In [1]:
!pip3 install findspark



In [2]:
import findspark
findspark.init()

In [3]:
import os
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [4]:
#executing the shell.py inside the 'SPARK_HOME' environment

exec(open(os.path.join(os.environ["SPARK_HOME"],'python/pyspark/shell.py')).read())

:: loading settings :: url = jar:file:/Users/tania/Downloads/spark/spark-3.3.2-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/tania/.ivy2/cache
The jars for the packages stored in: /Users/tania/.ivy2/jars
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-cbc73c42-f349-4348-9ffe-5332d9935cf1;1.0
	confs: [default]
	found org.apache.spark#spark-avro_2.12;3.3.2 in central
	found org.tukaani#xz;1.9 in central
	found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 364ms :: artifacts dl 35ms
	:: modules in use:
	org.apache.spark#spark-avro_2.12;3.3.2 from central in [default]
	org.spark-project.spark#unused;1.0.0 from central in [default]
	org.tukaani#xz;1.9 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     

23/08/09 20:15:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.3.2
      /_/

Using Python version 3.9.13 (main, Aug 25 2022 18:29:29)
Spark context Web UI available at http://192.168.0.25:4040
Spark context available as 'sc' (master = local[*], app id = local-1691622919291).
SparkSession available as 'spark'.


# Creating Dataframe

## Example 1

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark.read.json('/Users/tania/Downloads/spark/Spark-Programming-In-Python-master/notebook/data/people.json').show()

                                                                                

+----+--------+
| age|    name|
+----+--------+
|null|Prashant|
|  30|   Abdul|
|  19|  Justin|
|  43|    Andy|
+----+--------+



In [6]:
def to_date_df(df, fmt, field):
    return df.withColumn(field, to_date(col(field), fmt))

In [7]:
my_schema = StructType([
    StructField('ID', StringType()),
    StructField('EventDate', StringType())
])

my_rows = [
    Row('1', '4/5/2020'),
    Row('2', '08/15/2022'),
    Row('3', '01/01/2023'),
    Row('4', '12/31/2022')
]

my_rdd = spark.sparkContext.parallelize(my_rows)
my_df = spark.createDataFrame(my_rdd, my_schema)
my_df.show()

                                                                                

+---+----------+
| ID| EventDate|
+---+----------+
|  1|  4/5/2020|
|  2|08/15/2022|
|  3|01/01/2023|
|  4|12/31/2022|
+---+----------+



In [8]:
new_df = to_date_df(my_df, 'M/d/y', 'EventDate')
new_df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- EventDate: date (nullable = true)



In [9]:
new_df.show()

+---+----------+
| ID| EventDate|
+---+----------+
|  1|2020-04-05|
|  2|2022-08-15|
|  3|2023-01-01|
|  4|2022-12-31|
+---+----------+



## Example 2

In [10]:
data_list = [
    ('Tania', 3,11,1987),
    ('Daniel', 12,12,1980),
    ('Juliana', 31,1,1983),
    ('Pedro', 31,1,84),
    ('Pedro', 31,1,84),
    ('Maria', 15,5,1) # duplicate
]

raw_df = spark.createDataFrame(data_list).toDF('Name', 'Day', 'Month', 'Year')
raw_df.show()

+-------+---+-----+----+
|   Name|Day|Month|Year|
+-------+---+-----+----+
|  Tania|  3|   11|1987|
| Daniel| 12|   12|1980|
|Juliana| 31|    1|1983|
|  Pedro| 31|    1|  84|
|  Pedro| 31|    1|  84|
|  Maria| 15|    5|   1|
+-------+---+-----+----+



In [11]:
# cleaning data
raw_df_cleaned = raw_df.withColumn('Year', 
expr(
'''case 
     when Year < 23 then cast(Year as int) + 2000
     when Year <100 then cast(Year as int) + 1900
     else Year 
end'''))
raw_df_cleaned.show()

+-------+---+-----+----+
|   Name|Day|Month|Year|
+-------+---+-----+----+
|  Tania|  3|   11|1987|
| Daniel| 12|   12|1980|
|Juliana| 31|    1|1983|
|  Pedro| 31|    1|1984|
|  Pedro| 31|    1|1984|
|  Maria| 15|    5|2001|
+-------+---+-----+----+



In [12]:
raw_df.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Day: long (nullable = true)
 |-- Month: long (nullable = true)
 |-- Year: long (nullable = true)



