# 03 - Feature Engineering

**Team3 - Singapore Jobs Analytics**

This notebook creates derived features from the cleaned dataset for use in the Streamlit dashboard.

**Features Engineered:**
- **`jobs_base`** — Normalized salary (avg of min/max), date casting, application rate
- **`jobs_enriched`** — Salary bands, experience bands, time dimensions (year/month/quarter/DOW), days active
- **`jobs_categories`** — Flattened category JSON array via cross-join expansion

**Pipeline:** `jobs_cleaned.parquet` → `jobs_base` → `jobs_enriched` → `jobs_categories`

**Tools:** DuckDB (in-memory OLAP), Pandas

---
## 1. Setup & Data Loading

In [4]:
import duckdb
import pandas as pd
import os

con = duckdb.connect(':memory:')
print('DuckDB version:', duckdb.__version__)

DuckDB version: 1.1.3


In [6]:
# Load cleaned data from 01_data_cleaning output
con.execute("""
    CREATE TABLE jobs_raw AS
    SELECT * FROM read_parquet('../data/processed/jobs_cleaned.parquet')
""")

row_count = con.execute('SELECT COUNT(*) FROM jobs_raw').fetchone()[0]
print(f'Loaded {row_count:,} rows from cleaned dataset')

# Quick schema check
schema = con.execute('DESCRIBE jobs_raw').fetchdf()
print(f'Columns: {len(schema)}')
schema

Loaded 1,042,793 rows from cleaned dataset
Columns: 22


Unnamed: 0,column_name,column_type,null,key,default,extra
0,categories,VARCHAR,YES,,,
1,employmentTypes,VARCHAR,YES,,,
2,metadata_expiryDate,DATE,YES,,,
3,metadata_isPostedOnBehalf,BOOLEAN,YES,,,
4,metadata_jobPostId,VARCHAR,YES,,,
5,metadata_newPostingDate,DATE,YES,,,
6,metadata_originalPostingDate,DATE,YES,,,
7,metadata_repostCount,BIGINT,YES,,,
8,metadata_totalNumberJobApplication,BIGINT,YES,,,
9,metadata_totalNumberOfView,BIGINT,YES,,,


---
## 2. Feature Layer 1: `jobs_base`

This view normalizes the raw data into a consistent analytical format:
- **Salary normalization:** Computes `avg_salary` as `(min + max) / 2` when both are present, falls back to `average_salary`
- **Salary range:** `salary_maximum - salary_minimum` to measure pay band width
- **Application rate:** `applications / views` as a competition metric
- **Date casting:** Ensures posting/expiry dates are proper DATE types
- **Column renaming:** Shortens verbose column names for readability

In [None]:
# Create jobs_base view
con.execute("""
CREATE OR REPLACE VIEW jobs_base AS
SELECT
  metadata_jobPostId as job_id,
  title,
  postedCompany_name as company_name,
  salary_minimum,
  salary_maximum,
  salary_type,
  CASE
    WHEN salary_minimum IS NOT NULL AND salary_maximum IS NOT NULL
    THEN (salary_minimum + salary_maximum) / 2
    ELSE average_salary
  END as avg_salary,
  salary_maximum - salary_minimum as salary_range,
  minimumYearsExperience as min_experience,
  numberOfVacancies as vacancies,
  status_jobStatus as job_status,
  TRY_CAST(metadata_originalPostingDate AS DATE) as posting_date,
  TRY_CAST(metadata_expiryDate AS DATE) as expiry_date,
  metadata_totalNumberJobApplication as applications,
  metadata_totalNumberOfView as views,
  CASE
    WHEN metadata_totalNumberOfView > 0
    THEN CAST(metadata_totalNumberJobApplication AS FLOAT) / metadata_totalNumberOfView
    ELSE NULL
  END as application_rate,
  metadata_repostCount as repost_count,
  categories
FROM jobs_raw
WHERE metadata_jobPostId IS NOT NULL
""")

base_count = con.execute('SELECT COUNT(*) FROM jobs_base').fetchone()[0]
print(f'jobs_base: {base_count:,} rows')

# Preview derived fields
con.execute("""
    SELECT job_id, title, avg_salary, salary_range, application_rate, posting_date
    FROM jobs_base LIMIT 5
""").fetchdf()

---
## 3. Feature Layer 2: `jobs_enriched`

