# Parquet ETL with PySpark
**Last updated:** 2025-11-13

This notebook demonstrates a minimal ETL using **PySpark** and **Parquet**:

1. Read a Parquet file (or generate a sample if missing)  
2. Inspect schema and preview records  
3. Filter and derive a new column  
4. Save results to Parquet (partitioned) and CSV

> **⚠️ Important Note (Prerequisites)**  
> To run this notebook successfully, the following components must be properly installed and configured on your system:  
>
> 1. **Java JDK (17 recommended)**  
>    - Modern versions of Apache Spark (3.4 and newer) fully support and often recommend **Java 17 or later**.  
>    - Make sure the `JAVA_HOME` environment variable points to the installed JDK.
>   
> 2. **Apache Spark**  
>    - Use a pre-built distribution such as *spark-3.x-bin-hadoop3*.  
>    - Set the `SPARK_HOME` environment variable and include `SPARK_HOME\bin` in your system `PATH`.  
>  
> 3. **WinUtils (Windows only)**  
>    - Windows requires `winutils.exe` inside the folder `SPARK_HOME\bin`.  
>    - This is needed for local Hadoop compatibility and file permission handling.  
>  
> 4. **PySpark** installed in your Python environment  
>    ```bash
>    pip install pyspark
>    ```
>  
> 5. (Optional) **PyArrow** for improved Parquet performance  
>    ```bash
>    pip install pyarrow
>    ```
>  
> If any of these components are missing or incorrectly configured, the Spark session may fail to start or may be unable to read Parquet files.


In [2]:
# Install dependencies (uncomment if needed)
#%pip install pyspark --quiet

In [2]:
import os

os.environ["JAVA_HOME"] = r"C:\Program Files\Eclipse Adoptium\jdk-17.0.16.8-hotspot"
os.environ["SPARK_HOME"] = r"C:\spark\spark-3.5.7-bin-hadoop3"
os.environ["HADOOP_HOME"] = r"C:\spark\spark-3.5.7-bin-hadoop3\hadoop"
os.environ["PATH"] = os.environ["HADOOP_HOME"] + r"\bin;" + os.environ["SPARK_HOME"] + r"\bin;" + os.environ["PATH"]

In [4]:
import findspark
findspark.init()

In [6]:
from pathlib import Path
from datetime import datetime
from pyspark.sql import SparkSession, functions as f

# --- Paths ---
DATA_DIR = Path("data")
DATA_DIR.mkdir(parents=True, exist_ok=True)
IN_PATH = str(DATA_DIR / "orders.parquet")
OUT_PARQUET = str(DATA_DIR / "orders_clean_spark.parquet")
OUT_CSV_DIR = str(DATA_DIR / "orders_clean_spark_csv")

# --- Create Spark session ---
spark = (
    SparkSession.builder
    .master('local[*]')
    .appName("ParquetETL-Spark")
    .getOrCreate()
)

spark

In [12]:
# If input doesn't exist, create a small sample with pandas and write Parquet
from pyspark.sql import functions as f

if not Path(IN_PATH).exists():
    import pandas as pd
    import numpy as np
    try:
        import pyarrow  # noqa: F401
        engine = "pyarrow"
    except Exception:
        engine = "fastparquet"

    rng = np.random.default_rng(42)
    n = 1000
    df_sample = pd.DataFrame({
        "order_id": range(1, n+1),
        "unit_price": rng.uniform(5, 500, size=n).round(2),
        "quantity": rng.integers(1, 10, size=n),
        "order_date": pd.date_range("2024-01-01", periods=n, freq="H"),
        "country": rng.choice(["Brazil","USA","Spain","Germany"], size=n, p=[0.4,0.3,0.2,0.1])
    })
    # Convert timestamp ns to us (Spark compatible)
    df_sample["order_date"] = df_sample["order_date"].astype("datetime64[us]")
    
    df_sample.to_parquet(IN_PATH, engine=engine, compression="snappy", index=False)
    print(f"Sample dataset written to {IN_PATH}")
else:
    print(f"Found existing dataset at {IN_PATH}")