# Creating new columns 

## with expressions

In [13]:
from pyspark.sql.functions import year, month, dayofmonth

new_df2 = new_df.select('EventDate', \
                       year('EventDate').alias('Year'), \
                       month('EventDate').alias('Month'), \
                       dayofmonth("EventDate").alias('day') )
new_df2.show()

+----------+----+-----+---+
| EventDate|Year|Month|day|
+----------+----+-----+---+
|2020-04-05|2020|    4|  5|
|2022-08-15|2022|    8| 15|
|2023-01-01|2023|    1|  1|
|2022-12-31|2022|   12| 31|
+----------+----+-----+---+



In [14]:
new_df2.withColumn('Birthday', expr('''to_date(concat(Year, '-', Month, '-', day))''')).show()

+----------+----+-----+---+----------+
| EventDate|Year|Month|day|  Birthday|
+----------+----+-----+---+----------+
|2020-04-05|2020|    4|  5|2020-04-05|
|2022-08-15|2022|    8| 15|2022-08-15|
|2023-01-01|2023|    1|  1|2023-01-01|
|2022-12-31|2022|   12| 31|2022-12-31|
+----------+----+-----+---+----------+



In [15]:
new_df2.withColumn('Birthday', to_date(expr('''concat(Year, '-', Month, '-', day)'''))).show()

+----------+----+-----+---+----------+
| EventDate|Year|Month|day|  Birthday|
+----------+----+-----+---+----------+
|2020-04-05|2020|    4|  5|2020-04-05|
|2022-08-15|2022|    8| 15|2022-08-15|
|2023-01-01|2023|    1|  1|2023-01-01|
|2022-12-31|2022|   12| 31|2022-12-31|
+----------+----+-----+---+----------+



## drop column

In [16]:
new_df2.drop('day', 'Month').show()

+----------+----+
| EventDate|Year|
+----------+----+
|2020-04-05|2020|
|2022-08-15|2022|
|2023-01-01|2023|
|2022-12-31|2022|
+----------+----+



## drop duplicates

In [17]:
new_df2.drop_duplicates(['Year']).show()

+----------+----+-----+---+
| EventDate|Year|Month|day|
+----------+----+-----+---+
|2020-04-05|2020|    4|  5|
|2022-08-15|2022|    8| 15|
|2023-01-01|2023|    1|  1|
+----------+----+-----+---+



## create a unique id column

In [18]:
from pyspark.sql.functions import monotonically_increasing_id

raw_df_partitioned = raw_df.repartition(2)
raw_df_partitioned_with_id = raw_df_partitioned.withColumn('id', monotonically_increasing_id())
raw_df_partitioned_with_id.show()

+-------+---+-----+----+----------+
|   Name|Day|Month|Year|        id|
+-------+---+-----+----+----------+
|Juliana| 31|    1|1983|         0|
|  Pedro| 31|    1|  84|         1|
|  Maria| 15|    5|   1|         2|
|  Tania|  3|   11|1987|8589934592|
| Daniel| 12|   12|1980|8589934593|
|  Pedro| 31|    1|  84|8589934594|
+-------+---+-----+----+----------+



## sort column

In [19]:
new_df2.sort(expr('Month')).show()

+----------+----+-----+---+
| EventDate|Year|Month|day|
+----------+----+-----+---+
|2023-01-01|2023|    1|  1|
|2020-04-05|2020|    4|  5|
|2022-08-15|2022|    8| 15|
|2022-12-31|2022|   12| 31|
+----------+----+-----+---+



In [20]:
new_df2.orderBy('Year').show()

+----------+----+-----+---+
| EventDate|Year|Month|day|
+----------+----+-----+---+
|2020-04-05|2020|    4|  5|
|2022-08-15|2022|    8| 15|
|2022-12-31|2022|   12| 31|
|2023-01-01|2023|    1|  1|
+----------+----+-----+---+



## Substring Column

In [21]:
df = spark.createDataFrame([('a.b.c.d',)], ['s'])
df.select(substring_index(df.s, '.', 1).alias('s')).collect()
df.select(substring_index(df.s, '.', -3).alias('s')).collect()

[Row(s='b.c.d')]

## Inline Column Type Cast

In [22]:
# with cast() as expression
raw_df.withColumn('Year', 
expr(
'''case 
     when Year < 23 then cast(Year as int) + 2000
     when Year <100 then cast(Year as int) + 1900
     else Year 
end''')).show()

