###JOINS

- PySpark Join is used to combine two DataFrames and by chaining these you can join multiple DataFrames.
- It supports all basic join type operations available in traditional SQL like 
    - INNER (default) 
    - LEFT OUTER 
    - RIGHT OUTER
    - FULL OUTER 
    - LEFT ANTI 
    - LEFT SEMI 
    - CROSS 
    - SELF JOIN
- PySpark Joins are wider transformations that involve data shuffling across the network.

**INNER**:
An Inner join combines two DataFrames based on the key (common column) provided and results in rows where there is a matching found. Rows from both DataFrames are dropped with a non-matching key.

**LEFT OUTER:**
Leftouter join returns all rows from the left dataset regardless of match found on the right dataset when join expression doesn’t match, it assigns null for that record and drops records from right where match not found.

**RIGHT OUTER:**
Rightouter join is opposite of left join, here it returns all rows from the right dataset regardless of match found on the left dataset, when join expression doesn’t match, it assigns null for that record and drops records from left where match not found.

**FULL OUTER:**
It combines the results of both left and right outer joins, ensuring that all records from both DataFrames are included in the resulting DataFrame. It includes all rows from both DataFrames and fills in missing values with nulls where there is no match.

**LEFT SEMI:**
It returns only the rows from the left DataFrame (the first DataFrame mentioned in the join operation) where there is a match with the right DataFrame (the second DataFrame). It does not include any columns from the right DataFrame in the resulting DataFrame. This join type is useful when you only want to filter rows from the left DataFrame based on whether they have a matching key in the right DataFrame.

**LEFT ANTI:**
It returns only the rows from the left DataFrame (the first DataFrame mentioned in the join operation) where there is no match with the right DataFrame (the second DataFrame). It excludes any rows from the left DataFrame that have a corresponding key in the right DataFrame. This join type is useful when you want to filter out rows from the left DataFrame that have matching keys in the right DataFrame.


In [0]:
# Employee data (some dept IDs intentionally unmatched)
emp = [
    (1, "Aarav",   -1, "2019", "10", "M", 5500),   # Matches Finance
    (2, "Diya",     1, "2015", "20", "F", 7200),   # Matches Marketing
    (3, "Karan",    1, "2017", "10", "M", 4800),   # Matches Finance
    (4, "Isha",     2, "2012", "30", "F", 6100),   # Matches Sales
    (5, "Rahul",    2, "2016", "60", "M", 5300),   # No matching dept
    (6, "Neha",     3, "2020", "70", "F", 4600),   # No matching dept
    (7, "Vikram",   3, "2018", "30", "M", 5800)    # Matches Sales
]

empColumns = ["emp_id","name","superior_emp_id","year_joined","emp_dept_id","gender","salary"]
empDF = spark.createDataFrame(data=emp, schema=empColumns)
empDF.show(truncate=False)

# Department data (some depts without employees)
dept = [
    ("Finance",   10),
    ("Marketing", 20),
    ("Sales",     30),
    ("IT",        40),    # No employees
    ("HR",        50)     # No employees
]

deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema=deptColumns)
deptDF.show(truncate=False)


In [0]:
# Inner join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"inner") \
     .show(truncate=False)

In [0]:
# Left outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"left") \
    .show(truncate=False)

In [0]:
# Right outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"right") \
   .show(truncate=False)

In [0]:
# Full outer join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"full") \
    .show(truncate=False)

In [0]:
# Left semi join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftsemi") \
   .show(truncate=False)

In [0]:
# Left anti join
empDF.join(deptDF,empDF.emp_dept_id ==  deptDF.dept_id,"leftanti") \
   .show(truncate=False)

In [0]:
# Self join

from pyspark.sql.functions import col

empDF.alias("emp1").join(empDF.alias("emp2"), \
    col("emp1.superior_emp_id") == col("emp2.emp_id"),"inner") \
    .select(col("emp1.emp_id"),col("emp1.name"), \
      col("emp2.emp_id").alias("superior_emp_id"), \
      col("emp2.name").alias("superior_emp_name")) \
   .show(truncate=False)

In [0]:
# Cross join
empDF.join(deptDF, how="cross") \
     .show(truncate=False)