# DAIOE SSYK2012 Workflow

Organized end-to-end flow for preparing DAIOE + SCB SSYK data.


## 1) Setup Paths and Data Sources

Define imports, workspace paths, and source locations.


In [1]:
import polars as pl
from pathlib import Path


ROOT = Path.cwd().resolve()
DATA_DIR = ROOT / "data"
DATA_DIR.mkdir(parents=True, exist_ok=True)

DAIOE_SOURCE: str = (
    "https://raw.githubusercontent.com/joseph-data/07_translate_ssyk/main/"
    "03_translated_files/daioe_ssyk2012_translated.csv"
)

SCB_SOURCE: str = (
        "https://raw.githubusercontent.com/joseph-data/AI_Econ_daioe_years/daioe_pull/"
        "data/processed/ssyk12_aggregated_ssyk4_to_ssyk1.parquet"
)

## 2) Load DAIOE and SCB Lazily

Read source files as `LazyFrame` objects for efficient transformations.


In [2]:
daioe_lazy_lf = pl.scan_csv(
    DAIOE_SOURCE
)

scb_lazy_lf = pl.scan_parquet(
    SCB_SOURCE
)

## 3) Define Utility Helpers

Create helper functions used for lightweight pipeline inspection.


In [3]:
def inspect_lazy(lf: pl.LazyFrame) -> None:
    """
    Print the shape of a Polars LazyFrame in a memory-efficient manner.

    This function computes the number of rows using a lazy row-count
    aggregation (`pl.len()`) and retrieves the number of columns from
    the resolved schema without materializing the full dataset.

    Parameters
    ----------
    lf : pl.LazyFrame
        The LazyFrame to inspect.

    Notes
    -----
    - The row count triggers execution of the lazy query plan,
      but avoids collecting all columns into memory.
    - The column count is obtained from the schema metadata and
      does not require data materialization.
    - Intended for debugging and validation of large lazy pipelines.
    """
    n_rows = lf.select(pl.len()).collect().item()
    n_cols = len(lf.collect_schema())
    print(f"Rows: {n_rows:,}")
    print(f"Columns: {n_cols}")


## 4) Quick Sanity Checks and Early Military Removal

Preview both sources and remove code-0 military rows early from DAIOE and SCB.


In [4]:
print(daioe_lazy_lf.head(5).collect())

shape: (5, 27)
┌────────────┬──────┬────────────┬────────────┬───┬────────────┬───────────┬───────────┬───────────┐
│ ssyk2012_4 ┆ year ┆ daioe_alla ┆ daioe_stra ┆ … ┆ pctl_rank_ ┆ ssyk2012_ ┆ ssyk2012_ ┆ ssyk2012_ │
│ ---        ┆ ---  ┆ pps        ┆ tgames     ┆   ┆ genai      ┆ 1         ┆ 2         ┆ 3         │
│ str        ┆ i64  ┆ ---        ┆ ---        ┆   ┆ ---        ┆ ---       ┆ ---       ┆ ---       │
│            ┆      ┆ f64        ┆ f64        ┆   ┆ f64        ┆ str       ┆ str       ┆ str       │
╞════════════╪══════╪════════════╪════════════╪═══╪════════════╪═══════════╪═══════════╪═══════════╡
│ 0110 Commi ┆ 2010 ┆ null       ┆ null       ┆ … ┆ null       ┆ 0 Armed   ┆ 01        ┆ 011 Commi │
│ ssioned    ┆      ┆            ┆            ┆   ┆            ┆ forces    ┆ Officers  ┆ ssioned   │
│ armed      ┆      ┆            ┆            ┆   ┆            ┆ occupatio ┆           ┆ armed     │
│ forces…    ┆      ┆            ┆            ┆   ┆            ┆ ns        ┆

In [5]:
print(scb_lazy_lf.head(5).collect())