+-------+---+-----+----+
|   Name|Day|Month|Year|
+-------+---+-----+----+
|  Tania|  3|   11|1987|
| Daniel| 12|   12|1980|
|Juliana| 31|    1|1983|
|  Pedro| 31|    1|1984|
|  Pedro| 31|    1|1984|
|  Maria| 15|    5|2001|
+-------+---+-----+----+



In [23]:
raw_df.withColumn('Year', \
                  when(col('Year') < 23, col('Year') + 2000) \
                  .when(col('Year') < 100, col('Year') + 1900)\
                  .otherwise(col('Year'))
                 ).show()

+-------+---+-----+----+
|   Name|Day|Month|Year|
+-------+---+-----+----+
|  Tania|  3|   11|1987|
| Daniel| 12|   12|1980|
|Juliana| 31|    1|1983|
|  Pedro| 31|    1|1984|
|  Pedro| 31|    1|1984|
|  Maria| 15|    5|2001|
+-------+---+-----+----+



In [24]:
# with cast() function
raw_df_cleaned = raw_df.withColumn('Year', 
expr(
'''case 
     when Year < 23 then Year + 2000
     when Year <100 then Year + 1900
     else Year 
end''').cast(IntegerType()))
raw_df_cleaned.show()

+-------+---+-----+----+
|   Name|Day|Month|Year|
+-------+---+-----+----+
|  Tania|  3|   11|1987|
| Daniel| 12|   12|1980|
|Juliana| 31|    1|1983|
|  Pedro| 31|    1|1984|
|  Pedro| 31|    1|1984|
|  Maria| 15|    5|2001|
+-------+---+-----+----+



# Aggregations

In [39]:
from pyspark.sql import functions as f

invoices = spark.read.csv('/Users/tania/PycharmProjects/MyFirstSparkProgram/data/invoices.csv').show(5)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|      _c0|      _c1|                 _c2|     _c3|            _c4|      _c5|       _c6|           _c7|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
|   536365|     null|WHITE HANGING HEA...|       6|01-12-2010 8.26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01-12-2010 8.26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01-12-2010 8.26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01-12-2010 8.26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
only showing top 5 rows



In [32]:
invoices_df = spark.read\
                   .format('csv')\
                   .option('header', 'true')\
                   .option('inferSchema', 'true')\
                   .load('/Users/tania/PycharmProjects/MyFirstSparkProgram/data/invoices.csv')
invoices_df.show(5)



+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|     null|WHITE HANGING HEA...|       6|01-12-2010 8.26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|01-12-2010 8.26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|01-12-2010 8.26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|01-12-2010 8.26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|01-12-2010 8.26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
only showing top 5 rows



                                                                                

In [41]:
invoices_df.select(f.count('*').alias('Count *'),
                   f.sum('Quantity').alias('TotalCount'),
                   f.avg('UnitPrice').alias('AveragePrice'),
                   f.countDistinct('InvoiceNo').alias('CountDistinct')).show()



+-------+----------+-----------------+-------------+
|Count *|TotalCount|     AveragePrice|CountDistinct|
+-------+----------+-----------------+-------------+
| 541909|   5176450|4.611113626088501|        25900|
+-------+----------+-----------------+-------------+



                                                                                

In [None]:
# another way
invoices_df.selectExpr(
    'count(1) as  count 1', # count 1 is similar to * and include null
    'count(StockCode) as count field', # do not include nulls
    'sum(Quantity) as TotalQuantity',
    'avg(UnitPrice) as AveragePrice'
).show()

In [50]:
# another way
invoices_df.selectExpr(
    'count(1) as  `count 1`',
    'count(StockCode) as `count field`', # do not include nulls
    'sum(Quantity) as TotalQuantity',
    'avg(UnitPrice) as AveragePrice'
).show()

+-------+-----------+-------------+-----------------+
|count 1|count field|TotalQuantity|     AveragePrice|
+-------+-----------+-------------+-----------------+
| 541909|     541908|      5176450|4.611113626088323|
+-------+-----------+-------------+-----------------+



In [52]:
# with SQL
invoices_df.createOrReplaceTempView('sales')
summary = spark.sql(
''' SELECT Country, InvoiceNo,
           sum(Quantity) as TotalQuantity,
           round(sum(Quantity * UnitPrice), 2) as InvoiceValue
    FROM sales
    GROUP BY Country, InvoiceNo
''')
summary.show()



