#PySpark joins - beyond the basics

##Syntax - PySpark join

`dataframe1.join(dataframe2, on=None, how=None)`

**Parameters:**

  *other* : Other dataframe, which is the RIGHT side of the join

  *on* : Which column(s) to join ON?

  *how* : Join type as string. Default INNER. 

**Returns:** Joined dataframe.

##Focus of this notebook: How to join dataframes?

The previous part 1 of the series focused on Join Types.

Now, in this part 2, the primary focus will be on writing Join Conditions. That is, how to join dataframes.

##ER Diagram: tpch database

Note:  tpch is a database in samples catalog that's **available in Databricks free edition**

<img src="./ERDiagram_Databricks_Samples_catalog_tpch_db.png" alt="ERDiagram_Databricks_Samples_catalog_tpch_db.png"/>

##Example data 1 (samples.tpch)

Note: The **samples catalog** and the **tpch schema** are available by default in Databricks free edition.

In [0]:
df_customer = spark.read.table("samples.tpch.customer")
df_orders = spark.read.table("samples.tpch.orders")
df_lineitem = spark.read.table("samples.tpch.lineitem")
df_partsupp = spark.read.table("samples.tpch.partsupp")
df_part = spark.read.table("samples.tpch.part")
df_supplier = spark.read.table("samples.tpch.supplier")

# df_customer.show(5, truncate=False)
# df_orders.show(5, truncate=False)
# df_lineitem.show(5, truncate=False)
# df_partsupp.show(5, truncate=False)
# df_part.show(5, truncate=False)
# df_supplier.show(5, truncate=False)

##Before you start writing PySpark joins

Before we start writing PySpark joins, it's important to have clarity on a few key points:

1. Which join type to use
2. Which columns to join the dataframes on
3. Choose a column access method
4. Choose a dataframe aliasing approach


###(i) Join types - my blog post

Deciding on the join type is driven by the functional requirement. 

However, to have a clear understanding of various join types that Spark supports, I have explained it in detail in my blog-post on Medium.

https://medium.com/@jpilli/write-pyspark-joins-like-a-pro-part-1-join-types-explained-a5a2f43678cb

###INNER JOIN was used in the examples in here!

Although most of the examples in this notebook used INNER JOIN as the join type, switching join type from one join to another is a matter of simply changing the join type string value in the join transformation, with the exception of cross join. In case of cross join, apart from specifying join type as "cross", you'd need to change join condition to None.


###(ii) Which columns to join dataframes on?

This is driven by entity relationship of the tables that you want to join.

For this demo, you can refer to the ER Diagram provided earlier in this notebook.

###(iii) Column Access Methods - best practice recommendations

A **hybrid approach** is recommended, as no single method covers all scenarios for accessing columns in a DataFrame.

**Option A**

* Use **col()** for most transformations, including when working with aliased/aggregated columns or column names containing special characters.
* Use **string literals** only when:
  - Specifying a new column name with `withColumn()`, or
  - Renaming a column with `withColumnRenamed()`.

**Option B**

* Use **dot notation** for concise, readable code. It is familiar to Python users and provides a consistent coding style.
* Use **col()** when working with aliased/aggregated columns or columns with special characters.
* Use **string literals** only when:
  - Specifying a new column name with `withColumn()`, or
  - Renaming a column with `withColumnRenamed()`.

**Either option works well**; choosing between them is a matter of what suits you best.

Note: The best practice recommendation as above is an extract from my blog post: 

https://medium.com/@jpilli/pyspark-dataframes-which-column-access-method-should-you-use-best-practices-explained-f86c69d67fb8

###Column Access Methods used in these demos

Most of the code examples in this notebook are available in:
* Dot notation approach, as well as
* col() function approach

###(iv) Dataframe aliasing approaches

* Approach 1: Alias dataframes before join transformation
* Approach 2: Alias dataframes inline, in join transformation

####Alias dataframes before performing a join

When dataframes are aliased before the join transformation statement, then:
* Both Dot notation and `col()` function work fine

In [0]:
from pyspark.sql.functions import col

#Tip: Ensure the variable name and string name passed to df.alias() method are exactly the same
df_cust_aliased1 = df_customer.alias("df_cust_aliased1") 
df_ord_aliased1 = df_orders.alias("df_ord_aliased1")

df_result_DFsAliasedBefore = (
    df_cust_aliased1
    .join(df_ord_aliased1, 

        #using dot notation : Works fine
        (df_cust_aliased1.c_custkey == df_ord_aliased1.o_custkey),

        #using col() : Works fine
        # (col("df_cust_aliased1.c_custkey") == col("df_ord_aliased1.o_custkey")),  

        "inner"
   )
)

df_result_DFsAliasedBefore.display()

####Alias Dataframes inline while performing the join

The **limitations** when dataframes being joined are aliased in-line:
* Dot notation doesn't work. Only `col()` function works fine

In [0]:
from pyspark.sql.functions import col

df_result_DFsAliasedInLine = (
    df_customer.alias("df_cust_aliased")
    .join(df_orders.alias("df_ord_aliased"), 

        #using dot notation : DOES NOT WORK
        #(df_cust_aliased.c_custkey == df_ord_aliased.o_custkey),

        #using col()
        (col("df_cust_aliased.c_custkey") == col("df_ord_aliased.o_custkey")),  

        "inner"
   )
)

df_result_DFsAliasedInLine.display()

####What if I don't alias dataframes while joining them?

**The limitations** when dataframes being joined aren't aliased:
* Dot notation works fine. 
* `col()` function doesn't work. 
* cannot support creating multiple instances of the same dataframe.


In [0]:
from pyspark.sql.functions import col

df_result_NoDFAliases = (
    df_customer
    .join(df_orders, 

        #using dot notation. 
        (df_customer.c_custkey == df_orders.o_custkey),

        #using col() : DOES NOT WORK
        # You need to alias your DataFrames before using col() with qualified column names in join conditions.
        #(col("df_customer.c_custkey") == col("df_orders.o_custkey")),  

        "inner"
   )
)