Sample dataset written to data\orders.parquet


In [14]:
# 1) Read Parquet (Spark will push down column selection and filters later)
df = spark.read.parquet(IN_PATH)

# Preview & schema
df.show(5, truncate=False)
df.printSchema()

+--------+----------+--------+-------------------+-------+
|order_id|unit_price|quantity|order_date         |country|
+--------+----------+--------+-------------------+-------+
|1       |388.11    |8       |2024-01-01 00:00:00|USA    |
|2       |222.24    |1       |2024-01-01 01:00:00|Germany|
|3       |430.01    |4       |2024-01-01 02:00:00|Brazil |
|4       |350.2     |5       |2024-01-01 03:00:00|USA    |
|5       |51.62     |8       |2024-01-01 04:00:00|Spain  |
+--------+----------+--------+-------------------+-------+
only showing top 5 rows

root
 |-- order_id: long (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- quantity: long (nullable = true)
 |-- order_date: timestamp_ntz (nullable = true)
 |-- country: string (nullable = true)



In [16]:
# 2) Processing: filters and derived column
# - Keep country == 'Brazil'
# - Keep quantity >= 2
# - total_value = unit_price * quantity

if "country" in df.columns:
    df = df.filter(f.col("country") == f.lit("Brazil"))

if "quantity" in df.columns:
    df = df.filter(f.col("quantity") >= f.lit(2))

if "unit_price" in df.columns and "quantity" in df.columns:
    df = df.withColumn(
        "total_value",
        (f.col("unit_price").cast("double") * f.col("quantity").cast("double")).cast("double")
    )

df.show(5, truncate=False)

+--------+----------+--------+-------------------+-------+-----------+
|order_id|unit_price|quantity|order_date         |country|total_value|
+--------+----------+--------+-------------------+-------+-----------+
|3       |430.01    |4       |2024-01-01 02:00:00|Brazil |1720.04    |
|9       |68.42     |8       |2024-01-01 08:00:00|Brazil |547.36     |
|11      |188.55    |5       |2024-01-01 10:00:00|Brazil |942.75     |
|12      |463.75    |4       |2024-01-01 11:00:00|Brazil |1855.0     |
|16      |117.48    |3       |2024-01-01 15:00:00|Brazil |352.44     |
+--------+----------+--------+-------------------+-------+-----------+
only showing top 5 rows



In [18]:
# Optional: aggregation (daily sales)
if "order_date" in df.columns and "total_value" in df.columns:
    df = df.withColumn("order_ts", f.to_timestamp("order_date"))
    daily = (df
             .withColumn("order_date_only", f.to_date("order_ts"))
             .groupBy("order_date_only")
             .agg(f.sum("total_value").alias("daily_sales"))
             .orderBy("order_date_only"))
    daily.show(5, truncate=False)

+---------------+------------------+
|order_date_only|daily_sales       |
+---------------+------------------+
|2024-01-01     |12167.53          |
|2024-01-02     |20996.5           |
|2024-01-03     |13488.109999999999|
|2024-01-04     |5912.37           |
|2024-01-05     |20238.03          |
+---------------+------------------+
only showing top 5 rows



In [20]:
# 3) Save results
# Parquet (partitioned by order_year if order_date exists)
if "order_date" in df.columns:
    df = df.withColumn("order_ts", f.to_timestamp("order_date"))
    df = df.withColumn("order_year", f.year("order_ts"))
    (df.write
       .mode("overwrite")
       .partitionBy("order_year")
       .parquet(OUT_PARQUET))
else:
    df.write.mode("overwrite").parquet(OUT_PARQUET)

# CSV (coalesce to a single file for demo readability)
(df.coalesce(1)
   .write
   .mode("overwrite")
   .option("header", "true")
   .csv(OUT_CSV_DIR))

print("Wrote (Parquet folder):", OUT_PARQUET)
print("Wrote (CSV folder):", OUT_CSV_DIR)

Wrote (Parquet folder): data\orders_clean_spark.parquet
Wrote (CSV folder): data\orders_clean_spark_csv


In [22]:
# Stop the session (free resources)
spark.stop()