# CS 451/651: Data-Intensive Distributed Computing (Fall 2025)
# Assignment 2: ETL

Read the setup of this assignment in the <a href="https://lintool.github.io/cs451-2025f/assignments/assignment2.html">assignment 2 landing page</a> before you get started here.

## 1. Querying the Operational Database

Let's run a query to verify that the operational database has been properly restored and that we can issue a query to PostgreSQL:

In [None]:
!psql "cs451" -v ON_ERROR_STOP=1 -c "SELECT COUNT(DISTINCT user_id) AS number_users FROM retail.user;"

**The correct answer should be 3022290.**

If running the cell above gives you the same answer, the everything should be in order.

If you're getting an error, fix it before moving on.

As a warmup exercise, write SQL queries against the operational database to answer the following questions and report the answers.
Place both your SQL queries and answers in the following cell, replacing the placeholder texts that exist there.
Each question needs to be answered by a _single_ SQL query (that is, it is not acceptable to run multiple SQL queries and then compute the answer yourself).

1. For `session_id` `789d3699-028e-4367-b515-b82e2cb5225f`, what was the purchase price?
2. How many products are sold by the brand "sokolov"?
3. What is the average purchase price of items purchased from the brand "febest"?
4. What is average number of events per user? (Report answer to two digits after the decimal point, i.e., XX.XX)

<font color="red">**Below, you'll have to write some code!**</font>

// qcell_1b76x2 (keep this id for tracking purposes)

**Q1 SQL:**

SELECT....

**Q1 answer:**

XXXX

**Q2 SQL:**

SELECT....

**Q2 answer:**

XXXX

**Q3 SQL:**

SELECT....

**Q3 answer:**

XXXX

**Q4 SQL:**

SELECT....

**Q4 answer:**

XXXX


## 2. Setup

