# MGMT 467 — Prompt-Driven Lab (with Commented Examples)
## Kaggle ➜ Google Cloud Storage ➜ BigQuery ➜ Data Quality (DQ)

**How to use this notebook**
- Each section gives you a **Build Prompt** to paste into Gemini/Vertex AI (or Gemini in Colab).
- Below each prompt, you’ll see a **commented example** of what a good LLM answer might look like.
- **Do not** just uncomment and run. Use the prompt to generate your own code, then compare to the example.
- After every step, run the **Verification Prompt**, and write the **Reflection** in Markdown.

> Goal today: Download the Netflix dataset (Kaggle) → Stage on GCS → Load into BigQuery → Run DQ profiling (missingness, duplicates, outliers, anomaly flags).


### Academic integrity & LLM usage
- Use the prompts here to generate your own code cells.
- Read concept notes and write the reflection answers in your own words.
- Keep credentials out of code. Upload `kaggle.json` when asked.


## Learning objectives
1) Explain **why** we stage data in GCS and load it to BigQuery.  
2) Build an **idempotent**, auditable pipeline.  
3) Diagnose **missingness**, **duplicates**, and **outliers** and justify cleaning choices.  
4) Connect DQ decisions to **business/ML impact**.


## 0) Environment setup — What & Why
Authenticate Colab to Google Cloud so we can use `gcloud`, GCS, and BigQuery. Set **PROJECT_ID** and **REGION** once for consistency (cost/latency).

### Build Prompt (paste to LLM)
You are my cloud TA. Generate a single **Colab code cell** that:
1) Authenticates to Google Cloud in Colab,  
2) Prompts for `PROJECT_ID` via `input()` and sets `REGION="us-central1"` (editable),  
3) Exports `GOOGLE_CLOUD_PROJECT`,  
4) Runs `gcloud config set project $GOOGLE_CLOUD_PROJECT`,  
5) Prints both values. Add 2–3 comments explaining what/why.
End with a comment: `# Done: Auth + Project/Region set`.


In [1]:
from google.colab import files
import os

# Prompt the user to upload their kaggle.json file.
# This file contains your Kaggle API credentials.
print("Upload your kaggle.json (Kaggle > Account > Create New API Token)")
uploaded = files.upload()

# Create the .kaggle directory if it doesn't exist.
# This is where Kaggle expects to find the credentials file.
os.makedirs('/root/.kaggle', exist_ok=True)

# Save the uploaded file to the correct location.
# Using the first uploaded file as we expect only one (kaggle.json).
with open('/root/.kaggle/kaggle.json', 'wb') as f:
    f.write(uploaded[list(uploaded.keys())[0]])

# Set file permissions to 0600 (owner read/write only).
# This is crucial for security to prevent other users from accessing your API key.
os.chmod('/root/.kaggle/kaggle.json', 0o600)

# Verify the Kaggle installation by printing the version.
# This confirms the CLI is installed and can access the credentials.
!kaggle --version

# Done: Kaggle setup

Upload your kaggle.json (Kaggle > Account > Create New API Token)


Saving kaggle.json to kaggle.json
Kaggle API 1.7.4.5


### Build Prompt (paste to LLM)
You are my cloud TA. Generate a single **Colab code cell** that:
1) Authenticates to Google Cloud in Colab,  
2) Prompts for `PROJECT_ID` via `input()` and sets `REGION="us-central1"` (editable),  
3) Exports `GOOGLE_CLOUD_PROJECT`,  
4) Runs `gcloud config set project $GOOGLE_CLOUD_PROJECT`,  
5) Prints both values. Add 2–3 comments explaining what/why.
End with a comment: `# Done: Auth + Project/Region set`.


In [2]:
#EXAMPLE (from LLM) — Auth + Project/Region (commented; write your own cell using the prompt)
from google.colab import auth
auth.authenticate_user()

import os
PROJECT_ID = input("Enter your GCP Project ID: ").strip()
REGION = "us-central1"  # keep consistent; change if instructed
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
os.environ["REGION"] = REGION # Ensure REGION is also exported as an environment variable
print("Project:", PROJECT_ID, "| Region:", REGION)

# Set active project for gcloud/BigQuery CLI
!gcloud config set project $GOOGLE_CLOUD_PROJECT
!gcloud config get-value project
# Done: Auth + Project/Region set

Enter your GCP Project ID: upbeat-aspect-471118-v8
Project: upbeat-aspect-471118-v8 | Region: us-central1
Updated property [core/project].
upbeat-aspect-471118-v8


### Verification Prompt
Generate a short cell that prints the active project using `gcloud config get-value project` and echoes the `REGION` you set.


In [3]:
# Verification step: Check active project and region
!gcloud config get-value project
import os
print("Project:", PROJECT_ID, "| Region:", REGION)

upbeat-aspect-471118-v8
Project: upbeat-aspect-471118-v8 | Region: us-central1


**Reflection:** Why do we set `PROJECT_ID` and `REGION` at the top? What can go wrong if we don’t?

## 1) Kaggle API — What & Why
Use Kaggle CLI for reproducible downloads. Store `kaggle.json` at `~/.kaggle/kaggle.json` with `0600` permissions to protect secrets.

### Build Prompt
Generate a **single Colab code cell** that:
- Prompts me to upload `kaggle.json`,
- Saves to `~/.kaggle/kaggle.json` with `0600` permissions,
- Prints `kaggle --version`.
Add comments about security and reproducibility.


In [6]:
# Query BigQuery's INFORMATION_SCHEMA to check the dataset region
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
    schema_name,
    location
FROM
    `{project_id}`.INFORMATION_SCHEMA.SCHEMATA
