1. **Immutable DF**
2. **Lazy eval**
3. **Col vs SQL expr**
4. **Filter pushdown**

In [0]:
# Session
df1 = spark.read \
            .option('path', "/FileStore/NYSEData") \
            .format('csv') \
            .option("inferSchema", "true") \
            .load()

# Jobs > Stages > Tasks
# 1 Statement can triger 1 job or more 
# 1 job can have 1 or many Stages
# 1 stage can have 1 or more tasks

In [0]:
df1 = df1.withColumnRenamed("_c0", "ShortName").withColumnRenamed("_c1", "Date").withColumnRenamed("_c2", "StockValue").select("ShortName", "Date", "StockValue")
display(df1)

In [0]:
# Column expression vs SQL expression
from pyspark.sql.functions import col

df2_sql = df1.where("Date = 20160101").where("`ShortName` like 'A%'")  # SQL Expr

df2_col = df1.where(col('Date') == 20160101).where(col('ShortName').like('A%'))  # Col EXPR

# transformation / action
# T1(where) > T2(filter) > T3(groupby) > Action(display of DF after T3) > T4 > T5 > Action(collect)

In [0]:
# Lazy Eval - Actions and Transformations

display(df2_sql)

In [0]:
# Immutable DF

print(df2_sql.rdd.id(), df1.rdd.id())

In [0]:
# Filter push down analysis

df2_sql = df2_sql.where("`Date` = '20160101'")
df2_sql.explain()

# Get all data backend > filter in spark > NO PUSHDOWN
# Filter data in backend > Show in spark > Pushdown

In [0]:
# Datatype and Schema definition
from pyspark.sql.types import StructField, StructType, StringType, DateType

train_df_schema = StructType([
    StructField('ShortName', StringType()), 
    StructField('Date', DateType()), 
    StructField('StockValue', StringType())])
df3_explicit_schema = spark.read \
                             .option('path', "/FileStore/NYSEData") \
                             .format('csv') \
                             .schema(train_df_schema) \
                             .option('dateFormat', "yyyymmdd") \
                             .load()

print("Previous data types are ", df2_sql.schema, '\n')
print("Explicitly defined data types are ", df3_explicit_schema.schema)

In [0]:
display(df3_explicit_schema)

In [0]:
# Filter push down analysis

print(df3_explicit_schema.where("StockValue > 50").explain())

df4_explicit_schema = df3_explicit_schema
# Filter was not done in back end. ALL data was retrieved by SPARK and then filteration was done in SPARK

1. **Simple aggregation**
2. **Grouped aggregation**
3. **Window aggregation**
4. **Cache**

In [0]:
# Aggregations
from pyspark.sql.functions import avg, count, expr

# Simple Aggregations - SQL expressions
display(df4_explicit_schema.selectExpr("count(1) as `Total Rows`", "avg(StockValue) as `Average StockValue`"))

In [0]:
# Group Aggregations - SQL Expressions

df4_explicit_schema.createOrReplaceTempView("df4_explicit_schema_tempview")
df5_explicit_schema = spark.sql("""select 
                                  `Date`, 
                                  avg(StockValue) as `Aggregated StockValue` 
                                 from 
                                  df4_explicit_schema_tempview 
                                 where `ShortName` like ('A%')
                                 group by 
                                  `Date`
                                 having sum(StockValue) > 100
                                 order by 
                                  `Date` 
                                """)

display(df5_explicit_schema)

In [0]:
# Cache

# df4_explicit_schema.cache()
# df4_explicit_schema.take(1)

df4_explicit_schema.is_cached

# df4_explicit_schema.storageLevel
# df4_explicit_schema.unpersist()

In [0]:
# Window Func Running Sum

from pyspark.sql.functions import sum
from pyspark.sql import Window

# Preperatory Steps
df4_explicit_schema = df4_explicit_schema.withColumnRenamed("StockValue", "Sales")
df4_explicit_schema = df4_explicit_schema.groupBy("ShortName", "Date").agg(sum("Sales").alias("Sales")).orderBy("ShortName", "Date")

spark_sales_custom_window1 = Window.partitionBy("ShortName").orderBy("Date").rowsBetween(Window.unboundedPreceding, Window.currentRow)
sales_df3 = df4_explicit_schema.withColumn("RunningTotalSales", sum("Sales").over(spark_sales_custom_window1))

display(sales_df3)

In [0]:
# Window Func mean

from pyspark.sql.functions import mean

spark_sales_custom_window2 = Window.partitionBy("ShortName").orderBy("Date").rowsBetween(-2, Window.currentRow)
sales_df4 = df4_explicit_schema.withColumn("MovingAverageSales", mean("Sales").over(spark_sales_custom_window2))

