# This notebook is going to teach you how to perform  pyspark Joins

In [1]:
# Initialise Input And Output Dataset Path Variables
input_dataset_path = "C:/Users/Jasjyot Singh Jaswal/Documents/JupyterWB/BigDataOrclProject/InputDataset"
output_datset_path = "C:/Users/Jasjyot Singh Jaswal/Documents/JupyterWB/BigDataOrclProject/OutputDataset"
output_datset_path_csv = f"{output_datset_path}/CSV"
output_datset_path_pipe = f"{output_datset_path}/PIPE"
output_datset_path_tab = f"{output_datset_path}/TAB"
output_datset_path_json = f"{output_datset_path}/JSON"

# Import Dependencies

In [2]:
import findspark
findspark.init()
import pyspark # only run after findspark.init()
from pyspark.sql.functions import col, explode,coalesce
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
from pyspark.sql.types import StructType
from pyspark.sql.functions import udf
from pyspark.sql import functions as f
spark = SparkSession.builder.getOrCreate()
import datetime
from datetime import timedelta 
from pyspark.sql.types import ArrayType
from pyspark.sql.functions import split
from pyspark.sql.types import StringType
from pyspark.sql import functions
from pyspark.sql.window import Window
from pyspark.sql.functions import *

# Load Customer Information and Purchase Information dataframe.

### Find total counts for each and distinct customers present in each dataframe.
### Total count for Customer information dataframe and distinct customers present in it should be same in an ideal world
### Customer dataset has unmatched customer ID's from custID31,custID32,custID33.... all the way to custID50 (20 such customers)
### Purchase Dataset has unmatched customer ID's from custID3074,custID3074,custID3074,custID75,custID87,custID133,custID167 (5 Such Customers)
### Purchase Dataset has 4 ignorable sales with Purchase ID 393,496,456,429

In [9]:
cust_info = spark.read.option("header", "true").csv(f"{input_dataset_path}/customer_information.csv")
cust_info.show(truncate=False)
cust_info_count = cust_info.count()
cust_info_distinct_count = cust_info.select("CustomerID").distinct().count()
print(f"Total Customer Info Records are {cust_info_count} and Total Distinct Customer IDs present are {cust_info_distinct_count} ")

+----------+-----------------+----------------+------------------------+
|CustomerID|CustomerFirstName|CustomerLastName|StreetAddress           |
+----------+-----------------+----------------+------------------------+
|custID01  |Mikel            |Rankling        |2901 American Ash Road  |
|custID02  |Nial             |Rentcome        |08928 Sunfield Plaza    |
|custID03  |Leon             |Coales          |546 Darwin Park         |
|custID04  |Timothy          |Southon         |79507 Scott Court       |
|custID05  |Thedric          |Firbank         |755 Ramsey Terrace      |
|custID06  |Gerrie           |Barr            |477 Havey Hill          |
|custID07  |Hagen            |Maris           |7864 Hoepker Crossing   |
|custID08  |Winthrop         |Lantaph         |92172 Stoughton Court   |
|custID09  |Martainn         |Couch           |167 Calypso Terrace     |
|custID10  |Marcos           |Perrin          |223 Paget Road          |
|custID11  |Trev             |Stanyer         |8 Fa

In [10]:
purchase_info = spark.read.option("header", "true").csv(f"{input_dataset_path}/purchase_information.csv")
purchase_info.show(truncate=False)
purchase_info_count = purchase_info.count()
purchase_info_distinct_count = purchase_info.select("CustomerID").distinct().count()
print(f"Total Purchase Info Records are {purchase_info_count} and Total Distinct Customer IDs present are {purchase_info_distinct_count} ")

+----------+----------+------------+--------------+
|PurchaseID|CustomerID|PurchaseDate|PurchaseAmount|
+----------+----------+------------+--------------+
|1         |custID04  |09-04-2020  |28238         |
|2         |custID15  |10/14/2020  |29528         |
|3         |custID27  |6/18/2020   |22763         |
|4         |custID22  |10/15/2020  |24043         |
|5         |custID28  |01-01-2020  |20814         |
|6         |custID15  |01-02-2020  |27609         |
|7         |custID11  |10-10-2020  |23244         |
|8         |custID05  |7/21/2020   |28114         |
|9         |custID19  |9/14/2020   |26720         |
|10        |custID17  |11-09-2020  |29969         |
|11        |custID29  |01-09-2020  |28532         |
|12        |custID30  |10/23/2020  |26970         |
|13        |custID09  |07-05-2020  |25937         |
|14        |custID02  |04-06-2020  |25585         |
|15        |custID26  |08-01-2020  |21140         |
|16        |custID05  |3/24/2020   |29687         |
|17        |

