# PySpark User Guide - Complete Reference

## Quick Start & Setup

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pandas as pd
from pyspark.sql.functions import col
spark = SparkSession.builder \
    .appName("DataProcessing") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \
    .getOrCreate()    

## Sample Data for Testing

In [15]:
# Customer data
customers_data = [
    (1, "John Doe", "john@email.com", "US", "Premium"),
    (2, "Jane Smith", "jane@email.com", "CA", "Standard"),
    (3, "Bob Johnson", "bob@email.com", "UK", "Premium"),
    (4, "Alice Brown", "alice@email.com", "US", "Standard"),
    (5, "Charlie Wilson", "charlie@email.com", "FR", "Premium")
]
customers_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("country", StringType(), True),
    StructField("tier", StringType(), True)
])
customers = spark.createDataFrame(customers_data, customers_schema)

# Orders data
orders_data = [
    (101, 1, "2024-01-15", 150.0, "Electronics"),
    (102, 2, "2024-01-16", 89.5, "Books"),
    (103, 1, "2024-01-17", 200.0, "Electronics"),
    (104, 3, "2024-01-18", 45.0, "Books"),
    (105, 2, "2024-01-19", 310.0, "Clothing"),
    (106, 4, "2024-01-20", 75.0, "Electronics"),
    (107, 1, "2024-01-21", 125.0, "Clothing"),
    (108, 6, "2024-01-22", 95.0, "Books")  # Orphaned order
]
orders_schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("order_date", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("category", StringType(), True)
])
orders = spark.createDataFrame(orders_data, orders_schema)

# Products catalog (small table for broadcast)
products_data = [
    ("Electronics", 0.15, 30),
    ("Books", 0.05, 14),
    ("Clothing", 0.20, 7)
]
products_schema = StructType([
    StructField("category", StringType(), True),
    StructField("commission_rate", DoubleType(), True),
    StructField("return_window_days", IntegerType(), True)
])
products = spark.createDataFrame(products_data, products_schema)



## Advanced Joins

### All Join Types with Examples

#### Inner Join - Only Matching Records


In [3]:

inner_result = customers.join(orders, "customer_id", "inner")
print(f"Inner join: {inner_result.count()} records")  # 7 records
inner_result.select("name", "order_id", "amount").show()

                                                                                

Inner join: 7 records
+-----------+--------+------+
|       name|order_id|amount|
+-----------+--------+------+
|   John Doe|     101| 150.0|
|   John Doe|     103| 200.0|
|   John Doe|     107| 125.0|
| Jane Smith|     102|  89.5|
| Jane Smith|     105| 310.0|
|Bob Johnson|     104|  45.0|
|Alice Brown|     106|  75.0|
+-----------+--------+------+




#### Left Join - All Customers


In [4]:
left_result = customers.join(orders, "customer_id", "left")
print(f"Left join: {left_result.count()} records")  # 8 records (Charlie has no orders)
left_result.select("name", "order_id", "amount").show()

Left join: 8 records




+--------------+--------+------+
|          name|order_id|amount|
+--------------+--------+------+
|      John Doe|     107| 125.0|
|      John Doe|     103| 200.0|
|      John Doe|     101| 150.0|
|    Jane Smith|     105| 310.0|
|    Jane Smith|     102|  89.5|
|   Bob Johnson|     104|  45.0|
|Charlie Wilson|    NULL|  NULL|
|   Alice Brown|     106|  75.0|
+--------------+--------+------+



                                                                                

In [9]:
orders.join(products,"category","left").show()

                                                                                

+-----------+--------+-----------+----------+------+---------------+------------------+
|   category|order_id|customer_id|order_date|amount|commission_rate|return_window_days|
+-----------+--------+-----------+----------+------+---------------+------------------+
|Electronics|     101|          1|2024-01-15| 150.0|           0.15|                30|
|Electronics|     103|          1|2024-01-17| 200.0|           0.15|                30|
|      Books|     102|          2|2024-01-16|  89.5|           0.05|                14|
|      Books|     104|          3|2024-01-18|  45.0|           0.05|                14|
|Electronics|     106|          4|2024-01-20|  75.0|           0.15|                30|
|   Clothing|     105|          2|2024-01-19| 310.0|            0.2|                 7|
|   Clothing|     107|          1|2024-01-21| 125.0|            0.2|                 7|
|      Books|     108|          6|2024-01-22|  95.0|           0.05|                14|
+-----------+--------+----------


