# Basics of Spark Joins

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession\
            .builder\
            .appName("SparkDemo")\
            .getOrCreate()

In [0]:
orders_list = [("01", "02", 350, 1),
                ("01", "04", 580, 1),
                ("01", "07", 320, 2),
                ("02", "03", 450, 1),
                ("02", "06", 220, 1),
                ("03", "01", 195, 1),
                ("04", "09", 270, 3),
                ("04", "08", 410, 2),
                ("05", "02", 350, 1)]

order_df = spark.createDataFrame(orders_list).toDF("order_id", "prod_id", "unit_price", "qty")

product_list = [("01", "Scroll Mouse", 250, 20),
                ("02", "Optical Mouse", 350, 20), 
                ("03", "Wireless Mouse", 450, 50),
                ("04", "Wireless Keyboard", 580, 50),
                ("05", "Standard Keyboard", 360, 10),
                ("06", "16 GB Flash Storage", 240, 100),
                ("07", "32 GB Flash Storage", 320, 50),
                ("08", "64 GB Flash Storage", 430, 25)]

product_df = spark.createDataFrame (product_list).toDF("prod_id", "prod_name", "list_price", "qty")

# product_df.show()
# order_df.show()

# INNER JOIN

join_expr = order_df.prod_id == product_df.prod_id

order_df.join(product_df, join_expr, "inner")\
    .show()

+--------+-------+----------+---+-------+-------------------+----------+---+
|order_id|prod_id|unit_price|qty|prod_id|          prod_name|list_price|qty|
+--------+-------+----------+---+-------+-------------------+----------+---+
|      03|     01|       195|  1|     01|       Scroll Mouse|       250| 20|
|      01|     02|       350|  1|     02|      Optical Mouse|       350| 20|
|      05|     02|       350|  1|     02|      Optical Mouse|       350| 20|
|      02|     03|       450|  1|     03|     Wireless Mouse|       450| 50|
|      01|     04|       580|  1|     04|  Wireless Keyboard|       580| 50|
|      02|     06|       220|  1|     06|16 GB Flash Storage|       240|100|
|      01|     07|       320|  2|     07|32 GB Flash Storage|       320| 50|
|      04|     08|       410|  2|     08|64 GB Flash Storage|       430| 25|
+--------+-------+----------+---+-------+-------------------+----------+---+



In [0]:
# when having same name to multiple columns select will through ambiguity error as spark maps column names to internal ids while selecting columns
# to resolve this there are two steps
# 1. rename columns before joining
# 2. remove anyone column after join

product_renamed_df = product_df.withColumnRenamed("qty", "reorder_qty")

order_df.join(product_renamed_df, join_expr, "inner")\
    .drop(product_renamed_df.prod_id)\
    .select("order_id", "prod_id", "prod_name", "unit_price", "qty")\
    .show()

+--------+-------+-------------------+----------+---+
|order_id|prod_id|          prod_name|unit_price|qty|
+--------+-------+-------------------+----------+---+
|      03|     01|       Scroll Mouse|       195|  1|
|      01|     02|      Optical Mouse|       350|  1|
|      05|     02|      Optical Mouse|       350|  1|
|      02|     03|     Wireless Mouse|       450|  1|
|      01|     04|  Wireless Keyboard|       580|  1|
|      02|     06|16 GB Flash Storage|       220|  1|
|      01|     07|32 GB Flash Storage|       320|  2|
|      04|     08|64 GB Flash Storage|       410|  2|
+--------+-------+-------------------+----------+---+



In [0]:
# OUTER JOIN

order_df.join(product_renamed_df, join_expr, "outer")\
    .drop(product_renamed_df.prod_id)\
    .select("*")\
    .sort("order_id")\
    .show()

+--------+-------+----------+----+-------------------+----------+-----------+
|order_id|prod_id|unit_price| qty|          prod_name|list_price|reorder_qty|
+--------+-------+----------+----+-------------------+----------+-----------+
|    NULL|   NULL|      NULL|NULL|  Standard Keyboard|       360|         10|
|      01|     02|       350|   1|      Optical Mouse|       350|         20|
|      01|     04|       580|   1|  Wireless Keyboard|       580|         50|
|      01|     07|       320|   2|32 GB Flash Storage|       320|         50|
|      02|     03|       450|   1|     Wireless Mouse|       450|         50|
|      02|     06|       220|   1|16 GB Flash Storage|       240|        100|
|      03|     01|       195|   1|       Scroll Mouse|       250|         20|
|      04|     08|       410|   2|64 GB Flash Storage|       430|         25|
|      04|     09|       270|   3|               NULL|      NULL|       NULL|
|      05|     02|       350|   1|      Optical Mouse|       350

