<a href="https://colab.research.google.com/github/yijiaw0725/DataScience-Portfolio-yijia/blob/main/Data%20Engineering%20ETL%20Pipeline/Construction_ETL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This is mimiced job data for practice of building  database (including Employee KPI, Work Orders Information, Housing Information ect).

In [None]:
from google.colab import drive
drive.mount('/content/drive')

#
!jupyter nbconvert --to notebook --output "/content/Construction_ETL.ipynb" "/content/drive/MyDrive/path/to/your/notebook.ipynb"


### Mimic Data

In [None]:
!pip -q install duckdb

In [None]:
import duckdb, random, math
from datetime import datetime, timedelta
import pandas as pd
import numpy as np

In [None]:
# set for reproduction
random.seed(42); np.random.seed(42)
con = duckdb.connect()

mimic data for people and work order issues.

In [None]:
# Let's generate some fake data for practice
# ===== generate empolyee KPI data =====
positions = ["Tech I","Tech II","Lead Tech","Supervisor"]
cities = ["Austin","Chicago","Dallas","Denver","Phoenix","Charlotte"]
vendor_types = ["Employee","Vendor"]

n_people = 40
people = []
for i in range(1, n_people+1):
    name = f"Worker {i:02d}"
    email = f"worker{i:02d}@example.com"
    people.append({
        "Name": name,
        "cleaned_name": name.lower(),
        "Email": email,
        "cleaned_email": email.lower(),
        "Position": random.choice(positions),
        "Vendor Type": random.choice(vendor_types),
        "City": random.choice(cities),
    })
people_df = pd.DataFrame(people)

# ===== generate WO details（around 3000）=====
n_orders = 3000
start = datetime(2025,1,1)
end   = datetime(2025,6,30)

wo_rows = []
for k in range(n_orders):
    p = people_df.sample(1).iloc[0]
    created = start + timedelta(days=int(np.random.randint(0, (end-start).days+1)),
                                hours=int(np.random.randint(0,24)))
    dur_days = max(0, int(np.random.normal(2.5, 1.8)))   # average 2.5 days
    completed = created + timedelta(days=dur_days)
    wo_type = np.random.choice(["Move In Inspection","Turn","Rehab"], p=[0.4,0.35,0.25])
    status = np.random.choice(["Open","In Progress","Completed"], p=[0.1,0.2,0.7])
    if status != "Completed":
        completed = None

    wo_rows.append({
        "Vendor Name": p["Name"],
        "Vendor Type": p["Vendor Type"],
        "WO#": 100000 + k,
        "Unit": f"Apt {np.random.randint(1,400)}",
        "Address": f"{np.random.randint(100,9999)} Main St",
        "City": p["City"],
        "State": "TX",
        "Zip": f"{np.random.randint(10000,99999)}",
        "Type": wo_type,
        "Date Completed": completed,
        "Date Created": created,
        "Description": f"{wo_type} for unit",
        "Start Date": created.date(),
        "Status": status,
        "Email": p["Email"],
        "Duration": dur_days,
        "Status Mark": status,
        "Priority Tags": np.random.choice(["Normal","High","Low"], p=[0.7,0.2,0.1]),
        "City by Name": p["City"],
        "Date Created (MY)": created.strftime("%Y-%m"),
        "Cleaned Name": p["cleaned_name"],
        "Cleaned Email": p["cleaned_email"],
    })
issues_df = pd.DataFrame(wo_rows)

# check for the size
people_df.shape, issues_df.shape

((40, 7), (3000, 22))

Although the data is self generated here, let's mimic to get to know our data first before creating database.

### Know the Data

In [11]:
# Create a function for quick viewing information
def quick_profile(df, name):
    print(f"=== {name} ===")
    print(f"Shape: {df.shape[0]} rows × {df.shape[1]} columns\n")

    print("Columns:")
    print(list(df.columns), "\n")

    print("Head (first 5 rows):")
    print(df.head(), "\n")

    print("Summary (numeric + object mix):")
    print(df.describe(include='all').T, "\n")

    print("Missing value count:")
    print(df.isnull().sum(), "\n")
    print("="*60, "\n")

