In [1]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from itertools import combinations
import polars as pl

In [2]:
DATA_FOLDER = "../data/processed"

In [3]:
# Remap numeric column values to interpretable values.
sex_map = {
    1: "Male",
    2: "Female",
    9: "Participant did not self-identify",
}

employment_status_map = {
    1: "Employed",
    2: "Employed, but Received Notice of \
    Termination of Employment or Military \
    Separation is pending",
    3: "Not in labor force",
    0: "Unemployed"
}

race_map = {
    1: "Hispanic",
    2: "Asian (not Hispanic)",
    3: "Black (not Hispanic)",
    4: "Native Hawaiian or Pacific Islander (not Hispanic)",
    5: "American Indian or Alaska Native (not Hispanic)",
    6: "White (not Hispanic)",
    7: "Multiple Race (not Hispanic)",
}

def age_map(age):
  if (age < 5):
    return "Under 5 years"
  elif (5 <= age < 17):
    return "5 to 17 years"
  elif (17 <= age < 25):
    return "17 to 24 years"
  elif (25 <= age < 35):
    return "25 to 34 years"
  elif (35 <= age < 45):
    return "35 to 44 years"
  elif (45 <= age < 55):
    return "45 to 54 years"
  elif (55 <= age < 65):
    return "55 to 64 years"
  elif (65 <= age < 85):
    return "65 to 84 years"
  elif (85 <= age < 100):
    return "85 to 99 years"
  elif (100 <= age):
    return "100 years and over"
  else:
    return "Unknown"

highest_educational_level_map = {
  1: "Attained secondary school diploma",
  2: "Attained a secondary school equivalency",
  3: "The participant with a disability receives \
  a certificate of attendance/completion as a \
  result of successfully completing an \
  Individualized Education Program (IEP)",
  4: "Completed one of more years of postsecondary education",
  5: "Attained a postsecondary technical or vocational certificate (non-degree)",
  6: "Attained an Associate's degree",
  7: "Attained a Bachelor's degree",
  8: "Attained a degree beyond a Bachelor's degree",
  0: "No Educational Level Completed",
}

low_income_status_map = {
    1: "Yes",
    0: "No"
}

received_training_map = {
  1: "Yes",
  0: "No"
}

In [4]:
occupations = pl.scan_csv(f"{DATA_FOLDER}/occupations.csv")
workforce_boards = pl.scan_csv(f"{DATA_FOLDER}/workforce_boards.csv")

In [5]:
rti_by_subsector = pl.scan_parquet(f"{DATA_FOLDER}/rti_by_subsector.parquet")
rti_by_industry = pl.scan_parquet(f"{DATA_FOLDER}/rti_by_industry.parquet")
rti_by_occupation = pl.scan_parquet(f"{DATA_FOLDER}/rti_by_occupation.parquet")

In [6]:
data_lazy = pl.scan_parquet(f"{DATA_FOLDER}/wioa_data_filtered.parquet")

In [7]:
data_lazy = (
    data_lazy
    .with_columns([
      pl.col("sex").replace_strict(sex_map, default=None),
      pl.col("race").replace_strict(race_map, default=None),
      pl.col("age").map_elements(age_map, return_dtype=pl.String),
      pl.col("highest_educational_level").replace_strict(highest_educational_level_map, default=None),
      pl.col("low_income_status").replace_strict(low_income_status_map, default=None),
      pl.col("employment_status").replace_strict(employment_status_map, default=None),
      pl.col("received_training").replace_strict(received_training_map, default=None),
    ])
)

In [8]:
# Add an individaual's pre-program occupation title based on the occupation code.
data_lazy = (
    data_lazy.join(
        occupations,
        left_on="occupational_code_pre",
        right_on="occupation_code",
        how="left"
    )
    .drop(["soc_level", "occupation_code_prefix"])
    .rename({"occupation_title": "occupation_title_pre"})
)

# Use an individual's pre-program occupation code to get rti.
data_lazy = (
    data_lazy.join(
        rti_by_occupation,     
        left_on="occupational_code_pre",
        right_on="occupation_code",
        how="left"
    )
    .drop(["occ2000"])
    .rename({
      'r_cog': 'r_cog_pre',
      'r_man': 'r_man_pre',
      'offshor': 'offshor_pre'
    })
    .drop([
        "socwt",
        "nr_cog_anal",
        "nr_cog_pers",
        "nr_man_phys",	
        "nr_man_pers"
    ])
)

# Add an individaual's post-program occupation title based on the occupation code.
data_lazy = (
  data_lazy.join(
      occupations, 
      left_on="occupational_code_post", 
      right_on="occupation_code",
      how="left"
  )
  .drop(["soc_level", "occupation_code_prefix"])
  .rename({"occupation_title": "occupation_title_post"})
)