display(sales_df4)

In [0]:
# Window Func Rank

from pyspark.sql.functions import dense_rank

sales_df5 = df4_explicit_schema.select("Date", "ShortName", "Sales")  # Preperatory Step

spark_sales_custom_window3 = Window.partitionBy("Date").orderBy("Sales").rowsBetween(Window.unboundedPreceding, Window.currentRow)
sales_df5 = sales_df5.withColumn("RankSales", dense_rank().over(spark_sales_custom_window3))
display(sales_df5)

# df1 .........df4_explicit_schema > sales_df5
# df1..........df4_explicit_schema > sales_df4

# df1 .........df4_explicit_schema (cache) > sales_df5
#              df4_explicit_schema (cache)> sales_df4



# Dense Rank - 5 Jobs with 1 Stage each

# DF1(CSV) > DF2(T) > DF3 > DF4 > DF5 > DF6(action - display)
# DF1(CSV) > DF2(T) > DF3 > DF7 > DF8(action - display)

# DF1(CSV) > DF2(T) > DF3(CACHE) > DF4 > DF5 > DF6(action - display)
#                     DF3(CACHE) > DF7 > DF8(action - display)

1. **Repartition**
2. **Max records per file**
3. **Bucketing**

In [0]:
# Write data, file formats and files created upon write

df4_explicit_schema.write \
                     .format('parquet') \
                     .mode('overwrite') \
                     .option('path', "/FileStore/ParquetOutput0/") \
                     .save()

In [0]:
# Write data, file formats and files created upon write
from pyspark.sql.functions import spark_partition_id

print("number of partitions of DF are", df4_explicit_schema.rdd.getNumPartitions())
print("\npartitions size of DF are shown below")
print(df4_explicit_schema.groupBy(spark_partition_id()).count().show())

In [0]:
# Write data, file formats and files created upon write

df4_explicit_schema = df4_explicit_schema.repartition(6)
print("number of partitions of DF are", df4_explicit_schema.rdd.getNumPartitions())

print("\npartitions size of DF are shown below")
print(df4_explicit_schema.groupBy(spark_partition_id()).count().show())

df4_explicit_schema.write \
                     .format('json') \
                     .mode('overwrite') \
                     .option('path', "/FileStore/ParquetOutputRepartitioned0/") \
                     .save()

In [0]:
# Write data, file formats and files created upon write

df4_explicit_schema.write \
                     .format('json') \
                     .mode('overwrite') \
                     .option('path', "/FileStore/JSONOutput0/") \
                     .option('maxRecordsPerFile', 100000) \
                     .save()

In [0]:
# Write data, file formats and files created upon write

json_output_df = spark.read \
                      .format('json') \
                      .option('path', '/FileStore/JSONOutput0/part-00000-tid-*-c000.json') \
                      .load()
display(json_output_df)

In [0]:
# Write data, file formats and files created upon write
from pyspark.sql.functions import lower, col

print("\npartitions size of DF are shown below")
print(df4_explicit_schema.groupBy(spark_partition_id()).count().show())

df4_explicit_schema.write \
                     .format('json') \
                     .mode('overwrite') \
                     .option('path', "/FileStore/JSONOutputPartitioned0/") \
                     .partitionBy('Date') \
                     .save()

In [0]:
# Write data, file formats and files created upon write

json_output_df2 = spark.read \
                      .format('json') \
                      .option('path', '/FileStore/JSONOutputPartitioned0/Date=1997-01-01/part-00000-tid-*.c000.json') \
                      .load()
display(json_output_df2)

In [0]:
# Bucketing 

df4_explicit_schema.write \
                     .format('json') \
                     .mode('overwrite') \
                     .bucketBy(2, 'ShortName') \
                     .saveAsTable("bucket_table_0")

# PartitionBy - Date (31 Values - Jan) - Partitionby - Date - 1 Jan - 10,000 stocks - 1 file - 1JanFolder - 31 Folders
# 3100 - 3100 folders .....

# Bucket - HASH key - 2

1. **UDF**
2. **Scalar UDF**
3. **Grouped UDF**

In [0]:
display(df4_explicit_schema)

In [0]:
# UDF
from pyspark.sql.functions import upper

# UDF - Convert all product values in spark_sales_df6 column into upper case using withColumn and Select
@udf('string')
def udf_func(ShortName):
    return ShortName.lower()

sales_df5 = sales_df5.select("Date", "ShortName", "Sales")
sales_df7 = sales_df5.withColumn("UpdatedShortName_UDF", udf_func("ShortName"))
sales_df7 = sales_df7.withColumn("UpdatedProductName_Spark", lower(col("ShortName")))
display(sales_df7)