shape: (5, 7)
┌───────┬───────────┬───────┬───────┬──────┬───────┬─────────────────────────────────┐
│ level ┆ ssyk_code ┆ age   ┆ sex   ┆ year ┆ count ┆ occupation                      │
│ ---   ┆ ---       ┆ ---   ┆ ---   ┆ ---  ┆ ---   ┆ ---                             │
│ str   ┆ str       ┆ str   ┆ str   ┆ i64  ┆ i64   ┆ str                             │
╞═══════╪═══════════╪═══════╪═══════╪══════╪═══════╪═════════════════════════════════╡
│ SSYK4 ┆ 2132      ┆ 45-49 ┆ women ┆ 2024 ┆ 80    ┆ Plant and animal biologists     │
│ SSYK4 ┆ 8163      ┆ 55-59 ┆ women ┆ 2017 ┆ 184   ┆ Machine operators, baked-goods… │
│ SSYK4 ┆ 2413      ┆ 35-39 ┆ men   ┆ 2016 ┆ 556   ┆ Financial and investment advis… │
│ SSYK4 ┆ 1222      ┆ 16-24 ┆ men   ┆ 2016 ┆ 7     ┆ Human resource managers, level… │
│ SSYK4 ┆ 8199      ┆ 25-29 ┆ women ┆ 2019 ┆ 62    ┆ Process control technicians no… │
└───────┴───────────┴───────┴───────┴──────┴───────┴─────────────────────────────────┘


In [6]:
# daioe_lazy_lf.collect_schema()

In [7]:
## Removed Military Personnel

scb_lazy_lf = scb_lazy_lf.filter(
    pl.col("ssyk_code").str.starts_with("0").not_()
)

daioe_lazy_lf = daioe_lazy_lf.filter(
    pl.col("ssyk2012_4").str.starts_with("0").not_()
)

In [8]:
#scb_lazy_lf.collect().collect_schema()

## 5) Derive SSYK Levels in DAIOE

Split DAIOE SSYK4 into SSYK1-4 and keep SSYK2012-era years.


In [9]:
daioe_lazy_lf_ssyk12 = (
    daioe_lazy_lf\
    .with_columns([
    pl.col("ssyk2012_4").str.slice(0, 1).alias("code_1"),
    pl.col("ssyk2012_4").str.slice(0, 2).alias("code_2"),
    pl.col("ssyk2012_4").str.slice(0, 3).alias("code_3"),
    pl.col("ssyk2012_4").str.slice(0, 4).alias("code_4")
])\
    .drop(pl.col("^ssyk2012.*$"))\
        .filter(pl.col("year") >= 2014) ## The Year stretch from the first SSYK12 publication
)

## 6) Align DAIOE Years to SCB Coverage

Extend DAIOE series forward when SCB has later years.


In [10]:
## Here I extend the years to Latest according to the pulled SCB data (2024, yearly)

base = daioe_lazy_lf_ssyk12

daioe_max = base.select(pl.max("year")).collect().item()
scb_max   = scb_lazy_lf.select(pl.max("year")).collect().item()

missing = list(range(daioe_max + 1, scb_max + 1))

daioe_lazy_lf_extended = (
    base
    if not missing
    else pl.concat(
        [
            base,
            base
            .filter(pl.col("year") == daioe_max)
            .drop("year")
            .join(pl.LazyFrame({"year": missing}), how="cross")
            .select(base.collect_schema().names()),  # ensure same column order/schema
        ],
        how="vertical",
    )
)



In [11]:
inspect_lazy(daioe_lazy_lf_extended)


Rows: 4,686
Columns: 27


## 7) Build SCB SSYK4 Employment Counts

Aggregate SCB counts at year + 4-digit SSYK level.


In [12]:
scb_lazy_lf_level4 = (
    scb_lazy_lf
        .filter(pl.col("ssyk_code").str.len_chars() == 4)
        .group_by(["year", "ssyk_code"])
        .agg(pl.col("count").sum().alias("total_count"))
)



In [13]:
inspect_lazy(scb_lazy_lf_level4)

Rows: 4,686
Columns: 3


## 8) Merge DAIOE with SCB SSYK4 Counts

Join by `year` and 4-digit code, then inspect merged coverage.


In [14]:
daioe_lazy_lf_extended.head(5).collect()

