# Dataset Construction

This notebook documents the end-to-end process for constructing a cleaned, enriched career trajectories dataset from individuals' job and education records (provided by Lightcast), corresponding predicted race and gender attributes, and corresponding state-level GDPs and occupational wage data (sourced from the Bureau of Labor Statistics, or BLS).

The final dataset (`trajectory_df`) is optimized for downstream regression analyses and other statistical modeling tasks.

**Important Note on Reproducibility**

* The data processing logic was developed using a proprietary, licensed commercial dataset (Lightcast data). Therefore, the raw data files are NOT included in this repository and cannot be publicly shared.
* The file names for the job and education records data used in the code are placeholders. To run this notebook, users must substitute them with their own compatible raw data files.

## Overview of Pipeline

The pipeline proceeds through the following major steps:

### 0. Load and Preprocess Raw Data
- Load job (`job_df`) and education (`edu_df`) files.
- Convert date columns to `datetime`.

### 1. Job Records Filtering
- Keep jobs with valid titles, company, city, state, and country.
- Filter for valid start/end dates.
- Sort jobs chronologically within each individual.

### 2. Job Titles Filtering
- Load SOC prediction (`occ_df`) file.
- Replace SOC titles (ONET_2019)NAME) and codes (ONET_2019) with FewSOC-predicted labels in `occ_df`.

### 3. Education Records Filtering
- Retain only Bachelor’s and higher (BA, MA, PhD) degrees.
- Drop records with missing fields.
- Sort degrees chronologically per individual.

### 3b. User Intersection
- Keep only users that appear in job and education datasets.

### 4. Post-Graduation Gap Filtering
- Compute the gap between BA graduation and first job.
- Drop users exceeding the maximum allowable gap (varies by degree level).

### 5. Timeframe Filtering
- Restrict sample to users with:
  - First job year ≥ 1999
  - Last job year ≤ 2022

### 6. Construct Linear Career Trajectories
- Remove overlapping jobs.
- Sort jobs by start date (ascending) and end date (descending).
- Truncate each trajectory to the first 5 years after BA graduation.

### 7. Flatten to `trajectory_df`
Each row represents a single individual’s career trajectory.  
Includes:
- First job attributes (SOC, NAICS, company, state, start year)
- Number of job changes
- Highest education attained

### 8. Enrich with Contextual Variables
- Encode demographics and merge demographic attributes (gender, race):
  - Gender: `1 = Male`, `2 = Female`
  - Race: `1 = White`, `2 = Black`, `3 = Asian`, `4 = Hispanic`
- Estimate birth year from BA date → assign generation cohort
- Add state GDP at first job year → assign GDP decile
- Add state-occupation-year wages for the first job
- Compute job change type indicators:
  - `move_1_1`, `move_1_2`, `move_2_1`, `move_2_2`
- Add `up_move` indicator if last job wage ≥5% higher than first job wage

