In [None]:
import findspark

In [None]:
findspark.init()

In [None]:
from pyspark.sql import SparkSession

In [None]:
# For every application, there is only one Instance of SparlSession/SparkContext
spark = SparkSession.builder\
                   .appName("DF Operations")\
                   .master("local[2]")\
                   .getOrCreate()

In [None]:
spark.conf.set("spark.sql.shuffle.partitions",3)

In [None]:
# spark.read = returns DataFrameReader

df = spark.read\
          .option("header",True)\
          .option("inferSchema",True)\
          .csv("d:\\data\\custs.txt")


In [None]:
df.printSchema()

## String / SQL Like Expressions

In [None]:
# 1. get all the records where age > 40
# 2. get all the records where fname starts with 'S'
# 3. get all the records where desig is ('Teacher',"Pilot','Lawyer')
# 4. get all the records where age > 50 and desig is Pilot
# 5. get all the records where age is between 40 and 50
# 6. Get all the records where desig is null 
                                       
# 7. Get designation wise count
# 8. Get top 10 designations

In [None]:
# 1. get all the records where age > 40
df.where("age > 40").show()

In [None]:
# 2. get all the records where fname starts with 'S'
df.where("fname like 'S%'").show()

In [None]:
# 3. get all the records where desig is ('Teacher',"Pilot','Lawyer')
df.where("desig in ('Teacher','Pilot','Lawyer')").show()

In [None]:
# 4. get all the records where age > 50 and desig is Pilot
df.where("desig = 'Pilot' and age > 50").show(5)


In [None]:
# 5. get all the records where age is between 40 and 50
df.where("age between 40 and 50").show()

In [None]:
# 6. Get designation wise count
# select desig,count(*) from df group by desig ;

df.groupby("desig").count().show()

In [None]:
# 7. Get top 10 designations
df.groupby("desig").count().orderBy("count",ascending=False).show(10)

In [None]:
# 8. Get top 10 designations where overall count is greater than 215
df.groupby("desig").count().orderBy("count",ascending=False).where("count > 215").show(10)

## COLUMN Based Expression

In [None]:
from pyspark.sql.functions import col

In [None]:
# df.where(col("age") > 40).show()
# df.select("fname","lname","age").where("age > 40").show()

# 2. get all the records where fname starts with 'S'
# df.where(col("fname").startswith("S")).show()

# 3. get all the records where desig is ('Teacher',"Pilot','Lawyer')
# df.where(col("desig").isin('Teacher','Pilot','Lawyer')).show()

# 4. get all the records where age > 50 and desig is Pilot
# df.where((col("age") > 50) & (col("desig")=='Pilot')).show()

# 5. get all the records where age is between 40 and 50
# df.where(col("age").between(40,42)).show()

# 6. Get all the records where desig is null
df.where(col("desig").isNull()).show()

In [None]:
df.printSchema()

### Disadvantage of Using InferSchema option :
Spark runs a seperate JOB to figure out the schema (schema will be correct if date is not in default format), which can be avoided by creating your own Schema.

In [None]:
txn_df = spark.read\
          .option("header",True)\
          .option("inferSchema",True)\
          .csv("d:\\data\\txn_with_header.txt")

In [None]:
txn_df.printSchema()

### How to Define Schema

In [None]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType,FloatType,DateType,TimestampType,ArrayType

In [None]:
custSchema = StructType([
                        StructField("cid",IntegerType()),
                        StructField("fname",StringType()),
                        StructField("lname",StringType()),
                        StructField("age",IntegerType()),
                        StructField("desig",StringType()) 
                      ])

In [None]:
custDF = spark.read\
          .option("header",True)\
          .schema(custSchema)\
          .csv("d:\\data\\custs.txt")

In [None]:
custDF.show(2)

In [None]:
# Fail to parse '06-26-2011' in the new parser. 
# You can set "spark.sql.legacy.timeParserPolicy" to "LEGACY" to restore the behavior before Spark 3.0, 
# or set to "CORRECTED" and treat it as an invalid datetime string

In [None]:
txn_df.printSchema()

In [None]:
txnSchema = StructType([
                        StructField("txn_id",IntegerType()),
                        StructField("txn_date",DateType()),
                        StructField("cid",StringType()),
                        StructField("amount",FloatType()),
                        StructField("prod_cat",StringType()), 
                        StructField("prod",StringType()),
                        StructField("city",StringType()),
                        StructField("state",StringType()),
                        StructField("mode",StringType())
                      ])

# ,
                        # StructField("bad_records",StringType())

In [None]:
from pyspark.sql.functions import col

In [None]:
# 05-26-2011
txnDF = spark.read\
          .option("header",True)\
          .schema(txnSchema)\
          .option("mode","PERMISSIVE")\
          .option("dateFormat","M-dd-y")\
          .csv("d:\\data\\txn_with_header.txt")

txnDF.show(2,truncate=False)

# .option("columnNameOfCorruptRecord","bad_records")\

### Exercise :

* Create DF reading file <B>date_format_3.txt</B> 
<br>name,doj
<br>Ankit,18/03/2023 13:10
* Show Schema (make sure 2nd field is of Date Type)
* Print Data

### Reading JSON file with simple and Complex Schema