df_result_NoDFAliases.display()

####Key Takeaways - Aliasing Dataframes being joined

To specify join condition in a transformation statement:

* (*1st preferred*) Alias dataframes **before** performing a join:
    - Benefit: you can use *either dot notation or `col()`* in join condition

* (*2nd preferred*) Alias dataframes **inline** while performing a join:
    - limitation: you can use `col()` but not dot notation in join condition

* If we didn't alias dataframes while performing a join:
    - limitation: you can use *dot notation* but not the `col()`

##1.0 Single Column Join

**Join scenario**: 

* Dataframes to join: *df_customer, df_orders*
* Join condition: *df_customer.c_custkey = df_orders.o_custkey*

###1.1 Alias dataframes before performing the join

Benefits:
* Supports Dot notation as well as col() to access columns

####Using Dot notation

In [0]:
from pyspark.sql.functions import col

#Tip: Ensure the variable name and string name passed to df.alias() method are exactly the same
df_cust_alias = df_customer.alias("df_cust_alias") 
df_ord_alias = df_orders.alias("df_ord_alias")

df_result_scol_DfsAliasedB4_dotNotation = (
    df_cust_alias
    .join(df_ord_alias, 

        #using dot notation : Works fine
        (df_cust_alias.c_custkey == df_ord_alias.o_custkey),

        "inner"
   )
    #Given we aliased dfs before the join transformation, you could use either dot notation or col() to access columns
    .select(
    df_cust_alias.c_custkey.alias("cust_key"),
    df_cust_alias.c_name.alias("cust_name"),
    df_ord_alias.o_orderkey.alias("order_key"),
    df_ord_alias.o_custkey.alias("cust_key_in_orders"),
    df_ord_alias.o_orderstatus.alias("order_status"),
    df_ord_alias.o_totalprice.alias("order_total_price")
    )
    #Tip: Avoid sort() unless required
    .sort(col("cust_key").asc()) #For code brevity, prefer referencing aliased column names in sort() by using col()
          
    #To use dot notation in sort(), you'll need to reference original column name rather than aliased name
    #.sort(df_ord_alias.o_custkey.asc()) 
)

df_result_scol_DfsAliasedB4_dotNotation.display()

####Using col() function

In [0]:
from pyspark.sql.functions import col

#Tip: Ensure the variable name and string name passed to df.alias() method are exactly the same
df_cust_alias = df_customer.alias("df_cust_alias") 
df_ord_alias = df_orders.alias("df_ord_alias")

df_result_scol_DfsAliasedB4_col = (
    df_cust_alias
    .join(df_ord_alias, 

        #using col() : Works fine
        (col("df_cust_alias.c_custkey") == col("df_ord_alias.o_custkey")),  

        "inner"
   )
    #Given we aliased dfs before the join transformation, you could use either dot notation or col() to access columns
    .select(
    col("df_cust_alias.c_custkey").alias("cust_key"),
    col("df_cust_alias.c_name").alias("cust_name"),
    col("df_ord_alias.o_orderkey").alias("order_key"),
    col("df_ord_alias.o_custkey").alias("cust_key_in_orders"),
    col("df_ord_alias.o_orderstatus").alias("order_status"),
    col("df_ord_alias.o_totalprice").alias("order_total_price")
    )
    #Tip: Avoid sort() unless required
    .sort(col("cust_key_in_orders").asc()) #With col(), you get to reference aliased column names in sort()
)

df_result_scol_DfsAliasedB4_col.display()

###1.2 Alias Dataframes inline while performing the join

Benefits:
* Supports col() to access columns

Limitations:
* Doesn't support dot notation to access columns

####Using Dot notation - NOT SUPPORTED

####Using col() function

In [0]:
from pyspark.sql.functions import col

df_result_scol_DfsAliasedInLine_col = (
    df_customer.alias("df_cust_alias")
    .join(df_orders.alias("df_ord_alias"), 

        #using col() : Works fine
        (col("df_cust_alias.c_custkey") == col("df_ord_alias.o_custkey")),  

        "inner"
   )
    # IMPORTANT: Given we aliased dataframes inline, only col() function is supported to access columns.
    .select(
    col("df_cust_alias.c_custkey").alias("cust_key"),
    col("df_cust_alias.c_name").alias("cust_name"),
    col("df_ord_alias.o_orderkey").alias("order_key"),
    col("df_ord_alias.o_custkey").alias("cust_key_in_orders"),
    col("df_ord_alias.o_orderstatus").alias("order_status"),
    col("df_ord_alias.o_totalprice").alias("order_total_price")
    )
    #Tip: Avoid sort() unless required
    .sort(col("cust_key_in_orders").asc()) #With col(), you get to reference aliased column names in sort()
)

df_result_scol_DfsAliasedInLine_col.display()

##2.0 Multi-Column Joins

i.e Join condition on two or more columns

**Join scenario**: 

* Dataframes to join: *df_lineitem, df_partsupp*
* Join condition: 
    - *df_lineitem.l_partkey = df_partsupp.ps_partkey*
    - *df_lineitem.l_suppkey = df_partsupp.ps_suppkey*

Tip 1: It is the same syntax whether your join condition is on two columns or many columns.

Tip 2: While specifying joins on multiple columns, 
* enclose each column join expression in a separate parentheses ()
* optionally, enclose all column join expressions in a List[]

###2.1 Alias dataframes before performing the join

####Using Dot notation

In [0]:
from pyspark.sql.functions import col

#Tip: Ensure the variable name and string name passed to df.alias() method are exactly the same
df_lineitem_alias = df_lineitem.alias("df_lineitem_alias") 
df_partsupp_alias = df_partsupp.alias("df_partsupp_alias")