+--------------+---------+-------------+------------+
|       Country|InvoiceNo|TotalQuantity|InvoiceValue|
+--------------+---------+-------------+------------+
|United Kingdom|   536446|          329|      440.89|
|United Kingdom|   536508|          216|      155.52|
|United Kingdom|   537018|           -3|         0.0|
|United Kingdom|   537401|          -24|         0.0|
|United Kingdom|   537811|           74|      268.86|
|United Kingdom|  C537824|           -2|       -14.9|
|United Kingdom|   538895|          370|      247.38|
|United Kingdom|   540453|          341|      302.45|
|United Kingdom|   541291|          217|      305.81|
|United Kingdom|   542551|           -1|         0.0|
|United Kingdom|   542576|           -1|         0.0|
|United Kingdom|   542628|            9|      132.35|
|United Kingdom|   542886|          199|      320.51|
|United Kingdom|   542907|           75|      313.85|
|United Kingdom|   543131|          134|       164.1|
|United Kingdom|   543189|  

                                                                                

In [53]:
# another way

summary = invoices_df\
          .groupBy('Country', 'InvoiceNo')\
          .agg(f.sum('Quantity').alias('TotalQuantity'),
               f.round(f.sum(f.expr('Quantity * UnitPrice')), 2).alias('InvoiceValue'))
summary.show()



+--------------+---------+-------------+------------+
|       Country|InvoiceNo|TotalQuantity|InvoiceValue|
+--------------+---------+-------------+------------+
|United Kingdom|   536446|          329|      440.89|
|United Kingdom|   536508|          216|      155.52|
|United Kingdom|   537018|           -3|         0.0|
|United Kingdom|   537401|          -24|         0.0|
|United Kingdom|   537811|           74|      268.86|
|United Kingdom|  C537824|           -2|       -14.9|
|United Kingdom|   538895|          370|      247.38|
|United Kingdom|   540453|          341|      302.45|
|United Kingdom|   541291|          217|      305.81|
|United Kingdom|   542551|           -1|         0.0|
|United Kingdom|   542576|           -1|         0.0|
|United Kingdom|   542628|            9|      132.35|
|United Kingdom|   542886|          199|      320.51|
|United Kingdom|   542907|           75|      313.85|
|United Kingdom|   543131|          134|       164.1|
|United Kingdom|   543189|  

                                                                                

In [59]:
# Creating new columns

num_of_invoices = f.countDistinct('InvoiceNo').alias('NumInvoices')
total_quantity = f.sum('Quantity').alias('TotalQuantity')
total_invoice = f.round(f.sum(f.expr('Quantity * UnitPrice')), 2).alias('InvoiceValue')

summary_new = invoices_df\
              .withColumn('InvoiceDate', f.to_date(f.col('InvoiceDate'), 'dd-MM-yyyy H.mm'))\
              .where('year(InvoiceDate) == 2010')\
              .withColumn('WeekNumber', f.weekofyear(f.col('InvoiceDate')))\
              .groupBy('Country', 'WeekNumber')\
              .agg(num_of_invoices, total_quantity, total_invoice)

# save result to parquet format
# df.coalesce(1) will write to one file
summary_new.coalesce(1).write\
           .format('parquet')\
           .mode('overwrite')\
           .save('output_dir')

summary_new.sort('Country', 'WeekNumber').show()



+---------------+----------+-----------+-------------+------------+
|        Country|WeekNumber|NumInvoices|TotalQuantity|InvoiceValue|
+---------------+----------+-----------+-------------+------------+
|      Australia|        48|          1|          107|      358.25|
|      Australia|        49|          1|          214|       258.9|
|      Australia|        50|          2|          133|      387.95|
|        Austria|        50|          2|            3|      257.04|
|        Bahrain|        51|          1|           54|      205.74|
|        Belgium|        48|          1|          528|       346.1|
|        Belgium|        50|          2|          285|      625.16|
|        Belgium|        51|          2|          942|      838.65|
|Channel Islands|        49|          1|           80|      363.53|
|         Cyprus|        50|          1|          917|     1590.82|
|        Denmark|        49|          1|          454|      1281.5|
|           EIRE|        48|          7|        

                                                                                

# Windowing Aggregations

In [63]:
running_total_window = Window.partitionBy('Country')\
                             .orderBy('WeekNumber')\
                             .rowsBetween(Window.unboundedPreceding, Window.currentRow)