WHERE
    schema_name = 'netflix';
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

### Verification Prompt
Generate a one-liner that runs `kaggle --help | head -n 20` to show the CLI is ready.


**Reflection:** Why require strict `0600` permissions on API tokens? What risks are we avoiding?

In [7]:
# Verification step: Check active project and region
!gcloud config get-value project
import os
print("Project:", PROJECT_ID, "| Region:", REGION)


upbeat-aspect-471118-v8
Project: upbeat-aspect-471118-v8 | Region: us-central1


## 2) Download & unzip dataset — What & Why
Keep raw files under `/content/data/raw` for predictable paths and auditing.
**Dataset:** `sayeeduddin/netflix-2025user-behavior-dataset-210k-records`

### Build Prompt
Generate a **Colab code cell** that:
- Creates `/content/data/raw`,
- Downloads the dataset to `/content/data` with Kaggle CLI,
- Unzips into `/content/data/raw` (overwrite OK),
- Lists all CSVs with sizes in a neat table.
Include comments describing each step.


In [8]:
# Create the directory to store raw data
!mkdir -p /content/data/raw

# Download the dataset using Kaggle CLI to /content/data
# The -d flag specifies the dataset, and -p specifies the download path
!kaggle datasets download -d sayeeduddin/netflix-2025user-behavior-dataset-210k-records -p /content/data