df_result_mcols_DfsAliasedB4_dot = (
    df_lineitem_alias
    .join(df_partsupp_alias, 

        #using dot notation
        [(df_lineitem_alias.l_partkey == df_partsupp_alias.ps_partkey) &
        (df_lineitem_alias.l_suppkey == df_partsupp_alias.ps_suppkey)],

        "inner"
   )
    #Given we aliased dfs before the join transformation, you could use either dot notation or col() to access columns
    .select(
    df_lineitem_alias.l_orderkey.alias("orderkey"),
    df_lineitem_alias.l_partkey.alias("partkey_InLI"),
    df_lineitem_alias.l_suppkey.alias("suppkey_InLI"),
    df_lineitem_alias.l_linenumber.alias("linenumber"),
    df_lineitem_alias.l_quantity.alias("quantity"),
    df_partsupp_alias.ps_partkey.alias("partkey_inPartSupp"),
    df_partsupp_alias.ps_suppkey.alias("suppkey_inPartSupp"),
    df_partsupp_alias.ps_availqty.alias("availqty")
    )
    #Tip: Avoid sort() unless required
    .sort(col("orderkey").asc(), col("linenumber").asc()) #For code brevity, prefer referencing aliased column names in sort() by using col()

    #To use dot notation in sort(), you'll need to reference original column name rather than aliased name
    #.sort(df_lineitem_alias.l_orderkey.asc(), df_lineitem_alias.l_linenumber.asc()) 
)

df_result_mcols_DfsAliasedB4_dot.display()


####Using col() function

In [0]:
from pyspark.sql.functions import col

#Tip: Ensure the variable name and string name passed to df.alias() method are exactly the same
df_lineitem_alias = df_lineitem.alias("df_lineitem_alias") 
df_partsupp_alias = df_partsupp.alias("df_partsupp_alias")

df_result_mcols_DfsAliasedB4_col = (
    df_lineitem_alias
    .join(df_partsupp_alias, 

        #using col()
        [(col("df_lineitem_alias.l_partkey") == col("df_partsupp_alias.ps_partkey")) &
        (col("df_lineitem_alias.l_suppkey") == col("df_partsupp_alias.ps_suppkey"))],

        "inner"
   )
    #Given we aliased dfs before the join transformation, you could use either dot notation or col() to access columns
    .select(
    col("df_lineitem_alias.l_orderkey").alias("orderkey"),
    col("df_lineitem_alias.l_partkey").alias("partkey_InLI"),
    col("df_lineitem_alias.l_suppkey").alias("suppkey_InLI"),
    col("df_lineitem_alias.l_linenumber").alias("linenumber"),
    col("df_lineitem_alias.l_quantity").alias("quantity"),

    col("df_partsupp_alias.ps_partkey").alias("partkey_inPartSupp"),
    col("df_partsupp_alias.ps_suppkey").alias("suppkey_inPartSupp"),
    col("df_partsupp_alias.ps_availqty").alias("availqty")
    )
    #Tip: Avoid sort() unless required
    .sort(col("orderkey").asc(), col("linenumber").asc()) #With col(), you get to reference aliased column names in sort()
)

df_result_mcols_DfsAliasedB4_col.display()



###2.2 Alias Dataframes inline while performing the join

####Using Dot notation - NOT SUPPORTED

####Using col() function

In [0]:
from pyspark.sql.functions import col

df_result_mcols_DfsAliasedInLine_col = (
    df_lineitem.alias("df_lineitem_alias")
    .join(df_partsupp.alias("df_partsupp_alias"), 

        #using col()
        [(col("df_lineitem_alias.l_partkey") == col("df_partsupp_alias.ps_partkey")) &
        (col("df_lineitem_alias.l_suppkey") == col("df_partsupp_alias.ps_suppkey"))],

        "inner"
   )
    # IMPORTANT: Given we aliased dataframes inline, only col() function is supported to access columns.
    .select(
    col("df_lineitem_alias.l_orderkey").alias("orderkey"),
    col("df_lineitem_alias.l_partkey").alias("partkey_InLI"),
    col("df_lineitem_alias.l_suppkey").alias("suppkey_InLI"),
    col("df_lineitem_alias.l_linenumber").alias("linenumber"),
    col("df_lineitem_alias.l_quantity").alias("quantity"),

    col("df_partsupp_alias.ps_partkey").alias("partkey_inPartSupp"),
    col("df_partsupp_alias.ps_suppkey").alias("suppkey_inPartSupp"),
    col("df_partsupp_alias.ps_availqty").alias("availqty")
    )
    #Tip: Avoid sort() unless required
    .sort(col("orderkey").asc(), col("linenumber").asc()) #With col(), you get to reference aliased column names in sort()
)

df_result_mcols_DfsAliasedInLine_col.display()


##3.0 Multi-table Joins

**Join scenario**: 

* Dataframes to join: *df_lineitem, df_partsupp, df_part, df_supplier*
* Join condition: 
    - *df_lineitem.l_partkey = df_partsupp.ps_partkey*
    - *df_lineitem.l_suppkey = df_partsupp.ps_suppkey*
    - *df_partsupp.ps_partkey = df_part.p_partkey*
    - *df_partsupp.ps_suppkey = df_supplier.s_suppkey*

###3.1 Alias dataframes before performing the join

####Using Dot notation

In [0]:
from pyspark.sql.functions import col

#Tip: Ensure the variable name and string name passed to df.alias() method are exactly the same
df_lineitem_alias = df_lineitem.alias("df_lineitem_alias") 
df_partsupp_alias = df_partsupp.alias("df_partsupp_alias")
df_part_alias = df_part.alias("df_part_alias")
df_supplier_alias = df_supplier.alias("df_supplier_alias")

