In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SparkEnrichment") \
    .master("local[2]") \
    .config("spark.driver.memory", "2g") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
spark.range(5).show()


+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



### Load Cleaned Transactions Dataset

This cell loads the cleaned transaction data produced in the previous notebook.

The dataset is stored in Parquet format for efficient reading and consistent schema.
A smal


In [3]:
transactions = spark.read.parquet("../data_clean/transactions")

transactions.show(5)
transactions.count()


+----------+--------+-----------------+---------+-------------+--------+--------------------+--------------------+----------+
|product_id|order_id|add_to_cart_order|reordered|department_id|aisle_id|        product_name|               aisle|department|
+----------+--------+-----------------+---------+-------------+--------+--------------------+--------------------+----------+
|     48370|       5|                6|        1|           17|      54|Sensitive Toilet ...|         paper goods| household|
|     10096|      14|                8|        1|            1|      58|Corn Meal Pizza C...|frozen breads doughs|    frozen|
|     16974|      35|                4|        0|           19|     107|Sea Salt Brown Ri...|      chips pretzels|    snacks|
|     16974|      61|               12|        1|           19|     107|Sea Salt Brown Ri...|      chips pretzels|    snacks|
|     18306|      84|                9|        0|            7|      94|Chai Green Tea Ba...|                 tea| bev

32434489

### Load External Pricing Data

This cell loads external pricing data that simulates information retrieved from an API.

The pricing dataset contains average product prices by department.
This data will be used to enrich the transaction dataset with monetary values.

Displaying the data helps verify that the pricing information was loaded correctly.


In [4]:
prices_df = spark.read.csv(
    "../data_raw/api_prices.csv",
    header=True
)

prices_df.show()


+----------+---------+
|department|avg_price|
+----------+---------+
|   produce|      1.5|
|dairy eggs|      2.5|
|    snacks|      2.0|
| beverages|      1.8|
|    frozen|      3.0|
+----------+---------+



### Enrich Transactions with Pricing Information

This cell joins the cleaned transaction data with external pricing data.

- Transactions are joined with prices using the `department` field
- A left join ensures all transactions are retained
- Average price information is added to each transaction

This enrichment step enables revenue-based analysis in later stages.


In [5]:
from pyspark.sql.functions import col

transactions_enriched = transactions.join(
    prices_df,
    on="department",
    how="left"
)

transactions_enriched.show(5)


+----------+----------+--------+-----------------+---------+-------------+--------+--------------------+--------------------+---------+
|department|product_id|order_id|add_to_cart_order|reordered|department_id|aisle_id|        product_name|               aisle|avg_price|
+----------+----------+--------+-----------------+---------+-------------+--------+--------------------+--------------------+---------+
| household|     48370|       5|                6|        1|           17|      54|Sensitive Toilet ...|         paper goods|     NULL|
|    frozen|     10096|      14|                8|        1|            1|      58|Corn Meal Pizza C...|frozen breads doughs|      3.0|
|    snacks|     16974|      35|                4|        0|           19|     107|Sea Salt Brown Ri...|      chips pretzels|      2.0|
|    snacks|     16974|      61|               12|        1|           19|     107|Sea Salt Brown Ri...|      chips pretzels|      2.0|
| beverages|     18306|      84|                

### Clean and Standardize Pricing Data

This cell ensures the pricing information is usable for analysis.

- The `avg_price` column is cast to a numeric type
- Missing prices are filled with a default average value

This step prevents errors in downstream calculations and ensures all transactions have a monetary value.


In [6]:
transactions_enriched = transactions_enriched.withColumn(
    "avg_price",
    col("avg_price").cast("double")
)

transactions_enriched = transactions_enriched.fillna(
    {"avg_price": 2.0}
)


### Create Sample Dataset for Analysis

This cell creates a smaller, representative sample of the enriched transaction data.

Sampling reduces data size, making experimentation and analysis faster.
A fixed random seed is used to ensure the sample is reproducible.

This sampled dataset is intended for data science and analysis tasks.


In [7]:
transactions_enriched = transactions_enriched.sample(
    fraction=0.1,   # 10% sample
    seed=42
)


### Optimize Data Partitions for Local Processing

This cell adjusts the number of partitions in the enriched dataset.

Repartitioning helps balance workload and improves performance when working locally.
Using a small number of partitions is suitable for development and analysis on a single machine.


In [8]:
transactions_enriched = transactions_enriched.repartition(2)


### Prepare Output Directory for Enriched Data

This cell ensures that the directory for storing enriched datasets exists.

Creating the directory in advance prevents write errors when saving data.
This step supports a clean and organized project structure.


In [9]:
import os
os.makedirs("../data_enriched", exist_ok=True)


### Persist Enriched Transactions Dataset

This cell writes the enriched transaction data to disk in Parquet format.

The dataset now includes pricing information and is optimized for analytics.
Overwrite mode allows the dataset to be regenerated during development.

This output represents the final handoff from data engineering to data science and analysis.


In [10]:
transactions_enriched.write \
    .mode("overwrite") \
    .parquet("../data_enriched/transactions_enriched")


                                                                                

### Verify Enriched Data Output Files

This cell lists the contents of the enriched data directories.

It confirms that the Parquet files were successfully written to disk.
Seeing multiple part files indicates that Spark saved the data correctly in distributed format.


In [11]:
import os
os.listdir("../data_enriched")
os.listdir("../data_enriched/transactions_enriched")


['part-00000-68bc476e-dd89-4f66-acc7-00b26880e312-c000.snappy.parquet',
 '._SUCCESS.crc',
 '.part-00001-68bc476e-dd89-4f66-acc7-00b26880e312-c000.snappy.parquet.crc',
 'part-00001-68bc476e-dd89-4f66-acc7-00b26880e312-c000.snappy.parquet',
 '_SUCCESS',
 '.part-00000-68bc476e-dd89-4f66-acc7-00b26880e312-c000.snappy.parquet.crc']