<a href="https://colab.research.google.com/github/susanchen475/mgmt467-analytics-portfolio/blob/main/Unit2_Lab1_DataQuality.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# MGMT 467 — Prompt-Driven Lab (with Commented Examples)
## Kaggle ➜ Google Cloud Storage ➜ BigQuery ➜ Data Quality (DQ)

**How to use this notebook**
- Each section gives you a **Build Prompt** to paste into Gemini/Vertex AI (or Gemini in Colab).
- Below each prompt, you’ll see a **commented example** of what a good LLM answer might look like.
- **Do not** just uncomment and run. Use the prompt to generate your own code, then compare to the example.
- After every step, run the **Verification Prompt**, and write the **Reflection** in Markdown.

> Goal today: Download the Netflix dataset (Kaggle) → Stage on GCS → Load into BigQuery → Run DQ profiling (missingness, duplicates, outliers, anomaly flags).


### Academic integrity & LLM usage
- Use the prompts here to generate your own code cells.
- Read concept notes and write the reflection answers in your own words.
- Keep credentials out of code. Upload `kaggle.json` when asked.


## Learning objectives
1) Explain **why** we stage data in GCS and load it to BigQuery.  
2) Build an **idempotent**, auditable pipeline.  
3) Diagnose **missingness**, **duplicates**, and **outliers** and justify cleaning choices.  
4) Connect DQ decisions to **business/ML impact**.


## 0) Environment setup — What & Why
Authenticate Colab to Google Cloud so we can use `gcloud`, GCS, and BigQuery. Set **PROJECT_ID** and **REGION** once for consistency (cost/latency).

### Build Prompt (paste to LLM)
You are my cloud TA. Generate a single **Colab code cell** that:
1) Authenticates to Google Cloud in Colab,  
2) Prompts for `PROJECT_ID` via `input()` and sets `REGION="us-central1"` (editable),  
3) Exports `GOOGLE_CLOUD_PROJECT`,  
4) Runs `gcloud config set project $GOOGLE_CLOUD_PROJECT`,  
5) Prints both values. Add 2–3 comments explaining what/why.
End with a comment: `# Done: Auth + Project/Region set`.


In [45]:
# Authenticate to Google Cloud
from google.colab import auth
auth.authenticate_user() # This authenticates your Colab environment to Google Cloud

# Set Project ID and Region
import os
PROJECT_ID = input("Enter your GCP Project ID: ").strip() # Prompt user for Project ID
REGION = "us-central1"  # Set the default region

# Export GOOGLE_CLOUD_PROJECT environment variable
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID # Required by some gcloud commands

# Set active project for gcloud/BigQuery CLI
!gcloud config set project $GOOGLE_CLOUD_PROJECT

# Print Project ID and Region
print("Project:", PROJECT_ID, "| Region:", REGION)

# Done: Auth + Project/Region set

Enter your GCP Project ID: mgmt467-lab
Updated property [core/project].
Project: mgmt467-lab | Region: us-central1


### Verification Prompt
Generate a short cell that prints the active project using `gcloud config get-value project` and echoes the `REGION` you set.


**Reflection:** Why do we set `PROJECT_ID` and `REGION` at the top? What can go wrong if we don’t?

## 1) Kaggle API — What & Why
Use Kaggle CLI for reproducible downloads. Store `kaggle.json` at `~/.kaggle/kaggle.json` with `0600` permissions to protect secrets.

### Build Prompt
Generate a **single Colab code cell** that:
- Prompts me to upload `kaggle.json`,
- Saves to `~/.kaggle/kaggle.json` with `0600` permissions,
- Prints `kaggle --version`.
Add comments about security and reproducibility.


In [47]:
from google.colab import files
import os

# Prompt the user to upload the kaggle.json file
print("Please upload your kaggle.json file (from Kaggle > Account > Create New API Token)")
uploaded = files.upload()

# Ensure the .kaggle directory exists and set permissions
kaggle_dir = '/root/.kaggle'
os.makedirs(kaggle_dir, exist_ok=True)

# Save the uploaded file to the correct location
for filename in uploaded.keys():
    file_path = os.path.join(kaggle_dir, 'kaggle.json')
    with open(file_path, 'wb') as f:
        f.write(uploaded[filename])

    # Set file permissions to 0600 (owner read/write only) for security
    os.chmod(file_path, 0o600)
    print(f"Saved {filename} to {file_path} with permissions 0600.")

# Verify the Kaggle installation by printing the version
!kaggle --version

Please upload your kaggle.json file (from Kaggle > Account > Create New API Token)


Saving kaggle (4).json to kaggle (4) (1).json
Saved kaggle (4) (1).json to /root/.kaggle/kaggle.json with permissions 0600.
Kaggle API 1.7.4.5


In [48]:
# # EXAMPLE (from LLM) — Kaggle setup (commented)
# # from google.colab import files
# # print("Upload your kaggle.json (Kaggle > Account > Create New API Token)")
# # uploaded = files.upload()
# #
# # import os
# # os.makedirs('/root/.kaggle', exist_ok=True)
# # with open('/root/.kaggle/kaggle.json', 'wb') as f:
# #     f.write(uploaded[list(uploaded.keys())[0]])
# # os.chmod('/root/.kaggle/kaggle.json', 0o600)  # owner-only
# #
# # !kaggle --version

### Verification Prompt
Generate a one-liner that runs `kaggle --help | head -n 20` to show the CLI is ready.


In [49]:
!kaggle --help | head -n 20