# Use an individual's pre-program occupation code to get rti.
data_lazy = (
    data_lazy.join(
        rti_by_occupation,
        left_on="occupational_code_post",
        right_on="occupation_code",
        how="left"
    )
    .drop(["occ2000"])
    .rename({
      'r_cog': 'r_cog_post',
      'r_man': 'r_man_post',
      'offshor': 'offshor_post'
    })
    .drop([
        "socwt",
        "nr_cog_anal",
        "nr_cog_pers",
        "nr_man_phys",	
        "nr_man_pers"
    ])
)

In [9]:
# Join with industry-level rti for pre-program industry codes
data_lazy = (
   data_lazy.join(
      rti_by_subsector,
      left_on="subsector_code_pre",
      right_on="subsector_code",
      how="left"
   )
   .rename({
      "industry_title": "industry_title_pre",
      "subsector_title": "subsector_title_pre",
      "r_cog_industry": "r_cog_industry_pre",
      "r_man_industry": "r_man_industry_pre",
      "offshor_industry": "offshor_industry_pre"
  })
)

# Join with industry-level rti for pre-program industry codes
data_lazy = (
   data_lazy.join(
      rti_by_subsector,
      left_on="subsector_code_post",
      right_on="subsector_code",
      how="left"
   )
   .rename({
      "industry_title": "industry_title_post",
      "subsector_title": "subsector_title_post",
      "r_cog_industry": "r_cog_industry_post",
      "r_man_industry": "r_man_industry_post",
      "offshor_industry": "offshor_industry_post"
  })
)

In [10]:
# Calculate pre- and post-program difference in wages
data_lazy = data_lazy.with_columns([
    pl.mean_horizontal("wages_1q_pre", "wages_2q_pre", "wages_3q_pre").alias("wages_mean_pre"),
    pl.mean_horizontal("wages_1q_post", "wages_2q_post", "wages_3q_post", "wages_4q_post").alias("wages_mean_post")
])

data_lazy = data_lazy.with_columns([
    (pl.col("wages_mean_post") - pl.col("wages_mean_pre")).alias("diff_wages_mean"),
])

In [11]:
# Calculate pre- and post-program difference in rti
# A positive difference in pre- and post-program correspond to 
# an occupation change that is has more routine cognitive/manual task 
# or has more offshorable
data_lazy = data_lazy.with_columns([
   (pl.col("r_cog_post") - pl.col("r_cog_pre")).alias("diff_r_cog"),
   (pl.col("r_man_post") - pl.col("r_man_pre")).alias("diff_r_man"),
   (pl.col("offshor_post") - pl.col("offshor_pre")).alias("diff_offshor"),
   (pl.col("r_cog_industry_post") - pl.col("r_cog_industry_pre")).alias("diff_r_cog_industry"),
   (pl.col("r_man_industry_post") - pl.col("r_man_industry_pre")).alias("diff_r_man_industry"),
   (pl.col("offshor_industry_post") - pl.col("offshor_industry_pre")).alias("diff_offshor_industry"),
])

In [12]:
# Create indicator variables for pre- and post-program differences in 
# routine cognitive, routine manual, offshorability, and wages
data_lazy = data_lazy.with_columns([
    # True if routine cognitive tasks (strictly) increase
    (pl.col("diff_r_cog") > 0).alias("is_r_cog_increased"), 
    # True if routine manual tasks (strictly) increase
    (pl.col("diff_r_man") > 0).alias("is_r_man_increased"), 
    # True if offshorability (strictly) increase
    (pl.col("diff_offshor") > 0).alias("is_offshor_increased"), 
    # True if routine cognitive tasks (strictly) increase
    (pl.col("diff_r_cog_industry") > 0).alias("is_r_cog_industry_increased"), 
    # True if routine manual tasks (strictly) increase
    (pl.col("diff_r_man_industry") > 0).alias("is_r_man_industry_increased"), 
    # True if offshorability (strictly) increase
    (pl.col("diff_offshor_industry") > 0).alias("is_offshor_industry_increased"), 
    # True if wages (strictly) increase
    (pl.col("diff_wages_mean") > 0).alias("is_wages_mean_increased"), 
])