df_result_mdfs_DfsAliasedB4_dotNotation = (
    df_lineitem_alias
    #Join LineItems with PartSupplier table
    .join(df_partsupp_alias, 

        [(df_lineitem_alias.l_partkey == df_partsupp_alias.ps_partkey) &
        (df_lineitem_alias.l_suppkey == df_partsupp_alias.ps_suppkey)],

        "inner"
    )
    #Join PartSupplier table to Parts table
    .join(df_part_alias,
          (df_partsupp_alias.ps_partkey == df_part_alias.p_partkey),
        "inner"      
    )
    #Join PartSupplier table to Supplier table
    .join(df_supplier_alias,
          (df_partsupp_alias.ps_suppkey == df_supplier_alias.s_suppkey),
        "inner"      
    )
        
    #Given we aliased dfs before the join transformation, you could use either dot notation or col() to access columns
    .select(
    df_lineitem_alias.l_orderkey.alias("orderkey"),
    df_lineitem_alias.l_partkey.alias("partkey_InLI"),
    df_lineitem_alias.l_suppkey.alias("suppkey_InLI"),
    df_lineitem_alias.l_linenumber.alias("linenumber"),
    df_lineitem_alias.l_quantity.alias("quantity"),

    df_partsupp_alias.ps_partkey.alias("partkey_inPartSupp"),
    df_partsupp_alias.ps_suppkey.alias("suppkey_inPartSupp"),
    df_partsupp_alias.ps_availqty.alias("availqty"),
    df_partsupp_alias.ps_supplycost.alias("supplycost"),

    df_part_alias.p_partkey.alias("partkey_inPart"),
    df_part_alias.p_name.alias("partname"),

    df_supplier_alias.s_suppkey.alias("suppkey_inSupplier"),
    df_supplier_alias.s_name.alias("suppliername")    
    )
    #Tip: Avoid sort() unless required
    .sort(col("orderkey").asc(), col("linenumber").asc()) #For code brevity, prefer referencing aliased column names in sort() by using col()

    #To use dot notation in sort(), you'll need to reference original column name rather than aliased name
    #.sort(df_lineitem_alias.l_orderkey.asc(), df_lineitem_alias.l_linenumber.asc()) 
)

df_result_mdfs_DfsAliasedB4_dotNotation.display()



####Using col() function

In [0]:
from pyspark.sql.functions import col

#Tip: Ensure the variable name and string name passed to df.alias() method are exactly the same
df_lineitem_alias = df_lineitem.alias("df_lineitem_alias") 
df_partsupp_alias = df_partsupp.alias("df_partsupp_alias")
df_part_alias = df_part.alias("df_part_alias")
df_supplier_alias = df_supplier.alias("df_supplier_alias")

df_result_mdfs_DfsAliasedB4_col = (
    df_lineitem_alias
    #Join LineItems with PartSupplier table
    .join(
        df_partsupp_alias,
        (col("df_lineitem_alias.l_partkey") == col("df_partsupp_alias.ps_partkey")) &
        (col("df_lineitem_alias.l_suppkey") == col("df_partsupp_alias.ps_suppkey")),
        "inner"
    )
    #Join PartSupplier table to Parts table
    .join(
        df_part_alias,
        col("df_partsupp_alias.ps_partkey") == col("df_part_alias.p_partkey"),
        "inner"
    )
    #Join PartSupplier table to Supplier table
    .join(
        df_supplier_alias,
        col("df_partsupp_alias.ps_suppkey") == col("df_supplier_alias.s_suppkey"),
        "inner"
    )
    .select(
        col("df_lineitem_alias.l_orderkey").alias("orderkey"),
        col("df_lineitem_alias.l_partkey").alias("partkey_InLI"),
        col("df_lineitem_alias.l_suppkey").alias("suppkey_InLI"),
        col("df_lineitem_alias.l_linenumber").alias("linenumber"),
        col("df_lineitem_alias.l_quantity").alias("quantity"),
        col("df_partsupp_alias.ps_partkey").alias("partkey_inPartSupp"),
        col("df_partsupp_alias.ps_suppkey").alias("suppkey_inPartSupp"),
        col("df_partsupp_alias.ps_availqty").alias("availqty"),
        col("df_part_alias.p_partkey").alias("partkey_inPart"),
        col("df_part_alias.p_name").alias("partname"),
        col("df_supplier_alias.s_suppkey").alias("suppkey_inSupplier"),
        col("df_supplier_alias.s_name").alias("suppliername")
    )
    #Tip: Avoid sort() unless required
    .sort(col("orderkey").asc(), col("linenumber").asc()) 

)

df_result_mdfs_DfsAliasedB4_col.display()


###3.2 Alias Dataframes inline while performing the join

####Using Dot notation - NOT SUPPORTED

####Using col() function

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.functions import col

df_resut_mdfs_DfsAliasedInLine_col = (
    df_lineitem.alias("df_lineitem_alias")
    #Join LineItems with PartSupplier table
    .join(
        df_partsupp.alias("df_partsupp_alias"),
        (col("df_lineitem_alias.l_partkey") == col("df_partsupp_alias.ps_partkey")) &
        (col("df_lineitem_alias.l_suppkey") == col("df_partsupp_alias.ps_suppkey")),
        "inner"
    )
    #Join PartSupplier table to Parts table    
    .join(
        df_part.alias("df_part_alias"),
        col("df_partsupp_alias.ps_partkey") == col("df_part_alias.p_partkey"),
        "inner"
    )
    #Join PartSupplier table to Supplier table
    .join(
        df_supplier.alias("df_supplier_alias"),
        col("df_partsupp_alias.ps_suppkey") == col("df_supplier_alias.s_suppkey"),
        "inner"
    )
    .select(
        col("df_lineitem_alias.l_orderkey").alias("orderkey"),
        col("df_lineitem_alias.l_partkey").alias("partkey_InLI"),
        col("df_lineitem_alias.l_suppkey").alias("suppkey_InLI"),
        col("df_lineitem_alias.l_linenumber").alias("linenumber"),
        col("df_lineitem_alias.l_quantity").alias("quantity"),
        col("df_partsupp_alias.ps_partkey").alias("partkey_inPartSupp"),
        col("df_partsupp_alias.ps_suppkey").alias("suppkey_inPartSupp"),
        col("df_partsupp_alias.ps_availqty").alias("availqty"),
        col("df_part_alias.p_partkey").alias("partkey_inPart"),
        col("df_part_alias.p_name").alias("partname"),
        col("df_supplier_alias.s_suppkey").alias("suppkey_inSupplier"),
        col("df_supplier_alias.s_name").alias("suppliername")
    )
    #Tip: Avoid sort() unless required
    .sort(col("orderkey").asc(), col("linenumber").asc()) 

)