The following cell contains setup to measure wall clock time and memory usage. (Don't worry about the details, just run the cell)

In [None]:
!pip install -U numpy pandas pyarrow matplotlib scipy
import sys, subprocess
try:
    import psutil  # noqa: F401
except Exception:
    subprocess.check_call([sys.executable, "-m", "pip", "install", "psutil"])
print("psutil is installed.")


from IPython.core.magic import register_cell_magic
import time, os, platform

# Try to import optional modules
try:
    import psutil
except Exception:
    psutil = None

try:
    import resource  # not available on Windows
except Exception:
    resource = None


def _rss_bytes():
    """Resident Set Size in bytes (cross-platform via psutil if available)."""
    if psutil is not None:
        return psutil.Process(os.getpid()).memory_info().rss
    # Fallback: unknown RSS → 0 
    return 0


def _peak_bytes():
    """
    Best-effort peak memory in bytes.
    - Windows: psutil peak working set (peak_wset)
    - Linux:   resource.ru_maxrss (KB → bytes)
    - macOS:   resource.ru_maxrss (bytes)
    Fallback to current RSS if unavailable.
    """
    sysname = platform.system()

    # Windows path: use psutil peak_wset if present
    if sysname == "Windows" and psutil is not None:
        mi = psutil.Process(os.getpid()).memory_info()
        peak = getattr(mi, "peak_wset", None)  # should be available on Windows
        if peak is not None:
            return int(peak)
        return int(mi.rss)

    # POSIX path: resource may be available
    if resource is not None:
        try:
            ru = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss
            # On Linux ru_maxrss is in kilobytes; on macOS/BSD it is bytes
            if sysname == "Linux":
                return int(ru) * 1024
            else:
                return int(ru)
        except Exception:
            pass

    # Last resort
    return _rss_bytes()


@register_cell_magic
def timemem(line, cell):
    """
    Measure wall time and memory around the execution of this cell.

        %%timemem
        <your code>

    Notes:
    - RSS = resident memory after the cell.
    - Peak is OS-dependent (see _peak_bytes docstring).
    """
    ip = get_ipython()

    rss_before  = _rss_bytes()
    peak_before = _peak_bytes()
    t0 = time.perf_counter()

    # Execute the cell body
    result = ip.run_cell(cell)

    t1 = time.perf_counter()
    rss_after  = _rss_bytes()
    peak_after = _peak_bytes()

    wall = t1 - t0
    rss_delta_mb  = (rss_after  - rss_before)  / (1024 * 1024)
    peak_delta_mb = (peak_after - peak_before) / (1024 * 1024)

    print("======================================")
    print(f"Wall time: {wall:.3f} s")
    print(f"RSS Δ: {rss_delta_mb:+.2f} MB")
    print(f"Peak memory Δ: {peak_delta_mb:+.2f} MB (OS-dependent)")
    print("======================================")

    return result

## 3. The "Extract" in ETL

The operational database comprises the tables described [here](https://lintool.github.io/cs451-2025f/assignments/assignment2.html).

For the "Extract" in ETL, we're going to extract the following CSV files, each corresponding to a table in the operational database:

- **user.csv**: `user_id, gender, birthdate`
- **session.csv**: `session_id, user_id`
- **product.csv**: `product_id, brand, category, product_name`
- **product_name.csv**: `category, product_name, description`
- **events.csv**: `event_time, event_type, session_id, product_id, price`
- **category.csv**: `category, description`
- **brand.csv**: `brand, description`

From these files, we'll build a data warehouse organized in a standard star schema that has the following tables:

- Dimension tables: `dim_user`, `dim_age`, `dim_brand`, `dim_category`, `dim_product`, `dim_date`, `dim_session`
- The main fact table `fact_events` with foreign keys: `date_key, user_key, age_key, product_key, brand_key, category_key, session_key`


Let's specify a "base directory":

In [None]:
# Change to path on your local machine.
BASE_DIR = "/Users/jimmylin/cs451/a2"

These are the commands that perform the "extraction":

In [None]:
!psql cs451 -v ON_ERROR_STOP=1 -c '\copy "retail"."user"         TO '\''{BASE_DIR}/user.csv'\''         WITH (FORMAT csv, HEADER true)'
!psql cs451 -v ON_ERROR_STOP=1 -c '\copy "retail"."session"      TO '\''{BASE_DIR}/session.csv'\''      WITH (FORMAT csv, HEADER true)'
!psql cs451 -v ON_ERROR_STOP=1 -c '\copy "retail"."category"     TO '\''{BASE_DIR}/category.csv'\''     WITH (FORMAT csv, HEADER true)'
!psql cs451 -v ON_ERROR_STOP=1 -c '\copy "retail"."brand"        TO '\''{BASE_DIR}/brand.csv'\''        WITH (FORMAT csv, HEADER true)'
!psql cs451 -v ON_ERROR_STOP=1 -c '\copy "retail"."product_name" TO '\''{BASE_DIR}/product_name.csv'\'' WITH (FORMAT csv, HEADER true)'
!psql cs451 -v ON_ERROR_STOP=1 -c '\copy "retail"."product"      TO '\''{BASE_DIR}/product.csv'\''      WITH (FORMAT csv, HEADER true)'
!psql cs451 -v ON_ERROR_STOP=1 -c '\copy "retail"."events"       TO '\''{BASE_DIR}/events.csv'\''       WITH (FORMAT csv, HEADER true)'

(Note that the quote style above will _not_ work for Windows machines. Please adjust accordingly.)

After the extraction, you should have 7 CSV files, each corresponding to a table in the operational database.

The CSV files should be stored in `BASE_DIR`.

The following code snippet should "just work" to initialize Spark.

In [None]:
import findspark, os

# Change to path on your local machine.
os.environ["SPARK_HOME"] = "/Users/jimmylin/Dropbox/workspace/teaching/spark-4.0.0-bin-hadoop3"
findspark.init()

import os
from pyspark.sql import SparkSession, functions as F, types as T
from pyspark.sql.functions import col

py = sys.executable  # the Python of this notebook (e.g., .../envs/yourenv/bin/python)
os.environ["PYSPARK_DRIVER_PYTHON"] = py
os.environ["PYSPARK_PYTHON"] = py

spark = SparkSession.getActiveSession() or (
    SparkSession.builder
    .appName("A2")
    .master("local[*]")
    .config("spark.driver.memory", "8g")           # or 12g+
    .config("spark.sql.shuffle.partitions","400")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.pyspark.driver.python", py)
    .config("spark.pyspark.python", py)
    .config("spark.executorEnv.PYSPARK_PYTHON", py)
    .getOrCreate()
)

spark

At this point, Spark should be initialized.

Let's then load in CSV files into DataFrames.

<font color="red">**Below, you'll have to write some code!**</font>

In [None]:
%%timemem
# codecell_30z8le (keep this id for tracking purposes)

# TODO: Write your code below, but do not remove any lines already in this cell.


# By the time we get to here, we've loaded each of the CSV files into a corresponding dataframe.
# Let's count the number of records in each:

print(f"user: {df_user.count()}")
print(f"session: {df_session.count()}")
print(f"product: {df_product.count()}")
print(f"product_name: {df_product_name.count()}")
print(f"events: {df_events.count()}")
print(f"category: {df_category.count()}")
print(f"brand: {df_brand.count()}")

How do you know if you've done everything correctly?

Well, issue the SQL query `select count(*) from retail.user;` to count the number of rows in the `user` table in the operational database.
It should match the output of `df_user.count()`; same for the other tables.
If the counts match, then you know everything is in order.

## 4. Build the Dimensions Tables

### 4.1 The `user` Dimension Table

Build the `dim_user` dimension table.
This table should include `user_key`, `user_id`, `gender`, `birthdate`, and `generation`. 

Set `generation` to one of the following values based on the birth year: 
- "Traditionalists": born 1925 to 1945
- "Boomers": born 1946 to 1964
- "GenX": born 1965 to 1980
- "Millennials": born 1981 to 2000
- "GenZ": born 2001 to 2020

<font color="red">**Below, you'll have to write some code!**</font>

In [None]:
%%timemem
# codecell_41ax14 (keep this id for tracking purposes)

# TODO: Write your code below, but do not remove any lines already in this cell.


# By the time we get to here, "dim_user" should hold the user dimensions table according to the specification above.

dim_user.count()

**The correct answer should be 3022290.**

### 4.2 The `age` Dimension Table

Even though `birthdate` exists in `dim_user`, a separate `dim_age` is helpful because it:
- Simplifies analysis with ready-made bands.
- Ensures consistency across all queries.
- Improves performance via small surrogate keys.
- Preserves history by fixing age at event time.
- Adds flexibility to adjust bands without changing facts.

We're going to build a `dim_age` table that has 4 columns:
- `age_key`: (INT, surrogate PK)
- `age_band`: (STRING) following the age band rules below
- `min_age`: (INT)
- `max_age`: (INT)

Bands:
- "<18": min_age = NULL, max_age = 17
- "18-24": 18, 24
- "25-34": 25, 34
- "35-44": 35, 44
- "45-54": 45, 54
- "55-64": 55, 64
- "65-74": 65, 74
- "75-84": 75, 84
- "85-94": 85, 94
- "unknown": NULL, NULL

The construction of this table is a bit tricky, so we're going to show you how to do it, as follows:

In [None]:
%%timemem

# Static age bands
age_band_rows = [
    ("<18",   None, 17),
    ("18-24", 18, 24),
    ("25-34", 25, 34),
    ("35-44", 35, 44),
    ("45-54", 45, 54),
    ("55-64", 55, 64),
    ("65-74", 65, 74),
    ("75-84", 75, 84),
    ("85-94", 85, 94),
    ("unknown", None, None),
]
dim_age = spark.createDataFrame(age_band_rows, ["age_band", "min_age", "max_age"])

w_age = Window.orderBy(F.col("age_band"))
dim_age = dim_age.withColumn("age_key", F.dense_rank().over(w_age))

dim_age.count()

**The correct answer should be 10.**

### 4.3 The `brand`, `product`, and `category` Dimension Tables

Build the following dimension tables:

**`dim_brand`:**
- `brand_key` (INT, surrogate PK)
- `brand_code` (STRING) 
- `brand_desc` (STRING)

**`dim_category`:**
- `category_key` (INT, surrogate PK)
- `category_code` (STRING) 
- `category_desc` (STRING)

**`dim_product`:**
- `product_key`  (INT, surrogate PK)
- `product_id`   (STRING)
- `product_desc` (STRING)
- `brand_key`   (INT, FK → `dim_brand`)  
- `category_key`(INT, FK → `dim_category`)

The objective of `dim_product` is to keep all products in `product`, and add details from `product_names`, then join the results with `brand` and `category` dimension tables.

<font color="red">**Below, you'll have to write some code!**</font>

In [None]:
%%timemem
# codecell_43k3n9 (keep this id for tracking purposes)

# TODO: Write your code below, but do not remove any lines already in this cell.


# By the time we get to here, "dim_brand", "dim_category", and "dim_product" should hold 
# the dimension tables according to the specifications above.

print(f"Number of rows in dim_brand: {dim_brand.count()}")
print(f"Number of rows in dim_category: {dim_category.count()}")
print(f"Number of rows in dim_product: {dim_product.count()}")

**Correct answers:**

+ Number of rows in `dim_brand`: 3444
+ Number of rows in `dim_category`: 13
+ Number of rows in `dim_product`: 166794

### 4.4  The `date` Dimension Table

This table is expected to have one row per calendar date. 

**`dim_date`:**
- `date_key`     (INT, surrogate PK; format YYYYMMDD)
- `date`         (DATE, the actual calendar date)
- `day`          (INT, 1–31)
- `day_of_week`  (INT, 1=Mon … 7=Sun)
- `day_name`     (STRING, e.g., Monday)
- `is_weekend`   (BOOLEAN)
- `week_of_year` (INT, 1–53, ISO week)
- `month`        (INT, 1–12)
- `month_name`   (STRING, e.g., January)
- `quarter`      (INT, 1–4)
- `year`         (INT)


There are 2025 years, each with 365 days. Do we need to have a table that big? 
We can, but we do not have to! 

Instead, follow these instructions to create only as many rows as we need:

1. Determine the date range (from the min and max `event_date` in `df_events`).
2. Generate all dates in that range with `F.sequence()`.
3. Derive attributes (`day`, `day_of_week`, ...).
4. Create `date_key` = `year * 10000 + month * 100 + day` (i.e., YYYYMMDD).
5. Assign `date_key` as the surrogate PK.

Build the `dim_date` table conforming to the specifications above.

<font color="red">**Below, you'll have to write some code!**</font>

In [None]:
%%timemem
# codecell_44qm5c (keep this id for tracking purposes)

# TODO: Write your code below, but do not remove any lines already in this cell.


# By the time we get to here, "dim_date" should hold the dates dimension table according to the specification above.

print(dim_date.count())

**The correct answer should be 32.**

If you reach here, congratulations!
You have created all the dimension tables!

In [None]:
%%timemem

print(f"dim_user: {dim_user.count()}")
print(f"dim_age: {dim_age.count()}")
print(f"dim_brand: {dim_brand.count()}")
print(f"dim_category: {dim_category.count()}")
print(f"dim_product: {dim_product.count()}")
print(f"dim_date: {dim_date.count()}")

**Correct answers:**

- `dim_user`: 3022290
- `dim_age`: 10
- `dim_brand`: 3444
- `dim_category`: 13
- `dim_product`: 166794
- `dim_date`: 31

## 5. Build the Fact Table

Now it's time to build the fact table!

Our goal in this step is to create a clean `fact_events` table that joins the events from the operational database to the dimension tables you've just built above.
Along the way, we're going to enforce data quality and do a bit of data cleaning.

### 5.1 Clean Events

Create `events_clean` by removing any record that "does not make sense".
Specifically:

- Start from the `df_events` DataFrame.
- Keep only rows with non-null timestamps, `session_id`, and `product_id`.
- Cast price to double; keep `NULL` prices (views/carts can be price-less) and non-negative values only.
- Drop dates in the future.
- Restrict to valid event types: `view`, `cart`, `purchase`, `remove`.

<font color="red">**Below, you'll have to write some code!**</font>

In [None]:
%%timemem
# codecell_51ep7v (keep this id for tracking purposes)

from pyspark.sql import functions as F
from functools import reduce
from operator import and_ as AND

valid_types = ["view", "cart", "purchase", "remove"]

# TODO: Write your code below, but do not remove any lines already in this cell.


# By the time we get to here, "events_clean" should conform to the specification above.

events_clean.count()

### 5.2 Cap Silly Prices

Next, let us check some statistics about prices and then decide what we want to do.

What is the minimum, maximum, and average price in this database?

<font color="red">**Below, you'll have to write some code!**</font>

In [None]:
%%timemem
# codecell_52hg6x (keep this id for tracking purposes)

# TODO: Write your code below, but do not remove any lines already in this cell.


# By the time we get to here, "minimum", "maximum", and "average" should conform to the specification above.

print(f"minimum: {minimum}")
print(f"maximum: {maximum}")
print(f"average: {average}")

Wait, something's not right! 
The average price is 864.27 but the maximum seems suss...
It is possible these high prices are just errors.

For simplicity, let us assume a threshold value equal to 100x the average, and remove anything more than that.
Filter `events_clean` as described.

<font color="red">**Below, you'll have to write some code!**</font>

In [None]:
%%timemem
# codecell_52bf5d (keep this id for tracking purposes)

# TODO: Write your code below, but do not remove any lines already in this cell.


# By the time we get to here, "events_clean" should conform to the specification above.

events_clean.count()

Good, we still have about 42.4M records, but we've done some basic data cleaning.
Let us continue...

### 5.3 Build Tiny Lookup Tables (LKPs)

Create lookup tables that help us connect `events_clean` with the dimension tables we created:

- `user_lkp`: (`user_id` → `user_key`) from `dim_user`.
- `prod_lkp`:(`product_id` → `product_key`, `brand_key`, `category_key`) from `dim_product`.
- `date_lkp`: (`date` → `date_key`) from `dim_date`.
- session-to-user bridge: use the raw `df_session` (`session_id`, `user_id`) CSV (not a dimension) to pull `user_id`.

**Hint:** These LKPs are just calling `select` from the right sources with the right parameters.

<font color="red">**Below, you'll have to write some code!**</font>

In [None]:
%%timemem
# codecell_53l2kp (keep this id for tracking purposes)

# TODO: Write your code below, but do not remove any lines already in this cell.


# By the time we get to here, the following variables should conform to the specification above.

print(session_bridge.count(), user_lkp.count(), prod_lkp.count(), date_lkp.count())

### 5.4 Join Everything Together

Finally, join everything together to create `fact_events`.
Follow the following steps:

- Start from `clean events` with these columns: (`event_time`, `event_type`, `session_id`, `product_id`, `price`, `date`).
- Join sessions first (to get `user_id`).
- Then join product, date, and user.
- Join with `dim_user` to find out the birthdate and compute user age at the day of the event in `age_on_event`.
- Join with `dim_age` to find the age band based on `age_on_event`.

**Hints:**

- You built the LKPs for a reason... use them.
- Left, right, or natural joins?

The final part above is a bit tricky, so we'll just give you the answer. But you'll need to figure out how it integrates with everything above.

```
        .withColumn("age_on_event", F.floor(F.months_between(F.col("date"), F.to_date("birthdate"))/12))
        .join(
           dim_age.select("age_key", "age_band", "min_age", "max_age"),
           (
               ((F.col("age_on_event") > F.col("min_age"))) &
               ((F.col("age_on_event") <= F.col("max_age")))
           ),
           "left"
       )
```

The final result (`fact_events`) should include the following columns:

- `date_key`
- `user_key`
- `age_key`
- `product_key`
- `brand_key`
- `category_key`
- `session_id`
- `event_time`
- `event_type`
- `price`

<font color="red">**Below, you'll have to write some code!**</font>

In [None]:
%%timemem
# codecell_54aaaa (keep this id for tracking purposes)

# TODO: Write your code below, but do not remove any lines already in this cell.


# By the time we get to here, "fact_events" should conform to the specification above.

print(fact_events.count())

Congrats, you've done it!
You've created the fact table successfuly! 🚀

Here is the summary of the schema:

- `date_key` (FK → `dim_date`)
- `user_key` (FK → `dim_user`)
- `age_key`  (FK → `dim_age`)
- `product_key` (FK → `dim_product`)
- `brand_key` (FK → `dim_brand`)
- `category_key` (FK → `dim_category`)
- `session_id` (STRING, business key, kept directly in this table)
- `event_time` (TIMESTAMP)
- `event_tpe` (STRING)
- `price` (DOUBLE)


## 6. Export the Fact Table

You now have a shiny `fact_events` table!
But how should you store it?
(Remember our discussion in class about row vs. column representations?)

Let's store `fact_events` in a few different ways and compare data sizes.

First, let's try writing out as CSV files, both compressed and uncompressed, per below.

Note that in Spark, we specify the output _directory_, which is then populated with many "part" files.

In [None]:
fact_events.write.mode("overwrite").option("header", True).csv(BASE_DIR + "/fact_events.csv")
fact_events.write.mode("overwrite").option("header", True).option("compression", "snappy").csv(BASE_DIR + "/fact_events.csv.snappy")

Let's then try Parquet:

In [None]:
fact_events.write.mode("overwrite").parquet(BASE_DIR + "/fact_events.parquet")

Let's compare the output sizes using the following bit of code:

In [None]:
import os
for f in [BASE_DIR + "/fact_events.csv", BASE_DIR + "/fact_events.csv.snappy", BASE_DIR + "/fact_events.parquet"]:
    try:
        size = sum(os.path.getsize(os.path.join(dp, fn))
                   for dp, dn, filenames in os.walk(f)
                   for fn in filenames)
        print(f"{f}: {size/(1024*1024*1024):.1f} GB")
    except FileNotFoundError:
        pass

<font color="red">**Put your answers below! Replace X.X with the actual answer.**</font>

// qcell_6a9876 (keep this id for tracking purposes)

- **Size of CSV output, no compression:** <font color="red">X.X</font> GB
- **Size of CSV output, Snappy compression:** <font color="red">X.X</font> GB
- **Size of Parquet output:** <font color="red">X.X</font> GB 

**Answer the following question:**

Q6.1 Why is columnar storage (Parquet) usually much smaller?

Q6.2 Which format is better for analytical queries and why?

<font color="red">**Put your answers below!**</font>

// qcell_6b1234 (keep this id for tracking purposes)

**Q6.1 Answer:**

<font color="red">blah blah (replace with your actual answer)</font>

**Q6.2 Answer:**

<font color="red">blah blah (replace with your actual answer)</font>


## 7. Submission

Details about the submission of this assignment are outlined in the <a href="https://lintool.github.io/cs451-2025f/assignments/assignment2.html">assignment 2 landing page</a>.

In [None]:
%%timemem
spark.stop()