In [2]:
print("welcome to pyspark")

welcome to pyspark


### Spark Session

In [5]:
import findspark  # pip install findspark
findspark.init()

In [7]:
from pyspark.sql.session import SparkSession

In [9]:
spark = SparkSession.builder.appName("DF Example Dec2020").master("local[2]").getOrCreate()

### Create a RDD using Spark Session

In [10]:
rdd = spark.sparkContext.parallelize([1,2,3,4,5,6])

In [11]:
rdd.collect()

[1, 2, 3, 4, 5, 6]

### Dataframe Reader   -> Transformation

In [12]:
df = spark.read.csv("file:///home/bigdatapedia/01Dataset/emp.csv")

In [13]:
type(df)

pyspark.sql.dataframe.DataFrame

In [14]:
df.show()

+---+------+---+
|_c0|   _c1|_c2|
+---+------+---+
|  1|Dinesh| 66|
|  2| Kumar| 65|
+---+------+---+



### Dataframe Writer   -> Actions

In [15]:
df.write.csv("/user/bigdatapedia/dec2020/emp")

### create a DF using Parquet File

In [16]:
df_parquet = spark.read.parquet("/user/bigdatapedia/dec2020/dataset/parquet")

In [17]:
df_parquet.show(4)

+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|          1|       Richard|     Hernandez|     XXXXXXXXX|        XXXXXXXXX|  6303 Heather Plaza|  Brownsville|            TX|           78521|
|          2|          Mary|       Barrett|     XXXXXXXXX|        XXXXXXXXX|9526 Noble Embers...|    Littleton|            CO|           80126|
|          3|           Ann|         Smith|     XXXXXXXXX|        XXXXXXXXX|3422 Blue Pioneer...|       Caguas|            PR|           00725|
|          4|          Mary|         Jones|     XXXXXXXXX|        XXXXXXXXX|  8324 Little Common|   San Marcos|            CA|          

### create a DF using Avro File

In [27]:
# df_avro = spark.read.format("com.databricks.spark.avro").load("/user/bigdatapedia/dec2020/dataset/avro")

In [26]:
# df_avro = spark.read.format("com.databricks.spark.avro").load("/user/bigdatapedia/dec2020/dataset/avro")

### create a DF using ORC File

In [29]:
df_orc = spark.read.orc("/user/bigdatapedia/dec2020/dataset/orc/orders")

In [30]:
df_orc.show(2)

+--------+----------+-----------+-------+
|order_id|order_date|customer_id| status|
+--------+----------+-----------+-------+
|       2|2020-01-11|      10359|PENDING|
|       1|2020-01-11|       1234|PENDING|
+--------+----------+-----------+-------+
only showing top 2 rows



## Dataframe Transformation

### select

In [31]:
df_1 = df_orc.select("order_date", "status")
df_1.show(5)

+----------+-------+
|order_date| status|
+----------+-------+
|2020-01-11|PENDING|
|2020-01-11|PENDING|
|2020-01-11|PENDING|
+----------+-------+



### printSchema

In [32]:
df_orc.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- status: string (nullable = true)



In [33]:
df_1.printSchema()

root
 |-- order_date: string (nullable = true)
 |-- status: string (nullable = true)



In [34]:
df_2 = df_orc.select("order_date", "customer_id")
df_2.printSchema()

root
 |-- order_date: string (nullable = true)
 |-- customer_id: integer (nullable = true)



### Filter

In [37]:
df_parquet.show()

+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|          1|       Richard|     Hernandez|     XXXXXXXXX|        XXXXXXXXX|  6303 Heather Plaza|  Brownsville|            TX|           78521|
|          2|          Mary|       Barrett|     XXXXXXXXX|        XXXXXXXXX|9526 Noble Embers...|    Littleton|            CO|           80126|
|          3|           Ann|         Smith|     XXXXXXXXX|        XXXXXXXXX|3422 Blue Pioneer...|       Caguas|            PR|           00725|
|          4|          Mary|         Jones|     XXXXXXXXX|        XXXXXXXXX|  8324 Little Common|   San Marcos|            CA|          

In [38]:
df_1 = df_parquet.filter(df_parquet["customer_city"]=="Littleton")

In [39]:
df_1.show()

+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|          2|          Mary|       Barrett|     XXXXXXXXX|        XXXXXXXXX|9526 Noble Embers...|    Littleton|            CO|           80126|
|       1067|          Mary|        Hansen|     XXXXXXXXX|        XXXXXXXXX|    4212 Honey Trail|    Littleton|            CO|           80126|
|       1385|     Catherine|         Smith|     XXXXXXXXX|        XXXXXXXXX|      174 Hazy Pines|    Littleton|            CO|           80126|
|       6859|          Mary|         Smith|     XXXXXXXXX|        XXXXXXXXX|3538 Golden Bluff...|    Littleton|            CO|          

