# ETL Pipeline - Freelance Job Earnings Data Warehouse

This notebook extracts data from raw sources, transforms it into a star schema, and loads it into dimension and fact tables.

## Output Tables:
- **fact_job_earnings.csv** - Fact table with job earnings metrics
- **dim_worker.csv** - Worker dimension
- **dim_platform.csv** - Platform dimension
- **dim_region.csv** - Region dimension
- **dim_project.csv** - Project dimension
- **dim_date.csv** - Date dimension

In [None]:
import pandas as pd
import json
import os
from pathlib import Path

## 1. Configuration

In [None]:
# Define paths
BASE_DIR = Path.cwd().parent
RAW_DATA_DIR = BASE_DIR / 'data_raw'
CLEANED_DATA_DIR = BASE_DIR / 'data_cleaned'

# Create output directory if it doesn't exist
CLEANED_DATA_DIR.mkdir(parents=True, exist_ok=True)

print(f"Raw data directory: {RAW_DATA_DIR}")
print(f"Cleaned data directory: {CLEANED_DATA_DIR}")

---
## 2. EXTRACT - Load Raw Data

In [None]:
# Extract jobs transactions (main facts data)
jobs_df = pd.read_csv(RAW_DATA_DIR / 'jobs_transactions.csv')
print(f"Jobs transactions: {len(jobs_df)} records")
jobs_df.head()

In [None]:
# Extract workers dimension (JSON)
with open(RAW_DATA_DIR / 'workers.json', 'r') as f:
    workers_raw = json.load(f)
workers_df = pd.DataFrame(workers_raw)
print(f"Workers: {len(workers_df)} records")
workers_df.head()

In [None]:
# Extract platforms dimension (CSV)
platforms_df = pd.read_csv(RAW_DATA_DIR / 'platforms.csv')
print(f"Platforms: {len(platforms_df)} records")
platforms_df

In [None]:
# Extract regions dimension (JSON)
with open(RAW_DATA_DIR / 'regions.json', 'r') as f:
    regions_raw = json.load(f)
regions_df = pd.DataFrame(regions_raw)
print(f"Regions: {len(regions_df)} records")
regions_df

In [None]:
# Extract projects dimension (CSV)
projects_df = pd.read_csv(RAW_DATA_DIR / 'projects.csv')
print(f"Projects: {len(projects_df)} records")
projects_df.head(10)

In [None]:
# Extract date dimension (CSV)
dim_date_df = pd.read_csv(RAW_DATA_DIR / 'dim_date.csv')
print(f"Dates: {len(dim_date_df)} records")
dim_date_df.head()

---
## 3. TRANSFORM - Create Dimension Tables

### 3.1 dim_worker

In [None]:
# Transform worker dimension
dim_worker = workers_df.copy()

# Ensure proper column order as per data dictionary
dim_worker = dim_worker[['worker_id', 'experience_level', 'primary_skill']]

# Convert worker_id to string (as per data dictionary)
dim_worker['worker_id'] = dim_worker['worker_id'].astype(str)

print(f"dim_worker shape: {dim_worker.shape}")
dim_worker.head()

### 3.2 dim_platform

In [None]:
# Transform platform dimension
dim_platform = platforms_df.copy()

# Standardize platform names for matching with jobs data
# Create a mapping for case-insensitive matching
dim_platform['platform_name_standardized'] = dim_platform['platform_name'].str.title()

# Rename to match data dictionary
dim_platform = dim_platform.rename(columns={'payment_cycle': 'payment_cycle'})

# Ensure proper column order
dim_platform = dim_platform[['platform_id', 'platform_name', 'category', 'payment_cycle']]

print(f"dim_platform shape: {dim_platform.shape}")
dim_platform

### 3.3 dim_region

In [None]:
# Transform region dimension
dim_region = regions_df.copy()

# Ensure proper column order as per data dictionary
dim_region = dim_region[['region_id', 'region', 'cost_of_living_index']]

print(f"dim_region shape: {dim_region.shape}")
dim_region

### 3.4 dim_project

In [None]:
# Transform project dimension
dim_project = projects_df.copy()

# Ensure proper column order as per data dictionary
dim_project = dim_project[['project_id', 'project_type', 'job_category']]

print(f"dim_project shape: {dim_project.shape}")
dim_project

### 3.5 dim_date

In [None]:
# Transform date dimension
dim_date = dim_date_df.copy()