df_resut_mdfs_DfsAliasedInLine_col.display()


##4.0 Self-join

###Example data 2 (self-join)

Join the same dataframe to itself.

**Join scenario**: 

* Dataframes to join: *df_employee (instance 1), df_employee (instance 2)*
* Join condition: *df_instance1.manager_id = df_instance2.emp_id*

Example data in *df_employee* looks as below:

| emp_id | emp_name | job_title | manager_id |
|--------|----------|-----------|------------|
|101|David|CEO|None|
|102|Kevin|GM|101|
|103|Lisa|EM|102|
|104|Mike|PM|103|
|105|Jack|Data Engineer|104|
|106|Melissa|Data Engineer|104|


Load *df_employee* dataframe with example data:

In [0]:
employee_data = (
    (101,"David","CEO",None),
    (102,"Kevin","GM",101),
    (103,"Lisa","EM",102),
    (104,"Mike","PM",103),
    (105,"Jack","Data Engineer",104),
    (106,"Melissa","Data Engineer",104)
  )
employee_schema = "emp_id: int, emp_name: string, job_title: string, manager_id: int"

df_employee = spark.createDataFrame(data = employee_data, schema = employee_schema)

df_employee.select("*").display()

###Using dot notation

In [0]:
from pyspark.sql.functions import col, expr

#Tip: Ensure the variable name and string name passed to alias() method are exactly the same
df_emp = df_employee.alias("df_emp")
df_mgr = df_employee.alias("df_mgr")

df_result_selfjoin = (
    df_emp
    .join(
        df_mgr,
        df_emp.manager_id == df_mgr.emp_id,
        "left_outer" #Tip: Use left_outer join to be able to include employees that don't have a manager e.g. CEO
    )
    .select(
        df_emp.emp_id.alias("Employee_Id"),
        df_emp.emp_name.alias("Employee_Name"),
        df_emp.job_title.alias("Employee_Job_Title"),
        df_mgr.emp_id.alias("Manager_Id"),
        df_mgr.emp_name.alias("Manager_Name"),
        df_mgr.job_title.alias("Manager_Job_Title")
    )
    #Tip: Avoid sort() unless required
    .sort(col("Employee_Id").asc()) #For code brevity, prefer referencing aliased column names in sort() by using col()
)

df_result_selfjoin.display()

###Using col() function

In [0]:
from pyspark.sql.functions import col, expr

#Tip: Ensure the variable name and string name passed to alias() method are exactly the same 
df_emp = df_employee.alias("df_emp")
df_mgr = df_employee.alias("df_mgr")

df_result_selfjoin = (
    df_emp
    .join(
        df_mgr,
        col("df_emp.manager_id") == col("df_mgr.emp_id"),
        "left_outer" #Tip: Use left_outer join to be able to include employees that don't have a manager e.g. CEO
    )
    .select(
        col("df_emp.emp_id").alias("Employee_Id"),
        col("df_emp.emp_name").alias("Employee_Name"),
        col("df_emp.job_title").alias("Employee_Job_Title"),
        col("df_mgr.emp_id").alias("Manager_Id"),
        col("df_mgr.emp_name").alias("Manager_Name"),
        col("df_mgr.job_title").alias("Manager_Job_Title")
    )
)

df_result_selfjoin.display()

##5.0 Using string literals as column names to join dataframes

**Pre-requisites / Limitations**:
* The column names used in the join condition must be identical in both the dataframes

**Join scenario**: 

* Dataframes to join: *df_mockup_orders, df_mockup_lineitems, df_mockup_partsupp*
* Join condition: 
    - *df_mockup_orders.orderkey = df_mockup_lineitems.orderkey*
    - *df_mockup_lineitems.partkey = df_mockup_partsupp.partkey*
    - *df_mockup_lineitems.suppkey = df_mockup_partsupp.suppkey*


###Example data 3 (string literals as joins)

In [0]:
from decimal import Decimal

#1. df_mockup_orders
df_mockup_orders = spark.createDataFrame(
    [
        (1001,21,'Open', Decimal('45.00')),
        (1002,22,'Open', Decimal('60.00')),
        (1003,23,'Completed', Decimal('70.00')),
        (1004,24,'Completed', Decimal('90.00')),
        (1005,25,'Pending', Decimal('77.55')) 
    ],
    "orderkey: bigint, custkey: bigint, orderstatus: string, totalprice: decimal(18,2)"
)

df_mockup_orders.display()

#2. df_mockup_lineitems
df_mockup_lineitems = spark.createDataFrame(
    [
        (1001,301,881,1,2),
        (1002,311,881,1,3),
        (1003,315,881,1,5),
        (1004,321,881,1,2),
        (1005,324,881,1,6)
    ],
    "orderkey: bigint, partkey: bigint, suppkey: bigint, linenumber: int, quantity: int"
)
df_mockup_lineitems.display()

#3. df_mockup_partsupp
df_mockup_partsupp = spark.createDataFrame(
    [
        (301,881,100),
        (311,881,100),
        (315,881,100),
        (321,881,100),
        (324,881,100) 
    ],
    "partkey: bigint, suppkey: bigint, availqty: int"
)

df_mockup_partsupp.display()


###Single column join

In [0]:

df_stringjoin_singlecol = (
  df_mockup_orders
  .join(df_mockup_lineitems,
        "orderkey", #single column - join condition
        "inner" #Default: INNER
  )
  .select(df_mockup_orders.orderkey, df_mockup_orders.custkey, df_mockup_orders.orderstatus, df_mockup_orders.totalprice, df_mockup_lineitems.linenumber, df_mockup_lineitems.quantity)
  
  #Tip: Avoid sort() unless required
  .sort("orderkey","linenumber")
)

df_stringjoin_singlecol.display()

###Multi-table join

In [0]:

df_stringjoin_multicol = (
  df_mockup_orders
  .join(df_mockup_lineitems,
        "orderkey", #single column - join condition
        "inner" #Default: INNER
  )
  .join(df_mockup_partsupp,
        ["partkey","suppkey"], #Multi-colum join: List[] of column names as join condition
        "inner"
  )  
  .select(df_mockup_orders.orderkey, df_mockup_orders.custkey, df_mockup_orders.orderstatus, df_mockup_orders.totalprice, df_mockup_lineitems.linenumber, df_mockup_lineitems.quantity, df_mockup_partsupp.availqty)

  #Tip: Avoid sort() unless required
  .sort("orderkey","linenumber")
)

df_stringjoin_multicol.display()

##6.0 Tips and tricks

Last but not least, here are some useful tips and tricks to help you write PySpark joins more effectively.

Note: Explanation for each of the tips follows the illustration as below:

<img src="./PySpark Joins - Tips.png" alt="PySpark Joins - Tips.png"/>

###Tip 1: If aliasing dataframes before performing a join, ensure the variable name and string passed as arg to df.alias() method are identical

The benefit and reason behind it:
* Benefit: It gives you flexibility in using the same aliased dataframe name regardless of whether you use dot notation or `col()` function in a join transformation.
* Reason: In the join transformation, 
    - dot notation references variable that `df.alias()` method returned.
    - where as, `col()` function references the string argument that was passed to `df.alias()` method. 

In [0]:
from pyspark.sql.functions import col

#Tip: Notice here that the string argument that was passed to df.alias() was identical to the variable that the df.alias() method returned
df_lineitem_aliased1 = df_lineitem.alias("df_lineitem_aliased1") 
df_partsupp_aliased1 = df_partsupp.alias("df_partsupp_aliased1")


###Tip 2: Prefer parentheses () to support multi-line transformation

Prefer to enclose the entire join transformation on the right-hand side in parentheses() while assigning it to a new dataframe.

For example, in the trimmed-down version of the code as below, the join transformation on the right hand side of the assignment operator was enclosed in opening parenthesis on line 9 and closing parenthesis on line 24.

*Tip*: the opening parenthesis should start on the same line as the assignment operator (=) in assigning the transformation to a new dataframe as in line 9 rather than inserting opening parenthesis in the next line of code.

**Benefits**:
* Eliminates the need to use a backslash ( \ ) at the end of each line in a multi-line transformation
* Multi-line statement improves readability, especially in a complex transformation
* Supports in-line comments

In [0]:
from pyspark.sql.functions import col

df_lineitem_aliased1 = df_lineitem.alias("df_lineitem_aliased1") 
df_partsupp_aliased1 = df_partsupp.alias("df_partsupp_aliased1")

#-----------------------------------------------------------------
#To keep it brief, I’m showing a trimmed-down version of the code.
#-----------------------------------------------------------------
df_tips = (
    df_lineitem_aliased1
    .join(df_partsupp_aliased1, 

        #using dot notation
        [(df_lineitem_aliased1.l_partkey == df_partsupp_aliased1.ps_partkey) &
        (df_lineitem_aliased1.l_suppkey == df_partsupp_aliased1.ps_suppkey)],

        "inner"
   )#.select( #Tip: You could write .select() in this line itself or in the next line
    .select(
    df_lineitem_aliased1.l_orderkey.alias("orderkey"),
    df_lineitem_aliased1.l_partkey.alias("partkey_InLI")
    )
    .sort(col("orderkey").asc())
)

df_tips.display()


**Alternative to using parentheses**: Use **backslash ( \ )** at end of each of the multi-lines

In this example, notice the parentheses at lines 9 and and 24 were substituted by backslash at end of each line of the multi-line transformation.

*Tips*: 
* Ensure no-trailing spaces after each backslash ( \ )
* Backslash ( \ ) doesn't allow in-line comments after the backslash ( \ )


In [0]:
from pyspark.sql.functions import col

df_lineitem_aliased1 = df_lineitem.alias("df_lineitem_aliased1") 
df_partsupp_aliased1 = df_partsupp.alias("df_partsupp_aliased1")

#-----------------------------------------------------------------
#To keep it brief, I’m showing a trimmed-down version of the code.
#-----------------------------------------------------------------
df_tips = \
    df_lineitem_aliased1 \
    .join(df_partsupp_aliased1, \

        #using dot notation \
        [(df_lineitem_aliased1.l_partkey == df_partsupp_aliased1.ps_partkey) & \
        (df_lineitem_aliased1.l_suppkey == df_partsupp_aliased1.ps_suppkey)], \

        "inner" \
   ) \
    .select( \
    df_lineitem_aliased1.l_orderkey.alias("orderkey"), \
    df_lineitem_aliased1.l_partkey.alias("partkey_InLI") \
    ) \
    .sort(col("orderkey").asc()) \


df_tips.display()


###Tip 3: If join condition is complex, consider defining it before performing the join