In [40]:
df_parquet.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_fname: string (nullable = true)
 |-- customer_lname: string (nullable = true)
 |-- customer_email: string (nullable = true)
 |-- customer_password: string (nullable = true)
 |-- customer_street: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_zipcode: string (nullable = true)



In [41]:
df_2 = df_parquet.filter(df_parquet["customer_id"]==7211)

In [42]:
df_2.show()

+-----------+--------------+--------------+--------------+-----------------+----------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password| customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+----------------+-------------+--------------+----------------+
|       7211|      Kathleen|         Smith|     XXXXXXXXX|        XXXXXXXXX|9203 Dusty Vista|    Littleton|            CO|           80126|
+-----------+--------------+--------------+--------------+-----------------+----------------+-------------+--------------+----------------+



In [45]:
df_3 = df_parquet.filter(df_parquet["customer_id"]<10)

In [46]:
df_3.show()

+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|          1|       Richard|     Hernandez|     XXXXXXXXX|        XXXXXXXXX|  6303 Heather Plaza|  Brownsville|            TX|           78521|
|          2|          Mary|       Barrett|     XXXXXXXXX|        XXXXXXXXX|9526 Noble Embers...|    Littleton|            CO|           80126|
|          3|           Ann|         Smith|     XXXXXXXXX|        XXXXXXXXX|3422 Blue Pioneer...|       Caguas|            PR|           00725|
|          4|          Mary|         Jones|     XXXXXXXXX|        XXXXXXXXX|  8324 Little Common|   San Marcos|            CA|          

### show  vs limit 

show -> Action

limit -> Transformation

In [48]:
df_4 = df_3.limit(5)

In [50]:
df_4.show(5)

+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|          1|       Richard|     Hernandez|     XXXXXXXXX|        XXXXXXXXX|  6303 Heather Plaza|  Brownsville|            TX|           78521|
|          2|          Mary|       Barrett|     XXXXXXXXX|        XXXXXXXXX|9526 Noble Embers...|    Littleton|            CO|           80126|
|          3|           Ann|         Smith|     XXXXXXXXX|        XXXXXXXXX|3422 Blue Pioneer...|       Caguas|            PR|           00725|
|          4|          Mary|         Jones|     XXXXXXXXX|        XXXXXXXXX|  8324 Little Common|   San Marcos|            CA|          

### Order By

In [51]:
df_5 = df_4.orderBy("customer_city")

In [52]:
df_5.show()

+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|          1|       Richard|     Hernandez|     XXXXXXXXX|        XXXXXXXXX|  6303 Heather Plaza|  Brownsville|            TX|           78521|
|          5|        Robert|        Hudson|     XXXXXXXXX|        XXXXXXXXX|10 Crystal River ...|       Caguas|            PR|           00725|
|          3|           Ann|         Smith|     XXXXXXXXX|        XXXXXXXXX|3422 Blue Pioneer...|       Caguas|            PR|           00725|
|          2|          Mary|       Barrett|     XXXXXXXXX|        XXXXXXXXX|9526 Noble Embers...|    Littleton|            CO|          

In [53]:
df_6 = df_4.orderBy("customer_city", ascending=False)
df_6.show()

+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|          4|          Mary|         Jones|     XXXXXXXXX|        XXXXXXXXX|  8324 Little Common|   San Marcos|            CA|           92069|
|          2|          Mary|       Barrett|     XXXXXXXXX|        XXXXXXXXX|9526 Noble Embers...|    Littleton|            CO|           80126|
|          3|           Ann|         Smith|     XXXXXXXXX|        XXXXXXXXX|3422 Blue Pioneer...|       Caguas|            PR|           00725|
|          5|        Robert|        Hudson|     XXXXXXXXX|        XXXXXXXXX|10 Crystal River ...|       Caguas|            PR|          

### group By

In [54]:
df_7 = df_6.groupBy("customer_city").count()

In [55]:
df_7

DataFrame[customer_city: string, count: bigint]

In [56]:
df_7.show()

+-------------+-----+
|customer_city|count|
+-------------+-----+
|  Brownsville|    1|
|    Littleton|    1|
|       Caguas|    2|
|   San Marcos|    1|
+-------------+-----+



In [57]:
df_7.collect()

[Row(customer_city='Brownsville', count=1),
 Row(customer_city='Littleton', count=1),
 Row(customer_city='Caguas', count=2),
 Row(customer_city='San Marcos', count=1)]