In [0]:
# LEFT JOIN

order_df.join(product_renamed_df, join_expr, "left")\
    .drop(product_renamed_df.prod_id)\
    .select("*")\
    .sort("order_id")\
    .show()

+--------+-------+----------+---+-------------------+----------+-----------+
|order_id|prod_id|unit_price|qty|          prod_name|list_price|reorder_qty|
+--------+-------+----------+---+-------------------+----------+-----------+
|      01|     04|       580|  1|  Wireless Keyboard|       580|         50|
|      01|     02|       350|  1|      Optical Mouse|       350|         20|
|      01|     07|       320|  2|32 GB Flash Storage|       320|         50|
|      02|     06|       220|  1|16 GB Flash Storage|       240|        100|
|      02|     03|       450|  1|     Wireless Mouse|       450|         50|
|      03|     01|       195|  1|       Scroll Mouse|       250|         20|
|      04|     08|       410|  2|64 GB Flash Storage|       430|         25|
|      04|     09|       270|  3|               NULL|      NULL|       NULL|
|      05|     02|       350|  1|      Optical Mouse|       350|         20|
+--------+-------+----------+---+-------------------+----------+-----------+

In [0]:
# coleasce function

from pyspark.sql.functions import expr

order_df.join(product_renamed_df, join_expr, "left")\
    .drop(product_renamed_df.prod_id)\
    .select("*")\
    .withColumn("prod_name", expr("coalesce(prod_name, prod_id)"))\
    .withColumn("list_price", expr("coalesce(list_price, unit_price)"))\
    .sort("order_id")\
    .show()

+--------+-------+----------+---+-------------------+----------+-----------+
|order_id|prod_id|unit_price|qty|          prod_name|list_price|reorder_qty|
+--------+-------+----------+---+-------------------+----------+-----------+
|      01|     07|       320|  2|32 GB Flash Storage|       320|         50|
|      01|     02|       350|  1|      Optical Mouse|       350|         20|
|      01|     04|       580|  1|  Wireless Keyboard|       580|         50|
|      02|     06|       220|  1|16 GB Flash Storage|       240|        100|
|      02|     03|       450|  1|     Wireless Mouse|       450|         50|
|      03|     01|       195|  1|       Scroll Mouse|       250|         20|
|      04|     09|       270|  3|                 09|       270|       NULL|
|      04|     08|       410|  2|64 GB Flash Storage|       430|         25|
|      05|     02|       350|  1|      Optical Mouse|       350|         20|
+--------+-------+----------+---+-------------------+----------+-----------+

In [0]:
# Revision


# The list of joins provided by Spark SQL is:

# Inner Join
# Left / Left Outer Join
# Right / Right Outer Join
# Outer / Full Join
# Cross Join
# Left Anti Join
# Left Semi Join
# Self Join

# Different Example

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession\
            .builder\
                .appName("Spark Joins")\
                    .getOrCreate()

orders_list = [("01", "02", 350, 1),
                ("01", "04", 580, 1),
                ("01", "07", 320, 2),
                ("02", "03", 450, 1),
                ("02", "06", 220, 1),
                ("03", "01", 195, 1),
                ("04", "09", 270, 3),
                ("04", "08", 410, 2),
                ("05", "02", 350, 1)]

new_orders_df = spark.createDataFrame(orders_list).toDF("order_id", "prod_id", "unit_price", "qty")
new_df.show()

product_list = [("01", "Scroll Mouse", 250, 20),
                ("02", "Optical Mouse", 350, 20), 
                ("03", "Wireless Mouse", 450, 50),
                ("04", "Wireless Keyboard", 580, 50),
                ("05", "Standard Keyboard", 360, 10),
                ("06", "16 GB Flash Storage", 240, 100),
                ("07", "32 GB Flash Storage", 320, 50),
                ("08", "64 GB Flash Storage", 430, 25)]

new_product_df = spark.createDataFrame(product_list).toDF("prod_id", "prod_name", "list_price", "qty")
new_product_df.show()