In [13]:
data_lazy = data_lazy.with_columns(
   (
    # Tier 1: Has wages + all individual-level variables (most complete data)
    pl.col("is_wages_mean_increased").is_not_null() &
    pl.col("is_r_cog_increased").is_not_null() &
    pl.col("is_r_man_increased").is_not_null() &
    pl.col("is_offshor_increased").is_not_null()
   ).alias("has_occupational_outcomes"),
   (
    # Tier 2: Has wages + all industry-level variables (fallback data)
    pl.col("is_wages_mean_increased").is_not_null() &
    pl.col("is_r_cog_industry_increased").is_not_null() &
    pl.col("is_r_man_industry_increased").is_not_null() &
    pl.col("is_offshor_industry_increased").is_not_null()
   ).alias("has_industry_outcomes"),
   (
    # Tier 3: Has wages only (minimal usable data)
    pl.col("is_wages_mean_increased").is_not_null()
   ).alias("has_wages_outcomes")
)

In [14]:
dimensions = [
    'low_income_status', 'employment_status',
    'race', 'sex', 'age', 'state', 'highest_educational_level',
    'subsector_title_pre', 'subsector_title_post', 'funding_stream'
]

metrics = [
    "percent_r_cog_industry_increased",
    "percent_r_man_industry_increased",
    "percent_offshor_industry_increased",
    "percent_wages_mean_increased",

    "diff_r_cog_industry_median",
    "diff_r_cog_industry_25th",
    "diff_r_cog_industry_75th",
    "diff_r_cog_industry_mean",

    "diff_r_man_industry_median",
    "diff_r_man_industry_25th",
    "diff_r_man_industry_75th",
    "diff_r_man_industry_mean",

    "diff_offshor_industry_median",
    "diff_offshor_industry_25th",
    "diff_offshor_industry_75th",
    "diff_offshor_industry_mean",

    "diff_wages_mean_median",
    "diff_wages_mean_25th",
    "diff_wages_mean_75th",
    "diff_wages_mean_mean",

    "count",
]

column_order = dimensions + metrics

# Clean data - drop rows with nulls in any dimension column
data_lazy = data_lazy.drop_nulls(subset=dimensions)

In [15]:
# Create all combinations of dimensions from 1 to N
grouping_sets = [list(c) for i in range(1, len(dimensions)+1) for c in combinations(dimensions, i)]

# Filter to "industry" tier data only
data_lazy = data_lazy.filter(pl.col("has_industry_outcomes"))

In [16]:
data = data_lazy.collect(engine="streaming")

In [17]:
def consolidate_multiple_columns(data, columns, min_percentage=0.05):
    """Consolidate multiple columns at once, overwriting originals"""
    
    consolidated_data = data
    consolidated_columns = []
    non_consolidated_columns = []
    
    for column in columns:
        # Skip if column is not string/categorical
        if consolidated_data[column].dtype not in [pl.String, pl.Categorical]:
            non_consolidated_columns.append(column)
            print(f"{column}: skipped (not categorical)")
            continue
            
        total_count = consolidated_data.height
        value_counts = consolidated_data.select(pl.col(column).value_counts()).unnest(column)
        
        # Calculate percentages
        value_counts = value_counts.with_columns(
            (pl.col("count") / total_count).alias("percentage")
        )
        
        # Keep categories above threshold
        keep_categories = value_counts.filter(
            pl.col("percentage") >= min_percentage
        ).select(column).to_series().to_list()
        
        original_categories = value_counts.height
        kept_categories = len(keep_categories)
        
        # Only consolidate if we're actually reducing categories
        if kept_categories < original_categories:
            # Overwrite original column with consolidated version
            consolidated_data = consolidated_data.with_columns(
                pl.when(pl.col(column).is_in(keep_categories))
                .then(pl.col(column))
                .otherwise(pl.lit("Other"))
                .alias(column)  # Same name as original
            )
            consolidated_columns.append(column)
            print(f"{column}: consolidated to {kept_categories} categories (from {original_categories})")
        else:
            # No consolidation needed
            non_consolidated_columns.append(column)
            print(f"{column}: no consolidation needed ({original_categories} categories)")
    
    return consolidated_data, consolidated_columns, non_consolidated_columns

In [18]:
consolidated_data, consolidated_cols, non_consolidated_cols = consolidate_multiple_columns(
    data, dimensions, min_percentage=0.02
)

low_income_status: no consolidation needed (2 categories)
employment_status: consolidated to 2 categories (from 4)
race: consolidated to 4 categories (from 7)
sex: consolidated to 2 categories (from 3)
age: consolidated to 6 categories (from 8)
state: consolidated to 13 categories (from 53)
highest_educational_level: consolidated to 8 categories (from 9)
subsector_title_pre: consolidated to 10 categories (from 89)
subsector_title_post: consolidated to 10 categories (from 89)
funding_stream: consolidated to 4 categories (from 5)


In [19]:
aggregates = []