In [0]:
# Pandas Scalar Vectorized (Arrow) UDF 

from pyspark.sql.functions import pandas_udf, PandasUDFType

# UDF - Convert all product values in spark_sales_df8 column into upper case using withColumn
@pandas_udf('string', PandasUDFType.SCALAR)
def func_scalarUDF(ShortName):
    return ShortName.upper()

sales_df8 = sales_df7.withColumn("UpdatedProductName_Pandas_Scalar_UDF", func_scalarUDF("ShortName"))
display(sales_df8)

In [0]:
# Pandas Scalar Vectorized (Arrow) UDF 

@pandas_udf('string', PandasUDFType.SCALAR)
def func_scalarUDF(ShortName):
    return ShortName.str.upper()

sales_df8 = sales_df7.withColumn("UpdatedProductName_Pandas_Scalar_UDF", func_scalarUDF("ShortName"))
display(sales_df8)

In [0]:
# Pandas Grouped Map Vectorized (Arrow) UDF 

from pyspark.sql.types import IntegerType, DoubleType

schema = StructType([
    StructField('ShortName', StringType(), True),
    StructField('Date', DateType(), True),
    StructField('Sales', DoubleType(), True),
    StructField('running_total_sales_pandasUDF', DoubleType(), True)
])
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def grouped_map_UDF(group_of_shortnames):
    print(group_of_shortnames.head())
    group_of_shortnames["running_total_sales_pandasUDF"] = group_of_shortnames["Sales"].cumsum()
    return group_of_shortnames

sales_df9 = sales_df8.select("ShortName", "Date", "Sales")
sales_df10 = sales_df9.groupBy('ShortName').apply(grouped_map_UDF)

display(sales_df10)

In [0]:
# Pandas Grouped Map Vectorized (Arrow) UDF 

# Test UDF as a python function on drive.
testDF = sales_df9.where("ShortName = 'A'").toPandas()
grouped_map_UDF.func(testDF)

In [0]:
# Pandas Grouped Map Vectorized (Arrow) UDF 

# Try generating same output via spark native
sales_df11 = sales_df10.withColumn("RunningTotalSales", sum("Sales").over(spark_sales_custom_window1))

# Try generating same output via spark native function
def spark_native_func1(df):
    df = df.withColumn("RunningTotalSales_SeperateFunction", sum("Sales").over(spark_sales_custom_window1))
    return df

sales_df12 = spark_native_func1(sales_df11)
display(sales_df12)

1. **Broadcast Joins**
2. **Shuffle Sort Merge Joins**

In [0]:
display(df1)

In [0]:
meta_df = spark.read \
                 .option('path', "/FileStore/NYSEMetadata") \
                 .format('csv') \
                 .option('delimiter', "|") \
                 .load()
meta_df = meta_df.withColumnRenamed("_c0", "ShortName").withColumnRenamed("_c1", "FullName")

print(meta_df.groupBy(spark_partition_id()).count().show())

df1.createOrReplaceTempView("df_view")
meta_df.createOrReplaceTempView("meta_df_view")

In [0]:
%sql 
-- Broadcast Joins

EXPLAIN
select 
  table_a.ShortName,   table_b.FullName as FN
from 
  df_view as table_a 
  join meta_df_view as table_b 
  on table_a.ShortName = table_b.ShortName
where 
  table_b.FullName = 'Alcoa Inc.'

/*
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [ShortName#119, FullName#108 AS FN#203]
   +- BroadcastHashJoin [ShortName#119], [ShortName#96], Inner, BuildRight, false
      :- Project [_c0#45 AS ShortName#119]
      :  +- Filter isnotnull(_c0#45)
      :     +- FileScan csv [_c0#45] Batched: false, DataFilters: [isnotnull(_c0#45)], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/FileStore/nysedata], PartitionFilters: [], PushedFilters: [IsNotNull(_c0)], ReadSchema: struct<_c0:string>
      +- BroadcastExchange HashedRelationBroadcastMode(ArrayBuffer(input[0, string, true]),false), [plan_id=284]
         +- Project [_c0#76 AS ShortName#96, _c1#77 AS FullName#108]
            +- Filter ((isnotnull(_c1#77) AND (_c1#77 = Alcoa Inc.)) AND isnotnull(_c0#76))
               +- FileScan csv [_c0#76,_c1#77] Batched: false, DataFilters: [isnotnull(_c1#77), (_c1#77 = Alcoa Inc.), isnotnull(_c0#76)], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/FileStore/nysemetadataMAIN], PartitionFilters: [], PushedFilters: [IsNotNull(_c1), EqualTo(_c1,Alcoa Inc.), IsNotNull(_c0)], ReadSchema: struct<_c0:string,_c1:string>
*/