# PS:
# unboundedPreceding: from the beginning. Can be a numeric value

summary_window = summary_new\
                 .withColumn('RunningTotal', 
                             sum('InvoiceValue').over(running_total_window))
summary_window.show()



+---------------+----------+-----------+-------------+------------+------------------+
|        Country|WeekNumber|NumInvoices|TotalQuantity|InvoiceValue|      RunningTotal|
+---------------+----------+-----------+-------------+------------+------------------+
|      Australia|        48|          1|          107|      358.25|            358.25|
|      Australia|        49|          1|          214|       258.9|            617.15|
|      Australia|        50|          2|          133|      387.95|1005.0999999999999|
|        Austria|        50|          2|            3|      257.04|            257.04|
|        Bahrain|        51|          1|           54|      205.74|            205.74|
|        Belgium|        48|          1|          528|       346.1|             346.1|
|        Belgium|        50|          2|          285|      625.16|            971.26|
|        Belgium|        51|          2|          942|      838.65|1809.9099999999999|
|Channel Islands|        49|          1|   

                                                                                

# Joins

In [65]:
orders_list = [("01", "02", 350, 1),
               ("01", "04", 580, 1),
               ("01", "07", 320, 2),
               ("02", "03", 450, 1),
               ("02", "06", 220, 1),
               ("03", "01", 195, 1),
               ("04", "09", 270, 3),
               ("04", "08", 410, 2),
               ("05", "02", 350, 1)]

order_df = spark.createDataFrame(orders_list).toDF("order_id", "prod_id", "unit_price", "qty")

product_list = [("01", "Scroll Mouse", 250, 20),
                ("02", "Optical Mouse", 350, 20),
                ("03", "Wireless Mouse", 450, 50),
                ("04", "Wireless Keyboard", 580, 50),
                ("05", "Standard Keyboard", 360, 10),
                ("06", "16 GB Flash Storage", 240, 100),
                ("07", "32 GB Flash Storage", 320, 50),
                ("08", "64 GB Flash Storage", 430, 25)]

product_df = spark.createDataFrame(product_list).toDF("prod_id", "prod_name", "list_price", "qty")
product_renamed_df = product_df.withColumnRenamed("qty", "reorder_qty")

product_renamed_df.show()
order_df.show()

+-------+-------------------+----------+-----------+
|prod_id|          prod_name|list_price|reorder_qty|
+-------+-------------------+----------+-----------+
|     01|       Scroll Mouse|       250|         20|
|     02|      Optical Mouse|       350|         20|
|     03|     Wireless Mouse|       450|         50|
|     04|  Wireless Keyboard|       580|         50|
|     05|  Standard Keyboard|       360|         10|
|     06|16 GB Flash Storage|       240|        100|
|     07|32 GB Flash Storage|       320|         50|
|     08|64 GB Flash Storage|       430|         25|
+-------+-------------------+----------+-----------+

+--------+-------+----------+---+
|order_id|prod_id|unit_price|qty|
+--------+-------+----------+---+
|      01|     02|       350|  1|
|      01|     04|       580|  1|
|      01|     07|       320|  2|
|      02|     03|       450|  1|
|      02|     06|       220|  1|
|      03|     01|       195|  1|
|      04|     09|       270|  3|
|      04|     08|     

## Inner

In [66]:
join_expr = order_df.prod_id == product_df.prod_id

order_df.join(product_renamed_df, join_expr, "inner") \
        .drop(product_renamed_df.prod_id) \
        .select("order_id", "prod_id", "prod_name", "unit_price", "list_price", "qty") \
        .show()

+--------+-------+-------------------+----------+----------+---+
|order_id|prod_id|          prod_name|unit_price|list_price|qty|
+--------+-------+-------------------+----------+----------+---+
|      03|     01|       Scroll Mouse|       195|       250|  1|
|      01|     02|      Optical Mouse|       350|       350|  1|
|      05|     02|      Optical Mouse|       350|       350|  1|
|      02|     03|     Wireless Mouse|       450|       450|  1|
|      01|     04|  Wireless Keyboard|       580|       580|  1|
|      02|     06|16 GB Flash Storage|       220|       240|  1|
|      01|     07|32 GB Flash Storage|       320|       320|  2|
|      04|     08|64 GB Flash Storage|       410|       430|  2|
+--------+-------+-------------------+----------+----------+---+



## Outer join (Full Outer)