# Run profiling for both dataframes
quick_profile(people_df, "People Data")

=== People Data ===
Shape: 40 rows × 7 columns

Columns:
['Name', 'cleaned_name', 'Email', 'cleaned_email', 'Position', 'Vendor Type', 'City'] 

Head (first 5 rows):
        Name cleaned_name                 Email         cleaned_email  \
0  Worker 01    worker 01  worker01@example.com  worker01@example.com   
1  Worker 02    worker 02  worker02@example.com  worker02@example.com   
2  Worker 03    worker 03  worker03@example.com  worker03@example.com   
3  Worker 04    worker 04  worker04@example.com  worker04@example.com   
4  Worker 05    worker 05  worker05@example.com  worker05@example.com   

    Position Vendor Type       City  
0     Tech I    Employee  Charlotte  
1  Lead Tech    Employee    Chicago  
2    Tech II    Employee  Charlotte  
3     Tech I      Vendor     Austin  
4     Tech I    Employee    Chicago   

Summary (numeric + object mix):
              count unique                   top freq
Name             40     40             Worker 01    1
cleaned_name     40     4

In [12]:
quick_profile(issues_df, "Issues Data")

=== Issues Data ===
Shape: 3000 rows × 22 columns

Columns:
['Vendor Name', 'Vendor Type', 'WO#', 'Unit', 'Address', 'City', 'State', 'Zip', 'Type', 'Date Completed', 'Date Created', 'Description', 'Start Date', 'Status', 'Email', 'Duration', 'Status Mark', 'Priority Tags', 'City by Name', 'Date Created (MY)', 'Cleaned Name', 'Cleaned Email'] 

Head (first 5 rows):
  Vendor Name Vendor Type     WO#     Unit       Address       City State  \
0   Worker 20      Vendor  100000   Apt 92  5586 Main St  Charlotte    TX   
1   Worker 28      Vendor  100001  Apt 139  5718 Main St     Austin    TX   
2   Worker 13      Vendor  100002  Apt 231  6992 Main St     Austin    TX   
3   Worker 21    Employee  100003  Apt 113  1736 Main St     Dallas    TX   
4   Worker 13      Vendor  100004  Apt 338  4836 Main St     Austin    TX   

     Zip                Type      Date Completed  ...  Start Date  \
0  33483  Move In Inspection                 NaT  ...  2025-03-14   
1  49504  Move In Inspection   

### Build Database

#### Bronze

Here for raw data storage.

Purpose:
*   Store raw, unmodified data exactly as received from the source.
*   Create a "single source of truth" that can be reloaded or audited later.

User/Creater Note:
*   No clean or transform in Bronze - Keep original format.
*   Load from CSV, API, external DB ect.
*   Use CREATE OR REPLACE TABLE for idempotency.
*   Save metadata if possible.

Check List:
*   Row count matches the source.
*   All expected columns present.
*   Key (IDs) and dates look valid (no corruption)
*   Encoding and data types are correct (no garbled characters)

In [None]:
# 1) Register DataFrames as temp views
## this step allow duckdb see the data
con.register("people", people_df) # register people_df as table name people
con.register("issues", issues_df)

# 2) Create bronze schema and persist raw tables
## 2.1 issues table
con.sql("CREATE SCHEMA IF NOT EXISTS bronze")
# Raw work orders
con.sql("CREATE OR REPLACE TABLE bronze.move_in_issues_raw AS SELECT * FROM issues")

## 2.2 raw bonus-master "shell" built from people list (placeholders for KPIs)
con.sql("""
CREATE OR REPLACE TABLE bronze.bonus_master_raw AS
SELECT
  City                        AS "City",
  Name                        AS "Name",
  Position                    AS "Position",
  0                             AS "Total Rehabs",
  0                             AS "Total Turns",
  0                             AS "Rehab+Turn",
  NULL::DOUBLE                  AS "Completion %",
  0                             AS "# of property with move in issues",
  0                             AS "# of valid move in issues",
  0                             AS "Late Arrival",
  0                             AS "Daily Logs",
  NULL::DOUBLE                  AS "Daily Logs %",
  0                             AS "Turns more than 3 days to scope",
  0                             AS "Rehabs more than 5 days to scope",
  0                             AS "More than 2 days to create profile",
  0                             AS "Wrong title in BT",
  0                             AS "Missing Job title in SF",
  0                             AS "More than 2 days to release scope to GC",
  0                             AS "Incorrect PO creation",
  '2025-Q1Q2'                   AS "Period"
FROM people;                 -- use the temp view name you registered
""")

