In [0]:
# create sample dataframe

emp_data = [
    (1, "Arjun", 10),
    (2, "Meena", 20),
    (3, "Ravi", 30),
    (4, "Priya", 40)
]
dept_data = [
    (10, "Sales"),
    (20, "Marketing"),
    (30, "Finance"),
    (40, "IT")
]
emp_col = ["Emp_id", "Emp_name", "Dept_id"]
dept_col = ["Dept_id", "Dept_name"]

emp_df = spark.createDataFrame(emp_data, emp_col)
dept_df = spark.createDataFrame(dept_data, dept_col)


In [0]:
# inner join

from pyspark.sql.functions import col
df_join = (
    emp_df.alias("e").join(
        dept_df.alias("d"),
        col("e.dept_id") == col("d.dept_id"),
        "inner"
    ).filter(~col("d.dept_id").isin(30)).select("e.emp_id", "e.emp_name", "d.dept_id", "d.dept_name")
    
)
# always use alias to avoid column ambiguity


In [0]:
display(df_join)

In [0]:
# left join

from pyspark.sql.functions import col

emp_df.alias("e").join(
    dept_df.alias("d"),
    col("e.dept_id") == col("d.dept_id"), "left"
).show()

In [0]:
# left anti join
# use case -> to find unmatched records and data quality checks

emp_df.alias("e").join(
    dept_df.alias("d"),
    col("e.dept_id") == col("d.dept_id"), "leftanti"
).show()
# left semi join
# use case -> to find matched records

emp_df.alias("e").join(
    dept_df.alias("d"),
    col("e.dept_id") == col("d.dept_id"), "leftsemi"
).show()
# cross join
# use case -> to find all possible combinations of records


In [0]:
df_join.describe()

In [0]:
df_join.show()

In [0]:
df_join.select("e.*").show()

In [0]:
df_join.select(
    col("e.emp_id"),
    col("e.emp_name"),
    col("d.dept_id")
).show()

In [0]:
%sql
select * from dev_data.test.products

In [0]:
spark

In [0]:
# online orders dataset

online_ord_data = [
    (1, "Anu", "Laptop", 50000),
    (2, "Arjun", "Mobile", 20000),
    (3, "Ravi", "Tablet", 30000)
]

online_ord_col = ["order_id", "cust_name","product","amount"]
df_online = spark.createDataFrame(online_ord_data, online_ord_col)
df_online.show()

In [0]:
#store orders (same orders as above)

store_ord_data = [
    (4, "Sita", "Laptop", 48000),
    (5, "kumar", "Mobile", 22000)
]

store_ord_col = ["order_id", "cust_name", "product", "amount"]

df_store = spark.createDataFrame(store_ord_data, store_ord_col)

df_store.show()

In [0]:
# union(Same schema & order)
# union works on same column orders
# in spark union does not remove duplicates. there's no separate union all
# union itself behaves like union all(keeps duplicates)
df_union = df_online.union(df_store)
df_union.show()

In [0]:
# store orders in diff cols

store_ord_diff_cols = [
    ("Laptop", 6, "Vsoul", 47000),
    ("Mobile", 7 , "Vijay", 25000)
]

store_ord_dif_schema = ["Product","order_id", "cust_name", "amount"]

df_store_diff = spark.createDataFrame(store_ord_diff_cols, store_ord_dif_schema)

In [0]:
df_store_diff.show()

In [0]:
# it will throw error reason is column position dont match
df_union.union(df_store_diff).show()

In [0]:
df1 = df_online.union(df_store_diff)
df1.show()

In [0]:
display(df_online.union(df_store_diff))

In [0]:
#unionByname
# use unionbyname to union dataframes with different column orders
# it will matches by column name, order doesnt matter
# production-safe when columns are unordered
df1 = df_online.unionByName(df_store_diff)
df1.show()

In [0]:
# unionByName with missing values/columns

df_partial = df_store_diff.drop("amount")

df_partial.show()

In [0]:
df_online.show()

In [0]:
df_online.unionByName(df_partial, allowMissingColumns=True).show()

In [0]:
df_online.show()

In [0]:
# customer dataframe
from pyspark.sql.types import *

customer_schema = StructType([
    StructField("cust_id", IntegerType(), False),
    StructField("cust_name", StringType(), False),
    StructField("city", StringType(), True),
    StructField("email", StringType(), False)
])

customer_data = [
    (1, "Raj", "Delhi", "raj@gmail.com"),
    (2, "Ravi", "Mumbai", "ravi@gmail.com"),
    (3, "Rajesh", "Delhi", "rajesh@gmail.com"),
    (4, "naveen", "Chennai", "naveen@gmail.com")
]

df_customers = spark.createDataFrame(customer_data, schema=customer_schema)
df_customers.write.mode("overwrite").saveAsTable("dev_data.test.customers_online")
df_customers.show()
# order dataframe

In [0]:
from datetime import date
# Orders DataFrame
orders_schema = StructType([
    StructField("order_id", IntegerType(), False),
    StructField("customer_id", IntegerType(), False),
    StructField("order_date", DateType(), False),
    StructField("order_amount", DoubleType(), False)
])

orders_data = [
    (101, 1, date(2024, 1, 10), 250.50),
    (102, 2, date(2024, 1, 12), 120.00),
    (103, 1, date(2024, 1, 15), 75.25),
    (104, 3, date(2024, 1, 18), 300.00),
    (106, 2, date(2024, 1, 22), 90.00)
]

df_orders = spark.createDataFrame(orders_data, orders_schema)

df_orders.write.mode("overwrite").saveAsTable("dev_data.test.orders_online")
df_orders.show()

In [0]:
from pyspark.sql.functions import col

df_customers.alias("c").join(
    df_orders.alias("o"),
    col("c.cust_id") == col("o.customer_id"), "inner"
).show()


In [0]:
#Left join to find customers with no orders

df_customers.alias("c").join(
    df_orders.alias("o"),
    col("c.cust_id") == col("o.customer_id"), "left"
).filter(col("o.order_id").isNull()).show()


In [0]:
#Left join to find customers with no orders

df_customers.alias("c").join(
    df_orders.alias("o"),
    col("c.cust_id") == col("o.customer_id"), "leftanti"
).show()

In [0]:
df_customers.alias("c").join(
    df_orders.alias("o"),
    col("c.cust_id") == col("o.customer_id"), "inner"
).select("c.cust_id", "c.cust_name", "o.order_id", "o.order_date", "o.order_amount").show()
