## Combining DataFrames

On Spark, a `join()` merges DataFrame rows based on matching conditions, a `crossJoin()` returns the Cartesian product of all rows, and a `union()` concatenates DataFrames with identical schemas.

### Links and Resources
- [join](https://spark.apache.org/docs/3.5.3/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.join.html?highlight=join#pyspark.sql.DataFrame.join)
- [crossjoin](https://spark.apache.org/docs/3.5.3/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.crossJoin.html?highlight=join#pyspark.sql.DataFrame.crossJoin)
- [union](https://spark.apache.org/docs/3.5.3/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.union.html?highlight=join#pyspark.sql.DataFrame.union)

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Define schema for df_1 (Sales) with transaction_id included
schema_sales = StructType([
    StructField("transaction_id", IntegerType(), True),
    StructField("store_id", IntegerType(), True),
    StructField("sale_date", StringType(), True),
    StructField("sales_amount", IntegerType(), True)
])

# Data for df_1: sales records
# Some store_ids (e.g., 106 and 107) are not present in df_2 to demonstrate join gaps.
data_sales = [
    (1001, 103, "2025-01-15", 5000),
    (1002, 104, "2025-01-16", 7000),
    (1003, 105, "2025-01-17", 6500),
    (1004, 106, "2025-01-18", 4800),
    (1005, 107, "2025-01-19", 5300)
]

# Create df_1 with the sales data
df_1 = spark.createDataFrame(data_sales, schema=schema_sales)
df_1.show()

# Define schema for df_2 (Stores)
schema_stores = StructType([
    StructField("id", IntegerType(), True),
    StructField("store_name", StringType(), True),
    StructField("city", StringType(), True)
])

# Data for df_2: stores
data_stores = [
    (101, "Store A", "New York"),
    (102, "Store B", "Los Angeles"),
    (103, "Store C", "Chicago"),
    (104, "Store D", "Houston"),
    (105, "Store E", "Phoenix")
]

# Create df_2 with the stores data
df_2 = spark.createDataFrame(data_stores, schema=schema_stores)
df_2.show()

+--------------+--------+----------+------------+
|transaction_id|store_id| sale_date|sales_amount|
+--------------+--------+----------+------------+
|          1001|     103|2025-01-15|        5000|
|          1002|     104|2025-01-16|        7000|
|          1003|     105|2025-01-17|        6500|
|          1004|     106|2025-01-18|        4800|
|          1005|     107|2025-01-19|        5300|
+--------------+--------+----------+------------+

+---+----------+-----------+
| id|store_name|       city|
+---+----------+-----------+
|101|   Store A|   New York|
|102|   Store B|Los Angeles|
|103|   Store C|    Chicago|
|104|   Store D|    Houston|
|105|   Store E|    Phoenix|
+---+----------+-----------+



### Join

In [0]:
# left join returns all records from the left DataFrame and only matching records from the right

left_join_df = df_1.join(df_2, df_1.store_id == df_2.id, "left")

left_join_df.display()

transaction_id,store_id,sale_date,sales_amount,id,store_name,city
1001,103,2025-01-15,5000,103.0,Store C,Chicago
1002,104,2025-01-16,7000,104.0,Store D,Houston
1003,105,2025-01-17,6500,105.0,Store E,Phoenix
1004,106,2025-01-18,4800,,,
1005,107,2025-01-19,5300,,,


In [0]:
# right join returns all records from the left DataFrame and only matching records from the left

right_join_df = df_1.join(df_2, df_1.store_id == df_2.id, "right")

right_join_df.display()

transaction_id,store_id,sale_date,sales_amount,id,store_name,city
,,,,101,Store A,New York
,,,,102,Store B,Los Angeles
1001.0,103.0,2025-01-15,5000.0,103,Store C,Chicago
1002.0,104.0,2025-01-16,7000.0,104,Store D,Houston
1003.0,105.0,2025-01-17,6500.0,105,Store E,Phoenix


In [0]:
# inner join returns only matching records from both

inner_join_df = df_1.join(df_2, df_1.store_id == df_2.id, "inner")

inner_join_df.display()

transaction_id,store_id,sale_date,sales_amount,id,store_name,city
1001,103,2025-01-15,5000,103,Store C,Chicago
1002,104,2025-01-16,7000,104,Store D,Houston
1003,105,2025-01-17,6500,105,Store E,Phoenix


In [0]:
# full outer join returns all records from the left and right DataFrame, matched or unmatched

full_join_df = df_1.join(df_2, df_1.store_id == df_2.id, "fullouter")

full_join_df.display()

transaction_id,store_id,sale_date,sales_amount,id,store_name,city
,,,,101.0,Store A,New York
,,,,102.0,Store B,Los Angeles
1001.0,103.0,2025-01-15,5000.0,103.0,Store C,Chicago
1002.0,104.0,2025-01-16,7000.0,104.0,Store D,Houston
1003.0,105.0,2025-01-17,6500.0,105.0,Store E,Phoenix
1004.0,106.0,2025-01-18,4800.0,,,
1005.0,107.0,2025-01-19,5300.0,,,


In [0]:
# left andi returns all records unmatched records left DataFrame only

left_anti_df = df_1.join(df_2, df_1.store_id == df_2.id, "left_anti")

left_anti_df.display()

transaction_id,store_id,sale_date,sales_amount
1004,106,2025-01-18,4800
1005,107,2025-01-19,5300


### crossJoin

In [0]:
# crossJoin (also known as cartesian join) returns the cartesian product of both DataFrames

df_1.crossJoin(df_2).display()

transaction_id,store_id,sale_date,sales_amount,id,store_name,city
1001,103,2025-01-15,5000,101,Store A,New York
1001,103,2025-01-15,5000,102,Store B,Los Angeles
1001,103,2025-01-15,5000,103,Store C,Chicago
1001,103,2025-01-15,5000,104,Store D,Houston
1001,103,2025-01-15,5000,105,Store E,Phoenix
1002,104,2025-01-16,7000,101,Store A,New York
1002,104,2025-01-16,7000,102,Store B,Los Angeles
1002,104,2025-01-16,7000,103,Store C,Chicago
1002,104,2025-01-16,7000,104,Store D,Houston
1002,104,2025-01-16,7000,105,Store E,Phoenix


### Union

In [0]:
# Union appends two DataFrames

df_1.union(df_1).display()

transaction_id,store_id,sale_date,sales_amount
1001,103,2025-01-15,5000
1002,104,2025-01-16,7000
1003,105,2025-01-17,6500
1004,106,2025-01-18,4800
1005,107,2025-01-19,5300
1001,103,2025-01-15,5000
1002,104,2025-01-16,7000
1003,105,2025-01-17,6500
1004,106,2025-01-18,4800
1005,107,2025-01-19,5300