year,daioe_allapps,daioe_stratgames,daioe_videogames,daioe_imgrec,daioe_imgcompr,daioe_imggen,daioe_readcompr,daioe_lngmod,daioe_translat,daioe_speechrec,daioe_genai,pctl_rank_allapps,pctl_rank_stratgames,pctl_rank_videogames,pctl_rank_imgrec,pctl_rank_imgcompr,pctl_rank_imggen,pctl_rank_readcompr,pctl_rank_lngmod,pctl_rank_translat,pctl_rank_speechrec,pctl_rank_genai,code_1,code_2,code_3,code_4
i64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,str,str,str,str
2014,,,,,,,,,,,,,,,,,,,,,,,"""1""","""11""","""111""","""1111"""
2015,,,,,,,,,,,,,,,,,,,,,,,"""1""","""11""","""111""","""1111"""
2016,,,,,,,,,,,,,,,,,,,,,,,"""1""","""11""","""111""","""1111"""
2017,,,,,,,,,,,,,,,,,,,,,,,"""1""","""11""","""111""","""1111"""
2018,,,,,,,,,,,,,,,,,,,,,,,"""1""","""11""","""111""","""1111"""


In [15]:
scb_lazy_lf_level4.head(5).collect()

year,ssyk_code,total_count
i64,str,i64
2020,"""7221""",2909
2020,"""1722""",6110
2020,"""7521""",109
2021,"""3451""",2900
2022,"""2414""",2021


In [16]:
daioe_scb_years = daioe_lazy_lf_extended\
    .join(
        scb_lazy_lf_level4,
        left_on=["year", "code_4"],
        right_on=["year", "ssyk_code"],
        how="left"
    )

In [17]:
inspect_lazy(daioe_scb_years)

Rows: 4,686
Columns: 28


## 9) Inspect Unmatched DAIOE Codes

Identify DAIOE rows that have no SCB SSYK4 match.


In [18]:
# DAIOE codes with no SCB match
daioe_scb_years_unmatched = daioe_lazy_lf_extended\
    .join(
        scb_lazy_lf_level4,
        left_on=["year", "code_4"],
        right_on=["year", "ssyk_code"],
        how="anti"
    )



In [19]:
inspect_lazy(daioe_scb_years_unmatched)

Rows: 0
Columns: 27


## 10) Post-Merge Validation

Run quick schema and shape checks on the filtered joined frame before aggregation.


In [20]:
daioe_scb_years.collect_schema()

