## Import PySpark

In [1]:
import findspark
findspark.init()

import pyspark

## Initiate Spark Session with YARN Mode

In [2]:
from pyspark.conf import SparkConf
from pyspark.sql.session import SparkSession
import pyspark.sql.functions as F

conf=SparkConf()
conf.set("spark.driver.memory",      "1g") 
conf.set("spark.executor.memory",    "1g")
conf.set("spark.executor.instances", "3" )
# conf.set("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.1.2")

spark = SparkSession.builder.master("yarn").appName("Spark BDP Example 5").enableHiveSupport().config(conf=conf).getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark

### Load Sample data 

In [4]:
!hdfs dfs -mkdir -p /user/bigdatapedia/input/parquet

In [5]:
!hdfs dfs -put /home/bigdatapedia/data/customer_parq.parquet /user/bigdatapedia/input/parquet/

In [6]:
!hdfs dfs -ls /user/bigdatapedia/input/parquet

Found 1 items
-rw-r--r--   3 bigdatapedia supergroup     254648 2023-08-05 02:28 /user/bigdatapedia/input/parquet/customer_parq.parquet


In [7]:
df_cust = spark.read.parquet("/user/bigdatapedia/input/parquet")

                                                                                

In [8]:
df_cust.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_fname: string (nullable = true)
 |-- customer_lname: string (nullable = true)
 |-- customer_email: string (nullable = true)
 |-- customer_password: string (nullable = true)
 |-- customer_street: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_zipcode: string (nullable = true)



In [9]:
df_cust

DataFrame[customer_id: int, customer_fname: string, customer_lname: string, customer_email: string, customer_password: string, customer_street: string, customer_city: string, customer_state: string, customer_zipcode: string]

## via Dataframe

In [25]:
df_group = df_cust.groupBy("customer_state").agg(min("customer_id"), 
                                                  max("customer_id"), 
                                                  avg("customer_id")
                                                ).orderBy("customer_state")

In [26]:
df_group

DataFrame[customer_state: string, min(customer_id): int, max(customer_id): int, avg(customer_id): double]

In [27]:
df_group.show(20,0)



+--------------+----------------+----------------+-----------------+
|customer_state|min(customer_id)|max(customer_id)|avg(customer_id) |
+--------------+----------------+----------------+-----------------+
|AL            |1703            |10359           |5082.333333333333|
|AR            |965             |12074           |5454.416666666667|
|AZ            |25              |12362           |5974.723004694835|
|CA            |4               |12429           |6261.47564612326 |
|CO            |2               |12344           |6027.918032786885|
|CT            |213             |12421           |5823.260273972603|
|DC            |325             |12137           |6142.190476190476|
|DE            |294             |11508           |5121.0           |
|FL            |84              |12399           |6588.794117647059|
|GA            |42              |12364           |5603.443786982249|
|HI            |242             |12431           |6326.540229885058|
|IA            |265             |7

                                                                                

## Spark SQL

In [28]:
df_cust.createOrReplaceTempView("tbl_cust")

In [29]:
df_sql = spark.sql("""select customer_state, min(customer_id), max(customer_id), avg(customer_id) 
        from tbl_cust group by customer_state order by customer_state """)

In [30]:
df_sql

DataFrame[customer_state: string, min(customer_id): int, max(customer_id): int, avg(customer_id): double]

In [26]:
df_group

DataFrame[customer_state: string, min(customer_id): int, max(customer_id): int, avg(customer_id): double]

In [32]:
df_sql.show(20,0)



+--------------+----------------+----------------+-----------------+
|customer_state|min(customer_id)|max(customer_id)|avg(customer_id) |
+--------------+----------------+----------------+-----------------+
|AL            |1703            |10359           |5082.333333333333|
|AR            |965             |12074           |5454.416666666667|
|AZ            |25              |12362           |5974.723004694835|
|CA            |4               |12429           |6261.47564612326 |
|CO            |2               |12344           |6027.918032786885|
|CT            |213             |12421           |5823.260273972603|
|DC            |325             |12137           |6142.190476190476|
|DE            |294             |11508           |5121.0           |
|FL            |84              |12399           |6588.794117647059|
|GA            |42              |12364           |5603.443786982249|
|HI            |242             |12431           |6326.540229885058|
|IA            |265             |7

                                                                                