This view adds categorical and temporal features on top of `jobs_base`:

### Salary Bands (6 tiers)
| Band | Range |
|------|-------|
| < 3K | Below $3,000 |
| 3K - 5K | $3,000 - $4,999 |
| 5K - 8K | $5,000 - $7,999 |
| 8K - 12K | $8,000 - $11,999 |
| 12K - 20K | $12,000 - $19,999 |
| 20K+ | $20,000 and above |

### Experience Bands (4 tiers)
| Band | Years |
|------|-------|
| Entry (0-2 years) | 0-2 |
| Mid (3-5 years) | 3-5 |
| Senior (6-10 years) | 6-10 |
| Executive (10+ years) | 10+ |

### Time Dimensions
- `posting_year`, `posting_month`, `posting_quarter`, `posting_day_of_week`
- `days_active`: duration from posting to expiry

In [None]:
# Create jobs_enriched view
con.execute("""
CREATE OR REPLACE VIEW jobs_enriched AS
SELECT
  *,
  CASE
    WHEN avg_salary < 3000 THEN '< 3K'
    WHEN avg_salary < 5000 THEN '3K - 5K'
    WHEN avg_salary < 8000 THEN '5K - 8K'
    WHEN avg_salary < 12000 THEN '8K - 12K'
    WHEN avg_salary < 20000 THEN '12K - 20K'
    ELSE '20K+'
  END as salary_band,
  CASE
    WHEN min_experience IS NULL OR min_experience <= 2 THEN 'Entry (0-2 years)'
    WHEN min_experience <= 5 THEN 'Mid (3-5 years)'
    WHEN min_experience <= 10 THEN 'Senior (6-10 years)'
    ELSE 'Executive (10+ years)'
  END as experience_band,
  EXTRACT(YEAR FROM posting_date) as posting_year,
  EXTRACT(MONTH FROM posting_date) as posting_month,
  EXTRACT(QUARTER FROM posting_date) as posting_quarter,
  EXTRACT(DOW FROM posting_date) as posting_day_of_week,
  expiry_date - posting_date as days_active
FROM jobs_base
WHERE avg_salary IS NOT NULL AND avg_salary > 0
""")

enriched_count = con.execute('SELECT COUNT(*) FROM jobs_enriched').fetchone()[0]
print(f'jobs_enriched: {enriched_count:,} rows')

# Preview derived features
con.execute("""
    SELECT job_id, avg_salary, salary_band, experience_band,
           posting_year, posting_quarter, days_active
    FROM jobs_enriched LIMIT 10
""").fetchdf()

In [None]:
# Verify salary band distribution
salary_bands = con.execute("""
    SELECT salary_band, COUNT(*) as count,
           ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as pct
    FROM jobs_enriched
    GROUP BY salary_band
    ORDER BY
        CASE salary_band
            WHEN '< 3K' THEN 1
            WHEN '3K - 5K' THEN 2
            WHEN '5K - 8K' THEN 3
            WHEN '8K - 12K' THEN 4
            WHEN '12K - 20K' THEN 5
            WHEN '20K+' THEN 6
        END
""").fetchdf()
print('Salary band distribution:')
salary_bands

In [None]:
# Verify experience band distribution
exp_bands = con.execute("""
    SELECT experience_band, COUNT(*) as count,
           ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as pct
    FROM jobs_enriched
    GROUP BY experience_band
    ORDER BY
        CASE experience_band
            WHEN 'Entry (0-2 years)' THEN 1
            WHEN 'Mid (3-5 years)' THEN 2
            WHEN 'Senior (6-10 years)' THEN 3
            WHEN 'Executive (10+ years)' THEN 4
        END
""").fetchdf()
print('Experience band distribution:')
exp_bands

---
## 4. Feature Layer 3: `jobs_categories`

This view flattens the JSON `categories` array (e.g., `[{"id":21,"category":"Information Technology"}]`)
into individual rows via a CROSS JOIN with an index CTE.

Each job posting can have 1-10 categories, so this view has more rows than `jobs_enriched`.
This enables category-level aggregation and filtering in the dashboard.