#### Anti Join - Customers Without Orders


In [5]:

anti_result = customers.join(orders, "customer_id", "anti")
print(f"Anti join: {anti_result.count()} records")  # 1 record (Charlie)
anti_result.select("name", "email").show()
        

                                                                                

Anti join: 1 records


                                                                                

+--------------+-----------------+
|          name|            email|
+--------------+-----------------+
|Charlie Wilson|charlie@email.com|
+--------------+-----------------+




### Broadcast Joins - Performance Optimization

#### When to Use Broadcast Joins
- Small table < 200MB (configurable via `spark.sql.autoBroadcastJoinThreshold`)
- One table much smaller than the other Ratio > 10:1 entre les tailles des tables
- Avoiding shuffle operations for better performance


In [10]:
from pyspark.sql.functions import broadcast

# Manual broadcast (force small table to all executors)
broadcast_result = orders.join(broadcast(products), "category", "left")
broadcast_result.select("order_id", "category", "commission_rate", "amount").show()


+--------+-----------+---------------+------+
|order_id|   category|commission_rate|amount|
+--------+-----------+---------------+------+
|     101|Electronics|           0.15| 150.0|
|     102|      Books|           0.05|  89.5|
|     103|Electronics|           0.15| 200.0|
|     104|      Books|           0.05|  45.0|
|     105|   Clothing|            0.2| 310.0|
|     106|Electronics|           0.15|  75.0|
|     107|   Clothing|            0.2| 125.0|
|     108|      Books|           0.05|  95.0|
+--------+-----------+---------------+------+



In [None]:

# Calculate commission using broadcast join
commission_calc = orders.join(broadcast(products), "category", "left") \
    .withColumn("commission", col("amount") * col("commission_rate")) \
    .select("order_id", "amount", "commission_rate", "commission")
commission_calc.show()


+--------+------+---------------+------------------+
|order_id|amount|commission_rate|        commission|
+--------+------+---------------+------------------+
|     101| 150.0|           0.15|              22.5|
|     102|  89.5|           0.05|4.4750000000000005|
|     103| 200.0|           0.15|              30.0|
|     104|  45.0|           0.05|              2.25|
|     105| 310.0|            0.2|              62.0|
|     106|  75.0|           0.15|             11.25|
|     107| 125.0|            0.2|              25.0|
|     108|  95.0|           0.05|              4.75|
+--------+------+---------------+------------------+



#### Performance Comparison

In [17]:

# Without broadcast (creates shuffle)
no_broadcast = orders.join(products, "category", "left")

# With broadcast (no shuffle)
with_broadcast = orders.join(broadcast(products), "category", "left")

# Check execution plan
print("=== Without Broadcast ===")
no_broadcast.explain(True)


