#Spark Join Cheatsheet By Deepak Goyal (Azurelib Academy)
Course link: https://adeus.azurelib.com <br>
Mail at: admin@azurelib.com

In [0]:

# create two dataframes
df1 = spark.createDataFrame([(1, "John", 25), (2, "Jane", 30), (3, "Jim", 35)], 
                           ["id", "name", "age"])
df2 = spark.createDataFrame([(1, "NYC"), (2, "LA"), (3, "DC")], 
                           ["id", "city"])

# perform the inner join
join_df = df1.join(df2, df1.id == df2.id, "inner")

# display the joined dataframe
join_df.show()


+---+----+---+---+----+
| id|name|age| id|city|
+---+----+---+---+----+
|  1|John| 25|  1| NYC|
|  2|Jane| 30|  2|  LA|
|  3| Jim| 35|  3|  DC|
+---+----+---+---+----+



In [0]:
#left join
join_df = df1.join(df2, df1.id == df2.id, "left")



In [0]:
#right join
join_df = df1.join(df2, df1.id == df2.id, "right")

#Broadcast join

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast


# create two dataframes
df1 = spark.createDataFrame([(1, "John", 25), (2, "Jane", 30), (3, "Jim", 35)], 
                           ["id", "name", "age"])
df2 = spark.createDataFrame([(1, "NYC"), (2, "LA")], 
                           ["id", "city"])

# broadcast the smaller dataframe
join_df = df1.join(broadcast(df2), df1.id == df2.id)

# display the joined dataframe
join_df.show()


+---+----+---+---+----+
| id|name|age| id|city|
+---+----+---+---+----+
|  1|John| 25|  1| NYC|
|  2|Jane| 30|  2|  LA|
+---+----+---+---+----+



#Join opitimization Partition based on id

In [0]:
#Join opitimization Partition based on id

from pyspark.sql import SparkSession



# create two dataframes
df1 = spark.createDataFrame([(1, "John", 25), (2, "Jane", 30), (3, "Jim", 35)], 
                           ["id", "name", "age"])
df2 = spark.createDataFrame([(1, "NYC"), (2, "LA")], 
                           ["id", "city"])

# repartition the dataframes on the join key
df1 = df1.repartition(df1.id)
df2 = df2.repartition(df2.id)

# perform the join
join_df = df1.join(df2, df1.id == df2.id)

# display the joined dataframe
join_df.show()


+---+----+---+---+----+
| id|name|age| id|city|
+---+----+---+---+----+
|  1|John| 25|  1| NYC|
|  2|Jane| 30|  2|  LA|
+---+----+---+---+----+



#SortMerge Join
In this example, we first create two dataframes df1 and df2, and then sort each dataframe on the join key id using the sortWithinPartitions method. Finally, we perform the join using the join method and display the resulting joined dataframe.

Note that the sortWithinPartitions method sorts the data within each partition, not the entire dataframe. The data in each partition is sorted in ascending order by default, but you can specify a different sort order by passing the ascending argument.

In [0]:
from pyspark.sql import SparkSession


# create two dataframes
df1 = spark.createDataFrame([(1, "John", 25), (2, "Jane", 30), (3, "Jim", 35)], 
                           ["id", "name", "age"])
df2 = spark.createDataFrame([(1, "NYC"), (2, "LA")], 
                           ["id", "city"])

# sort the dataframes on the join key
df1 = df1.sortWithinPartitions("id")
df2 = df2.sortWithinPartitions("id")

# perform the join
join_df = df1.join(df2, df1.id == df2.id)

# display the joined dataframe
join_df.show()


+---+----+---+---+----+
| id|name|age| id|city|
+---+----+---+---+----+
|  1|John| 25|  1| NYC|
|  2|Jane| 30|  2|  LA|
+---+----+---+---+----+



#Bucketing concept

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

# create a Spark session
spark = SparkSession.builder.appName("BucketingExample").getOrCreate()

# create a dataframe
df = spark.createDataFrame([(1, "John", 25), (2, "Jane", 30), (3, "Jim", 35)], 
                           ["id", "name", "age"])

# bucket the dataframe
#df = df.repartition(1, expr("id")).write.bucketBy(2, "id").sortBy("id").mode("overwrite").parquet("/tmp/bucketed_data").saveAsTable("bucketExample")

df.write.format("parquet").bucketBy(2, "id").sortBy("id").option("path", "/tmp/bucketed_data").saveAsTable("bucketExample")


# read the bucked data
bucketed_df = spark.read.parquet("/tmp/bucketed_data")

# display the bucked data
bucketed_df.show()


+---+----+---+
| id|name|age|
+---+----+---+
|  1|John| 25|
|  2|Jane| 30|
|  3| Jim| 35|
+---+----+---+



In [0]:
%fs
ls /tmp/bucketed_data

