In [2]:
import polars as pl

In [3]:
states = [
    '01', '02', '04', '05', '06', '08', '09', '10', '11', '12', '13', '15', '16', '17', '18', 
    '19', '20', '21', '22', '23', '24', '25', '26', '27', '28', '29', '30', '31', '32', '33', 
    '34', '35', '36', '37', '38', '39', '40', '41', '42', '44', '45', '46', '47', '48', '49', 
    '50', '51', '53', '54', '55', '56', '72'
]

In [19]:
geo_codes = (
    pl
    .read_csv(
        '/Users/lowell/Projects/bls-revisions/data/reference/geographic_codes.csv',
        schema_overrides={
            'region': pl.Utf8,
            'division': pl.Utf8,
            'state_fips': pl.Utf8
            
        }
    )
    .filter(
        pl.col('state_fips').is_in(states)
    )
    .select(
        region=pl.when(pl.col('state_fips').eq('72'))
                 .then(pl.lit('3'))
                 .otherwise(pl.col('region')), 
        division=pl.when(pl.col('state_fips').eq('72'))
                   .then(pl.lit('05'))
                   .otherwise(pl.col('division')), 
        state_fips=pl.col('state_fips'), 
        state_name=pl.col('state_name')
    )
    .unique()
    .sort('state_fips')
)

region_dict = {d['state_fips']: d['region'] for d in geo_codes.iter_rows(named=True)}
division_dict = {d['state_fips']: d['division'] for d in geo_codes.iter_rows(named=True)}

In [15]:
qcew = (
    pl
    .read_parquet(
        '/Users/lowell/Projects/bls-revisions/data/qcew_revisions.parquet'
    )
)

ces = (
    pl
    .read_parquet(
        '/Users/lowell/Projects/bls-revisions/data/ces_revisions.parquet'
    )
    .with_columns(
        revision=pl.col('revision')
                   .cast(pl.UInt8),
        benchmark_revision=pl.col('revision')
                             .cast(pl.UInt8)
    )
)

sae = (
    pl
    .read_parquet(
        '/Users/lowell/Projects/bls-revisions/data/sae_revisions.parquet'
    )
)

In [23]:
# ------------------------------------------------------------------------------
# QCEW Vintage Series
# ------------------------------------------------------------------------------

revisions_1 = (
    pl
    .concat([
        qcew,
        ces,
        sae
    ])
)

revisions_national = (
    revisions_1
    .filter(
        pl.col('geographic_type').eq('national')
    )
)

revisions_state = (
    revisions_1
    .filter(
        pl.col('geographic_type').eq('state'),
        pl.col('geographic_code').ne('00')
    )
)

assert revisions_state.height + revisions_national.height == revisions_1.height

In [24]:
revisions_region = (
    revisions_state
    .with_columns(
        geographic_type=pl.lit('region', pl.Utf8),
        geographic_code=pl.col('geographic_code')
                          .replace_strict(region_dict, default=None)
    )
    .group_by(
        'source',
        'seasonally_adjusted',
        'geographic_type', 'geographic_code', 
        'industry_type', 'industry_code', 
        'ref_date', 'vintage_date',
        'revision', 'benchmark_revision', 
    )
    .agg(
        employment=pl.col('employment').sum()
    )
)

In [25]:
revisions_division = (
    revisions_state
    .with_columns(
        geographic_type=pl.lit('division', pl.Utf8),
        geographic_code=pl.col('geographic_code')
                          .replace_strict(division_dict, default=None)
    )
    .group_by(
        'source',
        'seasonally_adjusted',
        'geographic_type', 'geographic_code', 
        'industry_type', 'industry_code', 
        'ref_date', 'vintage_date',
        'revision', 'benchmark_revision', 
    )
    .agg(
        employment=pl.col('employment').sum()
    )
)

In [26]:
revisions_df = (
    pl
    .concat([
        revisions_national,
        revisions_state,
        revisions_region,
        revisions_division
    ])
    .sort(
        'source',
        'seasonally_adjusted',
        'geographic_type', 'geographic_code', 
        'industry_type', 'industry_code', 
        'ref_date', 'vintage_date',
        'revision', 'benchmark_revision', 
    )
)

In [29]:
revisions_dups = (
    revisions_df
    .unique(subset=[
        'source',
        'seasonally_adjusted',
        'geographic_type', 'geographic_code', 
        'industry_type', 'industry_code', 
        'ref_date', 'vintage_date',
        'revision', 'benchmark_revision', 
    ])
)

assert revisions_df.height == revisions_dups.height

In [30]:
(
    revisions_df
    .write_parquet(
        '/Users/lowell/Projects/bls-revisions/data/revisions.parquet'
    )
)