=== Without Broadcast ===
== Parsed Logical Plan ==
'Join UsingJoin(LeftOuter, [category])
:- LogicalRDD [order_id#183, customer_id#184, order_date#185, amount#186, category#187], false
+- LogicalRDD [category#188, commission_rate#189, return_window_days#190], false

== Analyzed Logical Plan ==
category: string, order_id: int, customer_id: int, order_date: string, amount: double, commission_rate: double, return_window_days: int
Project [category#187, order_id#183, customer_id#184, order_date#185, amount#186, commission_rate#189, return_window_days#190]
+- Join LeftOuter, (category#187 = category#188)
   :- LogicalRDD [order_id#183, customer_id#184, order_date#185, amount#186, category#187], false
   +- LogicalRDD [category#188, commission_rate#189, return_window_days#190], false

== Optimized Logical Plan ==
Project [category#187, order_id#183, customer_id#184, order_date#185, amount#186, commission_rate#189, return_window_days#190]
+- Join LeftOuter, (category#187 = category#188)
   :

In [18]:

print("\n=== With Broadcast ===")  
with_broadcast.explain(True)




=== With Broadcast ===
== Parsed Logical Plan ==
'Join UsingJoin(LeftOuter, [category])
:- LogicalRDD [order_id#183, customer_id#184, order_date#185, amount#186, category#187], false
+- ResolvedHint (strategy=broadcast)
   +- LogicalRDD [category#188, commission_rate#189, return_window_days#190], false

== Analyzed Logical Plan ==
category: string, order_id: int, customer_id: int, order_date: string, amount: double, commission_rate: double, return_window_days: int
Project [category#187, order_id#183, customer_id#184, order_date#185, amount#186, commission_rate#189, return_window_days#190]
+- Join LeftOuter, (category#187 = category#188)
   :- LogicalRDD [order_id#183, customer_id#184, order_date#185, amount#186, category#187], false
   +- ResolvedHint (strategy=broadcast)
      +- LogicalRDD [category#188, commission_rate#189, return_window_days#190], false

== Optimized Logical Plan ==
Project [category#187, order_id#183, customer_id#184, order_date#185, amount#186, commission_rate#1

### Complex Join Conditions


In [19]:
complex_join = orders.join(
    customers,
    (orders.customer_id == customers.customer_id) & 
    (customers.tier == "Premium") &
    (orders.amount > 100),
    "inner"
)
complex_join.select("name", "order_id", "amount", "tier").show()

[Stage 67:>                                                         (0 + 2) / 2]

+--------+--------+------+-------+
|    name|order_id|amount|   tier|
+--------+--------+------+-------+
|John Doe|     101| 150.0|Premium|
|John Doe|     103| 200.0|Premium|
|John Doe|     107| 125.0|Premium|
+--------+--------+------+-------+



                                                                                

## Wide ‚ü∑ Long Transformations
### Long to Wide (Pivot)

#### Basic Pivot


In [36]:
# Sample sales data (long format)
sales_data = [
    ("2024-01", "Electronics", "US", 1000),
    ("2024-01", "Books", "US", 500),
    ("2024-01", "Electronics", "CA", 800),
    ("2024-02", "Electronics", "US", 1200),
    ("2024-02", "Books", "US", 600),
    ("2024-02", "Clothing", "CA", 400),
]
sales = spark.createDataFrame(sales_data, ["month", "category", "country", "sales"])

# Pivot by country - create columns for each country
wide_by_country = sales.groupBy("month", "category") \
    .pivot("country") \
    .sum("sales") 
wide_by_country.show()


+-------+-----------+----+----+
|  month|   category|  CA|  US|
+-------+-----------+----+----+
|2024-02|      Books|NULL| 600|
|2024-01|Electronics| 800|1000|
|2024-01|      Books|NULL| 500|
|2024-02|Electronics|NULL|1200|
|2024-02|   Clothing| 400|NULL|
+-------+-----------+----+----+



### Tidyverse inspiration

In [41]:
fish_encounters = spark.read.csv("data/fish_encounters.csv", header=True,inferSchema=True)

In [42]:
fish_encounters.show()

+----+-------+----+
|fish|station|seen|
+----+-------+----+
|4842|Release|   1|
|4842|  I80_1|   1|
|4842| Lisbon|   1|
|4842|   Rstr|   1|
|4842|Base_TD|   1|
|4842|    BCE|   1|
|4842|    BCW|   1|
|4842|   BCE2|   1|
|4842|   BCW2|   1|
|4842|    MAE|   1|
|4843|Release|   1|
|4843|  I80_1|   1|
|4843| Lisbon|   1|
|4843|    BCE|   0|
|4843|    BCW|   0|
|4843|   BCE2|   0|
|4843|   BCW2|   0|
|4843|    MAE|   0|
|4844|Release|   1|
|4844|  I80_1|   1|
+----+-------+----+
only showing top 20 rows


In [45]:
def smart_pivot(df, pivot_col, value_col, id_cols=None, values_fill=None, agg_func="first"):
    """
    Pivot intelligent avec d√©tection automatique des colonnes ID
    values_fill=None par d√©faut pour garder les NA comme en R
    """
    
    # Auto-d√©tection des colonnes ID si non sp√©cifi√©es
    if id_cols is None:
        all_cols = set(df.columns)
        exclude_cols = {pivot_col, value_col}
        id_cols = list(all_cols - exclude_cols)
        print(f"üîç Colonnes ID d√©tect√©es automatiquement: {id_cols}")
    
    # D√©couvrir les valeurs de pivot
    pivot_values = df.select(pivot_col).distinct() \
        .rdd.map(lambda row: row[0]).collect()
    pivot_values = sorted([v for v in pivot_values if v is not None])
    print(f"üìä Valeurs de pivot trouv√©es: {pivot_values}")
    
    # Fonction d'agr√©gation
    agg_functions = {
        "first": lambda col_name: first(col_name, ignorenulls=False),
        "max": lambda col_name: max(col_name), 
        "min": lambda col_name: min(col_name),
        "sum": lambda col_name: sum(col_name),
        "avg": lambda col_name: avg(col_name)
    }
    
    agg_fn = agg_functions.get(agg_func, agg_functions["first"])
    
    # Effectuer le pivot
    result = df.groupBy(*id_cols) \
        .pivot(pivot_col, pivot_values) \
        .agg(agg_fn(value_col))
    
    # Appliquer fill seulement si sp√©cifi√© (sinon garder NULL comme R)
    if values_fill is not None:
        result = result.fillna(values_fill)
    
    print(f"‚úÖ Pivot termin√©! Dimensions: {len(id_cols)} ID cols + {len(pivot_values)} pivot cols")
    
    return result




In [46]:
print("Test de la fonction g√©n√©rique:")
result_generic = smart_pivot(
    fish_encounters, 
    pivot_col="station", 
    value_col="seen"
)
result_generic.show()


Test de la fonction g√©n√©rique:
üîç Colonnes ID d√©tect√©es automatiquement: ['fish']
üìä Valeurs de pivot trouv√©es: ['BCE', 'BCE2', 'BCW', 'BCW2', 'Base_TD', 'I80_1', 'Lisbon', 'MAE', 'Release', 'Rstr']
‚úÖ Pivot termin√©! Dimensions: 1 ID cols + 10 pivot cols
+----+---+----+---+----+-------+-----+------+---+-------+----+
|fish|BCE|BCE2|BCW|BCW2|Base_TD|I80_1|Lisbon|MAE|Release|Rstr|
+----+---+----+---+----+-------+-----+------+---+-------+----+
|4848|  1|   0|  1|   0|      1|    1|     1|  0|      1|   1|
|4843|  0|   0|  0|   0|   NULL|    1|     1|  0|      1|NULL|
|4851|  0|   0|  0|   0|      0|    1|     1|  0|      1|NULL|
|4844|  1|NULL|  0|   0|      1|    1|     1|  0|      1|   1|
|4842|  1|   1|  1|   1|      1|    1|     1|  1|      1|   1|
|4852|  0|   0|  0|   0|      1|    1|     1|  0|      1|   1|
|4846|  1|   1|  1|   1|      1|    1|     1|  1|      1|   1|
|4849|  0|   0|  0|   0|      0|    0|     0|  0|      1|   0|
|4847|  0|   0|  0|   0|      0|    1|   

In [47]:
print("Test de la fonction g√©n√©rique:")
result_generic = smart_pivot(
    fish_encounters, 
    pivot_col="station", 
    value_col="seen",
    values_fill=0
)
result_generic.show()


Test de la fonction g√©n√©rique:
üîç Colonnes ID d√©tect√©es automatiquement: ['fish']
üìä Valeurs de pivot trouv√©es: ['BCE', 'BCE2', 'BCW', 'BCW2', 'Base_TD', 'I80_1', 'Lisbon', 'MAE', 'Release', 'Rstr']
‚úÖ Pivot termin√©! Dimensions: 1 ID cols + 10 pivot cols
+----+---+----+---+----+-------+-----+------+---+-------+----+
|fish|BCE|BCE2|BCW|BCW2|Base_TD|I80_1|Lisbon|MAE|Release|Rstr|
+----+---+----+---+----+-------+-----+------+---+-------+----+
|4848|  1|   0|  1|   0|      1|    1|     1|  0|      1|   1|
|4843|  0|   0|  0|   0|      0|    1|     1|  0|      1|   0|
|4851|  0|   0|  0|   0|      0|    1|     1|  0|      1|   0|
|4844|  1|   0|  0|   0|      1|    1|     1|  0|      1|   1|
|4842|  1|   1|  1|   1|      1|    1|     1|  1|      1|   1|
|4852|  0|   0|  0|   0|      1|    1|     1|  0|      1|   1|
|4846|  1|   1|  1|   1|      1|    1|     1|  1|      1|   1|
|4849|  0|   0|  0|   0|      0|    0|     0|  0|      1|   0|
|4847|  0|   0|  0|   0|      0|    1|   