usage: kaggle [-h] [-v] [-W]
              {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
              ...

options:
  -h, --help            show this help message and exit
  -v, --version         Print the Kaggle API version

commands:
  {competitions,c,datasets,d,kernels,k,models,m,files,f,config}
                        Use one of:
                        competitions {list, files, download, submit, submissions, leaderboard}
                        datasets {list, files, download, create, version, init, metadata, status}
                        kernels {list, files, init, push, pull, output, status}
                        models {instances, get, list, init, create, delete, update}
                        models instances {versions, get, files, init, create, delete, update}
                        models instances versions {init, create, download, delete, files}
                        config {view, set, unset}
    competitions (c)    Commands related to Kaggle compe

**Reflection:** Why require strict `0600` permissions on API tokens? What risks are we avoiding?

With 0600 permission it means that only the owner of the file can read or write it. It is an important step for security because API tokens are passwords that grant access to the account and resources. With this permission, we can avoid unauthorized access, token theft, and accidental modification or deletion.

## 2) Download & unzip dataset — What & Why
Keep raw files under `/content/data/raw` for predictable paths and auditing.
**Dataset:** `sayeeduddin/netflix-2025user-behavior-dataset-210k-records`

### Build Prompt
Generate a **Colab code cell** that:
- Creates `/content/data/raw`,
- Downloads the dataset to `/content/data` with Kaggle CLI,
- Unzips into `/content/data/raw` (overwrite OK),
- Lists all CSVs with sizes in a neat table.
Include comments describing each step.


In [50]:
# Create the raw data directory
!mkdir -p /content/data/raw

# Download the dataset using Kaggle CLI
# The dataset is downloaded to /content/data
!kaggle datasets download -d sayeeduddin/netflix-2025user-behavior-dataset-210k-records -p /content/data

# Unzip the downloaded dataset into the raw data directory
# -o flag means overwrite existing files
!unzip -o /content/data/*.zip -d /content/data/raw

# List all CSV files in the raw data directory with their sizes in a neat table
!ls -lh /content/data/raw/*.csv

Dataset URL: https://www.kaggle.com/datasets/sayeeduddin/netflix-2025user-behavior-dataset-210k-records
License(s): CC0-1.0
netflix-2025user-behavior-dataset-210k-records.zip: Skipping, found more recently modified local copy (use --force to force download)
Archive:  /content/data/netflix-2025user-behavior-dataset-210k-records.zip
  inflating: /content/data/raw/README.md  
  inflating: /content/data/raw/movies.csv  
  inflating: /content/data/raw/recommendation_logs.csv  
  inflating: /content/data/raw/reviews.csv  
  inflating: /content/data/raw/search_logs.csv  
  inflating: /content/data/raw/users.csv  
  inflating: /content/data/raw/watch_history.csv  
-rw-r--r-- 1 root root 114K Aug  2 19:36 /content/data/raw/movies.csv
-rw-r--r-- 1 root root 4.5M Aug  2 19:36 /content/data/raw/recommendation_logs.csv
-rw-r--r-- 1 root root 1.8M Aug  2 19:36 /content/data/raw/reviews.csv
-rw-r--r-- 1 root root 2.2M Aug  2 19:36 /content/data/raw/search_logs.csv
-rw-r--r-- 1 root root 1.6M Aug  2 1

In [51]:
# # EXAMPLE (from LLM) — Download & unzip (commented)
# # !mkdir -p /content/data/raw
# # !kaggle datasets download -d sayeeduddin/netflix-2025user-behavior-dataset-210k-records -p /content/data
# # !unzip -o /content/data/*.zip -d /content/data/raw
# # # List CSV inventory
# # !ls -lh /content/data/raw/*.csv

### Verification Prompt
Generate a snippet that asserts there are exactly **six** CSV files and prints their names.


In [52]:
import glob
import os

# List all CSV files in the raw data directory
csv_files = glob.glob('/content/data/raw/*.csv')

# Assert that there are exactly six CSV files
expected_files = 6
actual_files = len(csv_files)

assert actual_files == expected_files, f"Expected {expected_files} CSV files, but found {actual_files}"

# Print the names of the CSV files
print(f"Successfully found {actual_files} CSV files:")
for csv_file in csv_files:
    print(os.path.basename(csv_file))

Successfully found 6 CSV files:
movies.csv
reviews.csv
users.csv
recommendation_logs.csv
search_logs.csv
watch_history.csv


**Reflection:** Why is keeping a clean file inventory (names, sizes) useful downstream?

A clean file is important because it can keep a record of the raw data that we have, and if we want to run the process later, we know which files were used. Other reasons are to identify problems from the sources files, and validate if the files that are downloaded are incomlete. It also helps others or the owner itself to have a clear documentation of the raw data.

## 3) Create GCS bucket & upload — What & Why
Stage in GCS → consistent, versionable source for BigQuery loads. Bucket names must be **globally unique**.

### Build Prompt
Generate a **Colab code cell** that:
- Creates a unique bucket in `${REGION}` (random suffix),
- Saves name to `BUCKET_NAME` env var,
- Uploads all CSVs to `gs://$BUCKET_NAME/netflix/`,
- Prints the bucket name and explains staging benefits.


In [53]:
import uuid
import os

# Generate a unique bucket name
# Bucket names must be globally unique
bucket_name = f"mgmt467-netflix-{uuid.uuid4().hex[:8]}"
os.environ["BUCKET_NAME"] = bucket_name

# Create the GCS bucket in the specified region
# --location flag sets the bucket's geographic location
# Use 'US' for the multi-region location
print(f"Creating bucket: gs://{bucket_name} in region US")
!gcloud storage buckets create gs://$BUCKET_NAME --location=US

# Upload all CSV files from the raw data directory to the bucket
# The files will be placed under the 'netflix/' prefix in the bucket
print(f"Uploading CSV files to gs://{bucket_name}/netflix/")
!gcloud storage cp /content/data/raw/*.csv gs://$BUCKET_NAME/netflix/

# Print the bucket name
print("\nSuccessfully created bucket and uploaded files.")
print("Bucket Name:", bucket_name)

# Explain staging benefits
print("\nBenefits of staging data in GCS:")
print("- **Centralized Storage:** Provides a single, durable location for your raw data.")
print("- **Versionability:** GCS can be configured to keep versions of your data, aiding reproducibility and recovery.")
print("- **Accessibility:** Data in GCS is easily accessible by various Google Cloud services like BigQuery, Dataflow, Dataproc, etc.")
print("- **Cost-Effective:** Generally more cost-effective for large amounts of data compared to storing directly in BigQuery tables.")

Creating bucket: gs://mgmt467-netflix-f12cd620 in region US
Creating gs://mgmt467-netflix-f12cd620/...
Uploading CSV files to gs://mgmt467-netflix-f12cd620/netflix/
Copying file:///content/data/raw/movies.csv to gs://mgmt467-netflix-f12cd620/netflix/movies.csv
Copying file:///content/data/raw/recommendation_logs.csv to gs://mgmt467-netflix-f12cd620/netflix/recommendation_logs.csv
Copying file:///content/data/raw/reviews.csv to gs://mgmt467-netflix-f12cd620/netflix/reviews.csv
Copying file:///content/data/raw/search_logs.csv to gs://mgmt467-netflix-f12cd620/netflix/search_logs.csv
Copying file:///content/data/raw/users.csv to gs://mgmt467-netflix-f12cd620/netflix/users.csv
Copying file:///content/data/raw/watch_history.csv to gs://mgmt467-netflix-f12cd620/netflix/watch_history.csv

Average throughput: 38.7MiB/s

Successfully created bucket and uploaded files.
Bucket Name: mgmt467-netflix-f12cd620

Benefits of staging data in GCS:
- **Centralized Storage:** Provides a single, durable loc

In [54]:
# Verification: List the contents of the 'netflix/' prefix in the bucket
# and show object sizes.
import os
bucket_name = os.environ.get("BUCKET_NAME")
if bucket_name:
  print(f"Listing contents of gs://{bucket_name}/netflix/")
  !gcloud storage ls -l gs://$BUCKET_NAME/netflix/
else:
  print("BUCKET_NAME environment variable not set. Please run the previous cell.")

Listing contents of gs://mgmt467-netflix-f12cd620/netflix/
    115942  2025-10-23T05:40:20Z  gs://mgmt467-netflix-f12cd620/netflix/movies.csv
   4695557  2025-10-23T05:40:21Z  gs://mgmt467-netflix-f12cd620/netflix/recommendation_logs.csv
   1861942  2025-10-23T05:40:21Z  gs://mgmt467-netflix-f12cd620/netflix/reviews.csv
   2250902  2025-10-23T05:40:21Z  gs://mgmt467-netflix-f12cd620/netflix/search_logs.csv
   1606820  2025-10-23T05:40:21Z  gs://mgmt467-netflix-f12cd620/netflix/users.csv
   9269425  2025-10-23T05:40:21Z  gs://mgmt467-netflix-f12cd620/netflix/watch_history.csv
TOTAL: 6 objects, 19800588 bytes (18.88MiB)


In [55]:
## # EXAMPLE (from LLM) — GCS staging (commented)
#import uuid, os
#bucket_name = f"mgmt467-netflix-{uuid.uuid4().hex[:8]}"
#os.environ["BUCKET_NAME"] = bucket_name
#!gcloud storage buckets create gs://$BUCKET_NAME --location=$REGION
#!gcloud storage cp /content/data/raw/* gs://$BUCKET_NAME/netflix/
#print("Bucket:", bucket_name)
# # # Verify contents
#!gcloud storage ls gs://$BUCKET_NAME/netflix/

### Verification Prompt
Generate a snippet that lists the `netflix/` prefix and shows object sizes.


In [56]:
import os
bucket_name = os.environ.get("BUCKET_NAME")
if bucket_name:
  print(f"Listing contents of gs://{bucket_name}/netflix/")
  !gcloud storage ls -l gs://$BUCKET_NAME/netflix/
else:
  print("BUCKET_NAME environment variable not set. Please run the previous cell.")

Listing contents of gs://mgmt467-netflix-f12cd620/netflix/
    115942  2025-10-23T05:40:20Z  gs://mgmt467-netflix-f12cd620/netflix/movies.csv
   4695557  2025-10-23T05:40:21Z  gs://mgmt467-netflix-f12cd620/netflix/recommendation_logs.csv
   1861942  2025-10-23T05:40:21Z  gs://mgmt467-netflix-f12cd620/netflix/reviews.csv
   2250902  2025-10-23T05:40:21Z  gs://mgmt467-netflix-f12cd620/netflix/search_logs.csv
   1606820  2025-10-23T05:40:21Z  gs://mgmt467-netflix-f12cd620/netflix/users.csv
   9269425  2025-10-23T05:40:21Z  gs://mgmt467-netflix-f12cd620/netflix/watch_history.csv
TOTAL: 6 objects, 19800588 bytes (18.88MiB)


**Reflection:** Name two benefits of staging in GCS vs loading directly from local Colab.

When data is staged in GCS it is persisten and available to various google cloud services outside of the colab runtime. While if loading directly from colab, the data is only available within the session of the colab. Another benefit is the scalability and performance. Since GCS is designed for large scale data storage and retrieval, loading it from GCS is for performant and scalable, whereas Colab might gave limitations on local storage and network bandwidth.

## 4) BigQuery dataset & loads — What & Why
Create dataset `netflix` and load six CSVs with **autodetect** for speed (we’ll enforce schemas later).

### Build Prompt (two cells)
**Cell A:** Create (idempotently) dataset `netflix` in US multi-region; if it exists, print a friendly message.  
**Cell B:** Load tables from `gs://$BUCKET_NAME/netflix/`:
`users, movies, watch_history, recommendation_logs, search_logs, reviews`
with `--skip_leading_rows=1 --autodetect --source_format=CSV`.
Finish with row-count queries for each table.


In [57]:
# Cell A: Create BigQuery dataset 'netflix'
DATASET = "netflix"
# Attempt to create the dataset; ignore if it already exists
!bq --location=US mk -d --description "MGMT467 Netflix dataset" {DATASET} || echo "Dataset may already exist."

BigQuery error in mk operation: Dataset 'mgmt467-lab:netflix' already exists.
Dataset may already exist.


In [58]:
# Cell B: Load tables from GCS
tables = {
  "users": "users.csv",
  "movies": "movies.csv",
  "watch_history": "watch_history.csv",
  "recommendation_logs": "recommendation_logs.csv",
  "search_logs": "search_logs.csv",
  "reviews": "reviews.csv",
}

import os
bucket_name = os.environ.get("BUCKET_NAME")
DATASET = "netflix" # Ensure DATASET is defined
PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT") # Get the project ID


if bucket_name:
  for tbl, fname in tables.items():
    src = f"gs://{bucket_name}/netflix/{fname}"
    print(f"Loading table {DATASET}.{tbl} from {src}")
    !bq load --skip_leading_rows=1 --autodetect --source_format=CSV {DATASET}.{tbl} {src}
else:
  print("BUCKET_NAME environment variable not set. Please run the GCS bucket creation cell.")

# Row counts for verification
print("\nVerifying row counts:")
for tbl in tables.keys():
  print(f"Fetching row count for {DATASET}.{tbl}")
  # Construct the bq query command string using f-string for direct embedding
  !bq query --nouse_legacy_sql "SELECT '{tbl}' AS table_name, COUNT(*) AS n FROM \`{PROJECT_ID}.{DATASET}.{tbl}\`"

Loading table netflix.users from gs://mgmt467-netflix-f12cd620/netflix/users.csv
Waiting on bqjob_r1ae45da739d81c91_0000019a0f953ec6_1 ... (1s) Current status: DONE   
Loading table netflix.movies from gs://mgmt467-netflix-f12cd620/netflix/movies.csv
Waiting on bqjob_r3565871d6330011_0000019a0f955510_1 ... (1s) Current status: DONE   
Loading table netflix.watch_history from gs://mgmt467-netflix-f12cd620/netflix/watch_history.csv
Waiting on bqjob_r36c5aeeb0d55d019_0000019a0f956a12_1 ... (2s) Current status: DONE   
Loading table netflix.recommendation_logs from gs://mgmt467-netflix-f12cd620/netflix/recommendation_logs.csv
Waiting on bqjob_r78e4b7432e6c2f4f_0000019a0f9584e5_1 ... (2s) Current status: DONE   
Loading table netflix.search_logs from gs://mgmt467-netflix-f12cd620/netflix/search_logs.csv
Waiting on bqjob_r60419f01a38b5d6a_0000019a0f959e50_1 ... (2s) Current status: DONE   
Loading table netflix.reviews from gs://mgmt467-netflix-f12cd620/netflix/reviews.csv
Waiting on bqjob_r

In [59]:
# # EXAMPLE (from LLM) — BigQuery dataset (commented)
# DATASET="netflix"
# # # Attempt to create; ignore if exists
# !bq --location=US mk -d --description "MGMT467 Netflix dataset" $DATASET || echo "Dataset may already exist."

In [60]:
# # EXAMPLE (from LLM) — Load tables (commented)
# tables = {
 #"watch_history":
# "watch_history.csv",
#   "recommendation_logs": "recommendation_logs.csv",
#   "search_logs": "search_logs.csv",
#   "reviews": "reviews.csv",
# }
#import os
# for tbl, fname in tables.items():
#   src = f"gs://{os.environ['BUCKET_NAME']}/netflix/{fname}"
#   print("Loading", tbl, "from", src)
#   !bq load --skip_leading_rows=1 --autodetect --source_format=CSV $DATASET.$tbl $src

# # # Row counts
# for tbl in tables.keys():
#   !bq query --nouse_legacy_sql "SELECT '{tbl}' AS table_name, COUNT(*) AS n FROM `${GOOGLE_CLOUD_PROJECT}.netflix.{tbl}`".format(tbl=tbl)

### Verification Prompt
Generate a single query that returns `table_name, row_count` for all six tables in `${GOOGLE_CLOUD_PROJECT}.netflix`.


In [61]:
from google.cloud import bigquery
import os

PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT")

# Construct the SQL query string
query = f"""
SELECT 'users' AS table_name, COUNT(*) AS row_count FROM `{PROJECT_ID}.netflix.users`
UNION ALL
SELECT 'movies' AS table_name, COUNT(*) AS row_count FROM `{PROJECT_ID}.netflix.movies`
UNION ALL
SELECT 'watch_history' AS table_name, COUNT(*) AS row_count FROM `{PROJECT_ID}.netflix.watch_history`
UNION ALL
SELECT 'recommendation_logs' AS table_name, COUNT(*) AS row_count FROM `{PROJECT_ID}.netflix.recommendation_logs`
UNION ALL
SELECT 'search_logs' AS table_name, COUNT(*) AS row_count FROM `{PROJECT_ID}.netflix.search_logs`
UNION ALL
SELECT 'reviews' AS table_name, COUNT(*) AS row_count FROM `{PROJECT_ID}.netflix.reviews`
"""

# Initialize the BigQuery client
client = bigquery.Client(project=PROJECT_ID)

# Run the query and store the results in a DataFrame
query_job = client.query(query)
results = query_job.to_dataframe()

# Display the results
display(results)

Unnamed: 0,table_name,row_count
0,users,216300
1,movies,21840
2,reviews,309000
3,watch_history,2205000
4,recommendation_logs,1092000
5,search_logs,556500


**Reflection:** When is `autodetect` acceptable? When should you enforce explicit schemas and why?

It is acceptable fir initial data exploration and quick loading. It is convenient for rapid prototyping and when you don't need strict type enforcement from the start. But you should enforce explicit schemas when the data quality and consistency are critical. It should also be used when the data has complex or nested structure, since autodetec is only for quick loading. When building a production pipeline, schemas should be enforced to prevent risk of potential variations that may lead to inference errors and pipeline failures. Enforce explicit schemas for reliability, data quality, performance, and maintainability in more mature data pipelines.

## 5) Data Quality (DQ) — Concepts we care about
- **Missingness** (MCAR/MAR/MNAR). Impute vs drop. Add `is_missing_*` indicators.
- **Duplicates** (exact vs near). Double-counted engagement corrupts labels & KPIs.
- **Outliers** (IQR). Winsorize/cap vs robust models. Always **flag** and explain.
- **Reproducibility**. Prefer `CREATE OR REPLACE` and deterministic keys.


### 5.1 Missingness (users) — What & Why
Measure % missing and check if missingness depends on another variable (MAR) → potential bias & instability.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Total rows and % missing in `region`, `plan_tier`, `age_band` from `users`.
2) `% plan_tier missing by region` ordered descending. Add comments on MAR.


In [62]:
# Total rows and % missing in country, subscription_plan, age from users
%%bigquery --project {PROJECT_ID}
WITH base AS (
  SELECT COUNT(*) n,
         COUNTIF(country IS NULL) miss_country,
         COUNTIF(subscription_plan IS NULL) miss_plan,
         COUNTIF(age IS NULL) miss_age
  FROM `netflix.users`
)
SELECT n,
       ROUND(100*miss_country/n,2) AS pct_missing_country,
       ROUND(100*miss_plan/n,2)   AS pct_missing_subscription_plan,
       ROUND(100*miss_age/n,2)    AS pct_missing_age
FROM base;

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,n,pct_missing_country,pct_missing_subscription_plan,pct_missing_age
0,216300,0.0,0.0,11.93


In [63]:
# % subscription_plan missing by country ordered descending
%%bigquery --project {PROJECT_ID}
SELECT country,
       COUNT(*) AS n,
       ROUND(100*COUNTIF(subscription_plan IS NULL)/COUNT(*),2) AS pct_missing_subscription_plan
FROM `netflix.users`
GROUP BY country
ORDER BY pct_missing_subscription_plan DESC;
# This query checks if the percentage of missing 'subscription_plan' values varies across different 'country' values.
# If the missingness percentage is significantly different for different countries, it suggests that the missingness of 'subscription_plan'
# might be related to the 'country' and could be Missing at Random (MAR).

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,country,n,pct_missing_subscription_plan
0,Canada,65016,0.0
1,USA,151284,0.0


### Verification Prompt
Generate a query that prints the three missingness percentages from (1), rounded to two decimals.


In [66]:
%%bigquery --project {PROJECT_ID}
WITH Missingness AS (
  SELECT COUNT(*) n,
         COUNTIF(country IS NULL) miss_country,
         COUNTIF(subscription_plan IS NULL) miss_plan,
         COUNTIF(age IS NULL) miss_age
  FROM `netflix.users`
)
SELECT ROUND(100*miss_country/n, 2) AS pct_missing_country,
       ROUND(100*miss_plan/n, 2)   AS pct_missing_subscription_plan,
       ROUND(100*miss_age/n, 2)    AS pct_missing_age
FROM Missingness;

Query is running:   0%|          |

Downloading:   0%|          |

Unnamed: 0,pct_missing_country,pct_missing_subscription_plan,pct_missing_age
0,0.0,0.0,11.93


**Reflection:** Which columns are most missing? Hypothesize MCAR/MAR/MNAR and why.

Age is the most missed columns with 11.93% are missing. It might be MAR or MNAR. For MAR, Missingness of age might be related to other observed variables, like perhaps users in certain regions or on certain plan tiers are less likely to provide their age. For MNAR, The missingness of age could be related to age itself in a way not explained by other variables. Whether or nor the users refuse to provide age, which can't be captured by country or subscription plan.

### 5.2 Duplicates (watch_history) — What & Why
Find exact duplicate interaction records and keep **one best** per group (deterministic policy).

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Report duplicate groups on `(user_id, movie_id, event_ts, device_type)` with counts (top 20).
2) Create table `watch_history_dedup` that keeps one row per group (prefer higher `progress_ratio`, then `minutes_watched`). Add comments.


In [74]:
# Report duplicate groups on (user_id, movie_id, watch_date, device_type) with counts (top 20)
from google.cloud import bigquery
import os

print("Starting query to find duplicate groups...")

# Use the PROJECT_ID variable defined in a previous cell
client = bigquery.Client(project=PROJECT_ID)

query = f"""
SELECT user_id, movie_id, watch_date, device_type, COUNT(*) AS dup_count
FROM `{PROJECT_ID}.netflix.watch_history`
GROUP BY user_id, movie_id, watch_date, device_type
HAVING dup_count > 1
ORDER BY dup_count DESC
LIMIT 20;
"""

query_job = client.query(query)
results = query_job.to_dataframe()

print("Query finished. Displaying results.")
display(results)
print("Display command executed.")

Starting query to find duplicate groups...
Query finished. Displaying results.


Unnamed: 0,user_id,movie_id,watch_date,device_type,dup_count
0,user_00391,movie_0893,2024-08-26,Laptop,84
1,user_03310,movie_0640,2024-09-08,Smart TV,84
2,user_01182,movie_0794,2025-07-03,Desktop,63
3,user_02126,movie_0642,2025-02-09,Desktop,63
4,user_00564,movie_0234,2024-01-09,Laptop,63
5,user_06462,movie_0588,2025-02-10,Laptop,63
6,user_03043,movie_0465,2024-02-03,Laptop,63
7,user_01581,movie_0933,2024-03-30,Desktop,63
8,user_04506,movie_0244,2025-05-27,Desktop,63
9,user_09973,movie_0342,2025-03-22,Desktop,63


Display command executed.


In [76]:
# Create table watch_history_dedup that keeps one row per group
# Keeps the row with the highest progress_percentage, then highest watch_duration_minutes
from google.cloud import bigquery
import os

# Use the PROJECT_ID variable defined in a previous cell
client = bigquery.Client(project=PROJECT_ID)

query = f"""
CREATE OR REPLACE TABLE `{PROJECT_ID}.netflix.watch_history_dedup` AS
SELECT * EXCEPT(rk) FROM (
  SELECT h.*,
         ROW_NUMBER() OVER (
           PARTITION BY user_id, movie_id, watch_date, device_type
           ORDER BY progress_percentage DESC, watch_duration_minutes DESC
         ) AS rk
  FROM `{PROJECT_ID}.netflix.watch_history` h
)
WHERE rk = 1;
"""

query_job = client.query(query)
print(f"Table `{PROJECT_ID}.netflix.watch_history_dedup` created or replaced.")

Table `mgmt467-lab.netflix.watch_history_dedup` created or replaced.


### Verification Prompt
Generate a before/after count query comparing raw vs `watch_history_dedup`.


In [78]:
# Generate a before/after count query comparing raw vs watch_history_dedup
from google.cloud import bigquery
import os

# Use the PROJECT_ID variable defined in a previous cell
client = bigquery.Client(project=PROJECT_ID)

query = f"""
SELECT
  (SELECT COUNT(*) FROM `{PROJECT_ID}.netflix.watch_history`) AS raw_count,
  (SELECT COUNT(*) FROM `{PROJECT_ID}.netflix.watch_history_dedup`) AS deduped_count
"""

query_job = client.query(query)
results = query_job.to_dataframe()
display(results)

Unnamed: 0,raw_count,deduped_count
0,2205000,100000


**Reflection:** Why do duplicates arise (natural vs system-generated)? How do they corrupt labels and KPIs?

For natural duplicates, the real world data colletion processes can naturally lead to identical or similar records. For example, the user may stream the same movies multiple times, which increases the record of watch history. The system-generated duplicates occurs when there are issues in data pipelines, integrations, or data entry. Duplicates can corrupt the labels and KPIs by  having infated counts, biased analytics, corrupted labels for ML, overfitting, and incorrect aggregations. Due to all the bad causes of duplicates, it is important to identfy and handle them for building robust and trustworthy data pipelines and models.



### 5.3 Outliers (minutes_watched) — What & Why
Estimate extreme values via IQR; report % outliers; **winsorize** to P01/P99 for robustness while also **flagging** extremes.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Compute IQR bounds for `minutes_watched` on `watch_history_dedup` and report % outliers.
2) Create `watch_history_robust` with `minutes_watched_capped` capped at P01/P99; return quantile summaries before/after.


In [81]:
# Compute IQR bounds for watch_duration_minutes on watch_history_dedup and report % outliers
from google.cloud import bigquery
import os

# Use the PROJECT_ID variable defined in a previous cell
client = bigquery.Client(project=PROJECT_ID)

query = f"""
WITH dist AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(1)] AS q1,
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(3)] AS q3
  FROM `{PROJECT_ID}.netflix.watch_history_dedup`
),
bounds AS (
  SELECT q1, q3, (q3-q1) AS iqr,
         q1 - 1.5*(q3-q1) AS lo,
         q3 + 1.5*(q3-q1) AS hi
  FROM dist
)
SELECT
  COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi) AS outliers,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi)/COUNT(*),2) AS pct_outliers
FROM `{PROJECT_ID}.netflix.watch_history_dedup` h
CROSS JOIN bounds b;
"""

query_job = client.query(query)
results = query_job.to_dataframe()
display(results)

Unnamed: 0,outliers,total,pct_outliers
0,3456,100000,3.46


In [82]:
# Create table watch_history_robust with watch_duration_minutes_capped at P01/P99; return quantile summaries before/after.
from google.cloud import bigquery
import os

# Use the PROJECT_ID variable defined in a previous cell
client = bigquery.Client(project=PROJECT_ID)

# Query to create the watch_history_robust table with capped duration
create_table_query = f"""
CREATE OR REPLACE TABLE `{PROJECT_ID}.netflix.watch_history_robust` AS
WITH q AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(1)]  AS p01,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(98)] AS p99
  FROM `{PROJECT_ID}.netflix.watch_history_dedup`
)
SELECT
  h.*,
  GREATEST(q.p01, LEAST(q.p99, h.watch_duration_minutes)) AS watch_duration_minutes_capped
FROM `{PROJECT_ID}.netflix.watch_history_dedup` h, q;
"""

print(f"Creating or replacing table: {PROJECT_ID}.netflix.watch_history_robust")
create_job = client.query(create_table_query)
create_job.result() # Wait for the job to complete
print("Table created or replaced.")

# Query to get quantile summaries before and after capping
quantile_query = f"""
WITH before AS (
  SELECT 'before' AS which, APPROX_QUANTILES(watch_duration_minutes, 5) AS q
  FROM `{PROJECT_ID}.netflix.watch_history_dedup`
),
after AS (
  SELECT 'after' AS which, APPROX_QUANTILES(watch_duration_minutes_capped, 5) AS q
  FROM `{PROJECT_ID}.netflix.watch_history_robust`
)
SELECT * FROM before
UNION ALL
SELECT * FROM after;
"""

print("\nFetching quantile summaries before and after capping...")
quantile_job = client.query(quantile_query)
quantile_results = quantile_job.to_dataframe()

print("Quantile Summaries:")
display(quantile_results)

Creating or replacing table: mgmt467-lab.netflix.watch_history_robust
Table created or replaced.

Fetching quantile summaries before and after capping...
Quantile Summaries:


Unnamed: 0,which,q
0,before,"[0.2, 24.8, 41.7, 61.4, 91.7, 799.3]"
1,after,"[4.4, 24.6, 41.5, 61.5, 92.0, 203.6]"


### Verification Prompt
Generate a query that shows min/median/max before vs after capping.


In [83]:
# Generate a query that shows min/median/max before vs after capping.
from google.cloud import bigquery
import os

# Use the PROJECT_ID variable defined in a previous cell
client = bigquery.Client(project=PROJECT_ID)

query = f"""
WITH before AS (
  SELECT
    'before' AS which,
    MIN(watch_duration_minutes) AS min_duration,
    APPROX_QUANTILES(watch_duration_minutes, 2)[OFFSET(1)] AS median_duration,
    MAX(watch_duration_minutes) AS max_duration
  FROM `{PROJECT_ID}.netflix.watch_history_dedup`
),
after AS (
  SELECT
    'after' AS which,
    MIN(watch_duration_minutes_capped) AS min_duration,
    APPROX_QUANTILES(watch_duration_minutes_capped, 2)[OFFSET(1)] AS median_duration,
    MAX(watch_duration_minutes_capped) AS max_duration
  FROM `{PROJECT_ID}.netflix.watch_history_robust`
)
SELECT * FROM before
UNION ALL
SELECT * FROM after;
"""

query_job = client.query(query)
results = query_job.to_dataframe()
display(results)

Unnamed: 0,which,min_duration,median_duration,max_duration
0,after,4.4,51.4,203.6
1,before,0.2,51.2,799.3


**Reflection:** When might capping be harmful? Name a model type less sensitive to outliers and why.

It can be harmful when the "outliers" are actually genuine, important data points that carry significant information. It may cause loss of information, distorted relationships, and have misleading insights for the dataset. Tree-based models can be less sensitive to outliers since it makes decisions based on splitting data at specific thresholds of feature values. The exact extreme value doesn't drastically change where the split occurs or how the data is partitioned beyond the threshold.

### 5.4 Business anomaly flags — What & Why
Human-readable flags help both product decisioning and ML features (e.g., binge behavior).

### Build Prompt
Generate **three BigQuery SQL cells** (adjust if columns differ):
1) In `watch_history_robust`, compute and summarize `flag_binge` for sessions > 8 hours.
2) In `users`, compute and summarize `flag_age_extreme` if age can be parsed from `age_band` (<10 or >100).
3) In `movies`, compute and summarize `flag_duration_anomaly` where `duration_min` < 15 or > 480 (if exists).
Each cell should output count and percentage and include 1–2 comments.


In [84]:
# In watch_history_robust, compute and summarize flag_binge for sessions > 8 hours.
from google.cloud import bigquery
import os

# Use the PROJECT_ID variable defined in a previous cell
client = bigquery.Client(project=PROJECT_ID)

query = f"""
SELECT
  COUNTIF(watch_duration_minutes_capped > 8*60) AS sessions_over_8h,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(watch_duration_minutes_capped > 8*60)/COUNT(*),2) AS pct
FROM `{PROJECT_ID}.netflix.watch_history_robust`;
"""

query_job = client.query(query)
results = query_job.to_dataframe()
display(results)

Unnamed: 0,sessions_over_8h,total,pct
0,0,100000,0.0


In [87]:
# In users, compute and summarize flag_age_extreme for ages <10 or >100 (using the age column).
from google.cloud import bigquery
import os

# Use the PROJECT_ID variable defined in a previous cell
client = bigquery.Client(project=PROJECT_ID)

query = f"""
SELECT
  COUNTIF(age IS NOT NULL AND (age < 10 OR age > 100)) AS extreme_age_rows,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(age IS NOT NULL AND (age < 10 OR age > 100))/COUNT(*),2) AS pct
FROM `{PROJECT_ID}.netflix.users`;
"""
# Check for non-NULL age values before applying the extreme age condition.

query_job = client.query(query)
results = query_job.to_dataframe()
display(results)

Unnamed: 0,extreme_age_rows,total,pct
0,3759,216300,1.74


In [90]:
# In movies, compute and summarize flag_duration_anomaly where duration_minutes < 15 or > 480.
from google.cloud import bigquery
import os

# Use the PROJECT_ID variable defined in a previous cell
client = bigquery.Client(project=PROJECT_ID)

query = f"""
SELECT
  COUNTIF(duration_minutes IS NOT NULL AND duration_minutes < 15) AS titles_under_15m,
  COUNTIF(duration_minutes IS NOT NULL AND duration_minutes > 480) AS titles_over_480m,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(duration_minutes IS NOT NULL AND (duration_minutes < 15 OR duration_minutes > 480))/COUNT(*),2) AS pct_duration_anomaly
FROM `{PROJECT_ID}.netflix.movies`;
"""
# Check for non-NULL duration_minutes values before applying the anomaly conditions.

query_job = client.query(query)
results = query_job.to_dataframe()
display(results)

Unnamed: 0,titles_under_15m,titles_over_480m,total,pct_duration_anomaly
0,252,231,21840,2.21


In [None]:
# # EXAMPLE (from LLM) — flag_binge (commented)
# # SELECT
# #   COUNTIF(minutes_watched > 8*60) AS sessions_over_8h,
# #   COUNT(*) AS total,
# #   ROUND(100*COUNTIF(minutes_watched > 8*60)/COUNT(*),2) AS pct
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.watch_history_robust`;

In [88]:
# # EXAMPLE (from LLM) — flag_age_extreme (commented) - Updated for numerical age column
# # SELECT
# #   COUNTIF(age IS NOT NULL AND (age < 10 OR age > 100)) AS extreme_age_rows,
# #   COUNT(*) AS total,
# #   ROUND(100*COUNTIF(age IS NOT NULL AND (age < 10 OR age > 100))/COUNT(*),2) AS pct
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.users`;

In [91]:
# # EXAMPLE (from LLM) — flag_duration_anomaly (commented) - Updated for duration_minutes
# # SELECT
# #   COUNTIF(duration_minutes IS NOT NULL AND duration_minutes < 15) AS titles_under_15m,
# #   COUNTIF(duration_minutes IS NOT NULL AND duration_minutes > 8*60) AS titles_over_8h,
# #   COUNT(*) AS total
# # FROM `${GOOGLE_CLOUD_PROJECT}.netflix.movies`;

### Verification Prompt
Generate a single compact summary query that returns two columns per flag: `flag_name, pct_of_rows`.


In [92]:
# Generate a single compact summary query that returns two columns per flag: flag_name, pct_of_rows.
from google.cloud import bigquery
import os

# Use the PROJECT_ID variable defined in a previous cell
client = bigquery.Client(project=PROJECT_ID)

query = f"""
SELECT 'flag_binge' AS flag_name,
       ROUND(100*COUNTIF(watch_duration_minutes_capped > 8*60)/COUNT(*),2) AS pct_of_rows
FROM `{PROJECT_ID}.netflix.watch_history_robust`
UNION ALL
SELECT 'flag_age_extreme' AS flag_name,
       ROUND(100*COUNTIF(age IS NOT NULL AND (age < 10 OR age > 100))/COUNT(*),2) AS pct_of_rows
FROM `{PROJECT_ID}.netflix.users`
UNION ALL
SELECT 'flag_duration_anomaly' AS flag_name,
       ROUND(100*COUNTIF(duration_minutes IS NOT NULL AND (duration_minutes < 15 OR duration_minutes > 480))/COUNT(*),2) AS pct_of_rows
FROM `{PROJECT_ID}.netflix.movies`;
"""

query_job = client.query(query)
results = query_job.to_dataframe()
display(results)

Unnamed: 0,flag_name,pct_of_rows
0,flag_binge,0.0
1,flag_age_extreme,1.74
2,flag_duration_anomaly,2.21


**Reflection:** Which anomaly flag is most common? Which would you keep as a feature and why?

Flag duration is the most common flag among the 3 flags, but I think keeping all 3 features is important. In a real world case, the 3 features are important to keep to identify higly engaged users and have content to keep their engagement. Age extreme keep up with users that are too young or old for the services and can be added to the demographice features. Lastly, the duration see if the content is too long or short for the streaming platform.

## 6) Save & submit — What & Why
Reproducibility: save artifacts and document decisions so others can rerun and audit.

### Build Prompt
Generate a checklist (Markdown) students can paste at the end:
- Save this notebook to the team Drive.
- Export a `.sql` file with your DQ queries and save to repo.
- Push notebook + SQL to the **team GitHub** with a descriptive commit.
- Add a README with your `PROJECT_ID`, `REGION`, bucket, dataset, and today’s row counts.


## Grading rubric (quick)
- Profiling completeness (30)  
- Cleaning policy correctness & reproducibility (40)  
- Reflection/insight (20)  
- Hygiene (naming, verification, idempotence) (10)


In [95]:
# Step 2 & 3: Combine SQL Queries and Write to .sql file

# --- Instructions: ---
# 1. Manually copy the SQL queries you want to export from the notebook cells.
#    Make sure to get the full SQL string for each query.
# 2. Paste each SQL query string into the list below, enclosed in triple quotes ('''SQL_STRING''').
#    Add a comment before each string to indicate which query it is (e.g., # Missingness 1).
# 3. Execute this cell. It will combine the queries and write them to the specified file.
# ---------------------

sql_queries_to_export = [
    # Example: Paste your SQL queries here
    # # Missingness 1 (Total rows and % missing)
    '''
%%bigquery --project {PROJECT_ID}
WITH base AS (
  SELECT COUNT(*) n,
         COUNTIF(country IS NULL) miss_country,
         COUNTIF(subscription_plan IS NULL) miss_plan,
         COUNTIF(age IS NULL) miss_age
  FROM `netflix.users`
)
SELECT n,
       ROUND(100*miss_country/n,2) AS pct_missing_country,
       ROUND(100*miss_plan/n,2)   AS pct_missing_subscription_plan,
       ROUND(100*miss_age/n,2)    AS pct_missing_age
FROM base;
    ''',

    # # Duplicates 1 (Report duplicate groups)
    '''
%%bigquery --project {PROJECT_ID}
SELECT country,
       COUNT(*) AS n,
       ROUND(100*COUNTIF(subscription_plan IS NULL)/COUNT(*),2) AS pct_missing_subscription_plan
FROM `netflix.users`
GROUP BY country
ORDER BY pct_missing_subscription_plan DESC;
    ''',
'''
%%bigquery --project {PROJECT_ID}
WITH Missingness AS (
  SELECT COUNT(*) n,
         COUNTIF(country IS NULL) miss_country,
         COUNTIF(subscription_plan IS NULL) miss_plan,
         COUNTIF(age IS NULL) miss_age
  FROM `netflix.users`
)
SELECT ROUND(100*miss_country/n, 2) AS pct_missing_country,
       ROUND(100*miss_plan/n, 2)   AS pct_missing_subscription_plan,
       ROUND(100*miss_age/n, 2)    AS pct_missing_age
FROM Missingness;
''',

"""
SELECT user_id, movie_id, watch_date, device_type, COUNT(*) AS dup_count
FROM `{PROJECT_ID}.netflix.watch_history`
GROUP BY user_id, movie_id, watch_date, device_type
HAVING dup_count > 1
ORDER BY dup_count DESC
LIMIT 20;
""",

"""
CREATE OR REPLACE TABLE `{PROJECT_ID}.netflix.watch_history_dedup` AS
SELECT * EXCEPT(rk) FROM (
  SELECT h.*,
         ROW_NUMBER() OVER (
           PARTITION BY user_id, movie_id, watch_date, device_type
           ORDER BY progress_percentage DESC, watch_duration_minutes DESC
         ) AS rk
  FROM `{PROJECT_ID}.netflix.watch_history` h
)
WHERE rk = 1;
""",

"""
SELECT
  (SELECT COUNT(*) FROM `{PROJECT_ID}.netflix.watch_history`) AS raw_count,
  (SELECT COUNT(*) FROM `{PROJECT_ID}.netflix.watch_history_dedup`) AS deduped_count
""",

"""
WITH dist AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(1)] AS q1,
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(3)] AS q3
  FROM `{PROJECT_ID}.netflix.watch_history_dedup`
),
bounds AS (
  SELECT q1, q3, (q3-q1) AS iqr,
         q1 - 1.5*(q3-q1) AS lo,
         q3 + 1.5*(q3-q1) AS hi
  FROM dist
)
SELECT
  COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi) AS outliers,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi)/COUNT(*),2) AS pct_outliers
FROM `{PROJECT_ID}.netflix.watch_history_dedup` h
CROSS JOIN bounds b;
""",

"""
CREATE OR REPLACE TABLE `{PROJECT_ID}.netflix.watch_history_robust` AS
WITH q AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(1)]  AS p01,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(98)] AS p99
  FROM `{PROJECT_ID}.netflix.watch_history_dedup`
)
SELECT
  h.*,
  GREATEST(q.p01, LEAST(q.p99, h.watch_duration_minutes)) AS watch_duration_minutes_capped
FROM `{PROJECT_ID}.netflix.watch_history_dedup` h, q;
""",

"""
WITH before AS (
  SELECT 'before' AS which, APPROX_QUANTILES(watch_duration_minutes, 5) AS q
  FROM `{PROJECT_ID}.netflix.watch_history_dedup`
),
after AS (
  SELECT 'after' AS which, APPROX_QUANTILES(watch_duration_minutes_capped, 5) AS q
  FROM `{PROJECT_ID}.netflix.watch_history_robust`
)
SELECT * FROM before
UNION ALL
SELECT * FROM after;
""",

"""
WITH before AS (
  SELECT
    'before' AS which,
    MIN(watch_duration_minutes) AS min_duration,
    APPROX_QUANTILES(watch_duration_minutes, 2)[OFFSET(1)] AS median_duration,
    MAX(watch_duration_minutes) AS max_duration
  FROM `{PROJECT_ID}.netflix.watch_history_dedup`
),
after AS (
  SELECT
    'after' AS which,
    MIN(watch_duration_minutes_capped) AS min_duration,
    APPROX_QUANTILES(watch_duration_minutes_capped, 2)[OFFSET(1)] AS median_duration,
    MAX(watch_duration_minutes_capped) AS max_duration
  FROM `{PROJECT_ID}.netflix.watch_history_robust`
)
SELECT * FROM before
UNION ALL
SELECT * FROM after;
""",

"""
WITH dist AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(1)] AS q1,
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(3)] AS q3
  FROM `{PROJECT_ID}.netflix.watch_history_dedup`
),
bounds AS (
  SELECT q1, q3, (q3-q1) AS iqr,
         q1 - 1.5*(q3-q1) AS lo,
         q3 + 1.5*(q3-q1) AS hi
  FROM dist
)
SELECT
  COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi) AS outliers,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi)/COUNT(*),2) AS pct_outliers
FROM `{PROJECT_ID}.netflix.watch_history_dedup` h
CROSS JOIN bounds b;
""",

"""
CREATE OR REPLACE TABLE `{PROJECT_ID}.netflix.watch_history_robust` AS
WITH q AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(1)]  AS p01,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(98)] AS p99
  FROM `{PROJECT_ID}.netflix.watch_history_dedup`
)
SELECT
  h.*,
  GREATEST(q.p01, LEAST(q.p99, h.watch_duration_minutes)) AS watch_duration_minutes_capped
FROM `{PROJECT_ID}.netflix.watch_history_dedup` h, q;
""",

"""
WITH before AS (
  SELECT 'before' AS which, APPROX_QUANTILES(watch_duration_minutes, 5) AS q
  FROM `{PROJECT_ID}.netflix.watch_history_dedup`
),
after AS (
  SELECT 'after' AS which, APPROX_QUANTILES(watch_duration_minutes_capped, 5) AS q
  FROM `{PROJECT_ID}.netflix.watch_history_robust`
)
SELECT * FROM before
UNION ALL
SELECT * FROM after;
""",
"""
WITH before AS (
  SELECT
    'before' AS which,
    MIN(watch_duration_minutes) AS min_duration,
    APPROX_QUANTILES(watch_duration_minutes, 2)[OFFSET(1)] AS median_duration,
    MAX(watch_duration_minutes) AS max_duration
  FROM `{PROJECT_ID}.netflix.watch_history_dedup`
),
after AS (
  SELECT
    'after' AS which,
    MIN(watch_duration_minutes_capped) AS min_duration,
    APPROX_QUANTILES(watch_duration_minutes_capped, 2)[OFFSET(1)] AS median_duration,
    MAX(watch_duration_minutes_capped) AS max_duration
  FROM `{PROJECT_ID}.netflix.watch_history_robust`
)
SELECT * FROM before
UNION ALL
SELECT * FROM after;
"""
,
]

# Combine the SQL queries into a single string
combined_sql_content = "-- Data Quality Queries Exported from Colab Notebook\n\n"

for i, query_block in enumerate(sql_queries_to_export):
    # Add a comment for the query block
    # This assumes the first line of the block is a comment
    lines = query_block.strip().split('\n')
    comment_line = lines[0] if lines[0].strip().startswith('#') else f"-- Query {i+1}"
    combined_sql_content += comment_line + "\n"
    combined_sql_content += "".join(lines[1:]).strip() + "\n\n" # Add the rest of the query

# Define the output file path
output_sql_file = "/content/data_quality_queries.sql"

# Write the combined SQL content to the file
with open(output_sql_file, "w") as f:
    f.write(combined_sql_content)

print(f"Successfully wrote SQL queries to {output_sql_file}")
print("You can now download this file from the Colab Files explorer (folder icon on the left).")

Successfully wrote SQL queries to /content/data_quality_queries.sql
You can now download this file from the Colab Files explorer (folder icon on the left).