In [68]:
join_expr = order_df.prod_id == product_df.prod_id

order_df.join(product_renamed_df, join_expr, "outer") \
        .drop(product_renamed_df.prod_id) \
        .select("order_id", "prod_id", "prod_name", "unit_price", "list_price", "qty") \
        .sort('order_id')\
        .show()

+--------+-------+-------------------+----------+----------+----+
|order_id|prod_id|          prod_name|unit_price|list_price| qty|
+--------+-------+-------------------+----------+----------+----+
|    null|   null|  Standard Keyboard|      null|       360|null|
|      01|     04|  Wireless Keyboard|       580|       580|   1|
|      01|     02|      Optical Mouse|       350|       350|   1|
|      01|     07|32 GB Flash Storage|       320|       320|   2|
|      02|     03|     Wireless Mouse|       450|       450|   1|
|      02|     06|16 GB Flash Storage|       220|       240|   1|
|      03|     01|       Scroll Mouse|       195|       250|   1|
|      04|     08|64 GB Flash Storage|       410|       430|   2|
|      04|     09|               null|       270|      null|   3|
|      05|     02|      Optical Mouse|       350|       350|   1|
+--------+-------+-------------------+----------+----------+----+



## Left Join

In [73]:
join_expr = order_df.prod_id == product_df.prod_id

order_df.join(product_renamed_df, join_expr, "left") \
        .drop(product_renamed_df.prod_id) \
        .select("order_id", "prod_id", "prod_name", "unit_price", "list_price", "qty") \
        .withColumn('prod_name', expr('coalesce(prod_name, prod_id)'))\
        .withColumn('list_price', expr('coalesce(list_price, unit_price)'))\
        .sort('order_id')\
        .show()

+--------+-------+-------------------+----------+----------+---+
|order_id|prod_id|          prod_name|unit_price|list_price|qty|
+--------+-------+-------------------+----------+----------+---+
|      01|     02|      Optical Mouse|       350|       350|  1|
|      01|     07|32 GB Flash Storage|       320|       320|  2|
|      01|     04|  Wireless Keyboard|       580|       580|  1|
|      02|     03|     Wireless Mouse|       450|       450|  1|
|      02|     06|16 GB Flash Storage|       220|       240|  1|
|      03|     01|       Scroll Mouse|       195|       250|  1|
|      04|     09|                 09|       270|       270|  3|
|      04|     08|64 GB Flash Storage|       410|       430|  2|
|      05|     02|      Optical Mouse|       350|       350|  1|
+--------+-------+-------------------+----------+----------+---+



----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 49514)
Traceback (most recent call last):
  File "/usr/local/anaconda3/lib/python3.9/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/local/anaconda3/lib/python3.9/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/usr/local/anaconda3/lib/python3.9/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/local/anaconda3/lib/python3.9/socketserver.py", line 747, in __init__
    self.handle()
  File "/Users/tania/Downloads/spark/spark-3.3.2-bin-hadoop3/python/pyspark/accumulators.py", line 281, in handle
    poll(accum_updates)
  File "/Users/tania/Downloads/spark/spark-3.3.2-bin-hadoop3/python/pyspark/accumulators.py", line 253, in poll
    if func():
  File "/Users/tania/Downloads/spark/spark-3.3.2

## Right Join

In [70]:
join_expr = order_df.prod_id == product_df.prod_id

order_df.join(product_renamed_df, join_expr, "right") \
        .drop(product_renamed_df.prod_id) \
        .select("order_id", "prod_id", "prod_name", "unit_price", "list_price", "qty") \
        .sort('order_id')\
        .show()

+--------+-------+-------------------+----------+----------+----+
|order_id|prod_id|          prod_name|unit_price|list_price| qty|
+--------+-------+-------------------+----------+----------+----+
|    null|   null|  Standard Keyboard|      null|       360|null|
|      01|     07|32 GB Flash Storage|       320|       320|   2|
|      01|     04|  Wireless Keyboard|       580|       580|   1|
|      01|     02|      Optical Mouse|       350|       350|   1|
|      02|     03|     Wireless Mouse|       450|       450|   1|
|      02|     06|16 GB Flash Storage|       220|       240|   1|
|      03|     01|       Scroll Mouse|       195|       250|   1|
|      04|     08|64 GB Flash Storage|       410|       430|   2|
|      05|     02|      Optical Mouse|       350|       350|   1|
+--------+-------+-------------------+----------+----------+----+

