# Unified ETL Pipeline

This notebook reproduces the steps from the Python ETL scripts in a single,
well-documented workflow. It generates the same output tables as the original
scripts for ONET, AEI and CAGED data.

## 1. Setup

In [None]:

import os
import unicodedata

import pandas as pd
import numpy as np
import basedosdados as bd
from dotenv import load_dotenv


def strip_accents(text: str) -> str:
    "Return text without accent marks."
    decomposed = unicodedata.normalize('NFKD', text)
    return ''.join(ch for ch in decomposed if unicodedata.category(ch) != 'Mn')


def merge_onet_soc_data() -> pd.DataFrame:
    "Merge O*NET task statements with SOC major group titles."
    onet_df = pd.read_csv('data/input/aei_data/onet_task_statements.csv')
    onet_df['soc_major_group'] = onet_df['O*NET-SOC Code'].str[:2]

    soc_df = pd.read_csv('data/input/aei_data/SOC_Structure.csv')
    soc_df = soc_df.dropna(subset=['Major Group'])
    soc_df['soc_major_group'] = soc_df['Major Group'].str[:2]

    merged = onet_df.merge(
        soc_df[['soc_major_group', 'SOC or O*NET-SOC 2019 Title']],
        on='soc_major_group',
        how='left'
    )
    return merged


## 2. Loading

In [None]:

load_dotenv()

task_pct = pd.read_csv('data/input/aei_data/task_pct_v2.csv')
automation_vs_augmentation_by_task = pd.read_csv(
    'data/input/aei_data/automation_vs_augmentation_by_task.csv'
)
onet_tasks = pd.read_csv('data/input/aei_data/onet_task_statements.csv')
soc_structure_aei = pd.read_csv('data/input/aei_data/SOC_Structure.csv')

soc_structure_full = pd.read_csv('data/input/SOC_Structure.csv')

caged_raw = pd.read_parquet('data/input/caged_national_UNTREATED.parquet')


## 3. ONET ETL

In [None]:

soc_structure_full['minor_group'] = soc_structure_full.apply(
    lambda row: row.dropna().iloc[0][:4], axis=1
)
minor_groups = (
    soc_structure_full.groupby('minor_group')
    .agg({'SOC or O*NET-SOC 2019 Title': lambda x: '; '.join(x)})
    .rename({'SOC or O*NET-SOC 2019 Title': 'title'}, axis='columns')
)
minor_groups.to_csv('data/output/onet_minor_groups.csv', index=True)


## 4. AEI ETL

In [None]:

onet_with_soc = merge_onet_soc_data()
onet_with_soc['task_normalized'] = onet_with_soc['Task'].str.lower().str.strip()

onet_with_soc['n_occurrences'] = (
    onet_with_soc.groupby('task_normalized')['Title'].transform('nunique')
)

grouped_with_occ = task_pct.merge(
    onet_with_soc, left_on='task_name', right_on='task_normalized', how='left'
)

grouped_with_occ['pct_occ_scaled'] = 100 * (
    grouped_with_occ['pct'] / grouped_with_occ['n_occurrences']
) / (
    grouped_with_occ['pct'] / grouped_with_occ['n_occurrences']
).sum()

automation_vs_augmentation_with_occ = grouped_with_occ.merge(
    automation_vs_augmentation_by_task, on='task_name', how='left'
)
assert len(automation_vs_augmentation_with_occ) == len(grouped_with_occ)

onet_tasks_tmp = onet_tasks[['O*NET-SOC Code', 'Task']].copy()
onet_tasks_tmp['soc_minor_group'] = onet_tasks_tmp['O*NET-SOC Code'].str[:4]
onet_tasks_tmp['task_name'] = onet_tasks_tmp['Task'].str.lower().str.strip()
onet_tasks_tmp = onet_tasks_tmp[['soc_minor_group', 'task_name']].drop_duplicates()

df = task_pct.merge(automation_vs_augmentation_by_task, on='task_name', how='left')
df.fillna(0, inplace=True)
df['aug'] = (df['learning'] + df['validation']) * df['pct']
df['aut'] = (df['feedback_loop'] + df['directive'] + df['task_iteration']) * df['pct']
df.drop(columns=['learning', 'validation', 'feedback_loop', 'directive', 'task_iteration'], inplace=True)

df = df.merge(onet_tasks_tmp, on='task_name', how='left')
occ_aut_aug_lvl = df.groupby('soc_minor_group')[['pct', 'aug', 'aut']].sum().reset_index()
occ_aut_aug_lvl['aug_aut_ratio'] = occ_aut_aug_lvl['aug'] / occ_aut_aug_lvl['aut']
occ_aut_aug_lvl.to_parquet('data/output/occ_aut_aug_lvl.parquet')

top_10_pct = occ_aut_aug_lvl.sort_values('pct', ascending=False).head(10)
top_10_pct['class'] = 'Top 10 pct'

bottom_10_pct = occ_aut_aug_lvl.sort_values('pct', ascending=True).head(10)
bottom_10_pct['class'] = 'Bottom 10 pct'

top_10_aut = occ_aut_aug_lvl.sort_values('aut', ascending=False).head(10)
top_10_aut['class'] = 'Top 10 aut'

top_10_aug = occ_aut_aug_lvl.sort_values('aug', ascending=False).head(10)
top_10_aug['class'] = 'Top 10 aug'

classified = pd.concat([top_10_pct, bottom_10_pct, top_10_aut, top_10_aug])
classified.to_parquet('data/output/occ_aut_aug_lvl_classified.parquet')


## 5. CAGED ETL

In [None]:

# Example query using basedosdados (already executed offline)
# billing_id = os.getenv('BILLING_ID')
# with open('data/config/caged_bd_national.SQL') as f:
#     query = f.read()
# df = bd.read_sql(query=query, billing_project_id=billing_id)
# df.to_parquet('data/input/caged_national_UNTREATED.parquet')

df = caged_raw.copy()
df = df[~df['cbo_2002_descricao_subgrupo_principal'].isna()]

cols_to_keep = [
    'ano',
    'mes',
    'cbo_2002_descricao_subgrupo_principal',
    'cbo_2002_descricao_grande_grupo'
]

df = (
    df.groupby(cols_to_keep, dropna=False)['saldo_movimentacao']
    .sum()
    .reset_index()
    .apply(lambda col: col.str.strip() if col.dtype == 'object' else col)
)

df = df.rename(
    columns={
        'saldo_movimentacao': 'net_jobs',
        'ano': 'year',
        'mes': 'month',
        'cbo_2002_descricao_subgrupo_principal': 'cbo_subgroup',
        'cbo_2002_descricao_grande_grupo': 'cbo_group'
    }
)

df['cbo_subgroup'] = df['cbo_subgroup'].apply(strip_accents)
df['cbo_group'] = df['cbo_group'].apply(strip_accents)

df.to_parquet('data/input/caged_national.parquet')