# Ensure proper column order as per data dictionary
dim_date = dim_date[['date_id', 'full_date', 'day_of_week', 'is_weekend', 'is_holiday', 
                      'month_name', 'month_number', 'quarter', 'year']]

print(f"dim_date shape: {dim_date.shape}")
dim_date.head()

---
## 4. TRANSFORM - Create Fact Table

In [None]:
# Start with jobs transactions
fact_job_earnings = jobs_df.copy()

# Convert work_date to datetime for joining
fact_job_earnings['work_date'] = pd.to_datetime(fact_job_earnings['work_date'])

print(f"Initial fact table: {len(fact_job_earnings)} records")
fact_job_earnings.head()

In [None]:
# Create date_id from work_date (YYYYMMDD format)
fact_job_earnings['date_id'] = fact_job_earnings['work_date'].dt.strftime('%Y%m%d').astype(int)

fact_job_earnings[['work_date', 'date_id']].head()

In [None]:
# Create platform lookup for getting platform_id
# Handle case differences (e.g., 'PeoplePerHour' vs 'Peopleperhour')
platform_lookup = dim_platform[['platform_id', 'platform_name']].copy()
platform_lookup['platform_name_lower'] = platform_lookup['platform_name'].str.lower()

fact_job_earnings['platform_lower'] = fact_job_earnings['platform'].str.lower()

# Merge to get platform_id
fact_job_earnings = fact_job_earnings.merge(
    platform_lookup[['platform_id', 'platform_name_lower']],
    left_on='platform_lower',
    right_on='platform_name_lower',
    how='left'
)

# Drop temporary columns
fact_job_earnings = fact_job_earnings.drop(columns=['platform_lower', 'platform_name_lower'])

print(f"Platform ID mapping check (null count): {fact_job_earnings['platform_id'].isna().sum()}")

In [None]:
# Create region lookup for getting region_id
# Handle case differences (e.g., 'UK' vs 'Uk', 'USA' vs 'Usa')
region_lookup = dim_region[['region_id', 'region']].copy()
region_lookup['region_lower'] = region_lookup['region'].str.lower()

fact_job_earnings['client_region_lower'] = fact_job_earnings['client_region'].str.lower()

# Merge to get region_id
fact_job_earnings = fact_job_earnings.merge(
    region_lookup[['region_id', 'region_lower']],
    left_on='client_region_lower',
    right_on='region_lower',
    how='left'
)

# Drop temporary columns
fact_job_earnings = fact_job_earnings.drop(columns=['client_region_lower', 'region_lower'])

print(f"Region ID mapping check (null count): {fact_job_earnings['region_id'].isna().sum()}")

In [None]:
# Create project lookup for getting project_id
# Need to match on both project_type and job_category
project_lookup = dim_project[['project_id', 'project_type', 'job_category']].copy()
project_lookup['project_type_lower'] = project_lookup['project_type'].str.lower()
project_lookup['job_category_lower'] = project_lookup['job_category'].str.lower()

fact_job_earnings['project_type_lower'] = fact_job_earnings['project_type'].str.lower()
fact_job_earnings['job_category_lower'] = fact_job_earnings['job_category'].str.lower()

# Merge to get project_id
fact_job_earnings = fact_job_earnings.merge(
    project_lookup[['project_id', 'project_type_lower', 'job_category_lower']],
    on=['project_type_lower', 'job_category_lower'],
    how='left'
)

# Drop temporary columns
fact_job_earnings = fact_job_earnings.drop(columns=['project_type_lower', 'job_category_lower'])

print(f"Project ID mapping check (null count): {fact_job_earnings['project_id'].isna().sum()}")

In [None]:
# Convert worker_id to string for consistency with dim_worker
fact_job_earnings['worker_id'] = fact_job_earnings['worker_id'].astype(str)

# Create is_gap_day flag: 1 if earnings are zero/missing or job_completed is 0, else 0
fact_job_earnings['is_gap_day'] = (
    (fact_job_earnings['earnings_usd'].isna()) | 
    (fact_job_earnings['earnings_usd'] == 0) | 
    (fact_job_earnings['job_completed'] == 0)
).astype(int)

print(f"is_gap_day distribution:\n{fact_job_earnings['is_gap_day'].value_counts()}")