Schema([('year', Int64),
        ('daioe_allapps', Float64),
        ('daioe_stratgames', Float64),
        ('daioe_videogames', Float64),
        ('daioe_imgrec', Float64),
        ('daioe_imgcompr', Float64),
        ('daioe_imggen', Float64),
        ('daioe_readcompr', Float64),
        ('daioe_lngmod', Float64),
        ('daioe_translat', Float64),
        ('daioe_speechrec', Float64),
        ('daioe_genai', Float64),
        ('pctl_rank_allapps', Float64),
        ('pctl_rank_stratgames', Float64),
        ('pctl_rank_videogames', Float64),
        ('pctl_rank_imgrec', Float64),
        ('pctl_rank_imgcompr', Float64),
        ('pctl_rank_imggen', Float64),
        ('pctl_rank_readcompr', Float64),
        ('pctl_rank_lngmod', Float64),
        ('pctl_rank_translat', Float64),
        ('pctl_rank_speechrec', Float64),
        ('pctl_rank_genai', Float64),
        ('code_1', String),
        ('code_2', String),
        ('code_3', String),
        ('code_4', String),
        ('tot

In [21]:
daioe_scb_years.collect_schema().names()

['year',
 'daioe_allapps',
 'daioe_stratgames',
 'daioe_videogames',
 'daioe_imgrec',
 'daioe_imgcompr',
 'daioe_imggen',
 'daioe_readcompr',
 'daioe_lngmod',
 'daioe_translat',
 'daioe_speechrec',
 'daioe_genai',
 'pctl_rank_allapps',
 'pctl_rank_stratgames',
 'pctl_rank_videogames',
 'pctl_rank_imgrec',
 'pctl_rank_imgcompr',
 'pctl_rank_imggen',
 'pctl_rank_readcompr',
 'pctl_rank_lngmod',
 'pctl_rank_translat',
 'pctl_rank_speechrec',
 'pctl_rank_genai',
 'code_1',
 'code_2',
 'code_3',
 'code_4',
 'total_count']

In [22]:
inspect_lazy(daioe_scb_years)

Rows: 4,686
Columns: 28


## 11) Identify DAIOE Measure Columns

Collect DAIOE metric columns and define the weight expression.


In [23]:
daioe_cols = [
    c for c in daioe_scb_years.collect_schema().names()
    if c.startswith("daioe_")
]

w = pl.col("total_count")


## 12) Manual Aggregation Checks (Legacy)

Compute SSYK3 and SSYK2 weighted/simple aggregates as intermediate checks.


In [24]:

daioe_scb_lv3 = (
    daioe_scb_years
    .select(["code_3", "year", "total_count", *daioe_cols])
    .group_by(["year", "code_3"])
    .agg(
        [
            w.sum().alias("weight_sum"),

            # simple averages
            *[pl.col(c).mean().alias(f"{c}_avg") for c in daioe_cols],

            # employment-weighted averages
            *[
                pl.when(w.sum() > 0)
                  .then((pl.col(c) * w).sum() / w.sum())
                  .otherwise(None)
                  .alias(f"{c}_wavg")
                for c in daioe_cols
            ],
        ]
    )
    .with_columns(pl.lit("SSYK3").alias("level"))
    .rename({"code_3": "ssyk_code"})
)

# preview
daioe_scb_lv3.limit(10).collect()


year,ssyk_code,weight_sum,daioe_allapps_avg,daioe_stratgames_avg,daioe_videogames_avg,daioe_imgrec_avg,daioe_imgcompr_avg,daioe_imggen_avg,daioe_readcompr_avg,daioe_lngmod_avg,daioe_translat_avg,daioe_speechrec_avg,daioe_genai_avg,daioe_allapps_wavg,daioe_stratgames_wavg,daioe_videogames_wavg,daioe_imgrec_wavg,daioe_imgcompr_wavg,daioe_imggen_wavg,daioe_readcompr_wavg,daioe_lngmod_wavg,daioe_translat_wavg,daioe_speechrec_wavg,daioe_genai_wavg,level
i64,str,i64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,str
2016,"""611""",17674,12.284982,0.217759,3.706761,0.155617,0.035179,0.221653,0.062775,0.017113,0.004518,0.150854,0.330999,12.524768,0.222846,3.682604,0.159775,0.036677,0.23503,0.069213,0.01874,0.004853,0.156388,0.353084,"""SSYK3"""
2022,"""511""",7410,25.536781,0.224151,4.174304,0.36945,0.144485,0.549917,0.181519,0.597564,0.03316,0.40922,1.953793,25.401552,0.224819,4.245089,0.372049,0.143683,0.548772,0.176305,0.574576,0.031923,0.39395,1.91329,"""SSYK3"""
2016,"""331""",56687,16.707202,0.330474,3.195515,0.23513,0.061758,0.397765,0.180369,0.049043,0.011904,0.331323,0.65615,16.747446,0.331487,3.262468,0.23273,0.06103,0.386884,0.175894,0.048655,0.011906,0.333789,0.641218,"""SSYK3"""
2014,"""535""",10192,5.090045,0.173984,2.340456,0.112397,,,0.002018,0.008183,,0.031734,0.008183,5.090045,0.173984,2.340456,0.112397,0.0,0.0,0.002018,0.008183,0.0,0.031734,0.008183,"""SSYK3"""
2022,"""233""",29573,31.384315,0.284569,3.893297,0.459472,0.186725,0.810267,0.285678,0.910982,0.047103,0.535188,2.933913,31.384315,0.284569,3.893297,0.459472,0.186725,0.810267,0.285678,0.910982,0.047103,0.535188,2.933913,"""SSYK3"""
2014,"""818""",8511,5.487439,0.182718,2.676367,0.121536,,,0.001555,0.00613,,0.025079,0.00613,5.465643,0.181862,2.670867,0.120094,0.0,0.0,0.001534,0.006047,0.0,0.024986,0.006047,"""SSYK3"""
2022,"""443""",3375,31.477972,0.280412,4.874263,0.467895,0.18231,0.670565,0.253734,0.797723,0.043899,0.49811,2.492945,31.477972,0.280412,4.874263,0.467895,0.18231,0.670565,0.253734,0.797723,0.043899,0.49811,2.492945,"""SSYK3"""
2021,"""832""",19730,22.93358,0.215376,5.157314,0.392134,0.122473,0.506349,0.104151,0.199391,0.021498,0.253269,1.212998,22.932401,0.215385,5.156184,0.392152,0.122488,0.506507,0.104169,0.199401,0.021496,0.253244,1.213253,"""SSYK3"""
2018,"""172""",7807,13.856059,0.223537,3.174193,0.15131,0.047101,0.269803,0.10394,0.068404,0.013356,0.223616,0.537527,13.856059,0.223537,3.174193,0.15131,0.047101,0.269803,0.10394,0.068404,0.013356,0.223616,0.537527,"""SSYK3"""
2021,"""352""",4615,28.968531,0.289234,5.571247,0.447116,0.14796,0.699028,0.170661,0.322556,0.034047,0.39927,1.77796,28.915054,0.288635,5.566486,0.44627,0.147684,0.697867,0.170206,0.32171,0.033926,0.39777,1.774345,"""SSYK3"""


In [25]:
inspect_lazy(daioe_scb_lv3)

Rows: 1,595
Columns: 26


In [26]:

daioe_scb_lv2 = (
    daioe_scb_years
    .select(["code_2", "year", "total_count", *daioe_cols])
    .group_by(["year", "code_2"])
    .agg(
        [
            w.sum().alias("weight_sum"),

            # simple averages
            *[pl.col(c).mean().alias(f"{c}_avg") for c in daioe_cols],

            # employment-weighted averages
            *[
                pl.when(w.sum() > 0)
                  .then((pl.col(c) * w).sum() / w.sum())
                  .otherwise(None)
                  .alias(f"{c}_wavg")
                for c in daioe_cols
            ],
        ]
    )
    .with_columns(pl.lit("SSYK2").alias("level"))
    .rename({"code_2": "ssyk_code"})
)

# preview
daioe_scb_lv2.limit(10).collect()

year,ssyk_code,weight_sum,daioe_allapps_avg,daioe_stratgames_avg,daioe_videogames_avg,daioe_imgrec_avg,daioe_imgcompr_avg,daioe_imggen_avg,daioe_readcompr_avg,daioe_lngmod_avg,daioe_translat_avg,daioe_speechrec_avg,daioe_genai_avg,daioe_allapps_wavg,daioe_stratgames_wavg,daioe_videogames_wavg,daioe_imgrec_wavg,daioe_imgcompr_wavg,daioe_imggen_wavg,daioe_readcompr_wavg,daioe_lngmod_wavg,daioe_translat_wavg,daioe_speechrec_wavg,daioe_genai_wavg,level
i64,str,i64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,str
2014,"""42""",54913,5.393706,0.207738,2.03942,0.150045,,,0.003412,0.014579,,0.059763,0.014579,5.504238,0.212455,2.075473,0.154182,0.0,0.0,0.003496,0.014911,0.0,0.060745,0.014911,"""SSYK2"""
2020,"""44""",19754,25.156848,0.267595,5.13977,0.362387,0.082101,0.452638,0.16934,0.287568,0.033611,0.36443,1.283225,23.405394,0.247555,5.352829,0.328971,0.072249,0.387084,0.136181,0.232221,0.027188,0.304652,1.071085,"""SSYK2"""
2021,"""43""",113995,27.667724,0.275908,4.868517,0.435456,0.15102,0.625769,0.19027,0.367729,0.03731,0.408674,1.760706,27.816823,0.280759,5.127493,0.435371,0.149998,0.610656,0.185794,0.355966,0.035496,0.38526,1.712003,"""SSYK2"""
2021,"""54""",38819,23.307648,0.228124,4.430831,0.368527,0.123384,0.503175,0.146101,0.277227,0.029569,0.346298,1.371693,22.49679,0.226622,4.351479,0.363554,0.119555,0.488952,0.140342,0.258532,0.026519,0.305739,1.313898,"""SSYK2"""
2021,"""31""",145409,28.189569,0.290318,5.199152,0.465574,0.155478,0.714877,0.180831,0.331247,0.032133,0.345302,1.82304,28.06428,0.292341,5.253365,0.461967,0.153823,0.701072,0.179247,0.32626,0.031488,0.334986,1.791942,"""SSYK2"""
2019,"""21""",150935,26.670737,0.33759,4.823092,0.377766,0.092134,0.608492,0.247657,0.164581,0.039429,0.37985,1.250758,27.403358,0.350316,4.952707,0.384065,0.094219,0.626584,0.254334,0.168513,0.040418,0.389566,1.286028,"""SSYK2"""
2017,"""23""",304568,14.901769,0.268629,2.596679,0.211183,0.071943,0.362631,0.170266,0.048996,0.016818,0.33123,0.60541,14.666204,0.267617,2.646086,0.210192,0.069567,0.363211,0.155546,0.045475,0.015206,0.31106,0.595685,"""SSYK2"""
2022,"""71""",175995,23.378634,0.220423,5.256511,0.362579,0.12524,0.512976,0.114264,0.353816,0.018393,0.229828,1.466672,24.197901,0.229434,5.129328,0.378918,0.132959,0.559011,0.1288,0.397432,0.020491,0.249897,1.619954,"""SSYK2"""
2021,"""16""",5102,27.728081,0.286123,3.890456,0.425825,0.160132,0.721497,0.241011,0.454782,0.045244,0.447635,2.093645,27.728081,0.286123,3.890456,0.425825,0.160132,0.721497,0.241011,0.454782,0.045244,0.447635,2.093645,"""SSYK2"""
2022,"""35""",51302,35.204817,0.340063,5.499943,0.523157,0.199607,0.873368,0.28004,0.852586,0.042206,0.472826,2.941231,34.996493,0.335521,5.401076,0.511851,0.198241,0.85654,0.285864,0.870051,0.0433,0.473579,2.943618,"""SSYK2"""


In [27]:
inspect_lazy(daioe_scb_lv2)

Rows: 473
Columns: 26


## 12b) Optional SSYK1 Check

This inspection assumes `daioe_scb_lv1` exists in the session.


In [28]:
if "daioe_scb_lv1" in globals():
    inspect_lazy(daioe_scb_lv1)
else:
    print("`daioe_scb_lv1` is not defined in this notebook flow; skipping this optional check.")


`daioe_scb_lv1` is not defined in this notebook flow; skipping this optional check.


## 13) Generalized Aggregation + Percentiles

Aggregate SSYK4-1 with a reusable function and add within-year percentiles.


In [29]:
def aggregate_daioe_level(
    lf: pl.LazyFrame,
    code_col: str,
    level_label: str,
    weight_col: str = "total_count",
    prefix: str = "daioe_",
    add_percentiles: bool = True,
    pct_scale: int = 100,
    descending: bool = False,
) -> pl.LazyFrame:

    daioe_cols = [c for c in lf.collect_schema().names() if c.startswith(prefix)]
    w = pl.col(weight_col)

    out = (
        lf
        .group_by(["year", code_col])
        .agg(
            w.sum().alias("weight_sum"),
            pl.col(daioe_cols).mean().name.suffix("_avg"),
            ((pl.col(daioe_cols) * w).sum() / w.sum()).name.suffix("_wavg"),
        )
        .with_columns(pl.lit(level_label).alias("level"))
        .rename({code_col: "ssyk_code"})
    )

    if not add_percentiles:
        return out

    group_keys = ["year", "level"]

    rank_expr = (
        pl.col(f"^{prefix}.*_(avg|wavg)$")
        .rank(method="average", descending=descending)
        .over(group_keys)
    )

    n_expr = pl.len().over(group_keys)

    return out.with_columns(
        (
            pl.when(n_expr > 1)
            .then((rank_expr - 1) / (n_expr - 1))
            .otherwise(0.0)
            * pct_scale
        ).name.prefix("pctl_")
    )


In [30]:
levels = {
    "code_4": "SSYK4",
    "code_3": "SSYK3",
    "code_2": "SSYK2",
    "code_1": "SSYK1",
}

aggregated = [
    aggregate_daioe_level(daioe_scb_years, col, label)
    for col, label in levels.items()
]

daioe_all_levels = (
    pl.concat(aggregated)
    .sort(["level", "year", "ssyk_code"])
)


In [31]:
inspect_lazy(daioe_all_levels)

Rows: 6,853
Columns: 48


In [32]:
print(
    daioe_all_levels
    .group_by("level")
    .len()
    .collect()
)


shape: (4, 2)
┌───────┬──────┐
│ level ┆ len  │
│ ---   ┆ ---  │
│ str   ┆ u32  │
╞═══════╪══════╡
│ SSYK3 ┆ 1595 │
│ SSYK1 ┆ 99   │
│ SSYK2 ┆ 473  │
│ SSYK4 ┆ 4686 │
└───────┴──────┘


## 14) Build 1-5 Level Exposure Columns

Create `daioe_<index>_Level_Exposure` columns from weighted percentile ranks (`pctl_daioe_*_wavg`) using quintile-style bins.


In [33]:
# Convert weighted percentile ranks (0..100) into 1-5 exposure levels
pct_cols = [
    c
    for c in daioe_all_levels.collect_schema().names()
    if c.startswith("pctl_daioe_") and c.endswith("_wavg")
]

exposure_exprs = []
for col_name in pct_cols:
    metric = col_name[len("pctl_daioe_"):-len("_wavg")]
    out_col = f"daioe_{metric}_Level_Exposure"
    p = pl.col(col_name)

    exposure_exprs.append(
        pl.when(p.is_null())
        .then(None)
        .when(p <= 20)
        .then(1)
        .when(p <= 40)
        .then(2)
        .when(p <= 60)
        .then(3)
        .when(p <= 80)
        .then(4)
        .otherwise(5)
        .cast(pl.Int8)
        .alias(out_col)
    )

daioe_all_levels = daioe_all_levels.with_columns(exposure_exprs)

print([c for c in daioe_all_levels.collect_schema().names() if c.endswith("_Level_Exposure")])


['daioe_allapps_Level_Exposure', 'daioe_stratgames_Level_Exposure', 'daioe_videogames_Level_Exposure', 'daioe_imgrec_Level_Exposure', 'daioe_imgcompr_Level_Exposure', 'daioe_imggen_Level_Exposure', 'daioe_readcompr_Level_Exposure', 'daioe_lngmod_Level_Exposure', 'daioe_translat_Level_Exposure', 'daioe_speechrec_Level_Exposure', 'daioe_genai_Level_Exposure']


## 15) Pre-Merge Diagnostics

Inspect schemas and frame shapes before final integration.


In [34]:
daioe_all_levels.collect_schema()

Schema([('year', Int64),
        ('ssyk_code', String),
        ('weight_sum', Int64),
        ('daioe_allapps_avg', Float64),
        ('daioe_stratgames_avg', Float64),
        ('daioe_videogames_avg', Float64),
        ('daioe_imgrec_avg', Float64),
        ('daioe_imgcompr_avg', Float64),
        ('daioe_imggen_avg', Float64),
        ('daioe_readcompr_avg', Float64),
        ('daioe_lngmod_avg', Float64),
        ('daioe_translat_avg', Float64),
        ('daioe_speechrec_avg', Float64),
        ('daioe_genai_avg', Float64),
        ('daioe_allapps_wavg', Float64),
        ('daioe_stratgames_wavg', Float64),
        ('daioe_videogames_wavg', Float64),
        ('daioe_imgrec_wavg', Float64),
        ('daioe_imgcompr_wavg', Float64),
        ('daioe_imggen_wavg', Float64),
        ('daioe_readcompr_wavg', Float64),
        ('daioe_lngmod_wavg', Float64),
        ('daioe_translat_wavg', Float64),
        ('daioe_speechrec_wavg', Float64),
        ('daioe_genai_wavg', Float64),
        

In [35]:
scb_lazy_lf.collect_schema()

Schema([('level', String),
        ('ssyk_code', String),
        ('age', String),
        ('sex', String),
        ('year', Int64),
        ('count', Int64),
        ('occupation', String)])

In [36]:
inspect_lazy(scb_lazy_lf)

Rows: 123,354
Columns: 7


In [37]:
inspect_lazy(daioe_all_levels)

Rows: 6,853
Columns: 59


## 16) Final Merge

Attach DAIOE aggregates back to the SCB base table.


In [38]:
final_merge = scb_lazy_lf\
    .join(
        daioe_all_levels,
        left_on=["year", "ssyk_code"],
        right_on=["year", "ssyk_code"],
        how="left"
    )

In [39]:
inspect_lazy(final_merge)

Rows: 123,354
Columns: 64


In [40]:
#dd = final_merge.limit(30).collect()

## 17) Export Final Dataset

Write the merged output to parquet for downstream use.


In [41]:
output_path = DATA_DIR / "daioe_scb_years_all_levels.parquet"

final_merge.sink_parquet(output_path)