In the below example, pretending that the join condition is complex:
1. Notice how the join condition was defined and assigned to a variable (as in lines# 7-10), before performing the join
2. And, join transformation (as in line# 16) is referencing the variable representing the join condition

What's the benefit?
* Improved code readability

In [0]:
from pyspark.sql.functions import col

df_lineitem_aliased1 = df_lineitem.alias("df_lineitem_aliased1") 
df_partsupp_aliased1 = df_partsupp.alias("df_partsupp_aliased1")

#pretending that the join condition is complex, define the join condition and store it in a list variable
join_condition_list_1 = [
    (df_lineitem_aliased1.l_partkey == df_partsupp_aliased1.ps_partkey) &
    (df_lineitem_aliased1.l_suppkey == df_partsupp_aliased1.ps_suppkey)
]

df_tips = (
    df_lineitem_aliased1
    .join(df_partsupp_aliased1, 

        join_condition_list_1,

        "inner"
   )
    .select(
    df_lineitem_aliased1.l_orderkey.alias("orderkey"),
    df_partsupp_aliased1.ps_partkey.alias("partkey_inPartSupp")
    )
)

df_tips.display()


###Tip 4: In a multi-column join, enclose each column join expression in a separate parentheses ()

While specifying join condition on multiple columns, 
* enclose each column join expression in a separate parentheses ()
  - e.g. `(df_lineitem_aliased1.l_partkey == df_partsupp_aliased1.ps_partkey)`
* optionally, enclose all column join expressions in a List[]

In [0]:
#-----------------------------------------------------------------
#To keep it brief, I’m showing a trimmed-down version of the code.
#-----------------------------------------------------------------
'''
df_tips = (
    df_lineitem_aliased1
    .join(df_partsupp_aliased1, 

        [
            (df_lineitem_aliased1.l_partkey == df_partsupp_aliased1.ps_partkey) &
            (df_lineitem_aliased1.l_suppkey == df_partsupp_aliased1.ps_suppkey)
        ],

        "inner"
   )
    #...
)
'''

###Tip 5: In a multi-line transformation, ".select()"  statement can start either on the same line as the preceding ")" or on the next line. 

When you use parentheses to wrap multi-line transformation statement, you could start the `.select()` statement either on the same line as the previous `)` or on the next line.

For example, as shown in the trimmed-down version of the code below, the `.select()` could be started either on line# 10 itself or on the next line (line# 11).

In [0]:
#-----------------------------------------------------------------
#To keep it brief, I’m showing a trimmed-down version of the code.
#-----------------------------------------------------------------
'''
df_tips = (
    df_lineitem_aliased1
    .join(df_partsupp_aliased1, 
        ...,
        "inner"
   )#.select( #Tip: You could write .select() in this line itself or in the next line
    .select(
        ...
    )
)
'''

###Tip 6: Avoid using select('*'); instead, select only the columns you actually need.

As a best practice, in a `select()` statement in general and as part of a `join()` transformation in particular, avoid using `select('*')`. Instead, select only those columns that are necessary.

**Why?**

A `join()` transformation is a wide transformation, which means it triggers a **shuffle** (a.k.a exchange). During a shuffle, Spark redistributes data across the cluster based on the join key and writes intermediate results to disk. This process is expensive in terms of both disk I/O and network transfer.

While shuffles are often unavoidable when performing joins, we can still optimize performance by minimizing the columns selected in the output — that is, avoid including redundant or unnecessary columns.

In [0]:
#-----------------------------------------------------------------
#To keep it brief, I’m showing a trimmed-down version of the code.
#-----------------------------------------------------------------
'''
df_tips = (
    #....
    .select(
    df_lineitem_aliased1.l_orderkey.alias("orderkey"),
    df_lineitem_aliased1.l_partkey.alias("partkey_InLI"),
    df_lineitem_aliased1.l_suppkey.alias("suppkey_InLI"),
    df_lineitem_aliased1.l_linenumber.alias("linenumber"),
    df_lineitem_aliased1.l_quantity.alias("quantity"),
    df_partsupp_aliased1.ps_partkey.alias("partkey_inPartSupp"),
    df_partsupp_aliased1.ps_suppkey.alias("suppkey_inPartSupp"),
    df_partsupp_aliased1.ps_availqty.alias("availqty")
    )
)
'''

###Tip 7: Alias columns to avoid ambiguous column names in output of transformation

**Problem scenario:**

What if an identical column name appeared in both the dataframes being joined?

For example, we want to join Customer table with Nation table. But, what if both Customer table and Nation table have a column by name 'NAME'. And, we want the output to contain both the NAME columns?

**Solution:** Alias column names


**Example data for this demo:**

Customer dataframe:

| cust_id | NAME | nation_id |
|--------|----------|------------|
|101|David|1|
|102|Kevin|2|
|103|Lisa|2|

Nation dataframe:

| nation_id | NAME |
|--------|----------|
|1|Australia|
|2|India|


In [0]:
#1. load customer dataframe
customer_data = (
    (101,"David",1),
    (102,"Kevin",2),
    (103,"Lisa",2)
  )
customer_schema = "cust_id: int, NAME: string, nation_id: int"

df_customer_tip7 = spark.createDataFrame(data = customer_data, schema = customer_schema)

#2. load nation dataframe
nation_data = (
    (1,"Australia"),
    (2,"India")
  )
nation_schema = "nation_id: int, NAME: string"

df_nation_tip7 = spark.createDataFrame(data = nation_data, schema = nation_schema)


**Scenario 1**: Alias output column names in a transformation to avoid ambiguity in column names

The printSchema() output looks as below:

```
 |-- Employee_Id: integer (nullable = true)
 |-- Employee_Name: string (nullable = true)
 |-- Employee_Job_Title: string (nullable = true)
 |-- Manager_Id: integer (nullable = true)
 |-- Manager_Name: string (nullable = true)
 |-- Manager_Job_Title: string (nullable = true)
```
Notice in the printSchema() ouput duplicate column names have been eliminated due to column aliasing appropriately.


In [0]:
from pyspark.sql.functions import col, expr

df_cust = df_customer_tip7.alias("df_cust")
df_nat = df_nation_tip7.alias("df_nat")

df_tip7 = (
    df_cust
    .join(
        df_nat,
        df_cust.nation_id == df_nat.nation_id,
        "left_outer" 
    )
    .select(
        #Notice how the column names are aliased, especially the identical column names from both the dataframes
        df_cust.cust_id.alias("Customer_Id"),
        df_cust.NAME.alias("Customer_Name"),
        df_nat.NAME.alias("Customer_Country")
    )
)

df_tip7.printSchema()

df_tip7.select("*").display()

###Tip 8: Avoid sort()/orderBy() unless it is necessary

As a best practice, avoid `sort()` / `orderBy()` unless you actually need it.

**Why?**

Similar to `join()` transformation, even `sort()` / `orderBy()` transformation is a wide transformation, which means it triggers a **shuffle** (a.k.a exchange). During a shuffle, Spark redistributes data across the cluster based on the join key and writes intermediate results to disk. This process is expensive in terms of both disk I/O and network transfer.

So, avoid sorting data in a transformation unless you really need it.


In [0]:
#-----------------------------------------------------------------
#To keep it brief, I’m showing a trimmed-down version of the code.
#-----------------------------------------------------------------
'''
df_tips = (
    df_lineitem_aliased1
    .join(df_partsupp_aliased1, 
        ...
        "inner"
   )
    .select(
    df_lineitem_aliased1.l_orderkey.alias("orderkey"),
    df_lineitem_aliased1.l_partkey.alias("partkey_InLI")
    )
    #.sort(col("orderkey").asc())
    .orderBy(col("orderkey").asc())
)
'''

###Tip 9: For code brevity, prefer col() to reference aliased column names in sort() / orderBy(). 

Focusing on line# 16-19 in trimmed-down version of the code as below: 

To `sort()` the output, Two different approaches were shown:
* Line# 16 - using `col()`
* Line# 19 - using dot notation

Between the two approaches, I think `col()` approach is concise, on this occasion.

In [0]:
#-----------------------------------------------------------------
#To keep it brief, I’m showing a trimmed-down version of the code.
#-----------------------------------------------------------------
'''
df_tips = (
    df_lineitem_aliased1
    .join(df_partsupp_aliased1, 
        [(...) & (...)],
        "inner"
   )
    .select(
        ...
    )

    #For code brevity, prefer referencing aliased column names in sort() by using col()
    .sort(col("orderkey").asc(), col("linenumber").asc()) 

    #To use dot notation in sort(), you'll need to reference original column name rather than aliased name
    #.sort(df_lineitem_aliased1.l_orderkey.asc(), df_lineitem_aliased1.l_linenumber.asc()) 
)
'''

###Tip 10: Remove display() / show() function calls before promoting code to higher environments.

e.g. `df_tips.display()`

`display()` and `show()` are actions. Unlike transformations, Actions force the computation to occur.

So, to avoid unnecessary compute costs, make sure to remove those lines of code that call `display()` / `show()` functions before promoting the code to higher environments.

###Tip 11: Want to convert SQL query into PySpark query? AI Assistant could help!

Say, you got a SQL query and for what so ever reason, you want to convert it into PySpark...How can we go about doing it?

Two options:
1. Write PySpark query from the scratch
2. Or, get AI Assistant in Databricks notebook to do the initial conversion of SQL query into PySpark query. Then, refine the initial version of PySpark query to meet your exact needs.

In [0]:
%sql
SELECT
  li.l_orderkey AS orderkey,
  li.l_partkey AS partkey_InLI,
  li.l_suppkey AS suppkey_InLI,
  li.l_linenumber AS linenumber,
  li.l_quantity AS quantity,
  ps.ps_partkey AS partkey_inPartSupp,
  ps.ps_suppkey AS suppkey_inPartSupp,
  ps.ps_availqty AS availqty,
  ps.ps_supplycost AS supplycost,
  p.p_partkey AS partkey_inPart,
  p.p_name AS partname,
  s.s_suppkey AS suppkey_inSupplier,
  s.s_name AS suppliername
FROM samples.tpch.lineitem li
INNER JOIN samples.tpch.partsupp ps
  ON li.l_partkey = ps.ps_partkey AND li.l_suppkey = ps.ps_suppkey
INNER JOIN samples.tpch.part p
  ON ps.ps_partkey = p.p_partkey
INNER JOIN samples.tpch.supplier s
  ON ps.ps_suppkey = s.s_suppkey
ORDER BY orderkey ASC, linenumber ASC


Query as converted by AI Assistant into PySpark query looks as below:

In [0]:
# PySpark equivalent of the SQL query (as in the preceding cell), as converted by AI Assistant in Databricks notebook.

li = spark.table("samples.tpch.lineitem").alias("li")
ps = spark.table("samples.tpch.partsupp").alias("ps")
p = spark.table("samples.tpch.part").alias("p")
s = spark.table("samples.tpch.supplier").alias("s")

result = (
    li.join(ps, (li.l_partkey == ps.ps_partkey) & (li.l_suppkey == ps.ps_suppkey))
      .join(p, ps.ps_partkey == p.p_partkey)
      .join(s, ps.ps_suppkey == s.s_suppkey)
      .select(
          li.l_orderkey.alias("orderkey"),
          li.l_partkey.alias("partkey_InLI"),
          li.l_suppkey.alias("suppkey_InLI"),
          li.l_linenumber.alias("linenumber"),
          li.l_quantity.alias("quantity"),
          ps.ps_partkey.alias("partkey_inPartSupp"),
          ps.ps_suppkey.alias("suppkey_inPartSupp"),
          ps.ps_availqty.alias("availqty"),
          ps.ps_supplycost.alias("supplycost"),
          p.p_partkey.alias("partkey_inPart"),
          p.p_name.alias("partname"),
          s.s_suppkey.alias("suppkey_inSupplier"),
          s.s_name.alias("suppliername")
      )
      .orderBy("orderkey", "linenumber")
)

# To display the result in Databricks notebook
display(result)