In [None]:
# Create jobs_categories view
con.execute("""
CREATE OR REPLACE VIEW jobs_categories AS
WITH indices AS (
  SELECT 0 as idx UNION ALL SELECT 1 UNION ALL SELECT 2 UNION ALL SELECT 3 UNION ALL SELECT 4
  UNION ALL SELECT 5 UNION ALL SELECT 6 UNION ALL SELECT 7 UNION ALL SELECT 8 UNION ALL SELECT 9
)
SELECT
  j.job_id,
  j.title,
  j.company_name,
  CAST(json_extract_string(
    json_extract(j.categories::JSON, '$[' || i.idx || ']'),
    '$.id'
  ) AS INTEGER) as category_id,
  json_extract_string(
    json_extract(j.categories::JSON, '$[' || i.idx || ']'),
    '$.category'
  ) as category_name,
  j.salary_minimum,
  j.salary_maximum,
  j.avg_salary,
  j.posting_date,
  j.job_status,
  j.min_experience,
  j.vacancies,
  j.experience_band,
  j.salary_band
FROM jobs_enriched j
CROSS JOIN indices i
WHERE j.categories IS NOT NULL
  AND j.categories != ''
  AND j.categories != '[]'
  AND i.idx < json_array_length(j.categories::JSON)
  AND json_extract(j.categories::JSON, '$[' || i.idx || ']') IS NOT NULL
""")

cat_count = con.execute('SELECT COUNT(*) FROM jobs_categories').fetchone()[0]
print(f'jobs_categories: {cat_count:,} rows (expanded from {enriched_count:,} jobs)')
print(f'Expansion ratio: {cat_count/enriched_count:.2f}x')

In [None]:
# Verify category extraction
print('Top 10 categories by job count:')
con.execute("""
    SELECT category_name, COUNT(*) as job_count,
           ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) as pct
    FROM jobs_categories
    WHERE category_name IS NOT NULL
    GROUP BY category_name
    ORDER BY job_count DESC
    LIMIT 10
""").fetchdf()

In [None]:
# Categories per job distribution
cats_per_job = con.execute("""
    SELECT num_cats, COUNT(*) as num_jobs
    FROM (
        SELECT job_id, COUNT(*) as num_cats
        FROM jobs_categories
        GROUP BY job_id
    )
    GROUP BY num_cats
    ORDER BY num_cats
""").fetchdf()
print('Categories per job:')
cats_per_job

---
## 5. Feature Summary

### All Derived Features

| Feature | Source | Description |
|---------|--------|-------------|
| `avg_salary` | `(salary_minimum + salary_maximum) / 2` | Normalized salary; falls back to `average_salary` |
| `salary_range` | `salary_maximum - salary_minimum` | Width of the pay band |
| `application_rate` | `applications / views` | Competition metric (NULL if 0 views) |
| `salary_band` | `avg_salary` thresholds | 6-tier categorical: < 3K to 20K+ |
| `experience_band` | `min_experience` thresholds | 4-tier categorical: Entry to Executive |
| `posting_year` | `posting_date` | Year extracted |
| `posting_month` | `posting_date` | Month extracted |
| `posting_quarter` | `posting_date` | Quarter extracted |
| `posting_day_of_week` | `posting_date` | Day of week (0=Sun, 6=Sat) |
| `days_active` | `expiry_date - posting_date` | Duration posting was live |
| `category_id` | JSON `categories` array | Flattened category ID |
| `category_name` | JSON `categories` array | Flattened category name |

---
## 6. Export Enriched Data

In [None]:
# Export enriched data (materialized from views)
os.makedirs('../data/processed', exist_ok=True)

# Export jobs_enriched (flat, one row per job)
con.execute("""
    COPY (SELECT * FROM jobs_enriched) TO '../data/processed/jobs_enriched.parquet' (FORMAT PARQUET)
""")
enriched_export = con.execute("SELECT COUNT(*) FROM read_parquet('../data/processed/jobs_enriched.parquet')").fetchone()[0]
print(f'Exported jobs_enriched: {enriched_export:,} rows')

# Export jobs_categories (expanded, one row per job-category pair)
con.execute("""
    COPY (SELECT * FROM jobs_categories) TO '../data/processed/jobs_categories.parquet' (FORMAT PARQUET)
""")
cats_export = con.execute("SELECT COUNT(*) FROM read_parquet('../data/processed/jobs_categories.parquet')").fetchone()[0]
print(f'Exported jobs_categories: {cats_export:,} rows')

print(f'\nAll enriched data exported to ../data/processed/')

In [None]:
# Clean up
con.close()
print('Feature engineering complete. Enriched data exported to data/processed/')