In [17]:
### A . Perform Inner Join between Customer and Purchase Dataset on CustomerID column
### This will hold all matching records between Customer & Purchase Dataset on CustomerID column
### Remove 1 of the CustomerID columns as column comes from both dataframes after joining

In [21]:
cust_inner_join = cust_info.join(purchase_info, [cust_info.CustomerID == purchase_info.CustomerID], 'inner').drop("CustomerID")
cust_inner_join_cnt = cust_inner_join.count()

In [22]:
cust_inner_join.show(truncate=False)
print(f"Target Count Records after joining {cust_inner_join_cnt} ")

+-----------------+----------------+------------------------+----------+------------+--------------+
|CustomerFirstName|CustomerLastName|StreetAddress           |PurchaseID|PurchaseDate|PurchaseAmount|
+-----------------+----------------+------------------------+----------+------------+--------------+
|Timothy          |Southon         |79507 Scott Court       |1         |09-04-2020  |28238         |
|Shelton          |Reddings        |78147 Hayes Park        |2         |10/14/2020  |29528         |
|Gibbie           |Purcell         |875 Harper Plaza        |3         |6/18/2020   |22763         |
|Marius           |Warbys          |59 Redwing Avenue       |4         |10/15/2020  |24043         |
|Merrick          |Wymer           |053 Pankratz Trail      |5         |01-01-2020  |20814         |
|Shelton          |Reddings        |78147 Hayes Park        |6         |01-02-2020  |27609         |
|Trev             |Stanyer         |8 Fairfield Crossing    |7         |10-10-2020  |23244 

In [18]:
### B . Perform Inner Join between Customer and Purchase Dataset on CustomerID column for significant sales i.e. sale value >1000 
### You can provide additional conditions apart from joining conditions

In [23]:
cust_inner_join_high_purch = cust_info.join(purchase_info, [cust_info.CustomerID == purchase_info.CustomerID,purchase_info.PurchaseAmount > 1000 ], 'inner').drop("CustomerID")
cust_inner_join_high_purch_cnt = cust_inner_join_high_purch.count()

In [24]:
cust_inner_join_high_purch.show(truncate=False)
print(f"Target Count Records after joining {cust_inner_join_high_purch_cnt} ")

+-----------------+----------------+------------------------+----------+------------+--------------+
|CustomerFirstName|CustomerLastName|StreetAddress           |PurchaseID|PurchaseDate|PurchaseAmount|
+-----------------+----------------+------------------------+----------+------------+--------------+
|Timothy          |Southon         |79507 Scott Court       |1         |09-04-2020  |28238         |
|Shelton          |Reddings        |78147 Hayes Park        |2         |10/14/2020  |29528         |
|Gibbie           |Purcell         |875 Harper Plaza        |3         |6/18/2020   |22763         |
|Marius           |Warbys          |59 Redwing Avenue       |4         |10/15/2020  |24043         |
|Merrick          |Wymer           |053 Pankratz Trail      |5         |01-01-2020  |20814         |
|Shelton          |Reddings        |78147 Hayes Park        |6         |01-02-2020  |27609         |
|Trev             |Stanyer         |8 Fairfield Crossing    |7         |10-10-2020  |23244 

In [25]:
### Total Output Records after inner join and significant purchase accounted for =  991
### Total Output Records after normal inner join only = 995
### No. of Insignificant Purchase records = 4
### 991 = 995-4

In [26]:
### C . Perform Left Outer Join between Customer and Purchase Dataset on CustomerID column 
### This will have all records from Customer Dataset including IDs from CustID31 to CustID50 but not unmatched Customers from Purchase Dataset

In [27]:
cust_left_join = cust_info.join(purchase_info, [cust_info.CustomerID == purchase_info.CustomerID], 'left').drop("CustomerID")
cust_left_join_cnt = cust_left_join.count()

In [28]:
cust_left_join.show(truncate=False)
print(f"Target Count Records after joining {cust_left_join_cnt} ")

+-----------------+----------------+----------------------+----------+------------+--------------+
|CustomerFirstName|CustomerLastName|StreetAddress         |PurchaseID|PurchaseDate|PurchaseAmount|
+-----------------+----------------+----------------------+----------+------------+--------------+
|Mikel            |Rankling        |2901 American Ash Road|987       |8/26/2020   |28643         |
|Mikel            |Rankling        |2901 American Ash Road|986       |2/24/2020   |27647         |
|Mikel            |Rankling        |2901 American Ash Road|940       |1/19/2020   |22662         |
|Mikel            |Rankling        |2901 American Ash Road|904       |7/29/2020   |22284         |
|Mikel            |Rankling        |2901 American Ash Road|899       |7/25/2020   |29865         |
|Mikel            |Rankling        |2901 American Ash Road|865       |6/15/2020   |29858         |
|Mikel            |Rankling        |2901 American Ash Road|859       |09-07-2020  |26229         |
|Mikel    