# Unzip the downloaded dataset into the raw data directory
# -o flag overwrites files if they exist
!unzip -o /content/data/*.zip -d /content/data/raw

# List all CSV files in the raw data directory with their sizes in a neat table
!ls -lh /content/data/raw/*.csv

Dataset URL: https://www.kaggle.com/datasets/sayeeduddin/netflix-2025user-behavior-dataset-210k-records
License(s): CC0-1.0
Downloading netflix-2025user-behavior-dataset-210k-records.zip to /content/data
  0% 0.00/4.02M [00:00<?, ?B/s]
100% 4.02M/4.02M [00:00<00:00, 826MB/s]
Archive:  /content/data/netflix-2025user-behavior-dataset-210k-records.zip
  inflating: /content/data/raw/README.md  
  inflating: /content/data/raw/movies.csv  
  inflating: /content/data/raw/recommendation_logs.csv  
  inflating: /content/data/raw/reviews.csv  
  inflating: /content/data/raw/search_logs.csv  
  inflating: /content/data/raw/users.csv  
  inflating: /content/data/raw/watch_history.csv  
-rw-r--r-- 1 root root 114K Aug  2 19:36 /content/data/raw/movies.csv
-rw-r--r-- 1 root root 4.5M Aug  2 19:36 /content/data/raw/recommendation_logs.csv
-rw-r--r-- 1 root root 1.8M Aug  2 19:36 /content/data/raw/reviews.csv
-rw-r--r-- 1 root root 2.2M Aug  2 19:36 /content/data/raw/search_logs.csv
-rw-r--r-- 1 root 

### Verification Prompt
Generate a snippet that asserts there are exactly **six** CSV files and prints their names.


**Reflection:** Why is keeping a clean file inventory (names, sizes) useful downstream?

In [9]:

# Verification step: Assert the number of CSV files and print their names
import glob
import os

csv_files = glob.glob('/content/data/raw/*.csv')
num_csv_files = len(csv_files)

# Assert that there are exactly six CSV files
assert num_csv_files == 6, f"Expected 6 CSV files, but found {num_csv_files}"

print(f"Found {num_csv_files} CSV files:")
for csv_file in csv_files:
    print(os.path.basename(csv_file))

Found 6 CSV files:
movies.csv
watch_history.csv
search_logs.csv
users.csv
recommendation_logs.csv
reviews.csv


## 3) Create GCS bucket & upload — What & Why
Stage in GCS → consistent, versionable source for BigQuery loads. Bucket names must be **globally unique**.

### Build Prompt
Generate a **Colab code cell** that:
- Creates a unique bucket in `${REGION}` (random suffix),
- Saves name to `BUCKET_NAME` env var,
- Uploads all CSVs to `gs://$BUCKET_NAME/netflix/`,
- Prints the bucket name and explains staging benefits.


In [10]:
# Create a unique bucket in the specified region
import uuid
import os

bucket_name = f"mgmt467-netflix-{uuid.uuid4().hex[:8]}"
os.environ["BUCKET_NAME"] = bucket_name

# Ensure REGION environment variable is set before using it in gcloud command
REGION = os.environ.get('REGION')
print(f"Creating bucket: gs://{bucket_name} in region: {REGION}")
!gcloud storage buckets create gs://$BUCKET_NAME --location=$REGION

# Upload all CSVs to the bucket
print(f"\nUploading files to gs://{bucket_name}/netflix/")
!gcloud storage cp /content/data/raw/* gs://$BUCKET_NAME/netflix/

# Print the bucket name and explain staging benefits
print(f"\nSuccessfully created bucket: {bucket_name} and uploaded files.")
print("\nBenefits of staging data in GCS:")
print("- **Consistent Source:** GCS provides a stable and versionable location for your data.")
print("- **Scalability:** GCS is highly scalable, handling large datasets easily.")
print("- **Integration:** GCS integrates seamlessly with other Google Cloud services like BigQuery.")
print("- **Cost-Effective:** GCS can be a cost-effective storage solution.")

# Verify contents (optional but recommended)
print(f"\nVerifying contents of gs://{bucket_name}/netflix/:")
!gcloud storage ls gs://$BUCKET_NAME/netflix/

Creating bucket: gs://mgmt467-netflix-7adb2cc1 in region: us-central1
Creating gs://mgmt467-netflix-7adb2cc1/...

Uploading files to gs://mgmt467-netflix-7adb2cc1/netflix/
Copying file:///content/data/raw/movies.csv to gs://mgmt467-netflix-7adb2cc1/netflix/movies.csv
Copying file:///content/data/raw/README.md to gs://mgmt467-netflix-7adb2cc1/netflix/README.md
Copying file:///content/data/raw/recommendation_logs.csv to gs://mgmt467-netflix-7adb2cc1/netflix/recommendation_logs.csv
Copying file:///content/data/raw/reviews.csv to gs://mgmt467-netflix-7adb2cc1/netflix/reviews.csv
Copying file:///content/data/raw/search_logs.csv to gs://mgmt467-netflix-7adb2cc1/netflix/search_logs.csv
Copying file:///content/data/raw/users.csv to gs://mgmt467-netflix-7adb2cc1/netflix/users.csv
Copying file:///content/data/raw/watch_history.csv to gs://mgmt467-netflix-7adb2cc1/netflix/watch_history.csv

Average throughput: 21.3MiB/s

Successfully created bucket: mgmt467-netflix-7adb2cc1 and uploaded files.

B

### Verification Prompt
Generate a snippet that lists the `netflix/` prefix and shows object sizes.


**Reflection:** Name two benefits of staging in GCS vs loading directly from local Colab.

In [11]:
# Verification step: List objects in the netflix/ prefix with sizes
import os

bucket_name = os.environ["BUCKET_NAME"]
print(f"Listing contents of gs://{bucket_name}/netflix/ with sizes:")
!gcloud storage ls -l gs://$BUCKET_NAME/netflix/

Listing contents of gs://mgmt467-netflix-7adb2cc1/netflix/ with sizes:
      8002  2025-10-27T00:42:51Z  gs://mgmt467-netflix-7adb2cc1/netflix/README.md
    115942  2025-10-27T00:42:51Z  gs://mgmt467-netflix-7adb2cc1/netflix/movies.csv
   4695557  2025-10-27T00:42:52Z  gs://mgmt467-netflix-7adb2cc1/netflix/recommendation_logs.csv
   1861942  2025-10-27T00:42:52Z  gs://mgmt467-netflix-7adb2cc1/netflix/reviews.csv
   2250902  2025-10-27T00:42:52Z  gs://mgmt467-netflix-7adb2cc1/netflix/search_logs.csv
   1606820  2025-10-27T00:42:52Z  gs://mgmt467-netflix-7adb2cc1/netflix/users.csv
   9269425  2025-10-27T00:42:52Z  gs://mgmt467-netflix-7adb2cc1/netflix/watch_history.csv
TOTAL: 7 objects, 19808590 bytes (18.89MiB)


## 4) BigQuery dataset & loads — What & Why
Create dataset `netflix` and load six CSVs with **autodetect** for speed (we’ll enforce schemas later).

### Build Prompt (two cells)
**Cell A:** Create (idempotently) dataset `netflix` in US multi-region; if it exists, print a friendly message.  
**Cell B:** Load tables from `gs://$BUCKET_NAME/netflix/`:
`users, movies, watch_history, recommendation_logs, search_logs, reviews`
with `--skip_leading_rows=1 --autodetect --source_format=CSV`.
Finish with row-count queries for each table.


In [12]:
# EXAMPLE (from LLM) — BigQuery dataset (commented)
DATASET="netflix"
# Attempt to create; ignore if exists
!bq --location=us-central1 mk -d --description "MGMT467 Netflix dataset" $DATASET || echo "Dataset may already exist."

BigQuery error in mk operation: Dataset 'upbeat-aspect-471118-v8:netflix'
already exists.
Dataset may already exist.


In [None]:
import os

# Set the BUCKET_NAME environment variable to the correct bucket name
os.environ["BUCKET_NAME"] = "mgmt467-netflix-81f821b1"
print(f"BUCKET_NAME environment variable set to: {os.environ['BUCKET_NAME']}")

BUCKET_NAME environment variable set to: mgmt467-netflix-81f821b1


In [13]:
# EXAMPLE (from LLM) — Load tables (commented)
tables = {
  "users": "users.csv",
  "movies": "movies.csv",
  "watch_history": "watch_history.csv",
  "recommendation_logs": "recommendation_logs.csv",
  "search_logs": "search_logs.csv",
  "reviews": "reviews.csv",
}
import os
DATASET = "netflix" # Define DATASET variable
PROJECT_ID = os.environ['GOOGLE_CLOUD_PROJECT'] # Define PROJECT_ID variable
BUCKET_NAME = os.environ['BUCKET_NAME'] # Get bucket name from environment variable
print(f"Using bucket: {BUCKET_NAME} for loading data.") # Print bucket name being used

for tbl, fname in tables.items():
  src = f"gs://{BUCKET_NAME}/netflix/{fname}"
  table_ref = f"{PROJECT_ID}.{DATASET}.{tbl}" # Construct table reference separately
  print("Loading", tbl, "from", src)
  # Corrected bq load command syntax - use the constructed table_ref
  !bq load --source_format=CSV --skip_leading_rows=1 --autodetect {table_ref} {src}

# Row counts
print("\nChecking row counts:")
for tbl in tables.keys():
  # Corrected bq query command syntax
  !bq query --nouse_legacy_sql "SELECT '{tbl}' AS table_name, COUNT(*) AS n FROM `{PROJECT_ID}.{DATASET}.{tbl}`"

Using bucket: mgmt467-netflix-7adb2cc1 for loading data.
Loading users from gs://mgmt467-netflix-7adb2cc1/netflix/users.csv
BigQuery error in load operation: Not found: Dataset upbeat-
aspect-471118-v8:upbeat-aspect-471118-v8.netflix
Loading movies from gs://mgmt467-netflix-7adb2cc1/netflix/movies.csv
BigQuery error in load operation: Not found: Dataset upbeat-
aspect-471118-v8:upbeat-aspect-471118-v8.netflix
Loading watch_history from gs://mgmt467-netflix-7adb2cc1/netflix/watch_history.csv
BigQuery error in load operation: Not found: Dataset upbeat-
aspect-471118-v8:upbeat-aspect-471118-v8.netflix
Loading recommendation_logs from gs://mgmt467-netflix-7adb2cc1/netflix/recommendation_logs.csv
BigQuery error in load operation: Not found: Dataset upbeat-
aspect-471118-v8:upbeat-aspect-471118-v8.netflix
Loading search_logs from gs://mgmt467-netflix-7adb2cc1/netflix/search_logs.csv
BigQuery error in load operation: Not found: Dataset upbeat-
aspect-471118-v8:upbeat-aspect-471118-v8.netflix


In [None]:
# Corrected code to load tables and check row counts
tables = {
  "users": "users.csv",
  "movies": "movies.csv",
  "watch_history": "watch_history.csv",
  "recommendation_logs": "recommendation_logs.csv",
  "search_logs": "search_logs.csv",
  "reviews": "reviews.csv",
}
import os
DATASET = "netflix" # Define DATASET variable
PROJECT_ID = os.environ['GOOGLE_CLOUD_PROJECT'] # Define PROJECT_ID variable
BUCKET_NAME = os.environ['BUCKET_NAME'] # Get bucket name from environment variable
print(f"Using bucket: {BUCKET_NAME} for loading data.") # Print bucket name being used

for tbl, fname in tables.items():
  src = f"gs://{BUCKET_NAME}/netflix/{fname}"
  table_ref = f"{PROJECT_ID}.{DATASET}.{tbl}" # Construct table reference
  print("Loading", tbl, "from", src)
  # Corrected bq load command syntax - removed the unnecessary project_id repetition
  !bq load --source_format=CSV --skip_leading_rows=1 --autodetect {DATASET}.{tbl} {src}

# Row counts
print("\nChecking row counts:")
for tbl in tables.keys():
  # Corrected bq query command syntax
  !bq query --nouse_legacy_sql "SELECT '{tbl}' AS table_name, COUNT(*) AS n FROM `{PROJECT_ID}.{DATASET}.{tbl}`"

Using bucket: mgmt467-netflix-81f821b1 for loading data.
Loading users from gs://mgmt467-netflix-81f821b1/netflix/users.csv
Waiting on bqjob_r4017af867f17b332_00000199ef17e240_1 ... (2s) Current status: DONE   
Loading movies from gs://mgmt467-netflix-81f821b1/netflix/movies.csv
Waiting on bqjob_r3e36c2c40f16acef_00000199ef17fbd6_1 ... (2s) Current status: DONE   
Loading watch_history from gs://mgmt467-netflix-81f821b1/netflix/watch_history.csv
Waiting on bqjob_r3bbafe6896ff9589_00000199ef18170b_1 ... (3s) Current status: DONE   
Loading recommendation_logs from gs://mgmt467-netflix-81f821b1/netflix/recommendation_logs.csv
Waiting on bqjob_r7be3c11949482e48_00000199ef183441_1 ... (2s) Current status: DONE   
Loading search_logs from gs://mgmt467-netflix-81f821b1/netflix/search_logs.csv
Waiting on bqjob_r42d18225bde97bf7_00000199ef184d75_1 ... (2s) Current status: DONE   
Loading reviews from gs://mgmt467-netflix-81f821b1/netflix/reviews.csv
Waiting on bqjob_r4d0cf9ebea8ef1ba_00000199e

In [14]:
from google.cloud import bigquery
import os

client = bigquery.Client()

tables = {
  "users": "users.csv",
  "movies": "movies.csv",
  "watch_history": "watch_history.csv",
  "recommendation_logs": "recommendation_logs.csv",
  "search_logs": "search_logs.csv",
  "reviews": "reviews.csv",
}

# Hardcoded dataset ID
DATASET_ID = "upbeat-aspect-471118-v8.netflix"
# Hardcoded bucket name from previous successful run - REPLACE WITH YOUR ACTUAL BUCKET NAME if needed
BUCKET_NAME = "mgmt467-netflix-7adb2cc1"


for tbl, fname in tables.items():
  uri = f"gs://{BUCKET_NAME}/netflix/{fname}"
  table_id = f"{DATASET_ID}.{tbl}"

  job_config = bigquery.LoadJobConfig(
      autodetect=True,
      skip_leading_rows=1,
      source_format=bigquery.SourceFormat.CSV,
  )

  print(f"Loading {tbl} from {uri} into {table_id}")
  # Use the client library to load the data
  load_job = client.load_table_from_uri(
      uri, table_id, job_config=job_config
  )  # Make an API request.

  load_job.result()  # Waits for the job to complete.

  print(f"Load job for {tbl} completed.")

# Row counts (Verification Prompt)
print("\nVerifying row counts:")
for tbl in tables.keys():
    table_id_full = f"{DATASET_ID}.{tbl}"
    query = f"SELECT '{tbl}' AS table_name, COUNT(*) AS n FROM `{table_id_full}`"
    query_job = client.query(query)
    results = query_job.result()
    for row in results:
        print(row)

Loading users from gs://mgmt467-netflix-7adb2cc1/netflix/users.csv into upbeat-aspect-471118-v8.netflix.users
Load job for users completed.
Loading movies from gs://mgmt467-netflix-7adb2cc1/netflix/movies.csv into upbeat-aspect-471118-v8.netflix.movies
Load job for movies completed.
Loading watch_history from gs://mgmt467-netflix-7adb2cc1/netflix/watch_history.csv into upbeat-aspect-471118-v8.netflix.watch_history
Load job for watch_history completed.
Loading recommendation_logs from gs://mgmt467-netflix-7adb2cc1/netflix/recommendation_logs.csv into upbeat-aspect-471118-v8.netflix.recommendation_logs
Load job for recommendation_logs completed.
Loading search_logs from gs://mgmt467-netflix-7adb2cc1/netflix/search_logs.csv into upbeat-aspect-471118-v8.netflix.search_logs
Load job for search_logs completed.
Loading reviews from gs://mgmt467-netflix-7adb2cc1/netflix/reviews.csv into upbeat-aspect-471118-v8.netflix.reviews
Load job for reviews completed.

Verifying row counts:
Row(('users',

### Verification Prompt
Generate a single query that returns `table_name, row_count` for all six tables in `${GOOGLE_CLOUD_PROJECT}.netflix`.


In [15]:
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT 'users' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.users`
UNION ALL
SELECT 'movies' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.movies`
UNION ALL
SELECT 'watch_history' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.watch_history`
UNION ALL
SELECT 'recommendation_logs' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.recommendation_logs`
UNION ALL
SELECT 'search_logs' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.search_logs`
UNION ALL
SELECT 'reviews' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.reviews`;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row(('recommendation_logs', 156000), {'table_name': 0, 'row_count': 1})
Row(('users', 30900), {'table_name': 0, 'row_count': 1})
Row(('search_logs', 79500), {'table_name': 0, 'row_count': 1})
Row(('movies', 3120), {'table_name': 0, 'row_count': 1})
Row(('reviews', 46350), {'table_name': 0, 'row_count': 1})
Row(('watch_history', 315000), {'table_name': 0, 'row_count': 1})


**Reflection:** When is `autodetect` acceptable? When should you enforce explicit schemas and why?

## 5) Data Quality (DQ) — Concepts we care about
- **Missingness** (MCAR/MAR/MNAR). Impute vs drop. Add `is_missing_*` indicators.
- **Duplicates** (exact vs near). Double-counted engagement corrupts labels & KPIs.
- **Outliers** (IQR). Winsorize/cap vs robust models. Always **flag** and explain.
- **Reproducibility**. Prefer `CREATE OR REPLACE` and deterministic keys.


### 5.1 Missingness (users) — What & Why
Measure % missing and check if missingness depends on another variable (MAR) → potential bias & instability.

In [20]:
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
-- Verification: Print missingness percentages
WITH base AS (
  SELECT COUNT(*) n,
         COUNTIF(country IS NULL) miss_country,
         COUNTIF(subscription_plan IS NULL) miss_plan,
         COUNTIF(age IS NULL) miss_age
  FROM `{project_id}.netflix.users`
)
SELECT ROUND(100*miss_country/n,2) AS pct_missing_country,
       ROUND(100*miss_plan/n,2)   AS pct_missing_subscription_plan,
       ROUND(100*miss_age/n,2)    AS pct_missing_age
FROM base;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row((0.0, 0.0, 11.93), {'pct_missing_country': 0, 'pct_missing_subscription_plan': 1, 'pct_missing_age': 2})


### Build Prompt
Generate **two BigQuery SQL cells**:
1) Total rows and % missing in `region`, `plan_tier`, `age_band` from `users`.
2) `% plan_tier missing by region` ordered descending. Add comments on MAR.


In [18]:
# Total rows and % missing in country, subscription_plan, age from users
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  COUNT(*) AS total_rows,
  ROUND(100 * COUNTIF(country IS NULL) / COUNT(*), 2) AS pct_missing_country,
  ROUND(100 * COUNTIF(subscription_plan IS NULL) / COUNT(*), 2) AS pct_missing_subscription_plan,
  ROUND(100 * COUNTIF(age IS NULL) / COUNT(*), 2) AS pct_missing_age
FROM `{project_id}.netflix.users`;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row((30900, 0.0, 0.0, 11.93), {'total_rows': 0, 'pct_missing_country': 1, 'pct_missing_subscription_plan': 2, 'pct_missing_age': 3})


In [19]:
# % subscription_plan missing by country ordered descending (Checking for MAR - Missing At Random)
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  country,
  COUNT(*) AS n,
  ROUND(100 * COUNTIF(subscription_plan IS NULL) / COUNT(*), 2) AS pct_missing_subscription_plan
FROM `{project_id}.netflix.users`
GROUP BY country
ORDER BY pct_missing_subscription_plan DESC;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row(('USA', 21612, 0.0), {'country': 0, 'n': 1, 'pct_missing_subscription_plan': 2})
Row(('Canada', 9288, 0.0), {'country': 0, 'n': 1, 'pct_missing_subscription_plan': 2})


Reflection: Which columns are most missing? Hypothesize MCAR/MAR/MNAR and why.

Based on the results of the missingness analysis:

The age column has the highest percentage of missing values (11.93%). country and subscription_plan have no missing values.

Regarding hypotheses for the missingness:

Age: It's possible the missingness in age is MAR (Missing At Random). The reason for not providing age might be related to other demographic information or user behavior that is captured in the dataset. For example, users in a certain age range might be less likely to provide their age. It could also be MNAR (Missing Not At Random) if there's a systemic reason why certain users (e.g., very young or very old users) are less likely to have their age recorded, and this reason is not captured by other variables.
Country and Subscription Plan: Since there are no missing values for these columns, we don't need to hypothesize about MCAR/MAR/MNAR. The data for these columns appears to be complete.
We would need to perform further analysis to confirm if the missingness is indeed MAR or MNAR, for example, by examining the characteristics of users with missing age values compared to those with complete age data.



### 5.2 Duplicates (watch_history) — What & Why
Find exact duplicate interaction records and keep **one best** per group (deterministic policy).

In [21]:
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

client = bigquery.Client(project=project_id)

query = f"""
-- Report duplicate groups on (user_id, movie_id, watch_date, device_type) with counts (top 20)
SELECT user_id, movie_id, watch_date, device_type, COUNT(*) AS dup_count
FROM `{project_id}.netflix.watch_history`
GROUP BY user_id, movie_id, watch_date, device_type
HAVING dup_count > 1
ORDER BY dup_count DESC
LIMIT 20;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row(('user_00391', 'movie_0893', datetime.date(2024, 8, 26), 'Laptop', 12), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_03310', 'movie_0640', datetime.date(2024, 9, 8), 'Smart TV', 12), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_03140', 'movie_0205', datetime.date(2025, 9, 11), 'Desktop', 9), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_08157', 'movie_0729', datetime.date(2025, 10, 26), 'Laptop', 9), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_00928', 'movie_0913', datetime.date(2024, 1, 18), 'Laptop', 9), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_07529', 'movie_0686', datetime.date(2025, 7, 7), 'Laptop', 9), {'user_id': 0, 'movie_id': 1, 'watch_date': 2, 'device_type': 3, 'dup_count': 4})
Row(('user_00249', 'movie_0203', datetime.date(2024, 8

In [22]:
project_id = os.environ['GOOGLE_CLOUD_PROJECT']

client = bigquery.Client(project=project_id)

query = f"""
-- Create table watch_history_dedup keeping one row per group
CREATE OR REPLACE TABLE `{project_id}.netflix.watch_history_dedup` AS
SELECT * EXCEPT(rk) FROM (
  SELECT h.*,
         ROW_NUMBER() OVER (
           PARTITION BY user_id, movie_id, watch_date, device_type
           ORDER BY progress_percentage DESC, watch_duration_minutes DESC
         ) AS rk
  FROM `{project_id}.netflix.watch_history` h
)
WHERE rk = 1;
"""

query_job = client.query(query)
query_job.result()

print(f"Table `{project_id}.netflix.watch_history_dedup` created successfully.")

Table `upbeat-aspect-471118-v8.netflix.watch_history_dedup` created successfully.


In [23]:
project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
-- Verification: Before/after count query comparing raw vs watch_history_dedup
SELECT 'watch_history_raw' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.watch_history`
UNION ALL
SELECT 'watch_history_dedup' AS table_name, COUNT(*) AS row_count FROM `{project_id}.netflix.watch_history_dedup`;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row(('watch_history_raw', 315000), {'table_name': 0, 'row_count': 1})
Row(('watch_history_dedup', 100000), {'table_name': 0, 'row_count': 1})


**Reflection:** Why do duplicates arise (natural vs system-generated)? How do they corrupt labels and KPIs?

Duplicates can arise from various sources, both natural and system-generated.

Natural Duplicates: These occur when real-world events are recorded multiple times. For example, a user might refresh a page, leading to the same watch event being logged twice in quick succession.
System-Generated Duplicates: These are often due to errors in data pipelines, ETL processes, or application logic. Retries, misconfigurations, or bugs can cause records to be inserted or processed more than once.
Duplicates can significantly corrupt labels and KPIs:

Inflated Metrics: Simple counts (like number of views or sessions) will be artificially high, leading to an overestimation of activity.
Incorrect Aggregations: Averages, sums, and other aggregations will be skewed. For instance, average watch duration could be underestimated if partial watch events are duplicated.
Biased Model Training: If duplicate data is used to train machine learning models, the models can become biased towards the characteristics of the duplicated records. This can lead to poor performance on clean data.
Skewed Analysis: Any analysis based on the duplicated data will be inaccurate, leading to flawed business decisions.
Handling duplicates is crucial for ensuring data integrity and the reliability of downstream analysis and models.

### 5.3 Outliers (minutes_watched) — What & Why
Estimate extreme values via IQR; report % outliers; **winsorize** to P01/P99 for robustness while also **flagging** extremes.

### Build Prompt
Generate **two BigQuery SQL cells**:
1) Compute IQR bounds for `minutes_watched` on `watch_history_dedup` and report % outliers.
2) Create `watch_history_robust` with `minutes_watched_capped` capped at P01/P99; return quantile summaries before/after.

In [24]:
# Compute IQR bounds for watch_duration_minutes on watch_history_dedup and report % outliers
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
WITH dist AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(1)] AS q1,
    APPROX_QUANTILES(watch_duration_minutes, 4)[OFFSET(3)] AS q3
  FROM `{project_id}.netflix.watch_history_dedup`
),
bounds AS (
  SELECT q1, q3, (q3-q1) AS iqr,
         q1 - 1.5*(q3-q1) AS lo,
         q3 + 1.5*(q3-q1) AS hi
  FROM dist
)
SELECT
  COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi) AS outliers,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(h.watch_duration_minutes < b.lo OR h.watch_duration_minutes > b.hi)/COUNT(*),2) AS pct_outliers
FROM `{project_id}.netflix.watch_history_dedup` h
CROSS JOIN bounds b;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row((3433, 100000, 3.43), {'outliers': 0, 'total': 1, 'pct_outliers': 2})


In [25]:
# Create watch_history_robust with watch_duration_minutes_capped at P01/P99; return quantile summaries before/after.
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
CREATE OR REPLACE TABLE `{project_id}.netflix.watch_history_robust` AS
WITH q AS (
  SELECT
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(1)]  AS p01,
    APPROX_QUANTILES(watch_duration_minutes, 100)[OFFSET(98)] AS p99
  FROM `{project_id}.netflix.watch_history_dedup`
)
SELECT
  h.*,
  GREATEST(q.p01, LEAST(q.p99, h.watch_duration_minutes)) AS watch_duration_minutes_capped
FROM `{project_id}.netflix.watch_history_dedup` h, q;

-- Quantiles before vs after
WITH before AS (
  SELECT 'before' AS which, APPROX_QUANTILES(watch_duration_minutes, 5) AS q
  FROM `{project_id}.netflix.watch_history_dedup`
),
after AS (
  SELECT 'after' AS which, APPROX_QUANTILES(watch_duration_minutes_capped, 5) AS q
  FROM `{project_id}.netflix.watch_history_robust`
)
SELECT * FROM before UNION ALL SELECT * FROM after;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row(('before', [0.2, 24.9, 41.7, 61.4, 91.5, 799.3]), {'which': 0, 'q': 1})
Row(('after', [4.4, 24.6, 41.5, 61.5, 92.0, 203.6]), {'which': 0, 'q': 1})


### Verification Prompt
Generate a query that shows min/median/max before vs after capping.

In [26]:
# Verification: Show min/median/max before vs after capping
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
WITH before AS (
  SELECT
    'before' AS which,
    MIN(watch_duration_minutes) AS min_val,
    APPROX_QUANTILES(watch_duration_minutes, 2)[OFFSET(1)] AS median_val,
    MAX(watch_duration_minutes) AS max_val
  FROM `{project_id}.netflix.watch_history_dedup`
),
after AS (
  SELECT
    'after' AS which,
    MIN(watch_duration_minutes_capped) AS min_val,
    APPROX_QUANTILES(watch_duration_minutes_capped, 2)[OFFSET(1)] AS median_val,
    MAX(watch_duration_minutes_capped) AS max_val
  FROM `{project_id}.netflix.watch_history_robust`
)
SELECT * FROM before UNION ALL SELECT * FROM after;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row(('after', 4.4, 51.4, 203.6), {'which': 0, 'min_val': 1, 'median_val': 2, 'max_val': 3})
Row(('before', 0.2, 51.2, 799.3), {'which': 0, 'min_val': 1, 'median_val': 2, 'max_val': 3})


**Reflection:** When might capping be harmful? Name a model type less sensitive to outliers and why.

When considering capping outliers, it's important to think about when it might not be the best approach:

Loss of Information: If the outliers are genuine data points that hold important information (e.g., a customer with exceptionally high spending), capping them can lead to a loss of valuable insights.
Distorted Relationships: Capping can distort the relationship between variables, which might negatively impact models that rely on these relationships.
Misleading Results: If the goal is to understand the full range of the data or identify extreme cases, capping can mask these important aspects.
Model types that are generally less sensitive to outliers include tree-based models such as Decision Trees, Random Forests, and Gradient Boosting Machines (like XGBoost or LightGBM). These models partition the data based on feature values and are not influenced by the magnitude of extreme values in the same way that linear models or distance-based algorithms are. They are more focused on the rank order of data points and splits.

### 5.4 Business anomaly flags — What & Why
Human-readable flags help both product decisioning and ML features (e.g., binge behavior).

### Build Prompt
Generate **three BigQuery SQL cells** (adjust if columns differ):
1) In `watch_history_robust`, compute and summarize `flag_binge` for sessions > 8 hours.
2) In `users`, compute and summarize `flag_age_extreme` if age can be parsed from `age_band` (<10 or >100).
3) In `movies`, compute and summarize `flag_duration_anomaly` where `duration_min` < 15 or > 480 (if exists).
Each cell should output count and percentage and include 1–2 comments.

In [27]:
# In watch_history_robust, compute and summarize flag_binge for sessions > 8 hours.
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  COUNTIF(watch_duration_minutes_capped > 8*60) AS sessions_over_8h,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(watch_duration_minutes_capped > 8*60)/COUNT(*),2) AS pct
FROM `{project_id}.netflix.watch_history_robust`;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row((0, 100000, 0.0), {'sessions_over_8h': 0, 'total': 1, 'pct': 2})


In [28]:
# In users, compute and summarize flag_age_extreme if age is <10 or >100.
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  COUNTIF(age < 10 OR age > 100) AS extreme_age_rows,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(age < 10 OR age > 100)/COUNT(*),2) AS pct
FROM `{project_id}.netflix.users`;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row((537, 30900, 1.74), {'extreme_age_rows': 0, 'total': 1, 'pct': 2})


In [29]:
# In movies, compute and summarize flag_duration_anomaly where duration_minutes < 15 or > 480.
import os
from google.cloud import bigquery

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
client = bigquery.Client(project=project_id)

query = f"""
SELECT
  COUNTIF(duration_minutes < 15) AS titles_under_15m,
  COUNTIF(duration_minutes > 480) AS titles_over_8h,
  COUNT(*) AS total,
  ROUND(100*COUNTIF(duration_minutes < 15 OR duration_minutes > 480)/COUNT(*),2) AS pct_duration_anomaly
FROM `{project_id}.netflix.movies`;
"""

query_job = client.query(query)
results = query_job.result()

# Print the results
for row in results:
    print(row)

Row((36, 33, 3120, 2.21), {'titles_under_15m': 0, 'titles_over_8h': 1, 'total': 2, 'pct_duration_anomaly': 3})


In [30]:
# Verification: Single compact summary query for anomaly flags
import os
# from google.cloud import bigquery # Comment out the import

project_id = os.environ['GOOGLE_CLOUD_PROJECT']
location = 'us-central1' # Specify location here

# client = bigquery.Client(project=project_id, location=location) # Comment out client initialization

query = f"""
SELECT
  'flag_binge' AS flag_name,
  ROUND(100*COUNTIF(watch_duration_minutes > 8*60)/COUNT(*),2) AS pct_of_rows
FROM `{project_id}.netflix.watch_history`
UNION ALL
SELECT
  'flag_age_extreme' AS flag_name,
  ROUND(100*COUNTIF(age < 10 OR age > 100)/COUNT(*),2) AS pct_of_rows
FROM `{project_id}.netflix.users`
UNION ALL
SELECT
  'flag_duration_anomaly' AS flag_name,
  ROUND(100*COUNTIF(duration_minutes < 15 OR duration_minutes > 480)/COUNT(*),2) AS pct_of_rows
FROM `{project_id}.netflix.movies`;
"""

# Use bq query command-line tool
#!bq query --nouse_legacy_sql --location={location} "{query}"

# The results from bq query are printed directly to stdout, no need for fetching results with client.
query_job = client.query(query)
results = query_job.result()

# # Print the results
for row in results:
    print(row)

Row(('flag_binge', 0.64), {'flag_name': 0, 'pct_of_rows': 1})
Row(('flag_age_extreme', 1.74), {'flag_name': 0, 'pct_of_rows': 1})
Row(('flag_duration_anomaly', 2.21), {'flag_name': 0, 'pct_of_rows': 1})


**Reflection:** Which anomaly flag is most common? Which would you keep as a feature and why?

Based on the anomaly flag analysis:

The flag_duration_anomaly is the most common, with 2.21% of movies having a duration less than 15 minutes or greater than 480 minutes.

As for which flag to keep as a feature, it depends on the specific downstream task. However, flag_binge (sessions > 8 hours) could be a valuable feature for several reasons:

Predictive Power: Binge-watching behavior is often a strong indicator of user engagement and could be highly predictive for models related to user retention, churn, or content recommendations.
Business Relevance: Identifying binge sessions is directly relevant to understanding user consumption patterns and can inform content strategy and platform design.
Actionable Insight: This flag provides a clear signal about a specific type of user behavior that can be targeted for analysis or interventions.
While flag_age_extreme and flag_duration_anomaly also identify interesting patterns, flag_binge seems to capture a more direct and potentially impactful user behavior for many common Netflix-related modeling tasks.

## 6) Save & submit — What & Why
Reproducibility: save artifacts and document decisions so others can rerun and audit.

### Build Prompt
Generate a checklist (Markdown) students can paste at the end:
- Save this notebook to the team Drive.
- Export a `.sql` file with your DQ queries and save to repo.
- Push notebook + SQL to the **team GitHub** with a descriptive commit.
- Add a README with your `PROJECT_ID`, `REGION`, bucket, dataset, and today’s row counts.


## Grading rubric (quick)
- Profiling completeness (30)  
- Cleaning policy correctness & reproducibility (40)  
- Reflection/insight (20)  
- Hygiene (naming, verification, idempotence) (10)


In [36]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [42]:
import json
import re
import os
from google.colab import drive

# Mount Google Drive to access the notebook file
# drive.mount('/content/drive')

# Replace 'YOUR_NOTEBOOK_PATH_IN_DRIVE' with the actual path to your notebook file in Google Drive
# For example: '/content/drive/My Drive/Colab Notebooks/your_notebook_name.ipynb'
notebook_path = '/content/drive/MyDrive/MGMT467/Labs/Lab4_MGMT467_PromptPlusExamples_Colab_Kaggle_GCS_BQ_DQ_prof.ipynb' # <--- **MODIFY THIS LINE**

try:
    # Read the notebook as a JSON file
    with open(notebook_path, 'r') as f:
        notebook_content = json.load(f)

    sql_queries = []

    # Regular expression to find the SQL query string within the f-string
    # It looks for `query = f"""` followed by any characters (non-greedily),
    # and captures the content before the closing `"""`.
    # It also handles the case where the f-string might span multiple lines.
    sql_pattern = re.compile(r'query\s*=\s*f"""(.*?)"""', re.DOTALL)

    # Iterate through cells and extract SQL queries from code cells
    for cell in notebook_content.get('cells', []):
        if cell.get('cell_type') == 'code':
            # Join the list of strings into a single string
            source_code = "".join(cell.get('source', []))
            match = sql_pattern.search(source_code)
            if match:
                # Extract the captured group (the SQL query string)
                query = match.group(1).strip()
                sql_queries.append(query)

    # Write the extracted SQL queries to a .sql file
    output_filename = 'dq_queries.sql'
    with open(output_filename, 'w') as f:
        for i, query in enumerate(sql_queries):
            f.write(f"-- Query {i+1}\n")
            f.write(query)
            f.write("\n\n")

    print(f"SQL queries exported to {output_filename}")

except FileNotFoundError:
    print(f"Error: Notebook file not found at {notebook_path}")
except json.JSONDecodeError:
    print(f"Error: Could not decode the notebook file as JSON. Ensure it's a valid .ipynb file.")
except Exception as e:
    print(f"An error occurred: {e}")

SQL queries exported to dq_queries.sql