path,name,size,modificationTime
dbfs:/tmp/bucketed_data/_SUCCESS,_SUCCESS,0,1675936832000
dbfs:/tmp/bucketed_data/_committed_1731950025197263571,_committed_1731950025197263571,339,1675936832000
dbfs:/tmp/bucketed_data/_started_1731950025197263571,_started_1731950025197263571,0,1675936828000
dbfs:/tmp/bucketed_data/part-00002-tid-1731950025197263571-c59e3e92-734f-4eca-8d89-f15e89cfec6f-86-1_00001.c000.snappy.parquet,part-00002-tid-1731950025197263571-c59e3e92-734f-4eca-8d89-f15e89cfec6f-86-1_00001.c000.snappy.parquet,1080,1675936831000
dbfs:/tmp/bucketed_data/part-00005-tid-1731950025197263571-c59e3e92-734f-4eca-8d89-f15e89cfec6f-89-1_00000.c000.snappy.parquet,part-00005-tid-1731950025197263571-c59e3e92-734f-4eca-8d89-f15e89cfec6f-89-1_00000.c000.snappy.parquet,1079,1675936831000
dbfs:/tmp/bucketed_data/part-00007-tid-1731950025197263571-c59e3e92-734f-4eca-8d89-f15e89cfec6f-91-1_00001.c000.snappy.parquet,part-00007-tid-1731950025197263571-c59e3e92-734f-4eca-8d89-f15e89cfec6f-91-1_00001.c000.snappy.parquet,1073,1675936831000


#Incremental Data load using the append mode

In [0]:
# Load new data into a dataframe
new_data_df = spark.read.parquet("/data/new_data")

# Append the new data to an existing Delta Lake table
new_data_df.write.format("delta").mode("append").save("/delta/events")


#Incremental Data load using the delta table Merge command

In [0]:
# Load new data into a dataframe
new_data_df = spark.read.parquet("/data/new_data")

# Create a temporary table for the new data
new_data_df.createOrReplaceTempView("new_data")

# Perform a MERGE statement to update the data in the storage system
spark.sql("""
  MERGE INTO events 
  USING new_data 
  ON events.id = new_data.id 
  WHEN MATCHED THEN 
    UPDATE SET * 
  WHEN NOT MATCHED THEN 
    INSERT * 
""")


# Map Join /Brodacast Join

In [0]:
from pyspark.sql.functions import broadcast

# Load the large table
large_table = spark.read.parquet("/data/large_table")

# Load the small table
small_table = spark.read.parquet("/data/small_table")

# Broadcast the small table
broadcast_small_table = broadcast(small_table)

# Perform the join
result = large_table.join(broadcast_small_table, "id")


#UDF example

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define the UDF
def to_upper(s):
    return s.upper()

# Register the UDF
to_upper_udf = udf(to_upper, StringType())

# Create a DataFrame
df = spark.createDataFrame([("Hello",), ("World",)], ["text"])

# Use the UDF in Spark SQL
df.select(to_upper_udf(df["text"]).alias("text_upper")).show()

# Output:
# +---------+
# |text_upper|
# +---------+
# |     HELLO|
# |     WORLD|
# +---------+




In [0]:
# Another example

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, FloatType, DecimalType
from decimal import Decimal

# Define the UDF
def string_float_to_decimal(string, float_value):
    return Decimal(string) * Decimal(float_value)

# Register the UDF
string_float_to_decimal_udf = udf(string_float_to_decimal, DecimalType(10, 2))

# Create a DataFrame
df = spark.createDataFrame([("1", 1.0), ("2", 2.0)], ["string", "float_value"])

# Use the UDF in Spark SQL
df.select(string_float_to_decimal_udf(df["string"], df["float_value"]).alias("decimal_result")).show()

# Output:
# +-------------+
# |decimal_result|
# +-------------+
# |         1.00|
# |         4.00|
# +-------------+

In [0]:
from pyspark.sql.functions import *
base = spark.range(16000000)

#Write non-bucketed table
base.write.format("parquet").saveAsTable("unbucketed2")

#// Write bucketed table
base.write.format("parquet").bucketBy(16, "id").saveAsTable("bucketed2")

t1 = spark.table("unbucketed2")
t2 = spark.table("bucketed2")
t3 = spark.table("bucketed2")

#// Unbucketed - bucketed join. Both sides need to be repartitioned.



[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
[0;32m<command-3990509121387465>[0m in [0;36m<cell line: 5>[0;34m()[0m
[1;32m      3[0m [0;34m[0m[0m
[1;32m      4[0m [0;31m#Write non-bucketed table[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 5[0;31m [0mbase[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"parquet"[0m[0;34m)[0m[0;34m.[0m[0msaveAsTable[0m[0;34m([0m[0;34m"unbucketed2"[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m      6[0m [0;34m[0m[0m
[1;32m      7[0m [0;31m#// Write bucketed table[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m

[0;32m/databricks/spark/python/pyspark/instrumentation_utils.py[0m in [0;36mwrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m             [0mstart[0m [0;34m=[0m [0mtime[0m[0;34m.[0m[0mperf_counter[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0

In [0]:
t1.join(t2, "id").explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#47L]
   +- SortMergeJoin [id#47L], [id#49L], Inner
      :- Sort [id#47L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(id#47L, 16), ENSURE_REQUIREMENTS, [plan_id=129]
      :     +- Filter isnotnull(id#47L)
      :        +- FileScan parquet spark_catalog.default.unbucketed2[id#47L] Batched: true, DataFilters: [isnotnull(id#47L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/user/hive/warehouse/unbucketed2], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
      +- Sort [id#49L ASC NULLS FIRST], false, 0
         +- Filter isnotnull(id#49L)
            +- FileScan parquet spark_catalog.default.bucketed2[id#49L] Batched: true, Bucketed: true, DataFilters: [isnotnull(id#49L)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/user/hive/warehouse/bucketed2], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>,