In [33]:
df_cust.explain()

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [customer_id#0,customer_fname#1,customer_lname#2,customer_email#3,customer_password#4,customer_street#5,customer_city#6,customer_state#7,customer_zipcode#8] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[hdfs://hdfs-bigdatapedia:9000/user/bigdatapedia/input/parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<customer_id:int,customer_fname:string,customer_lname:string,customer_email:string,customer...




In [34]:
df_sql.explain()

== Physical Plan ==
*(3) Sort [customer_state#7 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(customer_state#7 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#163]
   +- *(2) HashAggregate(keys=[customer_state#7], functions=[min(customer_id#0), max(customer_id#0), avg(cast(customer_id#0 as bigint))])
      +- Exchange hashpartitioning(customer_state#7, 200), ENSURE_REQUIREMENTS, [id=#159]
         +- *(1) HashAggregate(keys=[customer_state#7], functions=[partial_min(customer_id#0), partial_max(customer_id#0), partial_avg(cast(customer_id#0 as bigint))])
            +- *(1) ColumnarToRow
               +- FileScan parquet [customer_id#0,customer_state#7] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[hdfs://hdfs-bigdatapedia:9000/user/bigdatapedia/input/parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<customer_id:int,customer_state:string>




In [43]:
df_casewhen = df_cust.select("customer_id", "customer_fname", "customer_city", "customer_state", 
                             when(df_cust.customer_state == 'TX', 
                                         "Texas").otherwise("Non Texas").alias("New_State"))

In [44]:
df_casewhen.show(5,0)

+-----------+--------------+-------------+--------------+---------+
|customer_id|customer_fname|customer_city|customer_state|New_State|
+-----------+--------------+-------------+--------------+---------+
|1          |Richard       |Brownsville  |TX            |Texas    |
|2          |Mary          |Littleton    |CO            |Non Texas|
|3          |Ann           |Caguas       |PR            |Non Texas|
|4          |Mary          |San Marcos   |CA            |Non Texas|
|5          |Robert        |Caguas       |PR            |Non Texas|
+-----------+--------------+-------------+--------------+---------+
only showing top 5 rows



In [50]:
df_sql = spark.sql("""select customer_id, customer_fname, customer_city, customer_state, 
                case when customer_state='TX' then 'Texas' else 'Non Texas' end as new_state 
                from tbl_cust""")

df_sql.show(5, 0)

+-----------+--------------+-------------+--------------+---------+
|customer_id|customer_fname|customer_city|customer_state|new_state|
+-----------+--------------+-------------+--------------+---------+
|1          |Richard       |Brownsville  |TX            |Texas    |
|2          |Mary          |Littleton    |CO            |Non Texas|
|3          |Ann           |Caguas       |PR            |Non Texas|
|4          |Mary          |San Marcos   |CA            |Non Texas|
|5          |Robert        |Caguas       |PR            |Non Texas|
+-----------+--------------+-------------+--------------+---------+
only showing top 5 rows



In [51]:
df_sql.write.format("orc").saveAsTable("demo_de30t.case_tbl_orc_1")

In [52]:
df_sql.write.format("orc").save("/user/bigdatapedia/orc/case_tbl_orc_1")

### udf

In [59]:
df_sql.show(5,0)

+-----------+--------------+-------------+--------------+---------+
|customer_id|customer_fname|customer_city|customer_state|new_state|
+-----------+--------------+-------------+--------------+---------+
|1          |Richard       |Brownsville  |TX            |Texas    |
|2          |Mary          |Littleton    |CO            |Non Texas|
|3          |Ann           |Caguas       |PR            |Non Texas|
|4          |Mary          |San Marcos   |CA            |Non Texas|
|5          |Robert        |Caguas       |PR            |Non Texas|
+-----------+--------------+-------------+--------------+---------+
only showing top 5 rows



In [53]:
def convert_upper(x):
    return x.upper()

In [54]:
convert_upper('hello')

'HELLO'

In [56]:
from pyspark.sql.types import StringType

In [57]:
spark.udf.register("upper_convert", convert_upper, StringType())

<function __main__.convert_upper(x)>

In [50]:
df_sql = spark.sql("""select customer_id, customer_fname, customer_city, customer_state, 
                case when customer_state='TX' then 'Texas' else 'Non Texas' end as new_state 
                from tbl_cust""")

df_sql.show(5, 0)

+-----------+--------------+-------------+--------------+---------+
|customer_id|customer_fname|customer_city|customer_state|new_state|
+-----------+--------------+-------------+--------------+---------+
|1          |Richard       |Brownsville  |TX            |Texas    |
|2          |Mary          |Littleton    |CO            |Non Texas|
|3          |Ann           |Caguas       |PR            |Non Texas|
|4          |Mary          |San Marcos   |CA            |Non Texas|
|5          |Robert        |Caguas       |PR            |Non Texas|
+-----------+--------------+-------------+--------------+---------+
only showing top 5 rows



In [61]:
spark.sql("""select customer_id, customer_fname, customer_city, customer_state, 
                case when customer_state='TX' then 'Texas' else 'Non Texas' end as new_state , 
                upper_convert(customer_city) as caps_city from tbl_cust""").show(5,0)

[Stage 22:>                                                         (0 + 1) / 1]

+-----------+--------------+-------------+--------------+---------+-----------+
|customer_id|customer_fname|customer_city|customer_state|new_state|caps_city  |
+-----------+--------------+-------------+--------------+---------+-----------+
|1          |Richard       |Brownsville  |TX            |Texas    |BROWNSVILLE|
|2          |Mary          |Littleton    |CO            |Non Texas|LITTLETON  |
|3          |Ann           |Caguas       |PR            |Non Texas|CAGUAS     |
|4          |Mary          |San Marcos   |CA            |Non Texas|SAN MARCOS |
|5          |Robert        |Caguas       |PR            |Non Texas|CAGUAS     |
+-----------+--------------+-------------+--------------+---------+-----------+
only showing top 5 rows



                                                                                

In [62]:
udf

<function pyspark.sql.functions.udf(f=None, returnType=StringType)>

In [63]:
df_caps_city = udf(lambda z: convert_upper(z))

In [64]:
df_casewhen.select("*", df_caps_city("customer_city")).show(5, 0)

+-----------+--------------+-------------+--------------+---------+-----------------------+
|customer_id|customer_fname|customer_city|customer_state|New_State|<lambda>(customer_city)|
+-----------+--------------+-------------+--------------+---------+-----------------------+
|1          |Richard       |Brownsville  |TX            |Texas    |BROWNSVILLE            |
|2          |Mary          |Littleton    |CO            |Non Texas|LITTLETON              |
|3          |Ann           |Caguas       |PR            |Non Texas|CAGUAS                 |
|4          |Mary          |San Marcos   |CA            |Non Texas|SAN MARCOS             |
|5          |Robert        |Caguas       |PR            |Non Texas|CAGUAS                 |
+-----------+--------------+-------------+--------------+---------+-----------------------+
only showing top 5 rows