aggregations = [
    pl.col("is_r_cog_industry_increased").mean().alias("percent_r_cog_industry_increased"),
    pl.col("is_r_man_industry_increased").mean().alias("percent_r_man_industry_increased"),
    pl.col("is_offshor_industry_increased").mean().alias("percent_offshor_industry_increased"),
    pl.col("is_wages_mean_increased").mean().alias("percent_wages_mean_increased"),

    pl.col("diff_r_cog_industry").median().alias("diff_r_cog_industry_median"),
    pl.col("diff_r_cog_industry").quantile(0.25, interpolation="linear").alias("diff_r_cog_industry_25th"),
    pl.col("diff_r_cog_industry").quantile(0.75, interpolation="linear").alias("diff_r_cog_industry_75th"),
    pl.col("diff_r_cog_industry").mean().alias("diff_r_cog_industry_mean"),

    pl.col("diff_r_man_industry").median().alias("diff_r_man_industry_median"),
    pl.col("diff_r_man_industry").quantile(0.25, interpolation="linear").alias("diff_r_man_industry_25th"),
    pl.col("diff_r_man_industry").quantile(0.75, interpolation="linear").alias("diff_r_man_industry_75th"),
    pl.col("diff_r_man_industry").mean().alias("diff_r_man_industry_mean"),

    pl.col("diff_offshor_industry").median().alias("diff_offshor_industry_median"),
    pl.col("diff_offshor_industry").quantile(0.25, interpolation="linear").alias("diff_offshor_industry_25th"),
    pl.col("diff_offshor_industry").quantile(0.75, interpolation="linear").alias("diff_offshor_industry_75th"),
    pl.col("diff_offshor_industry").mean().alias("diff_offshor_industry_mean"),

    pl.col("diff_wages_mean").median().alias("diff_wages_mean_median"),
    pl.col("diff_wages_mean").quantile(0.25, interpolation="linear").alias("diff_wages_mean_25th"),
    pl.col("diff_wages_mean").quantile(0.75, interpolation="linear").alias("diff_wages_mean_75th"),
    pl.col("diff_wages_mean").mean().alias("diff_wages_mean_mean"),

    pl.len().alias("count"),
]

# Process each grouping combination
num_groups = len(grouping_sets)

for i, group in enumerate(grouping_sets):
    print(f"Aggregating group {i + 1} of {num_groups}")

    # Aggregate by current group
    grouped = consolidated_data.group_by(group).agg(aggregations)
    
    # Add "All" for dimensions not in current group
    for dim in dimensions:
        if dim not in group:
            grouped = grouped.with_columns(pl.lit("All").alias(dim))

    # Reorder columns to match final schema
    grouped = grouped.select(column_order)
    aggregates.append(grouped)

# Grand total row - aggregate all data
grand_total = consolidated_data.select(aggregations)

# Add all dimension columns as "All" and groupby marker
for dim in dimensions:
    grand_total = grand_total.with_columns(pl.lit("All").alias(dim))

# Reorder grand total columns to match other aggregates
grand_total = grand_total.select(column_order)
aggregates.append(grand_total)

Aggregating group 1 of 1023
Aggregating group 2 of 1023
Aggregating group 3 of 1023
Aggregating group 4 of 1023
Aggregating group 5 of 1023
Aggregating group 6 of 1023
Aggregating group 7 of 1023
Aggregating group 8 of 1023
Aggregating group 9 of 1023
Aggregating group 10 of 1023
Aggregating group 11 of 1023
Aggregating group 12 of 1023
Aggregating group 13 of 1023
Aggregating group 14 of 1023
Aggregating group 15 of 1023
Aggregating group 16 of 1023
Aggregating group 17 of 1023
Aggregating group 18 of 1023
Aggregating group 19 of 1023
Aggregating group 20 of 1023
Aggregating group 21 of 1023
Aggregating group 22 of 1023
Aggregating group 23 of 1023
Aggregating group 24 of 1023
Aggregating group 25 of 1023
Aggregating group 26 of 1023
Aggregating group 27 of 1023
Aggregating group 28 of 1023
Aggregating group 29 of 1023
Aggregating group 30 of 1023
Aggregating group 31 of 1023
Aggregating group 32 of 1023
Aggregating group 33 of 1023
Aggregating group 34 of 1023
Aggregating group 35 of

In [20]:
# Combine all aggregations into single dataframe
index_df = pl.concat(aggregates)

index_df = index_df.with_columns([
    pl.col(c).cast(pl.Categorical) for c in dimensions
])

In [22]:
index_df.write_parquet("../data/processed/index.parquet", compression="zstd")