In [None]:
# 18/03/2023 13:10
dtSchema = StructType([
                        StructField("name",StringType()),
                        StructField("doj",TimestampType()) 
                      ])

dateDF = spark.read\
          .option("header",True)\
          .schema(dtSchema)\
          .option("timestampFormat","d/M/y HH:mm")\
          .csv("d:\\data\\dates\\date_format_3.txt")

dateDF.show()

In [None]:
jsonSchema = StructType([
                        StructField("name",StringType()),
                        StructField("age",IntegerType()),
                        StructField("hobbies",ArrayType(StringType())),
                        StructField("address",StructType([
                                                          StructField("rno",IntegerType()),
                                                          StructField("city",StringType())
                                                          ]))                       
                      ])

In [None]:
jsonDF = spark.read\
          .option("header",True)\
          .schema(jsonSchema)\
          .json("d:\\data\\people.json")

jsonDF.select("name","address.city").show()
jsonDF.show()

In [None]:
jsonDF.printSchema()

### Read Mode : 
* a) PERMISSIVE 
* b)DROPMALFORMED and 
* c)FAILFAST

In [None]:
# txnDF.show(2)
# PERMISSIVE -> replace corrupt record with null values
# DROPMALFORMED -> Drop records if it is not as per defined schema
# FAILFAST -> Throw Exception when it come across corrupt records, it will not process further

# txnDF = spark.read\
#           .option("header",True)\
#           .schema(txnSchema)\
#           .option("mode","FAILFAST")\
#           .option("dateFormat","M-dd-y")\
#           .csv("d:\\data\\txn_with_header.txt")

# txnDF.show()

### Simple Aggregation(Table/DF Level)

In [None]:
custDF.show(2)

In [None]:
from pyspark.sql.functions import sum,avg,max,min,round,count
from pyspark.sql import functions as f

In [None]:
custDF.select(f.avg("age"),f.min("age"),f.max("age")).show()

In [None]:
txnDF.show(2)

### Multiple Aggregations

In [None]:
# State,City Wise Sale
total_sale = f.round(f.sum("amount"),2).alias("Total_Sale")
average_sale = f.round(f.avg("amount"),2).alias("Avg_Sale")

txnDF.groupby("state")\
        .agg(
            total_sale,
            average_sale,
            f.min("amount"),
            f.max("amount")
        ).show()
# average Sale
# Min Sale
# Max Sale

### Window Aggregration:

In [None]:
txn_state_city_sum = txnDF.groupby("state","city").agg(sum("amount").alias("totalSale")).orderBy("state","city")
txn_state_city_sum.show(50)


In [None]:
from pyspark.sql import Window

In [None]:
running_total_window = Window.partitionBy("state")\
                             .orderBy("city")\
                             .rowsBetween(Window.unboundedPreceding,Window.currentRow)

In [None]:
txn_state_city_sum.withColumn("RunningTotal",f.sum("totalSale").over(running_total_window)).orderBy("state","city").show(3)

In [None]:
myWindow = Window.partitionBy("state")\
                             .orderBy("totalSale")

In [None]:
txn_state_city_sum_top_3 = txn_state_city_sum.withColumn("Rank",f.rank().over(myWindow)).orderBy("state")

In [None]:
txn_state_city_sum_top_3.where("Rank < 4").show()

### How to write DF onto Disk = DataFrameWriter

In [52]:
txnDF.write\
     .mode("overwrite")\
     .format("json")\
     .save("d:\\data\\json")

In [54]:
txnDF.write\
     .mode("overwrite")\
     .format("parquet")\
     .save("d:\\data\\parquet")

In [74]:
txn_par_df = spark.read.load("D:\data\parquet")

In [77]:
txn_par_df.select("state").show()

+--------------+
|         state|
+--------------+
|    California|
|    California|
|     Wisconsin|
|     Tennessee|
|      Illinois|
|South Carolina|
|          Ohio|
|          Iowa|
|       Florida|
|        Nevada|
|          Ohio|
|    California|
|        Hawaii|
|    California|
|        Hawaii|
|South Carolina|
|      Nebraska|
|          Utah|
|    New Jersey|
|     Louisiana|
+--------------+
only showing top 20 rows



In [58]:
txn_par_df.show(3)

+------+----------+-------+------+------------------+--------------------+----------+----------+------+
|txn_id|  txn_date|    cid|amount|          prod_cat|                prod|      city|     state|  mode|
+------+----------+-------+------+------------------+--------------------+----------+----------+------+
|     1|2011-05-26|4006742| 98.44|Exercise & Fitness|Weightlifting Gloves|Long Beach|California|credit|
|     2|2011-06-01|4009775|  5.58|Exercise & Fitness|Weightlifting Mac...|   Anaheim|California|credit|
|     3|2011-06-05|4002199|198.19|        Gymnastics|    Gymnastics Rings| Milwaukee| Wisconsin|credit|
+------+----------+-------+------+------------------+--------------------+----------+----------+------+
only showing top 3 rows



In [None]:
#     append: Append contents of this DataFrame to existing data.
#     overwrite: Overwrite existing data.
#     error or errorifexists: Throw an exception if data already exists.
#     ignore: Silently ignore this operation if data already exists.

In [73]:
txnDF.select("txn_id","cid","amount")\
     .write\
     .mode("append")\
     .format("parquet")\
     .save("d:\\data\\parquet")