### 9. Final Cleaning
- Drop `ID` and detailed SOC columns no longer needed
- Drop rows with any missing values
- Log-transform first job wage:
  ```python
  trajectory_df['log_wage_x'] = np.log(trajectory_df['annual_state_wage_x'])

* Top-code `num_job_changes` at the 95th percentile to handle outliers
* Export cleaned dataset to Parquet:

  ```python
  trajectory_df.to_parquet('../data/career_trajectories.parquet', index=False)
  ```

## Final Data Table: 

| Column                                         | Description                                                               |
| ---------------------------------------------- | ------------------------------------------------------------------------- |
| `max_edu_name`                                 | Highest degree attained within the first 5-year timeframe                 |
| `onet_major_x`                                 | Major SOC code (2-digit) of first job                                     |
| `naics6_major_x`                               | 2-digit NAICS sector of first job                                         |
| `company_x`                                    | First employer company name                                               |
| `state_x`                                      | First job state                                                           |
| `job_start_year_x`                             | Year of first job start                                                   |
| `num_job_changes`                              | Number of job changes in the 5-year window (top-coded at 95th percentile) |
| `gender`                                       | Gender (1 = Male, 2 = Female)                                             |
| `race`                                         | Race (1 = White, 2 = Black, 3 = Asian, 4 = Hispanic)                      |
| `generation`                                   | Estimated generation cohort                                               |
| `state_gdp_decile_x`                           | Decile of state GDP in first job year                                     |
| `annual_state_wage_x`                          | Occupational wage for the first job (state × 6-digit occupation × year)   |
| `log_wage_x`                                   | Log-transformed first job wage                                            |
| `move_1_1`, `move_1_2`, `move_2_1`, `move_2_2` | Job change type indicators for Type-1, Type-2, Type-3, respectively       |
| `up_move`                                      | Upward mobility indicator: last job wage ≥ 5% above first job wage        |


# Loading Data Files

In [4]:
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
import re
from tabulate import tabulate
import time

# ============================
# 0. Load data
# ============================
job_filename = '../data/samples/job_1k.csv'  # Lightcast jobs data
edu_filename = '../data/samples/edu_1k.csv'  # Lightcast educations data

job_df = pd.read_csv(job_filename, encoding="utf-8")
edu_df = pd.read_csv(edu_filename, encoding="utf-8")

# ============================
# 1. Ensure date columns are datetime dtype
# ============================
def ensure_datetime(df: pd.DataFrame, cols: list[str]) -> None:
    """
    Convert specified columns to datetime if they are not already.
    This avoids redundant conversion for parquet-loaded datetime columns.
    """
    for col in cols:
        if col in df.columns and not np.issubdtype(df[col].dtype, np.datetime64):
            df[col] = pd.to_datetime(df[col], errors='coerce')

# Apply to relevant columns
ensure_datetime(job_df, ['JOB_START_DATE', 'JOB_END_DATE'])
ensure_datetime(edu_df, ['START_DATE', 'END_DATE'])

# Pre-processing Checks

In [12]:
def run_checks():
    global job_df, edu_df

    # ============================
    # Summary
    # ============================
    print("===== Dataset Summary =====")
    print(f"Job Table: {len(job_df['ID'].unique())} unique users, {len(job_df)} rows")
    print(f"Education Table: {len(edu_df['ID'].unique())} unique users, {len(edu_df)} rows")
    
    # ============================
    # Job Table Checks
    # ============================
    print("\n===== Job Table Checks =====")

    # Missing required job fields
    required_job_fields = ['TITLE_RAW', 'COMPANY_RAW', 'CITY_RAW', 'STATE_RAW', 'COUNTRY_RAW', 'JOB_START_DATE']
    missing_job_fields = job_df[required_job_fields].isna().sum().reset_index()
    missing_job_fields.columns = ['Field', 'Missing Count']
    print("Missing values in required job fields:")
    print(tabulate(missing_job_fields, headers="keys", tablefmt="github", showindex=False))

    # Chronological inversion (END < START when both present)
    invalid_chronology = job_df[
        job_df['JOB_START_DATE'].notna() &
        job_df['JOB_END_DATE'].notna() &
        (job_df['JOB_END_DATE'] < job_df['JOB_START_DATE'])
    ]
    print("Jobs with JOB_END_DATE before JOB_START_DATE:", len(invalid_chronology))
    
    # Missing date anomalies
    missing_start_but_end = job_df[job_df['JOB_START_DATE'].isna() & job_df['JOB_END_DATE'].notna()]
    missing_end_but_past = job_df[~job_df['IS_CURRENT'] & job_df['JOB_END_DATE'].isna()]
    print("Jobs with missing start but non-missing end:", len(missing_start_but_end))
    print("Past jobs missing end date:", len(missing_end_but_past))

    # Jobs outside 1999–2022 timeframe
    jobs_out_of_timeframe = job_df[
        job_df['JOB_START_DATE'].dt.year.lt(1999) |
        job_df['JOB_START_DATE'].dt.year.gt(2022)
    ]
    print("Jobs outside 1999–2022 timeframe:", len(jobs_out_of_timeframe))

    # Cross-table ID consistency
    ids_job = set(job_df['ID'])
    ids_edu = set(edu_df['ID'])
    ids_missing = ids_job - ids_edu
    print("IDs in job_df missing from edu_df", len(ids_missing))

    # ============================
    # Education Table Checks
    # ============================
    print("\n===== Education Table Checks =====")

    # Missing required education fields
    required_edu_fields = ['EDULEVEL_NAME', 'SCHOOL_RAW', 'START_DATE', 'END_DATE']
    missing_edu_fields = edu_df[required_edu_fields].isna().sum().reset_index()
    missing_edu_fields.columns = ['Field', 'Missing Count']
    print("Missing values in required education fields:")
    print(tabulate(missing_edu_fields, headers="keys", tablefmt="github", showindex=False))

    # Degrees filtered (BA+)
    valid_degrees = ["Bachelor's Degree", "Master's Degree", "Doctorate"]
    invalid_degrees = edu_df[~edu_df['EDULEVEL_NAME'].isin(valid_degrees)]
    print("Education records with degrees below BA or invalid:", len(invalid_degrees))
    
    # Graduation date < first job start date
    first_job = job_df.groupby('ID')['JOB_START_DATE'].min().rename('FIRST_JOB_DATE')
    ba_grad = edu_df[edu_df['EDULEVEL_NAME'] == "Bachelor's Degree"] \
                .groupby('ID')['END_DATE'].min().rename('BA_GRAD_DATE')
    gap_check = pd.concat([first_job, ba_grad], axis=1).dropna()
    violating_gap = gap_check[gap_check['FIRST_JOB_DATE'] < gap_check['BA_GRAD_DATE']]
    print("Profiles with first job before BA graduation:", len(violating_gap))
    
    # Post-grad gap thresholds
    degree_order = {"Bachelor's Degree": 1, "Master's Degree": 2, "Doctorate": 3}
    highest_degree = edu_df.groupby('ID')['EDULEVEL_NAME'].agg(
        lambda x: max(x, key=lambda y: degree_order.get(y, 0))
    )
    gap_check['HIGHEST_DEGREE'] = highest_degree
    gap_check['POST_GRAD_GAP_YEARS'] = (
        gap_check['FIRST_JOB_DATE'] - gap_check['BA_GRAD_DATE']
    ).dt.days / 365.25

    gap_thresholds = {"Bachelor's Degree": 3.75, "Master's Degree": 5.59, "Doctorate": 8.25}
    violating_threshold = gap_check[
        gap_check['POST_GRAD_GAP_YEARS'] >
        gap_check['HIGHEST_DEGREE'].map(gap_thresholds)
    ]
    print("Profiles exceeding post-graduation gap thresholds:", len(violating_threshold))

run_checks()

===== Dataset Summary =====
Job Table: 415 unique users, 1000 rows
Education Table: 565 unique users, 1000 rows

===== Job Table Checks =====
Missing values in required job fields:
| Field          |   Missing Count |
|----------------|-----------------|
| TITLE_RAW      |              16 |
| COMPANY_RAW    |              69 |
| CITY_RAW       |             372 |
| STATE_RAW      |             339 |
| COUNTRY_RAW    |             281 |
| JOB_START_DATE |             302 |
Jobs with JOB_END_DATE before JOB_START_DATE: 0
Jobs with missing start but non-missing end: 3
Past jobs missing end date: 93
Jobs outside 1999–2022 timeframe: 59
IDs in job_df missing from edu_df 262

===== Education Table Checks =====
Missing values in required education fields:
| Field         |   Missing Count |
|---------------|-----------------|
| EDULEVEL_NAME |             405 |
| SCHOOL_RAW    |              45 |
| START_DATE    |             259 |
| END_DATE      |             216 |
Education records with de

# Steps 1 - 3

In [13]:
# ============================
# 1. Job Records Filtering
# ============================

# Filter required fields in one go, then copy
job_df = job_df[
    (~job_df['TITLE_RAW'].isna()) &
    (~job_df['COMPANY_RAW'].isna()) &
    (~job_df['CITY_RAW'].isna()) &
    (~job_df['STATE_RAW'].isna()) &
    (~job_df['COUNTRY_RAW'].isna()) &
    (
        # Non-current jobs must have start & end
        ((~job_df['IS_CURRENT']) & (~job_df['JOB_START_DATE'].isna()) & (~job_df['JOB_END_DATE'].isna())) |
        # Current jobs must have start date
        ((job_df['IS_CURRENT']) & (~job_df['JOB_START_DATE'].isna()))
    )
].copy()  # single strategic copy

# Exclude records where end date is before start date
mask_valid_dates = (job_df['IS_CURRENT']) | (job_df['JOB_END_DATE'] >= job_df['JOB_START_DATE'])
job_df = job_df[mask_valid_dates]

# Sort jobs by individual and chronological order
job_df.sort_values(['ID', 'JOB_START_DATE', 'JOB_END_DATE'], ascending=[True, True, True], inplace=True)

# ============================
# 2. Job Titles Filtering
# ============================
# Replace SOC titles (ONET_2019)NAME) and codes (ONET_2019) with FewSOC-predicted labels in `occ_df`. 
# Use "job titlle, company name" to look up the values.
occ_filename = '../data/predictions_gpt-3.5-turbo.csv'  # Predicted SOC titles and codes
occ_df = pd.read_csv(occ_filename, encoding="utf-8")


# ============================
# 3. Education Records Filtering
# ============================

# Filter required fields in one go, then copy
edu_df = edu_df[
    (~edu_df['EDUCATION_RAW'].isna()) &
    (~edu_df['SCHOOL_RAW'].isna()) &
    (~edu_df['START_DATE'].isna()) &
    (~edu_df['END_DATE'].isna())
].copy()

# Keep only BA+
valid_degrees = ["Bachelor's Degree", "Master's Degree", "Doctorate"]
edu_df = edu_df[edu_df['EDULEVEL_NAME'].isin(valid_degrees)]

# Sort education by individual and chronological order
edu_df.sort_values(['ID', 'START_DATE', 'END_DATE'], ascending=[True, True, True], inplace=True)

# ============================
# 3b. Keep only users that appear in job, edu, and dem tables
# ============================
valid_ids = (
    job_df[['ID']].drop_duplicates()
    .merge(edu_df[['ID']].drop_duplicates(), on='ID', how='inner')
)['ID']

job_df = job_df[job_df['ID'].isin(valid_ids)]
edu_df = edu_df[edu_df['ID'].isin(valid_ids)]

run_checks()

===== Dataset Summary =====
Job Table: 72 unique users, 295 rows
Education Table: 72 unique users, 106 rows

===== Job Table Checks =====
Missing values in required job fields:
| Field          |   Missing Count |
|----------------|-----------------|
| TITLE_RAW      |               0 |
| COMPANY_RAW    |               0 |
| CITY_RAW       |               0 |
| STATE_RAW      |               0 |
| COUNTRY_RAW    |               0 |
| JOB_START_DATE |               0 |
Jobs with JOB_END_DATE before JOB_START_DATE: 0
Jobs with missing start but non-missing end: 0
Past jobs missing end date: 0
Jobs outside 1999–2022 timeframe: 17
IDs in job_df missing from edu_df 0

===== Education Table Checks =====
Missing values in required education fields:
| Field         |   Missing Count |
|---------------|-----------------|
| EDULEVEL_NAME |               0 |
| SCHOOL_RAW    |               0 |
| START_DATE    |               0 |
| END_DATE      |               0 |
Education records with degrees b

# Steps 4 - 5

In [14]:
# ---------------------------
# Step 4: Post-Graduation Gap Filtering
# ---------------------------

# Sort education table deterministically
edu_df = edu_df.sort_values(['ID', 'EDULEVEL_NAME', 'END_DATE'], 
                            key=lambda col: col.map({"Bachelor's Degree":1, "Master's Degree":2, "Doctorate":3}) 
                                             if col.name=='EDULEVEL_NAME' else col)

# Compute BA graduation date per user
ba_df = edu_df[edu_df['EDULEVEL_NAME'] == "Bachelor's Degree"]
ba_grad_dates = ba_df.groupby('ID')['END_DATE'].min().rename('BA_GRAD_DATE')

# First job start date
first_job_dates = job_df.groupby('ID')['JOB_START_DATE'].min().rename('FIRST_JOB_DATE')

# Highest degree per user
degree_order = {"Bachelor's Degree": 1, "Master's Degree": 2, "Doctorate": 3}
edu_df['DEGREE_ORDER'] = edu_df['EDULEVEL_NAME'].map(degree_order)
# Take the row with max degree order per user
idx = edu_df.groupby('ID')['DEGREE_ORDER'].idxmax()
highest_degree = edu_df.loc[idx, ['ID','EDULEVEL_NAME']].set_index('ID')['EDULEVEL_NAME'].rename('HIGHEST_DEGREE')

# Combine into gap dataframe
gap_df = pd.concat([ba_grad_dates, first_job_dates, highest_degree], axis=1).dropna()

# Remove negative gaps (first job before BA graduation)
gap_df = gap_df[gap_df['FIRST_JOB_DATE'] >= gap_df['BA_GRAD_DATE']]

# Compute post-grad gap in years (rounded to 4 decimals)
gap_df['POST_GRAD_GAP_YEARS'] = ((gap_df['FIRST_JOB_DATE'] - gap_df['BA_GRAD_DATE']).dt.days / 365.25).round(4)

# pply maximum allowed gap thresholds according to highest degree
gap_thresholds = {"Bachelor's Degree": 3.75, "Master's Degree": 5.59, "Doctorate": 8.25}
valid_ids_gap = gap_df[gap_df['POST_GRAD_GAP_YEARS'] <= gap_df['HIGHEST_DEGREE'].map(gap_thresholds)].index

# Apply filter to main tables
job_df = job_df[job_df['ID'].isin(valid_ids_gap)].copy()
edu_df = edu_df[edu_df['ID'].isin(valid_ids_gap)].copy()

# ---------------------------
# Step 5: Timeframe filtering
# ---------------------------

start_year, end_year = 1999, 2022

# Compute first and last job start dates per user
job_start_minmax = job_df.groupby('ID')['JOB_START_DATE'].agg(['min','max'])

# Keep only users whose earliest job >= 1999 and latest job <= 2022
valid_ids_time = job_start_minmax[
    (job_start_minmax['min'].dt.year >= start_year) &
    (job_start_minmax['max'].dt.year <= end_year)
].index

# Filter all tables to these valid users
job_df = job_df[job_df['ID'].isin(valid_ids_time)].copy()
edu_df = edu_df[edu_df['ID'].isin(valid_ids_time)].copy()

run_checks()

===== Dataset Summary =====
Job Table: 19 unique users, 86 rows
Education Table: 19 unique users, 35 rows

===== Job Table Checks =====
Missing values in required job fields:
| Field          |   Missing Count |
|----------------|-----------------|
| TITLE_RAW      |               0 |
| COMPANY_RAW    |               0 |
| CITY_RAW       |               0 |
| STATE_RAW      |               0 |
| COUNTRY_RAW    |               0 |
| JOB_START_DATE |               0 |
Jobs with JOB_END_DATE before JOB_START_DATE: 0
Jobs with missing start but non-missing end: 0
Past jobs missing end date: 0
Jobs outside 1999–2022 timeframe: 0
IDs in job_df missing from edu_df 0

===== Education Table Checks =====
Missing values in required education fields:
| Field         |   Missing Count |
|---------------|-----------------|
| EDULEVEL_NAME |               0 |
| SCHOOL_RAW    |               0 |
| START_DATE    |               0 |
| END_DATE      |               0 |
Education records with degrees belo

# Career Trajectories Construction

In [15]:
# ---------------------------
# Career Trajectories Construction
# ---------------------------

def construct_linear_job_history():
    """
    Construct linear career trajectories per user by removing overlapping jobs,
    sorting by (start_date asc, end_date desc), and truncating to 5 years.
    Returns linear_job_df.
    """
    global job_df
    
    df = job_df.copy()
    df = df.sort_values(by=['ID', 'JOB_START_DATE', 'JOB_END_DATE'], ascending=[True, True, False])
    linear_jobs = []

    for uid, group in df.groupby('ID'):
        group = group.reset_index(drop=True)
        if len(group) == 0:
            continue

        trajectory = [group.iloc[0]]  # always keep earliest job

        for i in range(1, len(group)):
            last_job = trajectory[-1]
            current_job = group.iloc[i]

            if pd.notna(current_job['JOB_START_DATE']) and pd.notna(last_job['JOB_END_DATE']):
                if current_job['JOB_START_DATE'] >= last_job['JOB_END_DATE']:
                    trajectory.append(current_job)

        if len(trajectory) == 0:
            continue

        traj_df = pd.DataFrame(trajectory)
        traj_df['TRAJECTORY_ORDER'] = range(1, len(traj_df) + 1)

        # Compute trajectory start and end
        traj_start = traj_df['JOB_START_DATE'].min()
        traj_end = traj_df['JOB_END_DATE'].max()
        if pd.isna(traj_end):
            traj_end = traj_df['JOB_START_DATE'].max()
        duration_years = (traj_end - traj_start).days / 365.25

        # Filter out trajectories shorter than 5 years
        if duration_years < 5:
            continue

        # Truncate to first 5 years from trajectory start
        cutoff_date = traj_start + pd.DateOffset(years=5)
        traj_df = traj_df[traj_df['JOB_START_DATE'] <= cutoff_date]

        linear_jobs.append(traj_df)

    linear_job_df = pd.concat(linear_jobs, ignore_index=True)
    return linear_job_df


def flatten_to_trajectory_df():
    """
    Flatten linear_job_df to trajectory_df where each user is a single row with summary info.
    """
    global linear_job_df, edu_df
    trajectory_records = []
    degree_order = {"Bachelor's Degree": 1, "Master's Degree": 2, "Doctorate": 3}

    for uid, group in linear_job_df.groupby('ID'):
        group = group.sort_values(by='TRAJECTORY_ORDER')
        if len(group) == 0:
            continue

        first_job = group.iloc[0]
        last_job_end = group['JOB_END_DATE'].max()
        if pd.isna(last_job_end):
            last_job_end = group['JOB_START_DATE'].max()

        # -----------------------------
        # Max degree within trajectory period
        # -----------------------------
        edu_sub = edu_df[(edu_df['ID'] == uid) & (edu_df['END_DATE'] <= last_job_end)]
        if len(edu_sub) > 0:
            edu_sub = edu_sub.copy()
            edu_sub['degree_num'] = edu_sub['EDULEVEL_NAME'].map(degree_order).fillna(0)
            # Sort by numeric degree descending, then by END_DATE descending
            max_row = edu_sub.sort_values(['degree_num', 'END_DATE'], ascending=[False, False]).iloc[0]
            max_edu_name = max_row['EDULEVEL_NAME']
        else:
            max_edu_name = None

        # -----------------------------
        # First job attributes
        # -----------------------------
        onet_major_x = str(first_job['ONET_2019'])[:2] if pd.notna(first_job.get('ONET_2019')) else None
        naics6_major_x = str(first_job['NAICS6'])[:2] if pd.notna(first_job.get('NAICS6')) else None
        onet_detailed_x = str(first_job['ONET_2019'])[:7] if pd.notna(first_job.get('ONET_2019')) else None
        company_x = first_job.get('COMPANY_NAME', None)
        state_x = first_job.get('STATE_RAW', None)
        job_start_year_x = first_job['JOB_START_DATE'].year if pd.notna(first_job['JOB_START_DATE']) else None

        # Number of job changes in trajectory
        num_job_changes = len(group) - 1

        trajectory_records.append({
            'ID': uid,
            'max_edu_name': max_edu_name,
            'onet_major_x': onet_major_x,
            'onet_detailed_x': onet_detailed_x,
            'naics6_major_x': naics6_major_x,
            'company_x': company_x,
            'state_x': state_x,
            'job_start_year_x': job_start_year_x,
            'num_job_changes': num_job_changes
        })

    trajectory_df = pd.DataFrame(trajectory_records)
    return trajectory_df

In [16]:
# ----------------------------------------
# Verifiying line_job_df and trajectory_df
# ----------------------------------------

def run_checks_linear_jobs():
    """
    Compare original job_df and linear_job_df to verify linear trajectory construction.
    """
    global job_df, linear_job_df
    
    print(f"Original job_df: {len(job_df['ID'].unique())} unique users, {len(job_df)} rows")
    print(f"Linear job_df: {len(linear_job_df['ID'].unique())} unique users, {len(linear_job_df)} rows\n")

    # 1. Check all linear jobs exist in original job_df
    merged = linear_job_df.merge(job_df, on=['ID','JOB_START_DATE','JOB_END_DATE','TITLE_RAW','COMPANY_RAW','CITY_RAW','STATE_RAW','COUNTRY_RAW','IS_CURRENT'], how='left', indicator=True)
    missing_in_original = merged[merged['_merge']=='left_only']
    print("Jobs in linear_job_df missing from original job_df:", len(missing_in_original))
    
    # 2. Check chronological order and no overlaps per user
    overlap_issues = []
    for uid, group in linear_job_df.groupby('ID'):
        group = group.sort_values(['JOB_START_DATE','JOB_END_DATE'], ascending=[True, False]).reset_index(drop=True)
        for i in range(1, len(group)):
            prev_end = group.loc[i-1, 'JOB_END_DATE']
            curr_start = group.loc[i, 'JOB_START_DATE']
            if pd.notna(prev_end) and pd.notna(curr_start) and curr_start < prev_end:
                overlap_issues.append((uid, i, prev_end, curr_start))
    
    print("Users with overlapping jobs in linear_job_df:", len(set(uid for uid,_,_,_ in overlap_issues)))
    if overlap_issues:
        print("Sample overlap issues:")
        print(tabulate(overlap_issues[:10], headers=['ID','JobIndex','PrevEnd','CurrStart'], tablefmt='github'))
    
    # 3. Check first job matches original earliest job
    first_job_mismatch = []
    for uid, group in linear_job_df.groupby('ID'):
        first_linear = group.loc[group['JOB_START_DATE'].idxmin()]
        first_original = job_df[job_df['ID']==uid].sort_values('JOB_START_DATE').iloc[0]
        if first_linear['JOB_START_DATE'] != first_original['JOB_START_DATE']:
            first_job_mismatch.append(uid)
    
    print("Users where first job in linear_job_df does not match original first job:", len(first_job_mismatch))
    if first_job_mismatch:
        print("Sample user IDs with first job mismatch:", first_job_mismatch[:10])
    
    # 4. Summary statistics
    print("\n===== Summary =====")
    print("Original job count per user (mean, min, max):", job_df.groupby('ID').size().agg(['mean','min','max']).to_dict())
    print("Linear job count per user (mean, min, max):", linear_job_df.groupby('ID').size().agg(['mean','min','max']).to_dict())

linear_job_df = construct_linear_job_history()
run_checks_linear_jobs()

Original job_df: 19 unique users, 86 rows
Linear job_df: 13 unique users, 29 rows

Jobs in linear_job_df missing from original job_df: 0
Users with overlapping jobs in linear_job_df: 0
Users where first job in linear_job_df does not match original first job: 0

===== Summary =====
Original job count per user (mean, min, max): {'mean': 4.526315789473684, 'min': 1.0, 'max': 9.0}
Linear job count per user (mean, min, max): {'mean': 2.230769230769231, 'min': 1.0, 'max': 4.0}


In [17]:
def run_checks_flattened():
    """
    Verify that flattened trajectory_df is consistent with linear_job_df and edu_df.
    """
    global linear_job_df, trajectory_df, edu_df

    print(f"Original linear_job_df: {len(linear_job_df['ID'].unique())} unique users, {len(linear_job_df)} rows")
    print(f"Flattened trajectory_df: {len(trajectory_df)} rows\n")

    edu_violations = 0
    first_job_violations = 0
    job_change_violations = 0
    missing_jobs = 0

    # Prepare jobs per user
    jobs_per_user = {uid: df.sort_values('JOB_START_DATE') for uid, df in linear_job_df.groupby('ID')}

    degree_order = {"Bachelor's Degree": 1, "Master's Degree": 2, "Doctorate": 3}

    for idx, row in trajectory_df.iterrows():
        uid = row['ID']
        user_jobs = jobs_per_user.get(uid)
        if user_jobs is None or user_jobs.empty:
            missing_jobs += 1
            continue

        # First and last job end dates for trajectory
        first_job = user_jobs.iloc[0]
        last_job_end = user_jobs['JOB_END_DATE'].max()
        if pd.isna(last_job_end):
            last_job_end = user_jobs['JOB_START_DATE'].max()

        # ============================
        # 1. Max degree within trajectory period
        # ============================
        user_edu = edu_df[edu_df['ID'] == uid]
        user_edu_window = user_edu[user_edu['END_DATE'] <= last_job_end]

        if not user_edu_window.empty:
            user_edu_window = user_edu_window.copy()
            user_edu_window['degree_num'] = user_edu_window['EDULEVEL_NAME'].map(degree_order).fillna(0)
            # Sort by degree_num descending, then END_DATE descending
            max_row = user_edu_window.sort_values(['degree_num', 'END_DATE'], ascending=[False, False]).iloc[0]
            max_degree = max_row['EDULEVEL_NAME']
            if max_degree != row['max_edu_name']:
                edu_violations += 1

        # ============================
        # 2. First job attributes
        # ============================
        onet_major_first2 = str(first_job['ONET_2019'])[:2] if pd.notna(first_job.get('ONET_2019')) else None
        naics6_major_first2 = str(first_job['NAICS6'])[:2] if pd.notna(first_job.get('NAICS6')) else None
        state_first = first_job.get('STATE_RAW', None)

        if (onet_major_first2 != row['onet_major_x'] or
            naics6_major_first2 != row['naics6_major_x'] or
            state_first != row['state_x']):
            first_job_violations += 1

        # ============================
        # 3. Num job changes
        # ============================
        num_jobs = len(user_jobs)
        if row['num_job_changes'] != (num_jobs - 1):
            job_change_violations += 1

    # ============================
    # Summary Table
    # ============================
    summary = [
        ["Trajectory rows with jobs missing in linear_job_df", missing_jobs],
        ["Trajectory rows with incorrect max_edu_name", edu_violations],
        ["Trajectory rows with incorrect first job attributes", first_job_violations],
        ["Trajectory rows with incorrect num_job_changes", job_change_violations]
    ]

    print(tabulate(summary, headers=["Check", "Count"], tablefmt="grid"))

trajectory_df = flatten_to_trajectory_df()
run_checks_flattened()

Original linear_job_df: 13 unique users, 29 rows
Flattened trajectory_df: 13 rows

+-----------------------------------------------------+---------+
| Check                                               |   Count |
| Trajectory rows with jobs missing in linear_job_df  |       0 |
+-----------------------------------------------------+---------+
| Trajectory rows with incorrect max_edu_name         |       0 |
+-----------------------------------------------------+---------+
| Trajectory rows with incorrect first job attributes |       0 |
+-----------------------------------------------------+---------+
| Trajectory rows with incorrect num_job_changes      |       0 |
+-----------------------------------------------------+---------+


# Attrobite Values Extraction

In [18]:
# Apply demographic attribute coding ---
# 1 = male, 2 = female
# 1 = white, 2 = black, 3 = asian, 4 = hispanic
def choose_race(row):
    if row['L1'] == 1:
        return 1
    elif row['L2'] == 1:
        return 2
    elif row['L3'] == 1:
        return 3
    elif row['L4'] == 1:
        return 4
    else:
        return None

def add_gender_race(demo_csv='../data/samples/dem_1k.csv'):
    """
    Add 'gender' and 'race' columns to trajectory_df based on predicted value from demo_csv
    """    
    global trajectory_df
    
    dem_df = pd.read_csv(demo_csv, encoding="utf-8")
    dem_df['race'] = dem_df.apply(lambda x: choose_race(x), axis=1)
    dem_df.rename(columns={'L5': 'gender'}, inplace=True)
    dem_df = dem_df.query('~race.isna() and gender > 0 and gender < 3')[['ID', 'race', 'gender']]

    trajectory_df = trajectory_df.merge(dem_df, on='ID', how='left')

add_gender_race()

In [19]:
def add_generation_cohort():
    """
    Add 'generation' column to trajectory_df based on estimated birth year
    from Bachelor's degree end date.
    """
    global trajectory_df, edu_df

    # Get bachelor's graduation year for each user
    ba_grads = edu_df[edu_df['EDULEVEL_NAME'] == "Bachelor's Degree"].copy()
    ba_grads['BA_GRAD_YEAR'] = ba_grads['END_DATE'].dt.year

    # Estimate birth year
    ba_grads['BIRTH_YEAR_EST'] = ba_grads['BA_GRAD_YEAR'] - 23

    # Map to generation
    def map_generation(year):
        if pd.isna(year):
            return None
        year = int(year)
        if 1997 <= year <= 2012:
            return 'Generation Z'
        elif 1981 <= year <= 1996:
            return 'Millennials'
        elif 1965 <= year <= 1980:
            return 'Generation X'
        elif 1946 <= year <= 1964:
            return 'Baby Boomers'
        elif 1928 <= year <= 1945:
            return 'Silent Generation'
        else:
            return None

    ba_grads['generation'] = ba_grads['BIRTH_YEAR_EST'].map(map_generation)

    # Merge into trajectory_df
    trajectory_df = trajectory_df.merge(
        ba_grads[['ID', 'generation']], on='ID', how='left'
    )

add_generation_cohort()

In [20]:
def normalize_state_name(name):
    if pd.isna(name):
        return None
    # Lowercase, strip whitespace
    name = name.lower().strip()
    # Replace multiple spaces with single space
    name = re.sub(r'\s+', ' ', name)
    # Remove punctuation
    name = re.sub(r'[^\w\s]', '', name)
    return name

def add_state_gdp_decile(gdp_csv='../data/1998_2022_real_gdp_by_state.csv'):
    global trajectory_df
    gdp_df = pd.read_csv(gdp_csv)

    # Melt to long format
    gdp_long = gdp_df.melt(id_vars=['GeoName'], var_name='Year', value_name='GDP')
    gdp_long = gdp_long[gdp_long['Year'].str.isdigit()]
    gdp_long['Year'] = gdp_long['Year'].astype(int)

    # Normalize state names
    gdp_long['GeoName_norm'] = gdp_long['GeoName'].apply(normalize_state_name)
    trajectory_df['state_x_norm'] = trajectory_df['state_x'].apply(normalize_state_name)

    # Build lookup dict {(state_norm, year): GDP}
    state_year_to_gdp = {
        (row['GeoName_norm'], row['Year']): row['GDP']
        for _, row in gdp_long.iterrows()
    }

    # Map GDP to each row in trajectory_df
    def lookup_gdp(row):
        return state_year_to_gdp.get((row['state_x_norm'], row['job_start_year_x']), None)

    trajectory_df['state_gdp'] = trajectory_df.apply(lookup_gdp, axis=1)

    # Compute deciles
    trajectory_df['state_gdp_decile_x'] = pd.qcut(
        trajectory_df['state_gdp'], 10, labels=False, duplicates='drop'
    ) + 1

    # Drop temporary normalized column
    trajectory_df.drop(columns=['state_x_norm', 'state_gdp'], inplace=True)

add_state_gdp_decile('../data/1998_2022_real_gdp_by_state.csv')

In [21]:
def add_occupational_wage(wage_csv='data/wage_interpolated_1999_2022_soc_2019.csv'):
    global trajectory_df

    # -----------------------------
    # 1. Load wage table
    # -----------------------------
    wage_df = pd.read_csv(wage_csv, dtype={'OCC_CODE': str, 'AREA_TITLE': str, 'year': int})
    
    # Normalize strings
    wage_df['OCC_CODE'] = wage_df['OCC_CODE'].astype(str).str[:7]  # truncate to 6-digit + dash
    wage_df['AREA_TITLE'] = wage_df['AREA_TITLE'].str.strip().str.lower()

    # -----------------------------
    # 2. Prepare MultiIndex Series
    # -----------------------------
    wage_series = wage_df.set_index(['AREA_TITLE', 'year', 'OCC_CODE'])['A_MEAN']

    # -----------------------------
    # 3. Normalize trajectory_df keys
    # -----------------------------
    trajectory_df['state_norm'] = trajectory_df['state_x'].str.strip().str.lower()
    trajectory_df['onet_norm'] = trajectory_df['onet_detailed_x'].astype(str).str[:7]  # truncate to match wage table

    # -----------------------------
    # 4. Build MultiIndex for lookup
    # -----------------------------
    trajectory_index = pd.MultiIndex.from_arrays([
        trajectory_df['state_norm'],
        trajectory_df['job_start_year_x'],
        trajectory_df['onet_norm']
    ])

    # -----------------------------
    # 5. Vectorized lookup
    # -----------------------------
    trajectory_df['annual_state_wage_x'] = wage_series.reindex(trajectory_index).to_numpy()

    # -----------------------------
    # 6. Cleanup
    # -----------------------------
    trajectory_df.drop(columns=['state_norm', 'onet_norm'], inplace=True)

    print(f"Added 'annual_state_wage_x', missing values: {trajectory_df['annual_state_wage_x'].isna().sum()}")

start = time.time()
add_occupational_wage('../data/wage_interpolated_1999_2022_soc_2019.csv')
end = time.time()
print(f"Done in {end - start:.4f} sec")

Added 'annual_state_wage_x', missing values: 3
Done in 2.0336 sec


In [22]:
def add_job_change_types():
    global trajectory_df, linear_job_df
    
    # Initialize columns in trajectory_df
    trajectory_df[['move_1_1', 'move_1_2', 'move_2_1', 'move_2_2']] = 0
    
    # Ensure jobs are sorted by trajectory
    linear_job_df_sorted = linear_job_df.sort_values(['ID', 'TRAJECTORY_ORDER']).copy()
    
    # For each user
    for uid, group in linear_job_df_sorted.groupby('ID'):
        if len(group) < 2:
            continue  # No job change
        
        # Shift columns to compare consecutive jobs
        prev_company = group['COMPANY_NAME'].shift(0)  # current row
        next_company = group['COMPANY_NAME'].shift(-1) # next row
        prev_onet = group['ONET_2019'].shift(0)
        next_onet = group['ONET_2019'].shift(-1)
        
        # Determine job change types
        type_1 = ((prev_company != next_company) & (prev_onet != next_onet)).astype(int)
        type_2 = ((prev_company == next_company) & (prev_onet != next_onet)).astype(int)
        type_3 = ((prev_company != next_company) & (prev_onet == next_onet)).astype(int)
        type_4 = ((prev_company == next_company) & (prev_onet == next_onet)).astype(int)
        
        # If user has at least one occurrence of the type, set 1 in trajectory_df
        trajectory_df.loc[trajectory_df['ID'] == uid, 'move_1_1'] = int(type_1.sum() > 0)
        trajectory_df.loc[trajectory_df['ID'] == uid, 'move_1_2'] = int(type_2.sum() > 0)
        trajectory_df.loc[trajectory_df['ID'] == uid, 'move_2_1'] = int(type_3.sum() > 0)
        trajectory_df.loc[trajectory_df['ID'] == uid, 'move_2_2'] = int(type_4.sum() > 0)

    print("Added job movement type indicators: move_1_1, move_1_2, move_2_1, move_2_2")

add_job_change_types()


Added job movement type indicators: move_1_1, move_1_2, move_2_1, move_2_2


In [23]:
def run_checks_job_change_types():
    """
    Verify that the job change type indicators in trajectory_df are consistent
    with linear_job_df.
    """
    global linear_job_df, trajectory_df

    type_1_violations = 0
    type_2_violations = 0
    type_3_violations = 0
    type_4_violations = 0

    # Ensure jobs sorted
    linear_job_df_sorted = linear_job_df.sort_values(['ID', 'TRAJECTORY_ORDER']).copy()

    for uid, group in linear_job_df_sorted.groupby('ID'):
        if len(group) < 2:
            continue

        prev_company = group['COMPANY_NAME'].shift(0)
        next_company = group['COMPANY_NAME'].shift(-1)
        prev_onet = group['ONET_2019'].shift(0)
        next_onet = group['ONET_2019'].shift(-1)

        type_1 = ((prev_company != next_company) & (prev_onet != next_onet)).any()
        type_2 = ((prev_company == next_company) & (prev_onet != next_onet)).any()
        type_3 = ((prev_company != next_company) & (prev_onet == next_onet)).any()
        type_4 = ((prev_company == next_company) & (prev_onet == next_onet)).any()

        # Compare with trajectory_df
        traj_row = trajectory_df[trajectory_df['ID'] == uid].iloc[0]

        if traj_row['move_1_1'] != int(type_1):
            type_1_violations += 1
        if traj_row['move_1_2'] != int(type_2):
            type_2_violations += 1
        if traj_row['move_2_1'] != int(type_3):
            type_3_violations += 1
        if traj_row['move_2_2'] != int(type_4):
            type_4_violations += 1

    summary = [
        ['move_1_1 (Type 1) violations', type_1_violations],
        ['move_1_2 (Type 2) violations', type_2_violations],
        ['move_2_1 (Type 3) violations', type_3_violations],
        ['move_2_2 (Type 4) violations', type_4_violations]
    ]

    print(tabulate(summary, headers=['Job Change Type', 'Count'], tablefmt='grid'))

run_checks_job_change_types()

+------------------------------+---------+
| Job Change Type              |   Count |
| move_1_1 (Type 1) violations |       0 |
+------------------------------+---------+
| move_1_2 (Type 2) violations |       0 |
+------------------------------+---------+
| move_2_1 (Type 3) violations |       0 |
+------------------------------+---------+
| move_2_2 (Type 4) violations |       0 |
+------------------------------+---------+


In [24]:
def add_up_move(wage_csv='data/wage_interpolated_1999_2022_soc_2019.csv', threshold=0.05):
    """
    Add 'up_move' indicator to trajectory_df based on wage change from first to last job.
    Up_move = 1 if last job's wage > first job's wage by threshold fraction, else 0.
    """

    global trajectory_df, linear_job_df

    # -----------------------------
    # 1. Read and prepare wage table
    # -----------------------------
    wage_df = pd.read_csv(wage_csv, dtype={'OCC_CODE': str, 'AREA_TITLE': str, 'year': int})
    wage_df['OCC_CODE'] = wage_df['OCC_CODE'].astype(str)
    wage_df['AREA_TITLE'] = wage_df['AREA_TITLE'].str.strip().str.lower()
    # Keep only necessary columns
    wage_df = wage_df[['AREA_TITLE', 'year', 'OCC_CODE', 'A_MEAN']]

    # -----------------------------
    # 2. Prepare last job info
    # -----------------------------
    # Normalize IDs
    trajectory_df['ID'] = trajectory_df['ID'].astype(str).str.strip()
    linear_job_df['ID'] = linear_job_df['ID'].astype(str).str.strip()

    # Last job per user
    last_jobs = linear_job_df.sort_values(['ID','TRAJECTORY_ORDER']).groupby('ID').last().reset_index()
    last_jobs['state_norm'] = last_jobs['STATE_RAW'].astype(str).str.strip().str.lower()
    # Truncate ONET to 6-digit SOC code for matching wage table
    last_jobs['onet_norm'] = last_jobs['ONET_2019'].astype(str).str[:7]
    last_jobs['year'] = last_jobs['JOB_START_DATE'].dt.year

    # -----------------------------
    # 3. Merge with wage table (vectorized)
    # -----------------------------
    last_jobs = last_jobs.merge(
        wage_df.rename(columns={'AREA_TITLE':'state_norm','OCC_CODE':'onet_norm','A_MEAN':'annual_state_wage_y'}),
        on=['state_norm','year','onet_norm'],
        how='left'
    )
    last_jobs = last_jobs[['ID','annual_state_wage_y']]

    # -----------------------------
    # 4. Merge last job wage into trajectory_df
    # -----------------------------
    trajectory_df = trajectory_df.merge(last_jobs, on='ID', how='left')

    # -----------------------------
    # 5. Compute up_move
    # -----------------------------
    trajectory_df['up_move'] = ((trajectory_df['annual_state_wage_y'] - trajectory_df['annual_state_wage_x'])
                                / trajectory_df['annual_state_wage_x'] > threshold).astype(int)

    # -----------------------------
    # 6. Drop auxiliary column
    # -----------------------------
    trajectory_df.drop(columns=['annual_state_wage_y'], inplace=True)

    # -----------------------------
    # 7. Print stats
    # -----------------------------
    matched = last_jobs['annual_state_wage_y'].notna().sum()
    total = len(last_jobs)
    print(f"Last job wage lookup: {matched}/{total} matched ({matched/total*100:.2f}%)")
    print(f"Added 'up_move' (threshold={threshold}). Positive cases: {trajectory_df['up_move'].sum()}")


start = time.time()
add_up_move('../data/wage_interpolated_1999_2022_soc_2019.csv', 0.05)
end = time.time()
print(f"Done in {end - start:.4f} sec")

Last job wage lookup: 7/13 matched (53.85%)
Added 'up_move' (threshold=0.05). Positive cases: 3
Done in 2.1828 sec


# Final Clean Up

In [26]:
output_filename = '../data/samples/career_trajectories.parquet'

# -----------------------------
# 1. Drop unnecessary columns
# -----------------------------
drop_cols = ['ID', 'onet_detailed_x']
trajectory_df = trajectory_df.drop(columns=[c for c in drop_cols if c in trajectory_df.columns])

# -----------------------------
# 2. Drop rows with null values
# -----------------------------
trajectory_df = trajectory_df.dropna().reset_index(drop=True)

# -----------------------------
# 3. Log transform wage
# -----------------------------
trajectory_df['log_wage_x'] = np.log(trajectory_df['annual_state_wage_x'])

# -----------------------------
# 4. Top-code num_job_changes at 95th percentile
# -----------------------------
if 'num_job_changes' in trajectory_df.columns:
    threshold = int(np.percentile(trajectory_df['num_job_changes'], 95))
    trajectory_df.loc[trajectory_df['num_job_changes'] > threshold, 'num_job_changes'] = threshold

# -----------------------------
# 5. Export to parquet
# -----------------------------
trajectory_df.to_parquet(output_filename, index=False)
print(f"trajectory_df cleaned and saved to {output_filename}. Shape: {trajectory_df.shape}")

trajectory_df cleaned and saved to data/samples/career_trajectories.paquet. Shape: (10, 18)