+--------+-------+----------+---+
|order_id|prod_id|unit_price|qty|
+--------+-------+----------+---+
|      01|     02|       350|  1|
|      01|     04|       580|  1|
|      01|     07|       320|  2|
|      02|     03|       450|  1|
|      02|     06|       220|  1|
|      03|     01|       195|  1|
|      04|     09|       270|  3|
|      04|     08|       410|  2|
|      05|     02|       350|  1|
+--------+-------+----------+---+

+-------+-------------------+----------+---+
|prod_id|          prod_name|list_price|qty|
+-------+-------------------+----------+---+
|     01|       Scroll Mouse|       250| 20|
|     02|      Optical Mouse|       350| 20|
|     03|     Wireless Mouse|       450| 50|
|     04|  Wireless Keyboard|       580| 50|
|     05|  Standard Keyboard|       360| 10|
|     06|16 GB Flash Storage|       240|100|
|     07|32 GB Flash Storage|       320| 50|
|     08|64 GB Flash Storage|       430| 25|
+-------+-------------------+----------+---+



In [0]:
# Inner Join
# Returns only the rows from both the dataframes that have matching values in both columns specified as the join keys.

# Inner Join Example 1 -
join_exp = new_orders_df.prod_id == new_product_df.prod_id

new_orders_df.join(new_product_df, join_exp, "inner")\
                            .show()

+--------+-------+----------+---+-------+-------------------+----------+---+
|order_id|prod_id|unit_price|qty|prod_id|          prod_name|list_price|qty|
+--------+-------+----------+---+-------+-------------------+----------+---+
|      03|     01|       195|  1|     01|       Scroll Mouse|       250| 20|
|      01|     02|       350|  1|     02|      Optical Mouse|       350| 20|
|      05|     02|       350|  1|     02|      Optical Mouse|       350| 20|
|      02|     03|       450|  1|     03|     Wireless Mouse|       450| 50|
|      01|     04|       580|  1|     04|  Wireless Keyboard|       580| 50|
|      02|     06|       220|  1|     06|16 GB Flash Storage|       240|100|
|      01|     07|       320|  2|     07|32 GB Flash Storage|       320| 50|
|      04|     08|       410|  2|     08|64 GB Flash Storage|       430| 25|
+--------+-------+----------+---+-------+-------------------+----------+---+



In [0]:
# Inner Join Example 2 -

df1 = spark.createDataFrame([("A", 1), ("B", 2), ("C", 3)], ["letter", "number"])
df2 = spark.createDataFrame([("A", 4), ("B", 5), ("D", 6)], ["letter", "value"])

join_exp_eg2 = df1['letter'] == df2['letter']

df1.join(df2, join_exp_eg2, "inner")\
                .show()

+------+------+------+-----+
|letter|number|letter|value|
+------+------+------+-----+
|     A|     1|     A|    4|
|     B|     2|     B|    5|
+------+------+------+-----+



In [0]:
# Left Outer Join
# Returns all the rows from the left dataframe and the matching rows from the right dataframe. If there are no matching values in the right dataframe, then it returns a null.

df1.join(df2, df1['letter'] == df2['letter'], "left")\
                    .show()
# OR
df1.join(df2, df1['letter'] == df2['letter'], "leftouter")\
                    .show()
# OR
df1.join(df2, df1['letter'] == df2['letter'], "left_outer")\
                    .show()

+------+------+------+-----+
|letter|number|letter|value|
+------+------+------+-----+
|     A|     1|     A|    4|
|     B|     2|     B|    5|
|     C|     3|  NULL| NULL|
+------+------+------+-----+

+------+------+------+-----+
|letter|number|letter|value|
+------+------+------+-----+
|     A|     1|     A|    4|
|     B|     2|     B|    5|
|     C|     3|  NULL| NULL|
+------+------+------+-----+

+------+------+------+-----+
|letter|number|letter|value|
+------+------+------+-----+
|     A|     1|     A|    4|
|     B|     2|     B|    5|
|     C|     3|  NULL| NULL|
+------+------+------+-----+



In [0]:
# Right Outer Join
# Returns all the rows from the right dataframe and the matching rows from the left dataframe. If there are no matching values in the left dataframe, then it returns a null.

df1.join(df2, df1['letter'] == df2['letter'], 'right').show()
# OR
df1.join(df2, df1['letter'] == df2['letter'], 'rightouter').show()
# OR
df1.join(df2, df1['letter'] == df2['letter'], 'right_outer').show()