In [None]:
# Select and order columns for final fact table as per data dictionary
fact_columns = [
    'job_id',           # DK - Degenerate key
    'worker_id',        # FK -> dim_worker
    'platform_id',      # FK -> dim_platform
    'region_id',        # FK -> dim_region
    'project_id',       # FK -> dim_project
    'date_id',          # FK -> dim_date
    'earnings_usd',     # Metric
    'job_completed',    # Metric
    'job_duration_days',# Metric
    'hourly_rate',      # Metric
    'job_success_rate', # Metric
    'client_rating',    # Metric
    'rehire_rate',      # Metric
    'marketing_spend',  # Metric
    'is_gap_day'        # Calculated flag
]

fact_job_earnings_final = fact_job_earnings[fact_columns].copy()

# Convert FK columns to int (handling NaN)
for col in ['platform_id', 'region_id', 'project_id']:
    fact_job_earnings_final[col] = fact_job_earnings_final[col].fillna(-1).astype(int)
    # Replace -1 back to NaN for cleaner output (optional)
    fact_job_earnings_final[col] = fact_job_earnings_final[col].replace(-1, pd.NA)

print(f"Final fact_job_earnings shape: {fact_job_earnings_final.shape}")
fact_job_earnings_final.head(10)

---
## 5. Data Quality Checks

In [None]:
# Check for any unmapped foreign keys
print("=== Data Quality Report ===")
print(f"\nFact Table Records: {len(fact_job_earnings_final)}")
print(f"\nNull/Unmapped Foreign Keys:")
print(f"  - platform_id: {fact_job_earnings_final['platform_id'].isna().sum()}")
print(f"  - region_id: {fact_job_earnings_final['region_id'].isna().sum()}")
print(f"  - project_id: {fact_job_earnings_final['project_id'].isna().sum()}")
print(f"\nDimension Tables:")
print(f"  - dim_worker: {len(dim_worker)} records")
print(f"  - dim_platform: {len(dim_platform)} records")
print(f"  - dim_region: {len(dim_region)} records")
print(f"  - dim_project: {len(dim_project)} records")
print(f"  - dim_date: {len(dim_date)} records")

In [None]:
# Verify referential integrity for worker_id
workers_in_fact = set(fact_job_earnings_final['worker_id'].unique())
workers_in_dim = set(dim_worker['worker_id'].unique())

orphan_workers = workers_in_fact - workers_in_dim
print(f"Workers in fact but not in dimension: {len(orphan_workers)}")
if orphan_workers:
    print(f"  Sample orphan worker IDs: {list(orphan_workers)[:5]}")

---
## 6. LOAD - Save to data_cleaned folder

In [None]:
# Save dimension tables
dim_worker.to_csv(CLEANED_DATA_DIR / 'dim_worker.csv', index=False)
print(f"✓ Saved dim_worker.csv ({len(dim_worker)} records)")

dim_platform.to_csv(CLEANED_DATA_DIR / 'dim_platform.csv', index=False)
print(f"✓ Saved dim_platform.csv ({len(dim_platform)} records)")

dim_region.to_csv(CLEANED_DATA_DIR / 'dim_region.csv', index=False)
print(f"✓ Saved dim_region.csv ({len(dim_region)} records)")

dim_project.to_csv(CLEANED_DATA_DIR / 'dim_project.csv', index=False)
print(f"✓ Saved dim_project.csv ({len(dim_project)} records)")

dim_date.to_csv(CLEANED_DATA_DIR / 'dim_date.csv', index=False)
print(f"✓ Saved dim_date.csv ({len(dim_date)} records)")

In [None]:
# Save fact table
fact_job_earnings_final.to_csv(CLEANED_DATA_DIR / 'fact_job_earnings.csv', index=False)
print(f"✓ Saved fact_job_earnings.csv ({len(fact_job_earnings_final)} records)")

In [None]:
# List all output files
print("\n=== ETL Pipeline Complete ===")
print(f"\nOutput files in {CLEANED_DATA_DIR}:")
for file in sorted(CLEANED_DATA_DIR.glob('*.csv')):
    size = file.stat().st_size / 1024  # KB
    print(f"  - {file.name} ({size:.1f} KB)")

---
## 7. Summary Statistics

In [None]:
# Display summary statistics for the fact table
print("=== Fact Table Summary Statistics ===")
fact_job_earnings_final[['earnings_usd', 'job_completed', 'job_duration_days', 
                          'hourly_rate', 'job_success_rate', 'client_rating', 
                          'rehire_rate', 'marketing_spend']].describe()