In [29]:
### D . Perform Full Outer Join between Customer and Purchase Dataset on CustomerID column 
### This will have all records from Customer Dataset including IDs from CustID31 to CustID50 also all records from Purchase Dataset including for IDS custID3074,custID75,custID87

In [31]:
cust_full_join = cust_info.join(purchase_info, [cust_info.CustomerID == purchase_info.CustomerID], 'full').drop("CustomerID")
cust_full_join_cnt = cust_full_join.count()

In [32]:
cust_full_join.show(truncate=False)
print(f"Target Count Records after joining {cust_full_join_cnt} ")

+-----------------+----------------+---------------+----------+------------+--------------+
|CustomerFirstName|CustomerLastName|StreetAddress  |PurchaseID|PurchaseDate|PurchaseAmount|
+-----------------+----------------+---------------+----------+------------+--------------+
|Leon             |Coales          |546 Darwin Park|46        |12/31/2019  |28319         |
|Leon             |Coales          |546 Darwin Park|57        |9/15/2020   |24450         |
|Leon             |Coales          |546 Darwin Park|62        |8/29/2020   |29697         |
|Leon             |Coales          |546 Darwin Park|92        |7/26/2020   |20162         |
|Leon             |Coales          |546 Darwin Park|144       |7/24/2020   |25351         |
|Leon             |Coales          |546 Darwin Park|160       |07-11-2020  |23886         |
|Leon             |Coales          |546 Darwin Park|176       |7/31/2020   |24463         |
|Leon             |Coales          |546 Darwin Park|194       |11-10-2020  |2926

In [None]:
### E . Perform Left Semi Join between Customer and Purchase Dataset on CustomerID column 
### This is similar to inner join except that it returns only Columns from the table on left i.e.  Customer

In [33]:
cust_leftsemi_join = cust_info.join(purchase_info, [cust_info.CustomerID == purchase_info.CustomerID], 'leftsemi').drop("CustomerID")
cust_leftsemi_join_cnt = cust_leftsemi_join.count()

In [34]:
cust_leftsemi_join.show(truncate=False)
print(f"Target Count Records after joining {cust_leftsemi_join_cnt} ")

+-----------------+----------------+------------------------+
|CustomerFirstName|CustomerLastName|StreetAddress           |
+-----------------+----------------+------------------------+
|Mikel            |Rankling        |2901 American Ash Road  |
|Nial             |Rentcome        |08928 Sunfield Plaza    |
|Leon             |Coales          |546 Darwin Park         |
|Timothy          |Southon         |79507 Scott Court       |
|Thedric          |Firbank         |755 Ramsey Terrace      |
|Gerrie           |Barr            |477 Havey Hill          |
|Hagen            |Maris           |7864 Hoepker Crossing   |
|Winthrop         |Lantaph         |92172 Stoughton Court   |
|Martainn         |Couch           |167 Calypso Terrace     |
|Marcos           |Perrin          |223 Paget Road          |
|Trev             |Stanyer         |8 Fairfield Crossing    |
|Glenn            |Leport          |8 Tomscot Place         |
|Odell            |Wakley          |52 Orin Avenue          |
|Vale   

In [36]:
### F . Perform Left Anti Join between Customer and Purchase Dataset on CustomerID column 
### This will have all unmatched records from Customer Dataset i.e.  IDs from CustID31 to CustID50 and returns columns only from Customer Dataset

In [35]:
cust_leftanti_join = cust_info.join(purchase_info, [cust_info.CustomerID == purchase_info.CustomerID], 'leftanti').drop("CustomerID")
cust_leftanti_join_cnt = cust_leftanti_join.count()

In [37]:
cust_leftanti_join.show(truncate=False)
print(f"Target Count Records after joining {cust_leftanti_join_cnt} ")

+-----------------+----------------+----------------------+
|CustomerFirstName|CustomerLastName|StreetAddress         |
+-----------------+----------------+----------------------+
|Toiboid          |Farnon          |751 Hanson Drive      |
|Normand          |Eriksson        |7 Village Green Drive |
|Shelton          |Yansons         |95 Grim Park          |
|Jeromy           |Gallimore       |9 Hermina Alley       |
|Joachim          |Stoeck          |66452 Lakeland Street |
|Bruno            |Thatcham        |298 Scofield Court    |
|Markos           |Doley           |3410 Hanover Hill     |
|Alexio           |Hubbock         |093 Village Green Road|
|Lucian           |Overstreet      |981 Jay Hill          |
|Norman           |Ranns           |39 Mandrake Plaza     |
|Dov              |Bentote         |1 Mcbride Trail       |
|Lonnie           |Jira            |65 Spenser Way        |
|Carrol           |Balkwill        |316 Mesta Plaza       |
|Westbrook        |Camm            |1647