+------+------+------+-----+
|letter|number|letter|value|
+------+------+------+-----+
|     A|     1|     A|    4|
|     B|     2|     B|    5|
|  NULL|  NULL|     D|    6|
+------+------+------+-----+

+------+------+------+-----+
|letter|number|letter|value|
+------+------+------+-----+
|     A|     1|     A|    4|
|     B|     2|     B|    5|
|  NULL|  NULL|     D|    6|
+------+------+------+-----+

+------+------+------+-----+
|letter|number|letter|value|
+------+------+------+-----+
|     A|     1|     A|    4|
|     B|     2|     B|    5|
|  NULL|  NULL|     D|    6|
+------+------+------+-----+



In [0]:
# Outer / Full Join
# Returns all the rows from both the dataframes, including the matching and non-matching rows. If there are no matching values, then the result will contain a NULL value in place of the missing data.

df1.join(df2, df1['letter'] == df2['letter'], 'outer').show()
# OR
df1.join(df2, df1['letter'] == df2['letter'], 'full').show()
# OR
df1.join(df2, df1['letter'] == df2['letter'], 'fullouter').show()
# OR
df1.join(df2, df1['letter'] == df2['letter'], 'full_outer').show()

+------+------+------+-----+
|letter|number|letter|value|
+------+------+------+-----+
|     A|     1|     A|    4|
|     B|     2|     B|    5|
|     C|     3|  NULL| NULL|
|  NULL|  NULL|     D|    6|
+------+------+------+-----+

+------+------+------+-----+
|letter|number|letter|value|
+------+------+------+-----+
|     A|     1|     A|    4|
|     B|     2|     B|    5|
|     C|     3|  NULL| NULL|
|  NULL|  NULL|     D|    6|
+------+------+------+-----+

+------+------+------+-----+
|letter|number|letter|value|
+------+------+------+-----+
|     A|     1|     A|    4|
|     B|     2|     B|    5|
|     C|     3|  NULL| NULL|
|  NULL|  NULL|     D|    6|
+------+------+------+-----+

+------+------+------+-----+
|letter|number|letter|value|
+------+------+------+-----+
|     A|     1|     A|    4|
|     B|     2|     B|    5|
|     C|     3|  NULL| NULL|
|  NULL|  NULL|     D|    6|
+------+------+------+-----+



In [0]:
# Cross Join
# Returns all possible combinations of rows from both the dataframes. In other words, it takes every row from one dataframe and matches it with every row in the other dataframe. The result is a new dataframe with all possible combinations of the rows from the two input dataframes.

df1.crossJoin(df2)\
        .show()

+------+------+------+-----+
|letter|number|letter|value|
+------+------+------+-----+
|     A|     1|     A|    4|
|     A|     1|     B|    5|
|     A|     1|     D|    6|
|     B|     2|     A|    4|
|     B|     2|     B|    5|
|     B|     2|     D|    6|
|     C|     3|     A|    4|
|     C|     3|     B|    5|
|     C|     3|     D|    6|
+------+------+------+-----+



In [0]:
# Left Anti Join
# A left anti join in Spark SQL is a type of left join operation that returns only the rows from the left dataframe that do not have matching values in the right dataframe. It is used to find the rows in one dataframe that do not have corresponding values in another dataframe.

df1.join(df2, df1['letter'] == df2['letter'], 'left_anti')\
                        .show()

+------+------+
|letter|number|
+------+------+
|     C|     3|
+------+------+



In [0]:
# Left Semi Join
# A left semi join in Spark SQL is a type of join operation that returns only the columns from the left dataframe that have matching values in the right dataframe. It is used to find the values in one dataframe that have corresponding values in another dataframe.

df1.join(df2, df1['letter'] == df2['letter'], "leftsemi")\
                            .show()

+------+------+
|letter|number|
+------+------+
|     A|     1|
|     B|     2|
+------+------+



In [0]:
# Self Join
# A self join in Spark SQL is a join operation in which a dataframe is joined with itself. It is used to compare the values within a single dataframe and return the rows that match specified criteria.

df1.alias("temp1").join(df1.alias("temp2"), df1['letter'] == df1['letter'])\
                                        .show()

+------+------+------+------+
|letter|number|letter|number|
+------+------+------+------+
|     A|     1|     A|     1|
|     B|     2|     B|     2|
|     C|     3|     C|     3|
+------+------+------+------+