Verify tables exist, row counts match expectations, sample rows look sane, and key/date health is okay.

In [34]:
# 3) Sanity Checks
## List bronze tables
print(con.sql("""
SELECT table_schema, table_name
FROM information_schema.tables
WHERE table_schema = 'bronze'
ORDER BY table_name;
""").df())

  table_schema          table_name
0       bronze    bonus_master_raw
1       bronze  move_in_issues_raw


In [35]:
## row counts for both raw tables
print("Row counts:")
print(con.sql("""
SELECT 'move_in_issues_raw' AS table_name, COUNT(*) AS rows FROM bronze.move_in_issues_raw
UNION ALL
SELECT 'bonus_master_raw' , COUNT(*) FROM bronze.bonus_master_raw
""").df(), "\n")

Row counts:
           table_name  rows
0  move_in_issues_raw  3000
1    bonus_master_raw    40 



In [None]:
## sample first few rows
print(con.sql("SELECT * FROM bronze.move_in_issues_raw LIMIT 3；").df())

In [None]:
print(con.sql("SELECT * FROM bronze.bonus_master_raw LIMIT 3；").df())

In [38]:
## key & date health check for work orders
print("WO key & date health check:")
print(con.sql("""
SELECT
  COUNT(*)                              AS rows,
  COUNT(DISTINCT "WO#")                 AS distinct_wo,
  SUM(CASE WHEN "WO#" IS NULL THEN 1 ELSE 0 END) AS wo_nulls,
  MIN("Date Created") AS min_created,
  MAX("Date Created") AS max_created,
  MIN("Date Completed") AS min_completed,
  MAX("Date Completed") AS max_completed
FROM bronze.move_in_issues_raw;
""").df())

WO key & date health check:
   rows  distinct_wo  wo_nulls         min_created         max_created  \
0  3000         3000       0.0 2025-01-01 03:00:00 2025-06-30 23:00:00   

        min_completed       max_completed  
0 2025-01-01 03:00:00 2025-07-06 23:00:00  


#### Silver

Purpose:
*   Apply cleaning and normalization so that downstream processes use consistent formats.
*   Fix naming issues, data types, nulls, and create stable IDs for entities.

User/ Creater Note:
*   Standardize keys (such as cleaned_name).
*   Convert date strings to proper DATE.
*   Normalize Categories (such as Move In and move_in all to Move In).
*   Cerate dimension tables like dim_person and fact tables with relationships.

Check List:
*   No duplicates in keys.
*   Nulls are handled according to business rules.
*   Consistent naming, types and categories.
*   Joins to dimensions work correctly.

In [44]:
!git config --global user.name "yijiaw0725"
!git config --global user.email "yijiaw0725@gmail.com"

In [45]:
!git clone https://github.com/yijiaw0725/DataScience-Portfolio-yijia.git
%cd DataScience-Portfolio-yijia

Cloning into 'DataScience-Portfolio-yijia'...
remote: Enumerating objects: 91, done.[K
remote: Counting objects: 100% (91/91), done.[K
remote: Compressing objects: 100% (80/80), done.[K
remote: Total 91 (delta 22), reused 0 (delta 0), pack-reused 0 (from 0)[K
Receiving objects: 100% (91/91), 15.22 MiB | 11.82 MiB/s, done.
Resolving deltas: 100% (22/22), done.
/content/DataScience-Portfolio-yijia


In [50]:
!pwd

/content/DataScience-Portfolio-yijia


In [51]:
!cp /content/Construction_ETL.ipynb "/content/DataScience-Portfolio-yijia/Data Engineering ETL Pipeline/"

cp: cannot stat '/content/Construction_ETL.ipynb': No such file or directory