-- Inferences:
-- explain plan - table_b (short name and full name columns) is broadcased 
-- explain plan - FullName filter on table_b is pushed to the source

In [0]:
%sql 
-- Broadcast Joins

select 
  table_a.ShortName,   table_b.FullName as FN
from 
  df_view as table_a 
  join meta_df_view as table_b 
  on table_a.ShortName = table_b.ShortName
where 
  table_b.FullName = 'Alcoa Inc.'
  
-- Observe the broadcast join happening in logs - SQL/Dataframe

In [0]:
# Join column Ambiguity in spark

output_df = meta_df.join(df1, meta_df.ShortName == df1.ShortName, "inner").select("ShortName", "FullName").where("FullName = 'Alcoa Inc.'")

In [0]:
# Join column Ambiguity in spark

output_df = meta_df.join(df1, meta_df.ShortName == df1.ShortName, "inner").drop(df1.ShortName).select("ShortName", "FullName").where("FullName = 'Alcoa Inc.'")

output_df.display()

In [0]:
# change broadcast join minimum MB limit of small DF to force a shuffle sort join and not broadcast join

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1)

In [0]:
%sql 
--Shuffle Sort Merge Join

EXPLAIN
select 
  table_a.ShortName,   table_b.FullName as FN
from 
  df_view as table_a 
  join meta_df_view as table_b 
  on table_a.ShortName = table_b.ShortName
where 
  table_b.FullName = 'Alcoa Inc.'
  
/*
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [ShortName#921, FullName#910 AS FN#968]
   +- SortMergeJoin [ShortName#921], [ShortName#898], Inner
      :- Sort [ShortName#921 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(ShortName#921, 200), ENSURE_REQUIREMENTS, [plan_id=1966]
      :     +- Project [_c0#847 AS ShortName#921]
      :        +- Filter isnotnull(_c0#847)
      :           +- FileScan csv [_c0#847] Batched: false, DataFilters: [isnotnull(_c0#847)], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/FileStore/nysedata], PartitionFilters: [], PushedFilters: [IsNotNull(_c0)], ReadSchema: struct<_c0:string>
      +- Sort [ShortName#898 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(ShortName#898, 200), ENSURE_REQUIREMENTS, [plan_id=1967]
            +- Project [_c0#878 AS ShortName#898, _c1#879 AS FullName#910]
               +- Filter ((isnotnull(_c1#879) AND (_c1#879 = Alcoa Inc.)) AND isnotnull(_c0#878))
                  +- FileScan csv [_c0#878,_c1#879] Batched: false, DataFilters: [isnotnull(_c1#879), (_c1#879 = Alcoa Inc.), isnotnull(_c0#878)], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/FileStore/nysemetadataMAIN], PartitionFilters: [], PushedFilters: [IsNotNull(_c1), EqualTo(_c1,Alcoa Inc.), IsNotNull(_c0)], ReadSchema: struct<_c0:string,_c1:string>
*/

-- Inferences:
-- explain plan - Observe shuffle sort in plan

In [0]:
%sql 
--Shuffle Sort Merge Join

select 
  table_a.ShortName,   table_b.FullName as FN
from 
  df_view as table_a 
  join meta_df_view as table_b 
  on table_a.ShortName = table_b.ShortName
where 
  table_b.FullName = 'Alcoa Inc.'

-- Inferences:
-- Logs - Observe Shuffle sort happening in logs - SQL/DataFrame

1. **AQE - Shuffle Partitions**

In [0]:
# Default shuffle partitions is 200 and it kicks in once you do a wide transformation to an existing DF. 

spark.conf.set("spark.sql.adaptive.enabled", False)
print(spark.conf.get("spark.sql.adaptive.enabled"), spark.conf.get("spark.sql.shuffle.partitions"))
print(df1.rdd.getNumPartitions())

dist_shortname = df1.selectExpr("ShortName").distinct()
print(dist_shortname.rdd.getNumPartitions())

In [0]:
# Above behaviour can be changed by hardcoding the no of shuffle partitions

spark.conf.set("spark.sql.shuffle.partitions", 10)
dist_shortname = df1.selectExpr("ShortName").distinct()
print(dist_shortname.rdd.getNumPartitions())

In [0]:
# Above behaviour can also be changed by switching on AQE which will dynamically make partitions as per available cores.

spark.conf.set("spark.sql.adaptive.enabled", True)
dist_shortname = df1.selectExpr("ShortName").distinct()
print(dist_shortname.rdd.getNumPartitions())
display(dist_shortname)

# Observe logs (SQL/Dataframe) for additional AQE Shuffle which reduces the partitions from 200 to 1