# Store all output records in CSV,TSV,| as well as JSON from Example Transformations in A,B,C,D,E,F

In [39]:
# Persist all the output transformation to ensure they are not computed multiple times

In [40]:
cust_inner_join.persist()
cust_inner_join_high_purch.persist()
cust_left_join.persist()
cust_full_join.persist()
cust_leftsemi_join.persist()
cust_leftanti_join.persist()

DataFrame[CustomerFirstName: string, CustomerLastName: string, StreetAddress: string]

In [41]:
# Writing Records in CSV,TSV,| and JSON for cust_inner_join
cust_inner_join.write.format('csv').options(delimiter=',').option("header","true").save(f'{output_datset_path_csv}/cust_inner_join')
cust_inner_join.write.format('csv').options(delimiter='\t').option("header","true").save(f'{output_datset_path_tab}/cust_inner_join')
cust_inner_join.write.format('csv').options(delimiter='|').option("header","true").save(f'{output_datset_path_pipe}/cust_inner_join')
cust_inner_join.write.format('json').save(f'{output_datset_path_json}/cust_inner_join')

In [42]:
# Writing Records in CSV,TSV,| and JSON for cust_inner_join_high_purch
cust_inner_join_high_purch.write.format('csv').options(delimiter=',').option("header","true").save(f'{output_datset_path_csv}/cust_inner_join_high_purch')
cust_inner_join_high_purch.write.format('csv').options(delimiter='\t').option("header","true").save(f'{output_datset_path_tab}/cust_inner_join_high_purch')
cust_inner_join_high_purch.write.format('csv').options(delimiter='|').option("header","true").save(f'{output_datset_path_pipe}/cust_inner_join_high_purch')
cust_inner_join_high_purch.write.format('json').save(f'{output_datset_path_json}/cust_inner_join_high_purch')

In [43]:
# Writing Records in CSV,TSV,| and JSON for cust_left_join
cust_left_join.write.format('csv').options(delimiter=',').option("header","true").save(f'{output_datset_path_csv}/cust_left_join')
cust_left_join.write.format('csv').options(delimiter='\t').option("header","true").save(f'{output_datset_path_tab}/cust_left_join')
cust_left_join.write.format('csv').options(delimiter='|').option("header","true").save(f'{output_datset_path_pipe}/cust_left_join')
cust_left_join.write.format('json').save(f'{output_datset_path_json}/cust_left_join')

In [44]:
# Writing Records in CSV,TSV,| and JSON for cust_full_join
cust_full_join.write.format('csv').options(delimiter=',').option("header","true").save(f'{output_datset_path_csv}/cust_full_join')
cust_full_join.write.format('csv').options(delimiter='\t').option("header","true").save(f'{output_datset_path_tab}/cust_full_join')
cust_full_join.write.format('csv').options(delimiter='|').option("header","true").save(f'{output_datset_path_pipe}/cust_full_join')
cust_full_join.write.format('json').save(f'{output_datset_path_json}/cust_full_join')

In [45]:
# Writing Records in CSV,TSV,| and JSON for cust_leftsemi_join
cust_leftsemi_join.write.format('csv').options(delimiter=',').option("header","true").save(f'{output_datset_path_csv}/cust_leftsemi_join')
cust_leftsemi_join.write.format('csv').options(delimiter='\t').option("header","true").save(f'{output_datset_path_tab}/cust_leftsemi_join')
cust_leftsemi_join.write.format('csv').options(delimiter='|').option("header","true").save(f'{output_datset_path_pipe}/cust_leftsemi_join')
cust_leftsemi_join.write.format('json').save(f'{output_datset_path_json}/cust_leftsemi_join')

In [46]:
# Writing Records in CSV,TSV,| and JSON for cust_leftanti_join
cust_leftanti_join.write.format('csv').options(delimiter=',').option("header","true").save(f'{output_datset_path_csv}/cust_leftanti_join')
cust_leftanti_join.write.format('csv').options(delimiter='\t').option("header","true").save(f'{output_datset_path_tab}/cust_leftanti_join')
cust_leftanti_join.write.format('csv').options(delimiter='|').option("header","true").save(f'{output_datset_path_pipe}/cust_leftanti_join')
cust_leftanti_join.write.format('json').save(f'{output_datset_path_json}/cust_leftanti_join')