In [None]:
import numpy as np
rng = np.random.default_rng()
import os
import polars as pl
import polars.selectors as cs
import shutil
from ipumspy import IpumsApiClient, MicrodataExtract, readers, ddi

import rpy2.robjects as ro
from rpy2.robjects import pandas2ri
from rpy2.robjects.conversion import localconverter

# Variables and Setup

In [None]:
IPUMS_API_KEY = os.environ.get("IPUMS_API_KEY")
IPUMS_DOWNLOAD_DIR = "ipums_data"
SIPP_PU_FILE_PATH = os.environ.get("SIPP_PU_FILE_PATH")
SIPP_RW_FILE_PATH = os.environ.get("SIPP_RW_FILE_PATH")
DOWNLOAD_IPUMS = False
# STATEFIP_CODE = 37 # NC
# PUMA_CODES = [3106, 3107, 3108] # CLT
STATEFIP_CODE = 22 # LA
PUMA_CODES = [2400,2401,2402,2500, 2300,2301,2302,2200,2201,1900]

# Setup R Libary and Install R Packages
Commented out packages after they are installed.

In [None]:
ro.r("""
userlib <- file.path(
  Sys.getenv("USERPROFILE"),
  "Documents", "R", "win-library",
  paste0(R.version$major, ".", R.version$minor)
)

if (!dir.exists(userlib)) {
  dir.create(userlib, recursive = TRUE, showWarnings = FALSE)
}

.libPaths(userlib)

#install.packages("VIM",lib=userlib,repos="https://cloud.r-project.org")
#install.packages("srvyr",lib=userlib,repos="https://cloud.r-project.org") 
"""
)

# SIPP Data

SIPP stands for the Survey of Income and Program Participation.
It’s a longitudinal household survey run by the U.S. Census Bureau to understand how people’s income, employment, public assistance, and family circumstances change over time.

* The same people and households are interviewed repeatedly 
* Typically monthly data, released in panels (e.g., 2018 SIPP panel)

SIPP is designed specifically to measure:

* Participation in government programs
* Eligibility vs receipt
* Duration and churn

Key Fields
* SSUID - Sample Unit\Household
* PNUM - Identifies person within the household
* MONTHCODE - Identifies reference month
* SPANEL - Identifies the panel
* SHHADID - Physical housing unit
* SWAVE - Interview wave
* (SSUID, SHHADID) → household-level logic
* (SSUID, PNUM) → person-level logic

Two files:
* PU files - Person Univers (monthly person-level core data)
* RW files - Reweighting/topical/reference waves (monthly or wave-level supplements)

## Download and Load PU File
Below is a link to the 2018 file that can be downloaded and read in directly using Polars.

[pu2018.csv.gz](https://www2.census.gov/programs-surveys/sipp/data/datasets/2018/pu2018.csv.gz)

### PU Columns

**RMOVER** was not found in the file.

In [None]:
pu_cols = [
   #Common case identification variables
  'SSUID','PNUM','MONTHCODE','ERESIDENCEID', 'SHHADID','ERELRPE','TLIVQTR', 'SPANEL','SWAVE',
  'TEHC_ST', 'TEHC_METRO',
  
  # Move indicator
  # 'RMOVER', # not found
  'TMOVER',
  
  #The base weight
  'WPFINWGT',
  
  #Demographics variables
  
  'ESEX','TAGE','TRACE','EORIGIN', 'ESPEAK','EEDUC', 'ECITIZEN',
  'RFAMKIND', 'RFAMNUM', 'RHNUMPER', 'RFRELU18',
  
  'AEDUC', 
  'AHTOTINC',
  
  #additional variables for analysis
  'TPTOTINC', #monthly income - individual - this can be negative based on how they sum investments and such
  'THTOTINC', #monthly income - household - this can be negative based on how they sum investments and such
  'ETENURE', # living quarters owned, rented, occupied w/o rent payment?
  'EMS', #marital status
  'RDIS', #disability recode
  
  #adding from RH ipums pull
  'TFINCPOV', #family poverty ratio in this month, but it's person-month..
  'TJB1_OINCAMT', #Spell variable showing how much additional income was received by the respondent through business N
  'TJB2_OINCAMT',
  'TJB3_OINCAMT',
  'TJB4_OINCAMT',
  'TJB5_OINCAMT',
  'TJB6_OINCAMT',
  'TJB7_OINCAMT',
  
  'THINC_BOND', #income from interest-earning assets
  'THINC_BANK', #income from interest-earning assets
  
  'THVAL_HOME', #home value
  'THEQ_HOME', #home equity
  
  'TSSSAMT', #how much did ... receive in social security benefit payments in this month
  'TPSCININC', #sum of the reported monthly amounts received by an indiv. from VA benefits (excpet VA pension), workers comp, unemp, or social security
  #'TRETINCAMT', #monthly income recode variable: sum of reported monthly amouts from retiremenbt income: pension income, retirement account withdrawals, life insurance benefits, and lump sum pension and retirement acct income
  
  #Monthly employment status recode
  'RMESR',
  
  #class of worker
  'EJB1_CLWRK', #conditional on  EJB1_JBORSE in C(1,2,3)
  'TJB1_MWKHRS', # Can use this to check that the first job listed is actually the one the respondent spent the most time at. It is the average number of hours worked per week at job 1 during reference month
  
  'EJB2_CLWRK',
  'TJB2_MWKHRS',
  
  'EJB3_CLWRK',
  'TJB3_MWKHRS',
  
  'EJB4_CLWRK',
  'TJB4_MWKHRS',
  
  'EJB5_CLWRK',
  'TJB5_MWKHRS',
  
  'EJB6_CLWRK',
  'TJB6_MWKHRS',
  
  'EJB7_CLWRK',
  'TJB7_MWKHRS',
  
  #Welfare calc
  'TSSI_AMT', #how much did... receive in SSI payments
  'TTANF_AMT',
  'TGA_AMT',
  
  #INCRETIRE calc
  'TRET1AMT', #retirement income from pension comp/union
  'TRET2AMT' ,#retirement income pension from fed gov
  'TRET3AMT', #"" state gov
  'TRET4AMT', #"" local gov
  'TSUR3AMT', #survivor income from fed
  'TSUR4AMT', # "" from us gov railroad
  'TSUR5AMT', #"" st gov pension
  'TSUR6AMT', #'' loc gov pension
  'TSUR7AMT', #"" life insurance policy
  'TSUR8AMT', #"" military retirement pay
  'TDIS3AMT', #disability income from pension comp/union
  'TDIS4AMT', # "" fed pension
  'TDIS5AMT', #"" state gov pension
  'TDIS6AMT', # "" loc gov pension
  'TIRAKEOVAL',
  
  #debts: following BWDC guidance
  'THDEBT_AST', #total debt recodes at hh level
  'THDEBT_USEC',
  'THDEBT_SEC',
  
  #individual level debt types - to create an "any debt" variable.
  #will roll up to HH level after creating it for the individuals.
  
  'EDEBT_CC',
  'EDEBT_MED',
  'EDEBT_ED',
  'EDEBT_OT',
  'EPRDEBT',
  'APRDEBT',
  'EMHDEBT',
  'EVEH1DEBT',
  'EVEH2DEBT',
  'EVEH3DEBT',
  'EMCYCDEBT',
  'EBOATDEBT',
  'ERVDEBT',
  'EORECDEBT',
  'TDEBT_BUS',
  
  'THDEBT_HOME',
  'TORPDEBTVAL',
  'THDEBT_RE',
  'THDEBT_VEH',
  'TBOATDEBTVAL',
  'TMCYCDEBTVAL',
  'TRVDEBTVAL',
  'TORECDEBTVAL',
  'TBSJ1DEBTVAL',
  'TBSJ2DEBTVAL',
  'TBSJ3DEBTVAL',
  'TBSJ4DEBTVAL',
  'TBSJ5DEBTVAL',
  'TBSI1DEBTVAL',
  'TBSI2DEBTVAL',
  'TBSI3DEBTVAL',
  'THDEBT_CC',
  'THDEBT_ED',
  'THDEBT_OT',
  'TMED_AMT',
  
  #assets: following BWDC guidance
  'THVAL_AST', #total asset recodes at hh level
  'THVAL_BANK',
  'THVAL_BOND',
  'THVAL_STMF',
  'THVAL_RENT',
  'THVAL_RE',
  'THVAL_BUS',
  #'THVAL_HOME', #included above, but leaving it here as a reminder about grouping.
  'THVAL_VEH',
  'THVAL_ESAV',
  'THVAL_RET',
  'THVAL_OTH',

  
  #individual level asset types - to create an "any asset" variable
  #will roll up to HH level after creating it for the individuals.
  'EOWN_TREQ',
  'EOWN_ANNEQ',
  'EOWN_BSI',
  
  #'EOWN_BSJ', making this later using the following variables using all 12 months of data
  'EJB1_JBORSE',
  'EJB2_JBORSE',
  'EJB3_JBORSE',
  'EJB4_JBORSE',
  'EJB5_JBORSE',
  'EJB6_JBORSE',
  'EJB7_JBORSE', #EJB(n)_JBORSE = business as a job. n = 1...7 for reporting up to 7 jobs
  
  
  'EOWN_CD',
  'EOWN_CHK',
  'EOWN_GOVS',
  'EOWN_IRAKEO',
  'EOWN_LIFE',
  'ELIFE_TYPE',
  'EOWN_MCBD',
  'EOWN_MF',
  'EOWN_MM',
  'EOWN_OINV',
  'EOWN_RE',
  'EOWN_RP',
  'EOWN_SAV',
  'EOWN_ST',
  'EOWN_THR401',
  'EOWN_ESAV',
  'EOWN_RECV',
  'EOWN_VEH',
  'TVEH_NUM'
]

### Lazy Loading
Lazy load the file and then just select the fields needed for the analysis and then load completly.  Rename columns to uppercase.

In [None]:
pu = (pl
      .scan_csv(SIPP_PU_FILE_PATH,separator="|")
      .select(pl.col(pu_cols))
      .collect()
     )
pu = pu.rename({col: col.upper() for col in pu.columns})
pu

## Download and Load RW File
Below is a link to the 2018 file that can be downloaded and read in directly using Polars.

[rw2018.csv.gz](https://www2.census.gov/programs-surveys/sipp/data/datasets/2018/rw2018.csv.gz)


### Lazy Loading
Lazy load the file and rename columns to uppercase.

In [None]:
SIPP_RW_FILE_PATH

In [None]:
rw = (pl
      .scan_csv(SIPP_RW_FILE_PATH,separator="|")
      .collect()
     )
rw = rw.rename({col: col.upper() for col in rw.columns})
rw

## Join Datasets Together

Join the pu and rw files together to develop the **sipp_us** data frame.

In [None]:
sipp_us = pu.join(rw,on=["SSUID","PNUM","MONTHCODE", "SPANEL", "SWAVE"],how="inner")
sipp_us

## Transformations

### First Set of Transformations
* Household Income
    * hh_inc_yr - Group by "PNUM", "SSUID", "SHHADID" to get the sum of household income.
    * hh_inc_over200k - Indicator to show if **hh_inc_yr** >= 200,000
    * hh_income - Binning of **hh_inc_yr** into different categories based on value ranges.
    * home_value - Binning of home value field into different categories based on value ranges.
* Demographics
    * age - Binning of age field into different categories based on value ranges.
    * male - Indicator to show if the individual is a Male.
    * edu - Category field showing the level of education of the individual.
    * hisp_fct - Category field showing if the individual is Hispanic or Non-Hispanic.
    * race_fct - Category field capturing the race fo the individual.
    * race_eth - Combining the **hisp_fct** and **race_fct** field into a new category field.
    * citizen - Indicator to show if the individual is a citizen.
    * disability - Indicator to show if the individual has a disability.
* Sample Month
    * sample_month - For each "SSUID", "SHHADID" grouping assign a random sample month ranging from 1 to 12 and assign the value for each grouping to the new field.
* Class Worker
    * class_worker - For records matching the sample month create a category capturing the worker group the individual would fall into.
* Children
    * child_flag - Based on the age create an indicator to show if the individual is a child.
    * child_sum - For each "SSUID", "SHHADID" grouping sum the **child_flag** field to determine the number of children in the household.
* Welfare
    Calculate the different programs amounts (SSI, TANF, GA, SSS) and sum by the SSUID, PNUM fields.  These would be considered person level calculations.

    * welfare_pp - Add the person level SSI and TANF amount that were calculated.
    * ss_pp - The person level SSS amount
    * poverty - Indicator to identify if an individual is in poverty.
    * welfare_hh - Summing the **welfare_pp** amounts grouped by "SSUID", "SHHADID".
    * ss_hh - Summing the **ss_pp** amounts grouped by "SSUID", "SHHADID".
    * public_assistance - Indicator of 1 or 0 to indicate if the field **welfare_hh** has amount greater than 0.
    * social_security - Indicator of 1 or 0 to indicate if the field **ss_hh** has amount greater than 0.
* Language
    * english_at_home - Indicator to show if an individual can speak English.
* State
    * state - Code related to the state associated with the record.


In [None]:
sipp_us_t = sipp_us

sipp_us_t = \
    (sipp_us_t
        # summed household annual income 
        .with_columns(
            hh_inc_yr=(
                pl.when(
                    (pl.col("TLIVQTR").is_in([1,2])) &
                    (pl.col("ERELRPE").is_in([1,2]))
                )
                .then(pl.col("THTOTINC"))
                .otherwise(None)
                .sum()
                .over(["PNUM", "SSUID", "SHHADID"])
            ) 
        )
        #  income >= 200k indicator
        .with_columns(
            hh_inc_over200k=(
                pl.when(pl.col("hh_inc_yr") >= 200_000).then(1).otherwise(0)
            )
        )
        # household income factor
        .with_columns(
            hh_income=(
                    pl.when(pl.col("hh_inc_yr") <= 0).then(1)
                    .when(pl.col("hh_inc_yr") < 5_000).then(2)
                    .when(pl.col("hh_inc_yr") < 15_000).then(3)
                    .when(pl.col("hh_inc_yr") < 20_000).then(4)
                    .when(pl.col("hh_inc_yr") < 25_000).then(5)
                    .when(pl.col("hh_inc_yr") < 30_000).then(6)
                    .when(pl.col("hh_inc_yr") < 35_000).then(7)
                    .when(pl.col("hh_inc_yr") < 45_000).then(8)
                    .when(pl.col("hh_inc_yr") < 55_000).then(9)
                    .when(pl.col("hh_inc_yr") < 65_000).then(10)
                    .when(pl.col("hh_inc_yr") < 75_000).then(11)
                    .when(pl.col("hh_inc_yr") < 90_000).then(12)
                    .when(pl.col("hh_inc_yr") < 105_000).then(13)
                    .when(pl.col("hh_inc_yr") < 125_000).then(14)
                    .when(pl.col("hh_inc_yr") < 150_000).then(15)
                    .when(pl.col("hh_inc_yr") < 200_000).then(16)
                    .when(pl.col("hh_inc_yr") < 500_000).then(17)
                    .when(pl.col("hh_inc_yr") >= 500_000).then(18)
                    .otherwise(None)
                ),
        )
        # age factor
        .with_columns(
            age=(
                pl.when(pl.col("TAGE") < 15).then(1)
                .when(pl.col("TAGE").is_between(15, 24)).then(2)
                .when(pl.col("TAGE").is_between(25, 29)).then(3)
                .when(pl.col("TAGE").is_between(30, 34)).then(4)
                .when(pl.col("TAGE").is_between(35, 39)).then(5)
                .when(pl.col("TAGE").is_between(40, 44)).then(6)
                .when(pl.col("TAGE").is_between(45, 49)).then(7)
                .when(pl.col("TAGE").is_between(50, 54)).then(8)
                .when(pl.col("TAGE").is_between(55, 59)).then(9)
                .when(pl.col("TAGE").is_between(60, 64)).then(10)
                .when(pl.col("TAGE").is_between(65, 69)).then(11)
                .when(pl.col("TAGE").is_between(70, 74)).then(12)
                .when(pl.col("TAGE").is_between(75, 79)).then(13)
                .when(pl.col("TAGE").is_between(80, 84)).then(14)
                .when(pl.col("TAGE") >= 85).then(15)
                .otherwise(None)
                .cast(pl.Int8)
            )
        )
        # male indicator
        .with_columns(
            male=(
                pl.when(pl.col("ESEX") == 1).then(pl.lit(1))
                    .when(pl.col("ESEX") == 2).then(pl.lit(0))
                    .otherwise(None)
                )
        )
        # education factor
        .with_columns(
            edu=(
                    pl.when(pl.col("EEDUC").is_between(31,38)).then(1)
                    .when(pl.col("EEDUC") == 39).then(pl.lit(2))
                    .when(pl.col("EEDUC").is_between(40,42)).then(3)
                    .when(pl.col("EEDUC") == 43).then(4)
                    .when(pl.col("EEDUC").is_between(44,46)).then(5)
                    .otherwise(None)
            )
        )
        # hispanic and race factors
        .with_columns(
            hisp_fct = (
                pl.when(pl.col("EORIGIN") == 1).then(pl.lit("hispanic"))
                .when(pl.col("EORIGIN") == 2).then(pl.lit("non-hispanic"))
            ),
            race_fct = (
                pl.when(pl.col("TRACE") == 1)
                .then(pl.lit("white"))
                .when(pl.col("TRACE") == 2).then(pl.lit("black"))
                .when(pl.col("TRACE") == 4).then(pl.lit("asian"))
                .otherwise(pl.lit("other"))
            )
        )
        # race ethnicity factor  
        .with_columns(
            race_eth = (
                pl.when(pl.col("hisp_fct") == "hispanic").then(3)
                .when(
                    (pl.col("hisp_fct") == "non-hispanic") & 
                    (pl.col("race_fct") == "white")
                ).then(1)
                .when(
                    (pl.col("hisp_fct") == "non-hispanic") & 
                    (pl.col("race_fct") == "black")
                ).then(2)
                .when(
                    (pl.col("hisp_fct") == "non-hispanic") & 
                    (pl.col("race_fct") == "asian")
                ).then(4)
                .when(
                    (pl.col("hisp_fct") == "non-hispanic") & 
                    (pl.col("race_fct") == "other")
                ).then(5)
            )
        )
        # citizenship
        .with_columns(
            citizen = pl.when(pl.col("ECITIZEN") == 1).then(1)
            .when(pl.col("ECITIZEN") == 2).then(0)
        )
        # disability
        .with_columns(
            disability = pl.when(pl.col("RDIS") == 1).then(1)
            .when(pl.col("RDIS") == 2).then(0)
        )
        # sample month
        .with_columns(
            sample_month=(
                pl.int_range(0,13)
                .shuffle()
                .first()
                .over(["SSUID", "SHHADID"])
                .cast(pl.Int8)
            )
        )
        # class worker
        .with_columns(
            class_worker=(
                pl.when(
                    (pl.col("MONTHCODE") == pl.col("sample_month")) 
                    & (pl.col("TAGE") >= 16)
                    & (pl.col("EJB1_JBORSE").is_in([1,2,3]))
                    & (pl.col("EJB1_CLWRK") == 7)
                )
                .then(1)
                .when(
                    (pl.col("MONTHCODE") == pl.col("sample_month")) 
                    & (pl.col("TAGE") >= 16)
                    & (pl.col("EJB1_JBORSE").is_in([1,2,3]))
                    & (pl.col("EJB1_CLWRK") == 8)
                )
                .then(pl.lit(2))
                .when(
                    (pl.col("MONTHCODE") == pl.col("sample_month")) 
                    & (pl.col("TAGE") >= 16)
                    & (pl.col("EJB1_JBORSE").is_in([1,2,3]))
                    & (pl.col("EJB1_CLWRK").is_in([1,2,3,4,5,6]))
                )
                .then(3)
                .when(
                    (pl.col("MONTHCODE") == pl.col("sample_month")) 
                    & (pl.col("TAGE") >= 65)
                    & (pl.col("EJB1_JBORSE").is_null())
                )
                .then(4)
                .when(
                    (pl.col("MONTHCODE") == pl.col("sample_month")) 
                    & (pl.col("TAGE").is_between(16,64))
                    & (pl.col("EJB1_JBORSE").is_null())
                )
                .then(5)
                .otherwise(None)
            )
        )
        # if there are multiple non-nulls, it takes the last by row order.
        .with_columns(
            class_worker=pl.col("class_worker").drop_nulls().last().over(["SSUID", "PNUM"])
        )
        # child indicator
        .with_columns(
            child_flag = (
                pl.when(pl.col("TAGE") < 18).then(1).otherwise(0)
            )
        )
        # sum of number of children and max household size
        .with_columns(
            child_sum=(
                pl.col("child_flag").sum().over("SSUID","SHHADID")
            )
        )
        # household type factor
        .with_columns(
            household_type=(
                pl.when((pl.col("TAGE") >= 18) & 
                        (pl.col("EMS").is_in([1,2])) &
                        (pl.col("child_sum") > 0)
                    ).then(1)
                .when((pl.col("TAGE") >= 18) & 
                        (pl.col("EMS").is_in([1,2])) &
                        (pl.col("child_sum") == 0)
                    ).then(2)
                .when((pl.col("TAGE") >= 18) & 
                        (pl.col("EMS").is_in([3,4,5,6])) &
                        (pl.col("child_sum") > 0)
                    ).then(3)
                .when((pl.col("TAGE") >= 18) & 
                        (pl.col("EMS").is_in([3,4,5,6])) &
                        (pl.col("child_sum") == 0)
                    ).then(4)
                .when((pl.col("TAGE") < 18) & 
                        (pl.col("EMS").is_in([1,2])) &
                        (pl.col("child_sum") > 0) &
                        (pl.col("RHNUMPER") > 2)
                    ).then(1)
                .when((pl.col("TAGE") < 18) & 
                        (pl.col("EMS").is_in([1,2])) &
                        (pl.col("child_sum") > 0) &
                        (pl.col("RHNUMPER") == 2)
                    ).then(2)
                .when((pl.col("TAGE") < 18) & 
                        (pl.col("EMS").is_in([3,4,5,6])) &
                        (pl.col("child_sum") > 0) &
                        (pl.col("RHNUMPER") > 1)
                    ).then(3)
                .when((pl.col("TAGE") < 18) & 
                        (pl.col("EMS").is_in([3,4,5,6])) &
                        (pl.col("child_sum") > 0) &
                        (pl.col("RHNUMPER") == 1)
                    ).then(4)
            )
        )
        # english as a first language factor
        .with_columns(
            english_at_home=(
                pl.when(pl.col("ESPEAK") == 1).then(0)
                .when(pl.col("ESPEAK") == 2).then(1)
                .when(pl.col("ESPEAK").is_null()).then(None)
            )
        )
        # welfare
        # person level
        .with_columns(
            TSSI_yr = pl.col("TSSI_AMT").cast(pl.Int64,strict=True).sum().over(["SSUID","PNUM"]),
            TTANF_yr = pl.col("TTANF_AMT").cast(pl.Int64,strict=True).sum().over(["SSUID","PNUM"]),
            TGA_yr = pl.col("TGA_AMT").cast(pl.Int64,strict=True).sum().over(["SSUID","PNUM"]),
            TSSS_yr = pl.col("TSSSAMT").cast(pl.Int64,strict=True).sum().over(["SSUID","PNUM"]),   
        )
        .with_columns(
            welfare_pp=pl.col("TSSI_yr") + pl.col("TTANF_yr") + pl.col("TGA_yr"),
            ss_pp=pl.col("TSSS_yr"),
            poverty=pl.when(pl.col("TFINCPOV") <= 1).then(1).otherwise(0).cast(pl.Int8),
        )
        # household level
        .with_columns(
            welfare_hh=pl.col("welfare_pp").sum().over(["SSUID", "SHHADID"]),
            ss_hh=pl.col("ss_pp").sum().over(["SSUID", "SHHADID"]),
        )
        .with_columns(
            public_assistance=pl.when(pl.col("welfare_hh") > 0).then(1).otherwise(0).cast(pl.Int8),
            social_security=pl.when(pl.col("ss_hh") > 0).then(1).otherwise(0).cast(pl.Int8),
        )
        # home value
        .with_columns(
            homevalue=(
                pl.when(pl.col("ETENURE").is_in([2,3])).then(1)
                .when(pl.col("THVAL_HOME") < 50000).then(2)
                .when(pl.col("THVAL_HOME").is_between(50_000, 99_999)).then(3)
                .when(pl.col("THVAL_HOME").is_between(100_000, 299_999)).then(4)
                .when(pl.col("THVAL_HOME").is_between(300_000, 499_999)).then(5)
                .when(pl.col("THVAL_HOME").is_between(500_000, 749_999)).then(6)
                .when(pl.col("THVAL_HOME").is_between(750_000, 999_999)).then(7)
                .when(pl.col("THVAL_HOME") >= 1_000_000).then(8)
            )
        )
        .with_columns(
            state=pl.col("TEHC_ST")
        )
    
)
sipp_us_t

### Imputing Metro

* Create a data frame used for the imputation.
* Remove any records that have any column with a null.
* Create a categorical field **metro_fct** that is calculated based on the value of the **TEHC_METRO** field that will either be "metro", "nonmetro" or null.
* Impute the records where **metro_fct is null**.
* Call R function **VIM.knn** to impute the values where missing.



In [None]:
sipp_impute_metro = (sipp_us_t
 .filter(
     (pl.col("ERELRPE").is_in([1,2])) 
     & (pl.col("MONTHCODE") == 12) 
     & (pl.col("TLIVQTR").is_in([1,2]))
 )
 .with_columns(
     hh_income_rank=pl.col("hh_income") + 1,
     edu_rank=pl.col("edu") + 1,
     white=pl.when(pl.col("race_eth") == 0).then(1).otherwise(0),
     black=pl.when(pl.col("race_eth") == 1).then(1).otherwise(0),
     hispanic=pl.when(pl.col("race_eth") == 2).then(1).otherwise(0),
     asian=pl.when(pl.col("race_eth") == 3).then(1).otherwise(0),
     other_race=pl.when(pl.col("race_eth") == 4).then(1).otherwise(0),
     own_home=pl.when(pl.col("ETENURE") == 1).then(1).otherwise(0),
     homevalue_rank=pl.col("homevalue") + 1,
     self_emp=pl.when(pl.col("class_worker").is_in([0,1])).then(1).otherwise(0),
     wage_salary_emp=pl.when(pl.col("class_worker").is_in([2])).then(1).otherwise(0),
     not_employed=pl.when(pl.col("class_worker").is_in([3,4])).then(1).otherwise(0),
     age_rank=pl.col('age') + 1,
     married_withkids=pl.when(pl.col("household_type").is_in([0])).then(1).otherwise(0),
     married_nokids=pl.when(pl.col("household_type").is_in([1])).then(1).otherwise(0),
     single_withkids=pl.when(pl.col("household_type").is_in([2])).then(1).otherwise(0),
     single_nokids=pl.when(pl.col("household_type").is_in([3])).then(1).otherwise(0),
 )
 .select(
     pl.col("SSUID"),
     pl.col("SHHADID"),
     pl.col("state"),
     pl.col("TEHC_METRO"),
     pl.col("hh_income_rank"),
     pl.col("male"),
     pl.col("age_rank"),
     pl.col("edu_rank"),
     pl.col("white"),
     pl.col("black"),
     pl.col("hispanic"),
     pl.col("asian"),
     pl.col("other_race"),
     pl.col("own_home"),
     pl.col("homevalue_rank"),
     pl.col("disability"),
     pl.col("self_emp"),
     pl.col("wage_salary_emp"),
     pl.col("not_employed"),
     pl.col("married_withkids"),
     pl.col("married_nokids"),
     pl.col("single_withkids"),
     pl.col("single_nokids"),
     pl.col("english_at_home"),
     pl.col("public_assistance"),
     pl.col("social_security"),
     pl.col("poverty"),   
 )
 .drop_nulls()
 .with_columns(
     metro_fct=(
                    pl.when(pl.col("TEHC_METRO") == 0).then(None)
                    .when(pl.col("TEHC_METRO").is_in([1])).then(pl.lit("metro"))
                    .when(pl.col("TEHC_METRO").is_in([2])).then(pl.lit("nonmetro"))
                    .otherwise(None)
                    .cast(pl.Categorical)
                )
 )
)
sipp_impute_metro.shape

In [None]:
# create pandas version
sipp_impute_metro_pdf = sipp_impute_metro.to_pandas()
# create deep copy
sipp_impute_metro_pdf = sipp_impute_metro_pdf.copy()
# loop through columns
for c in sipp_impute_metro_pdf.columns:
    # detect string data types
    if str(sipp_impute_metro_pdf[c].dtype).startswith("string"):
        # cast string data types as object data type
        sipp_impute_metro_pdf[c] = sipp_impute_metro_pdf[c].astype(object)

In [None]:
# python to r conversion
with localconverter(ro.default_converter + pandas2ri.converter):
    r_sipp_impute_metro_pdf = ro.conversion.py2rpy(sipp_impute_metro_pdf)
# assign variable to R global environment
ro.globalenv["r_sipp_impute_metro_pdf"] = r_sipp_impute_metro_pdf

In [None]:
# execute R command to impute field
ro.r("""
suppressPackageStartupMessages(library(VIM))
suppressPackageStartupMessages(library(dplyr))

sipp_impute_metro <- VIM::kNN(r_sipp_impute_metro_pdf, variable = "metro_fct", imp_var = FALSE)
""")

In [None]:
# get the results from R and put them back into Polars data frame
with localconverter(ro.default_converter + pandas2ri.converter):
    sipp_impute_metro = pl.from_pandas(ro.conversion.rpy2py(ro.globalenv["sipp_impute_metro"]))
    sipp_impute_metro = (sipp_impute_metro
                         .with_columns(
                             pl.col("SSUID").cast(pl.Int64),
                             pl.col("SHHADID").cast(pl.Int64)
                         )
                        )
sipp_impute_metro.null_count()

### Second Set of Transformations
* Left join the metro imputation data frame back to the SIPP data frame using "SSUID" and "SHHADID" fields.
* Create binary (0,1) field **metro** based on if the **metro_fct** field is "metro" or "nonmetro".
* Create field **owned_withdebt** to indicate if there are mortgages or loans associated with the residence.  This will be saved with string values including "NA".
* Drop duplicate columns (_right) that are created as a result of the left join.

In [None]:
sipp_us_t2 = (sipp_us_t
              # left join on metro table 
              .join(sipp_impute_metro,
                    how='left',
                    on=['SSUID','SHHADID'])
              #  metro
              .with_columns(
                  pl.when(pl.col("metro_fct") == "metro").then(pl.lit(1))
                  .when(pl.col("metro_fct") == "nonmetro").then(pl.lit(0))
                  .when(pl.col("metro_fct").is_null() & (pl.col("TEHC_METRO").cast(pl.Int64) == 1)).then(pl.lit(1))
                  .when(pl.col("metro_fct").is_null() & (pl.col("TEHC_METRO").cast(pl.Int64) == 2)).then(pl.lit(0))
                  .otherwise(None)
                  .cast(pl.Int8)
                  .alias("metro")
               )
              # EPRDEB = 1 then set to "2" and then if EPRDEBT = 2 then set to "1"   
              .with_columns(
                  owned_withdebt=(
                        pl.when(pl.col("EPRDEBT") == 1).then(pl.lit("2"))
                        .when(pl.col("EPRDEBT") == 2).then(pl.lit("1"))
                        .when(pl.col("EPRDEBT").is_null()).then(pl.lit("NA"))
                  )     
              )
             )
# drop duplicate columns as a result of the join
sipp_us_t2 = sipp_us_t2.drop([col for col in sipp_us_t2.columns if col.endswith("_right")])
sipp_us_t2

### Imputing Tenure

* Create a data frame used for the imputation.
* Remove any records that have any column with a null.
* Update field **owned_withdebt** casting as an integer.
* Impute the records where **owned_withdebt is null**.
* Call R function **VIM.knn** to impute the values where missing.

In [None]:
sipp_impute_tenure = (sipp_us_t2.
                      filter(
                          (pl.col("ERELRPE").is_in([1,2])) 
                          & (pl.col("MONTHCODE").is_in([12]))
                          & (pl.col("TLIVQTR").is_in([1,2]))
                          & (pl.col("ETENURE").is_in([1]))
                      )
                      .with_columns(
                          hh_income_rank=pl.col("hh_income") + 1,
                          edu_rank=pl.col("edu") + 1,
                          white=pl.when(pl.col("race_eth") == 0).then(1).otherwise(0),
                          black=pl.when(pl.col("race_eth") == 1).then(1).otherwise(0),
                          hispanic=pl.when(pl.col("race_eth") == 2).then(1).otherwise(0),
                          asian=pl.when(pl.col("race_eth") == 3).then(1).otherwise(0),
                          other_race=pl.when(pl.col("race_eth") == 4).then(1).otherwise(0),
                          homevalue_rank=pl.col("homevalue") + 1,
                          self_emp=pl.when(pl.col("class_worker").is_in([0,1])).then(1).otherwise(0),
                          wage_salary_emp=pl.when(pl.col("class_worker").is_in([2])).then(1).otherwise(0),
                          not_employed=pl.when(pl.col("class_worker").is_in([3,4])).then(1).otherwise(0),
                          age_rank=pl.col('age') + 1,
                          married_withkids=pl.when(pl.col("household_type").is_in([0])).then(1).otherwise(0),
                          married_nokids=pl.when(pl.col("household_type").is_in([1])).then(1).otherwise(0),
                          single_withkids=pl.when(pl.col("household_type").is_in([2])).then(1).otherwise(0),
                          single_nokids=pl.when(pl.col("household_type").is_in([3])).then(1).otherwise(0),
                        )
                      .select(
                          pl.col("SSUID"),
                          pl.col("SHHADID"),
                          pl.col("state"),
                          pl.col("TEHC_METRO"),
                          pl.col("hh_income_rank"),
                          pl.col("male"),
                          pl.col("age_rank"),
                          pl.col("edu_rank"),
                          pl.col("white"),
                          pl.col("black"),
                          pl.col("hispanic"),
                          pl.col("asian"),
                          pl.col("other_race"),
                          pl.col("owned_withdebt"),
                          pl.col("disability"),
                          pl.col("self_emp"),
                          pl.col("wage_salary_emp"),
                          pl.col("not_employed"),
                          pl.col("married_withkids"),
                          pl.col("married_nokids"),
                          pl.col("single_withkids"),
                          pl.col("single_nokids"),
                          pl.col("english_at_home"),
                          pl.col("public_assistance"),
                          pl.col("social_security"),
                          pl.col("poverty"),   
                        )
                        .drop_nulls()
                        # cast to integer for imputation
                        .with_columns(
                            pl.col("owned_withdebt").replace({"NA":None}).cast(pl.Int8).alias("owned_withdebt")
                        )
                    )

sipp_impute_tenure.null_count()

In [None]:
# create pandas version
sipp_impute_tenure_pdf = sipp_impute_tenure.to_pandas()
# create deep copy
sipp_impute_tenure_pdf = sipp_impute_tenure_pdf.copy()
# loop through columns
for c in sipp_impute_tenure_pdf.columns:
    # detect string data types
    if str(sipp_impute_tenure_pdf[c].dtype).startswith("string"):
        # cast string data types as object data type
        sipp_impute_tenure_pdf[c] = sipp_impute_tenure_pdf[c].astype(object)

In [None]:
# python to r conversion
with localconverter(ro.default_converter + pandas2ri.converter):
    r_sipp_impute_tenure_pdf = ro.conversion.py2rpy(sipp_impute_tenure_pdf)
# assign variable to R global environment
ro.globalenv["r_sipp_impute_tenure_pdf"] = r_sipp_impute_tenure_pdf

In [None]:
# execute R command to impute field
ro.r("""
suppressPackageStartupMessages(library(VIM))
suppressPackageStartupMessages(library(dplyr))

sipp_impute_tenure <- VIM::kNN(r_sipp_impute_tenure_pdf, variable = "owned_withdebt", imp_var = FALSE)
""")

In [None]:
# get the results from R and put them back into Polars data frame
with localconverter(ro.default_converter + pandas2ri.converter):
    sipp_impute_tenure = pl.from_pandas(ro.conversion.rpy2py(ro.globalenv["sipp_impute_tenure"]))
    sipp_impute_tenure = (sipp_impute_tenure
                         .with_columns(
                             pl.col("SSUID").cast(pl.Int64),
                             pl.col("SHHADID").cast(pl.Int64)
                         )
                        )
sipp_impute_tenure

### Third Set of Transformations
* Drop owned_withdebt field from sipp_us_t2 data frame since the imputed version will be joined.
* Left join the imputed data frame on the "SSUID","SHHADID" fields.
* Calculated tenure field that will indicate if the residence is "owned free and clear", "owned with mortgage or loan" or "not owned" usin the **ETENURE** and **owned_withdebt** fields.

In [None]:
sipp_us_t3 = (sipp_us_t2
 .select(~cs.contains("owned_withdebt"))
 .join(sipp_impute_tenure,on=["SSUID","SHHADID"],
       how="left")
 .with_columns(
     tenure=(
        pl.when(
            (pl.col("ETENURE") == 1) & (pl.col("owned_withdebt") == 1)
        ).then(pl.lit(1))
        .when(
            (pl.col("ETENURE") == 1) & (pl.col("owned_withdebt") == 2)
        ).then(pl.lit(2))
        .when(
            pl.col("ETENURE").is_in([2,3])
        ).then(pl.lit(3))     
     )
  )
)
sipp_us_t3


### Calculate Outcome Variables

* EOWN_BSJ - Indicates if a person is self-employed/owns a business
* pp_any_debt - Indicates if a person has any debts looking at all the debt related fields.
* pp_any_asset - Indicates if a person has any assets looking at all the asset related fields including the calculated **EOWN_BSJ** field.
* sum_pp_debt - Summing the **pp_any_debt** grouped by fields "SSUID","SHHADID".
* sum_pp_asset - Summing the **pp_any_asset** grouped by fields "SSUID","SHHADID".
* hh_any_debt - Calculate if household has any debt including calculations if sum_pp_debt > 0 grouped by fields "SSUID","SHHADID".
* hh_any_asset - Calculate if household has any assets including calculations if sum_pp_asset > 0 grouped by fields "SSUID","SHHADID".



In [None]:
sipp_us_w_outcome = (sipp_us_t3
 .with_columns(
    EOWN_BSJ = (
        pl.when(
            pl.any_horizontal(
                pl.col("EJB1_JBORSE").cast(pl.Int8,strict=True) == 2,
                pl.col("EJB2_JBORSE").cast(pl.Int8,strict=True) == 2,
                pl.col("EJB3_JBORSE").cast(pl.Int8,strict=True) == 2,
                pl.col("EJB4_JBORSE").cast(pl.Int8,strict=True) == 2,
                pl.col("EJB5_JBORSE").cast(pl.Int8,strict=True) == 2,
                pl.col("EJB6_JBORSE").cast(pl.Int8,strict=True) == 2,
                pl.col("EJB7_JBORSE").cast(pl.Int8,strict=True) == 2,
            )
        ).then(1).otherwise(None).cast(pl.Int8)
    )
 )
 .with_columns(
    pp_any_debt=(
        pl.when(
             (pl.col("MONTHCODE") == 12)
             & pl.col("TLIVQTR").is_in([1, 2])
             & pl.any_horizontal(
                pl.col("EDEBT_CC").cast(pl.Int8,strict=True) == 1,
                pl.col("EDEBT_MED").cast(pl.Int8,strict=True) == 1,
                pl.col("EDEBT_ED").cast(pl.Int8,strict=True) == 1,
                pl.col("EDEBT_OT").cast(pl.Int8,strict=True) == 1,
                pl.col("EPRDEBT").cast(pl.Int8,strict=True) == 1,
                pl.col("EMHDEBT").cast(pl.Int8,strict=True) == 1,
                pl.col("EVEH1DEBT").cast(pl.Int8,strict=True) == 1,
                pl.col("EVEH2DEBT").cast(pl.Int8,strict=True) == 1,
                pl.col("EVEH3DEBT").cast(pl.Int8,strict=True) == 1,
                pl.col("EMCYCDEBT").cast(pl.Int8,strict=True) == 1,
                pl.col("EBOATDEBT").cast(pl.Int8,strict=True) == 1,
                pl.col("ERVDEBT").cast(pl.Int8,strict=True) == 1,
                pl.col("EORECDEBT").cast(pl.Int8,strict=True) == 1,
                pl.col("TDEBT_BUS").cast(pl.Int64,strict=True) > 0,
             ) 
        ).then(1).otherwise(0).cast(pl.Int8)
    ),
    pp_any_asset=(
        pl.when(
             (pl.col("MONTHCODE") == 12)
             & pl.col("TLIVQTR").is_in([1, 2])
             & pl.any_horizontal(
                pl.col("EOWN_TREQ").cast(pl.Int8,strict=True) == 1,
                pl.col("EOWN_ANNEQ").cast(pl.Int8,strict=True) == 1,
                pl.col("EOWN_BSI").cast(pl.Int8,strict=True) == 1,
                pl.col("EOWN_BSJ").cast(pl.Int8,strict=True) == 1,
                pl.col("EOWN_CD").cast(pl.Int8,strict=True) == 1,
                pl.col("EOWN_CHK").cast(pl.Int8,strict=True) == 1,
                pl.col("EOWN_GOVS").cast(pl.Int8,strict=True) == 1,
                pl.col("EOWN_IRAKEO").cast(pl.Int8,strict=True) == 1,
                ((pl.col("EOWN_LIFE").cast(pl.Int8,strict=True) == 1) 
                 & (pl.col("ELIFE_TYPE").cast(pl.Int8,strict=True) == 1)),
                pl.col("EOWN_MCBD").cast(pl.Int8,strict=True) == 1,
                pl.col("EOWN_MF").cast(pl.Int8,strict=True) == 1,
                pl.col("EOWN_MM").cast(pl.Int8,strict=True) == 1,
                pl.col("EOWN_OINV").cast(pl.Int8,strict=True) == 1,
                pl.col("EOWN_RE").cast(pl.Int8,strict=True) == 1,
                pl.col("EOWN_RP").cast(pl.Int8,strict=True) == 1,
                pl.col("EOWN_SAV").cast(pl.Int8,strict=True) == 1,
                pl.col("EOWN_ST").cast(pl.Int8,strict=True) == 1,
                pl.col("EOWN_THR401").cast(pl.Int8,strict=True) == 1,
                pl.col("EOWN_ESAV").cast(pl.Int8,strict=True) == 1,
                pl.col("EOWN_RECV").cast(pl.Int8,strict=True) == 1,
                ((pl.col("EOWN_VEH").cast(pl.Int8,strict=True) == 1) 
                 & (pl.col("TVEH_NUM").cast(pl.Int8,strict=True) > 0)),
                pl.col("ETENURE").cast(pl.Int8,strict=True) == 1,
             ) 
        ).then(1).otherwise(0).cast(pl.Int8)
    ) 
  )
  .with_columns(
        # household sums (nulls ignored by default; fill_null(0) is extra-safe)
        pl.col("pp_any_debt").fill_null(0).sum().over(["SSUID", "SHHADID"]).alias("sum_pp_debt"),
        pl.col("pp_any_asset").fill_null(0).sum().over(["SSUID", "SHHADID"]).alias("sum_pp_asset"),
  )
  .with_columns(
        # household any debt flag
        pl.when(
            (pl.col("MONTHCODE") == 12)
            & pl.col("TLIVQTR").is_in([1, 2])
            & pl.col("ERELRPE").is_in([1, 2])
            & (pl.col("sum_pp_debt") > 0)
            & (pl.col("THDEBT_AST") > 0)
        )
        .then(1)
        .otherwise(0)
        .cast(pl.Int8)
        .alias("hh_any_debt"),
         # household any asset flag
        pl.when(
            (pl.col("MONTHCODE") == 12)
            & pl.col("TLIVQTR").is_in([1, 2])
            & pl.col("ERELRPE").is_in([1, 2])
            & (pl.col("sum_pp_asset") > 0)
            & (pl.col("THVAL_AST") > 0)
        )
        .then(1)
        .otherwise(0)
        .cast(pl.Int8)
        .alias("hh_any_asset")   
  )
)
sipp_us_w_outcome

### Calculating 99th Percentiles

BWDC guidance (from talking with people at SIPP) said to filter out >99th percentile of the household assets, debts, and income. However, since we use a percent rank model, I choose to keep these people in the data. Since the top of the distribution might not be representative, I topcode the top to exactly the 99th percentile. This prevents some weird outliers where people with assets and debts >99th percentile have debts that supremely outweight their assets, and have ultra-rich people with the most extreme negative net worth.

In [None]:
sipp_us_w_outcome.width

In [None]:
sipp_svy_data = (sipp_us_w_outcome
 .select(
     pl.col("SSUID","SHHADID"),
     pl.col("MONTHCODE"),
     pl.col("WPFINWGT"),
     pl.col("ERELRPE"),
     pl.col("TLIVQTR"),
     pl.col("THVAL_AST","THDEBT_AST"),
     pl.col("hh_any_asset","hh_any_debt"),
     cs.contains("REPWGT")
 )
)
sipp_svy_data

In [None]:
# create pandas version
sipp_svy_data_pdf = sipp_svy_data.to_pandas()
# create deep copy
sipp_svy_data_pdf = sipp_svy_data_pdf.copy()
# loop through columns
for c in sipp_svy_data_pdf.columns:
    # detect string data types
    if str(sipp_svy_data_pdf[c].dtype).startswith("string"):
        # cast string data types as object data type
        sipp_svy_data_pdf[c] = sipp_svy_data_pdf[c].astype(object)

In [None]:
# python to r conversion
with localconverter(ro.default_converter + pandas2ri.converter):
    r_sipp_svy_data_pdf = ro.conversion.py2rpy(sipp_svy_data_pdf)
# assign variable to R global environment
ro.globalenv["r_sipp_svy_data_pdf"] = r_sipp_svy_data_pdf

In [None]:
# execute R command to impute field
ro.r("""
suppressPackageStartupMessages(library(VIM))
suppressPackageStartupMessages(library(dplyr))
suppressPackageStartupMessages(library(srvyr))

svy_data <-  r_sipp_svy_data_pdf %>%
  as_survey_rep(weights = WPFINWGT,
                repweights = REPWGT0:REPWGT240,
                type = "BRR")
                
asset_q <- svy_data %>%
  filter(ERELRPE %in% c(1,2) & MONTHCODE == 12 & TLIVQTR %in% c(1,2) & hh_any_asset == 1) %>%
  srvyr::summarize(asset = survey_quantile(THVAL_AST, .99))
  
debt_q <- svy_data %>%
  filter(ERELRPE %in% c(1,2) & MONTHCODE == 12 & TLIVQTR %in% c(1,2) & hh_any_debt == 1) %>%
  srvyr::summarize(debt = survey_quantile(THDEBT_AST, .99))
""")

In [None]:
# get the results from R and put them back into Polars data frame
with localconverter(ro.default_converter + pandas2ri.converter):
    sipp_asset_q = pl.from_pandas(ro.conversion.rpy2py(ro.globalenv["asset_q"]))    
    sipp_debt_q = pl.from_pandas(ro.conversion.rpy2py(ro.globalenv["debt_q"]))

In [None]:
sipp_asset_q

In [None]:
sipp_debt_q

In [None]:
asset_q99 = sipp_asset_q['asset_q99'].first()
asset_q99

In [None]:
debt_q99 = sipp_debt_q['debt_q99'].first()
debt_q99

### Calculate Asset Fields

Check to make sure that everyone with value of assets == 0 have "hh_any_asset" == 0 and no one with hh_any_asset or hh_any_debt have values equal to 0.


In [None]:
asset_cond = (
    pl.col("ERELRPE").is_in([1, 2]) &
    (pl.col("MONTHCODE") == 12) &
    pl.col("TLIVQTR").is_in([1, 2]) &
    (pl.col("hh_any_asset") == 1)
)

sipp_us_w_outcome_assets = (sipp_us_w_outcome
 .with_columns(
  pl.when(asset_cond & (pl.col("THVAL_AST") < asset_q99))
      .then(pl.col("THVAL_AST"))
      .when(asset_cond & (pl.col("THVAL_AST") >= asset_q99))
      .then(asset_q99)
      .otherwise(None)
      .alias("hh_assets")      
 )
 .with_columns(
    pl.when(asset_cond).then(pl.col("THVAL_BANK")).otherwise(None).alias("assets_bank"),
    pl.when(asset_cond).then(pl.col("THVAL_BOND")).otherwise(None).alias("assets_bond"),
    pl.when(asset_cond).then(pl.col("THVAL_STMF")).otherwise(None).alias("assets_stock"),
    pl.when(asset_cond).then(pl.col("THVAL_RENT")).otherwise(None).alias("assets_rent"),
    pl.when(asset_cond).then(pl.col("THVAL_RE")).otherwise(None).alias("assets_realest"),
    pl.when(asset_cond).then(pl.col("THVAL_BUS")).otherwise(None).alias("assets_bus"),
    pl.when(asset_cond).then(pl.col("THVAL_HOME")).otherwise(None).alias("assets_home"),
    pl.when(asset_cond).then(pl.col("THVAL_RET")).otherwise(None).alias("assets_ret"),
    pl.when(asset_cond).then(pl.col("THVAL_VEH")).otherwise(None).alias("assets_veh"),
    pl.when(asset_cond).then(pl.col("THVAL_ESAV")).otherwise(None).alias("assets_esav"),
    pl.when(asset_cond).then(pl.col("THVAL_OTH")).otherwise(None).alias("assets_oth")    
 )
)
sipp_us_w_outcome_assets

### Calculate Debt Fields

Only some of the debt types are already broken out for the household. Medical debt and business debts are some that I have to manually roll up. Note that medical debt at the individual level is only reported for household members older than 15 and in month 12. Business debts are for businesses as jobs and businesses as investments. 

* **TORPDEBTVAL_hh** - Household sum of **TORPDEBTVAL**, only for HHs flagged with debt
* **business_job_pp** - Person-level business job debt (only if **hh_any_debt** == 1)
* **business_job_debts** - Household sum of person-level business job debts
* **business_invest_pp** - Person-level business investment debt (only if **hh_any_debt** == 1)
* **business_invest_debts** - Household sum of person-level business investment debts
* **medical_debt_pp** - Person-level medical debt: only age>=15 and December snapshot
* **medical_debt_hh** - Household sum of medical debt
* **hh_debts** - Household debts (only for ref person/spouse, Dec, in quarters, **hh_any_debt**==1)
* **debts_home, debt_CC, debt_medical, debt_edu, debt_other** - Debts for different categories
* **debt_business** - Business related debts.
* **secured_debts, unsecured_debts** - Secured and Unsecured debts



In [None]:
debt_cond = (
    pl.col("ERELRPE").is_in([1, 2]) &
    (pl.col("MONTHCODE") == 12) &
    pl.col("TLIVQTR").is_in([1, 2]) &
    (pl.col("hh_any_debt") == 1)
)


In [None]:
sipp_us_w_outcome_assets_debts =  (sipp_us_w_outcome_assets.
 with_columns(
   pl.when(pl.col("hh_any_debt") == 1)
          .then(pl.col("TORPDEBTVAL").cast(pl.Float32,strict=True).fill_null(0).sum().over(["SSUID", "SHHADID"]))
          .otherwise(None)
          .cast(pl.Float32,strict=True)
          .alias("TORPDEBTVAL_hh"),
   pl.when(pl.col("hh_any_debt") == 1)
          .then(
              pl.sum_horizontal(
                  pl.col("TBSJ1DEBTVAL"),
                  pl.col("TBSJ2DEBTVAL"),
                  pl.col("TBSJ3DEBTVAL"),
                  pl.col("TBSJ4DEBTVAL"),
                  pl.col("TBSJ5DEBTVAL"),
              )
          )
          .otherwise(None)
          .cast(pl.Float32,strict=False)
          .alias("business_job_pp"),
  )
  .with_columns( 
    pl.col("business_job_pp").fill_null(0).sum().over(["SSUID", "SHHADID"]).alias("business_job_debts"),     
  )
  .with_columns(
    pl.when(pl.col("hh_any_debt") == 1)
          .then(
              pl.sum_horizontal(
                  pl.col("TBSI1DEBTVAL"),
                  pl.col("TBSI2DEBTVAL"),
                  pl.col("TBSI3DEBTVAL"),
              )
          )
          .otherwise(None)
          .cast(pl.Float32,strict=False)
          .alias("business_invest_pp")
  )
  .with_columns(
    pl.col("business_invest_pp").fill_null(0).sum().over(["SSUID", "SHHADID"]).alias("business_invest_debts"),
  )
  .with_columns(
     pl.when((pl.col("TAGE") >= 15) & (pl.col("MONTHCODE") == 12))
          .then(pl.col("TMED_AMT"))
          .otherwise(None)
          .cast(pl.Float32,strict=False)
          .alias("medical_debt_pp")
  )
  .with_columns(
    pl.col("medical_debt_pp").fill_null(0).sum().over(["SSUID", "SHHADID"]).alias("medical_debt_hh")
  )
  .with_columns(
    pl.when(debt_cond & (pl.col("THDEBT_AST") < debt_q99))
          .then(pl.col("THDEBT_AST"))
          .when(debt_cond & (pl.col("THDEBT_AST") >= debt_q99))
          .then(debt_q99)
          .otherwise(None)
          .alias("hh_debts")
  )
  .with_columns(
     pl.when(debt_cond).then(pl.col("THDEBT_HOME")).otherwise(None).alias("debt_home"),
        pl.when(debt_cond).then(pl.col("THDEBT_CC")).otherwise(None).alias("debt_CC"),
        pl.when(debt_cond).then(pl.col("medical_debt_hh")).otherwise(None).alias("debt_medical"),
        pl.when(debt_cond).then(pl.col("THDEBT_ED")).otherwise(None).alias("debt_edu"),
        pl.when(debt_cond).then(pl.col("THDEBT_OT")).otherwise(None).alias("debt_oth"),
  )
  .with_columns(
      pl.when(debt_cond)
          .then((pl.col("business_job_debts") + pl.col("business_invest_debts")))
          .otherwise(None)
          .alias("debt_business")  
  )
  .with_columns(
        pl.when(debt_cond).then(pl.col("THDEBT_SEC")).otherwise(None).alias("secured_debts"),
        pl.when(debt_cond).then(pl.col("THDEBT_USEC")).otherwise(None).alias("unsecured_debts"),  
  )
)
sipp_us_w_outcome_assets_debts

### Net Worth Calculation

* **hh_any_wealth** - If ref person/spouse in Dec/in-quarters AND (any asset OR any debt), else 0.
* **hh_networth** - Only defined for ref person/spouse in Dec/in-quarters to calculate household net worth.


In [None]:
net_worth_cond = (
    pl.col("ERELRPE").is_in([1, 2]) &
    (pl.col("MONTHCODE") == 12) &
    pl.col("TLIVQTR").is_in([1, 2])
)

sipp_us_w_outcome_assets_debts_networth = (sipp_us_w_outcome_assets_debts.with_columns(
    
    pl.when(
        net_worth_cond & ((pl.col("hh_any_asset") == 1) | (pl.col("hh_any_debt") == 1))
    )
    .then(1)
    .otherwise(0)
    .cast(pl.Int8)
    .alias("hh_any_wealth"),

    pl.when(net_worth_cond & (pl.col("hh_any_asset") == 1) & (pl.col("hh_any_debt") == 1))
      .then(pl.col("hh_assets") - pl.col("hh_debts"))
    .when(net_worth_cond & (pl.col("hh_any_asset") == 1) & (pl.col("hh_any_debt") == 0))
      .then(pl.col("hh_assets"))
    .when(net_worth_cond & (pl.col("hh_any_asset") == 0) & (pl.col("hh_any_debt") == 1))
      .then(-pl.col("hh_debts"))
    .otherwise(None)
    .alias("hh_networth"),
    )
)
sipp_us_w_outcome_assets_debts_networth

## Output

In [None]:
os.makedirs('output',exist_ok=True)

### SIPP Export

In [None]:
sipp_us_w_outcome_assets_debts_networth_export = (sipp_us_w_outcome_assets_debts_networth
 .select(
     pl.col("SSUID", "SHHADID", "ERELRPE", "MONTHCODE", "TLIVQTR"),
     pl.col("state", "metro", "hh_income", "male", "age", "edu", "race_eth", "tenure",
         "homevalue", "disability", "class_worker", "household_type", "citizen",
         "english_at_home", "public_assistance", "social_security", "poverty",
         "hh_any_asset", "hh_any_debt", "hh_any_wealth", "hh_assets", "hh_debts",
         "hh_networth"),
     pl.col("WPFINWGT"),
     (cs.contains("REPWGT"))
 )
)
sipp_us_w_outcome_assets_debts_networth_export.write_parquet("output/sipp_us_save.parquet")
sipp_us_w_outcome_assets_debts_networth_export

### SIPP Model

In [None]:
sipp_model = (sipp_us_w_outcome_assets_debts_networth_export
 .filter(
     (pl.col("ERELRPE").is_in([1,2])) 
     & (pl.col("MONTHCODE") == 12)
     & (pl.col("TLIVQTR").is_in([1,2]))
 )
 .select(
     pl.col("state", "metro", "hh_income", "male", "age", "edu", "race_eth", "tenure",
         "homevalue", "disability", "class_worker", "household_type", "citizen",
         "english_at_home", "public_assistance", "social_security", "poverty",
         "hh_any_asset", "hh_any_debt", "hh_any_wealth", "hh_assets", "hh_debts",
         "hh_networth"),
     pl.col("WPFINWGT"),
     (cs.contains("REPWGT"))
 )
 .with_row_index("row_id")
)

sipp_model

In [None]:
# asset weighted-rank table
wtd_assets = (
    sipp_model
    .drop_nulls(subset=["hh_assets"])
    .sort("hh_assets")
    .with_columns(
        # cumulative sum of weights
        pl.col("WPFINWGT").cum_sum().alias("cum_wgt")
    )
    .with_columns(
        # lag(cumsum(WPFINWGT), default = 0)
        pl.col("cum_wgt").shift(1).fill_null(0).alias("cum_wgt_lag")
    )
    .with_columns(
        # prank_assets = lag(cumsum(wgt)) / (sum(wgt) - 1)
        (pl.col("cum_wgt_lag") / (pl.col("WPFINWGT").sum() - 1))
        .alias("prank_assets")
    )
    .select(pl.col("row_id"),pl.col("prank_assets"))
)
sipp_model = sipp_model.join(wtd_assets,on="row_id",how="left")
sipp_model

In [None]:
# debt weighted-rank table
wtd_debts = (
    sipp_model
    .drop_nulls(subset=["hh_debts"])
    .sort("hh_debts")
    .with_columns(
        # cumulative sum of weights
        pl.col("WPFINWGT").cum_sum().alias("cum_wgt")
    )
    .with_columns(
        # lag(cumsum(WPFINWGT), default = 0)
        pl.col("cum_wgt").shift(1).fill_null(0).alias("cum_wgt_lag")
    )
    .with_columns(
        (pl.col("cum_wgt_lag") / (pl.col("WPFINWGT").sum() - 1))
        .alias("prank_debts")
    )
    .select(pl.col("row_id"),pl.col("prank_debts"))
)
sipp_model = sipp_model.join(wtd_debts,on="row_id",how="left")
sipp_model

In [None]:
# networth weighted-rank table
wtd_wealth = (
    sipp_model
    .drop_nulls(subset=["hh_networth"])
    .sort("hh_networth")
    .with_columns(
        # cumulative sum of weights
        pl.col("WPFINWGT").cum_sum().alias("cum_wgt")
    )
    .with_columns(
        # lag(cumsum(WPFINWGT), default = 0)
        pl.col("cum_wgt").shift(1).fill_null(0).alias("cum_wgt_lag")
    )
    .with_columns(
        (pl.col("cum_wgt_lag") / (pl.col("WPFINWGT").sum() - 1))
        .alias("prank_networth")
    )
    .select(pl.col("row_id"),pl.col("prank_networth"))
)
sipp_model = sipp_model.join(wtd_wealth,on="row_id",how="left")
sipp_model

In [None]:
sipp_model.write_parquet('output/sipp_model.parquet')

In [None]:
sipp_model

# IPUMS

IPUMS (Integrated Public Use Microdata Series) is a long-running project that harmonizes microdata from major U.S. and international surveys and censuses and makes them easy to access, compare, and analyze across time and geography.

Using years 2014 to 2018.  

## Get Data From API
Have API Key for IPUMS in envrionmental variable called **IPUMS_API_KEY**.  Filter data to North Carolina using FIPS code 37.

In [None]:
if DOWNLOAD_IPUMS:
    try:
        shutil.rmtree(IPUMS_DOWNLOAD_DIR)
    except:
        pass
    os.makedirs(IPUMS_DOWNLOAD_DIR,exist_ok=True)

    ipums = IpumsApiClient(IPUMS_API_KEY)

    extract = MicrodataExtract(
        collection="usa",
        description="Extract",
        samples=["us2014a","us2015a","us2016a","us2017a","us2018a"],
        variables=[ "YEAR","STATEFIP","COUNTYFIP","AGE","SEX","RACE","HISPAN","HHINCOME",
                   "EDUCD","CITIZEN","DIFFSENS","DIFFREM","DIFFPHYS","DIFFCARE","DIFFMOB",
                   "EMPSTAT","CLASSWKRD","SERIAL","PERNUM","MARST","LANGUAGE",
                   "INCWELFR","POVERTY","INCSS","OWNERSHP","VALUEH","METRO","RELATE",
                   "PUMA"],
        data_format="csv"
    )
    ipums.submit_extract(extract)
    print("Submitted")
    ipums.wait_for_extract(extract)
    print("Waiting for extract")
    ipums.download_extract(extract, download_dir=IPUMS_DOWNLOAD_DIR)
    print("Downloading Extract")

In [None]:
# get the extract file
extract_files = {"file":f for f in os.listdir(IPUMS_DOWNLOAD_DIR) if f.endswith('.csv.gz')}
extract_file = extract_files.get("file",None)
extract_file

In [None]:
ipums = pl.scan_csv(f'{IPUMS_DOWNLOAD_DIR}/{extract_file}').collect()

In [None]:
# ipums filtered by north carolina
ipums_nc = ipums.filter(pl.col('STATEFIP') == 37)
# ipums_nc.write_csv(r"output\ipums_nc.csv")

In [None]:
ipums_la = ipums.filter(pl.col('STATEFIP') == 22)
# ipums_la.write_csv(r"output\ipums_la.csv")

In [None]:
# uncomment based on which state to run
# ipums_df = ipums_nc
ipums_df = ipums_la

## Transformations

### First Set of Transformations

* Household
    * hh_income_adj - Household income adjustment
    * hh_income - Household income is reported for the past 12 months previous to the month interviewed. .
    * home_value - Binning of home value field into different categories based on value ranges.
    * household_type - Set the category of the type of household depending if the person is married or single and if they have children.
    * income_NA - Boolean indicator if household income is not available.
* Demographics
    * age - Binning of age field into different categories based on value ranges.
    * male - Indicator to show if the individual is a Male.
    * edu - Category field showing the level of education of the individual.
    * hisp_fct - Category field showing if the individual is Hispanic or Non-Hispanic.
    * race_fct - Category field capturing the race fo the individual.
    * race_eth - Combining the **hisp_fct** and **race_fct** field into a new category field.
    * citizen - Indicator to show if the individual is a citizen.
    * disability - Indicator to show if the individual has a disability.
* Class Worker
    * class_worker - Create a category capturing the worker group the individual would fall into.
* Children
    * child_flag - Based on the age create an indicator to show if the individual is a child.
    * child_sum - For each "SSUID", "SHHADID" grouping sum the **child_flag** field to determine the number of children in the household.
* Welfare
    * public_assistance_pp - Indicates if a person using public assistance services.
    * social_security_pp - Indicates if a person using social security services.
    * poverty - Indicator to identify if an individual is in poverty.
    * public_assistance_hh - Sum of **public_assistance_pp**.
    * public_assistance - Indicator if household uses public assistance services.
    * social_security_hh - Sum of **social_security_pp**.
    * social_security - Indicator if household uses social security services.
    * social_security - Indicator of 1 or 0 to indicate if the field **ss_hh** has amount greater than 0.
    * poverty_NA - Boolean indicator if poverty status is not available.
* Language
    * english_at_home - Indicator to show if an individual can speak English.


In [None]:
ipums_df_t = ipums_df

ipums_df_t = (
    ipums_df_t
    # household income adjustment
    .with_columns(
        hh_income_adj=(
            pl.when(pl.col("HHINCOME") == 9999999).then(None)
            .when(pl.col("YEAR") == 2014).then(pl.col("HHINCOME") * 1.06)
            .when(pl.col("YEAR") == 2015).then(pl.col("HHINCOME") * 1.06)
            .when(pl.col("YEAR") == 2016).then(pl.col("HHINCOME") * 1.05)
            .when(pl.col("YEAR") == 2017).then(pl.col("HHINCOME") * 1.02)
            .when(pl.col("YEAR") == 2018).then(pl.col("HHINCOME"))
        )
    )
    # household income factor
    .with_columns(
        hh_income=(
            pl.when(pl.col('hh_income_adj') <= 0).then(1)
            .when(pl.col("hh_income_adj").is_between(1, 4_999)).then(2)
            .when(pl.col("hh_income_adj").is_between(5_000, 14_999)).then(3)
            .when(pl.col("hh_income_adj").is_between(15_000, 19_999)).then(4)
            .when(pl.col("hh_income_adj").is_between(20_000, 24_999)).then(5)
            .when(pl.col("hh_income_adj").is_between(25_000, 29_000)).then(6)
            .when(pl.col("hh_income_adj").is_between(30_000, 34_999)).then(7)
            .when(pl.col("hh_income_adj").is_between(35_000, 44_999)).then(8)
            .when(pl.col("hh_income_adj").is_between(45_000, 54_999)).then(9)
            .when(pl.col("hh_income_adj").is_between(55_000, 64_999)).then(10)
            .when(pl.col("hh_income_adj").is_between(65_000, 74_999)).then(11)
            .when(pl.col("hh_income_adj").is_between(75_000, 89_999)).then(12)
            .when(pl.col("hh_income_adj").is_between(90_000, 104_999)).then(13)
            .when(pl.col("hh_income_adj").is_between(105_000, 124_999)).then(14)
            .when(pl.col("hh_income_adj").is_between(125_000, 149_999)).then(15)
            .when(pl.col("hh_income_adj").is_between(150_000, 199_999)).then(16)
            .when(pl.col("hh_income_adj").is_between(200_000, 499_999)).then(17)
            .when(pl.col("hh_income_adj") >= 500_000).then(18)
            .otherwise(None)
        )
    )
    # age factor
    .with_columns(
        age=(
                pl.when(pl.col("AGE") < 15).then(1)
                .when(pl.col("AGE").is_between(15, 24)).then(2)
                .when(pl.col("AGE").is_between(25, 29)).then(3)
                .when(pl.col("AGE").is_between(30, 34)).then(4)
                .when(pl.col("AGE").is_between(35, 39)).then(5)
                .when(pl.col("AGE").is_between(40, 44)).then(6)
                .when(pl.col("AGE").is_between(45, 49)).then(7)
                .when(pl.col("AGE").is_between(50, 54)).then(8)
                .when(pl.col("AGE").is_between(55, 59)).then(9)
                .when(pl.col("AGE").is_between(60, 64)).then(10)
                .when(pl.col("AGE").is_between(65, 69)).then(11)
                .when(pl.col("AGE").is_between(70, 74)).then(12)
                .when(pl.col("AGE").is_between(75, 79)).then(13)
                .when(pl.col("AGE").is_between(80, 84)).then(14)
                .when(pl.col("AGE") >= 85).then(15)
                .otherwise(None)
                .cast(pl.Int8)
            )
    )
    # male indicator
    .with_columns(
        male=(
                pl.when(pl.col("SEX") == 1).then(pl.lit(1))
                    .when(pl.col("SEX") == 2).then(pl.lit(0))
                    .otherwise(None)
                )
    )
    # education factor
    .with_columns(
        edu=(
                    pl.when(pl.col("EDUCD").is_between(2,61)).then(1)
                    .when(pl.col("EDUCD").is_between(62,64)).then(pl.lit(2))
                    .when(pl.col("EDUCD").is_between(65,100)).then(3)
                    .when(pl.col("EDUCD") == 101).then(4)
                    .when(pl.col("EDUCD").is_between(110,116)).then(5)
                    .when(pl.col("EDUCD").is_in([999,1,0])).then(None)
                    .otherwise(None)
            )
    )
    # hispanic and race factors
    .with_columns(
        hisp_fct = (
            pl.when(pl.col("HISPAN").is_in([1,2,3,4])).then(pl.lit("hispanic"))
            .when(pl.col("HISPAN") == 0).then(pl.lit("non-hispanic"))
            .when(pl.col("HISPAN") == 9).then(pl.lit(None))
        ),
        race_fct = (
            pl.when(pl.col("RACE") == 1)
            .then(pl.lit("white"))
            .when(pl.col("RACE") == 2).then(pl.lit("black"))
            .when(pl.col("RACE").is_in([4,5,6])).then(pl.lit("asian"))
            .otherwise(pl.lit("other"))
        )
    )
    # race ethnicity factor  
    .with_columns(
        race_eth = (
            pl.when(pl.col("hisp_fct") == "hispanic").then(3)
            .when(
                (pl.col("hisp_fct") == "non-hispanic") & 
                (pl.col("race_fct") == "white")
            ).then(1)
            .when(
                (pl.col("hisp_fct") == "non-hispanic") & 
                (pl.col("race_fct") == "black")
            ).then(2)
            .when(
                (pl.col("hisp_fct") == "non-hispanic") & 
                (pl.col("race_fct") == "asian")
            ).then(4)
            .when(
                (pl.col("hisp_fct") == "non-hispanic") & 
                (pl.col("race_fct") == "other")
            ).then(5)
        )
    )
    # citizenship
    .with_columns(
        citizen = pl.when(pl.col("CITIZEN").is_in([0,1,2])).then(1)
        .when(pl.col("CITIZEN") == 3).then(0)
    )
    # disability
    .with_columns(
        disability = pl.when(
            (pl.col("DIFFSENS") == 2) |
            (pl.col("DIFFREM") == 2) |
            (pl.col("DIFFPHYS") == 2) |
            (pl.col("DIFFCARE") == 2) |
            (pl.col("DIFFMOB") == 2)
        ).then(1)
        .when(
            (pl.col("DIFFSENS") == 1) |
            (pl.col("DIFFREM") == 1) |
            (pl.col("DIFFPHYS") == 1) |
            (pl.col("DIFFCARE") == 1) |
            (pl.col("DIFFMOB") == 1)
        ).then(0)
    )
    # class worker
    .with_columns(
        class_worker=(
            pl.when(
                (pl.col("EMPSTAT") == 1) 
                & (pl.col("CLASSWKRD") == 13)
            )
            .then(1)
            .when(
                (pl.col("EMPSTAT") == 1) 
                & (pl.col("CLASSWKRD") == 14)
            )
            .then(2)
            .when(
                (pl.col("EMPSTAT") == 1) 
                & (pl.col("CLASSWKRD").is_in([22,23,25,27,28]))
            )
            .then(3)
            .when(
                (pl.col("EMPSTAT").is_in([0,2,3])) 
                & (pl.col("AGE") >= 65)
            )
            .then(4)
            .when(
                (pl.col("EMPSTAT").is_in([0,2,3])) 
                & (pl.col("AGE") < 65)
            )
            .then(5)
            .when(
                (pl.col("EMPSTAT") == 1) 
                & (pl.col("AGE") >= 65)
                & (pl.col("CLASSWKRD") == 29)
            )
            .then(4)
            .when(
                (pl.col("EMPSTAT") == 1) 
                & (pl.col("AGE") < 65)
                & (pl.col("CLASSWKRD") == 29)
            )
            .then(5)
            .otherwise(None)
        )
    )
    # child indicator
    .with_columns(
        child_flag = (
            pl.when(pl.col("AGE") < 18).then(1).otherwise(0)
        )
    )
    # sum of number of children and max household size
    .with_columns(
        child_sum=(
            pl.col("child_flag").sum().over("YEAR","SERIAL")
        )
    )
    # household size
    .with_columns(
        hh_size=(
            pl.col("PERNUM").max().over("YEAR","SERIAL")
        )
    )
    # household type factor
    .with_columns(
        household_type=(
            pl.when((pl.col("AGE") >= 18) & 
                    (pl.col("MARST").is_in([1,2])) &
                    (pl.col("child_sum") > 0)
                ).then(1)
            .when((pl.col("AGE") >= 18) & 
                    (pl.col("MARST").is_in([1,2])) &
                    (pl.col("child_sum") == 0)
                ).then(2)
            .when((pl.col("AGE") >= 18) & 
                    (pl.col("MARST").is_in([3,4,5,6])) &
                    (pl.col("child_sum") > 0)
                ).then(3)
            .when((pl.col("AGE") >= 18) & 
                    (pl.col("MARST").is_in([3,4,5,6])) &
                    (pl.col("child_sum") == 0)
                ).then(4)
            .when((pl.col("AGE") < 18) & 
                    (pl.col("MARST").is_in([1,2])) &
                    (pl.col("child_sum") > 0) &
                    (pl.col("hh_size") > 2)
                ).then(1)
            .when((pl.col("AGE") < 18) & 
                    (pl.col("MARST").is_in([1,2])) &
                    (pl.col("child_sum") > 0) & 
                    (pl.col("hh_size") == 2)
                ).then(2)
            .when((pl.col("AGE") < 18) & 
                    (pl.col("MARST").is_in([3,4,5,6])) &
                    (pl.col("child_sum") > 0) &
                    (pl.col("hh_size") > 1)
                ).then(3)
            .when((pl.col("AGE") < 18) & 
                    (pl.col("MARST").is_in([3,4,5,6])) &
                    (pl.col("child_sum") > 0) &
                    (pl.col("hh_size") == 1)
                ).then(4)
        )
    )
    # english as a first language factor
    .with_columns(
        english_at_home=(
            pl.when(pl.col("LANGUAGE") == 1).then(1)
            .when(pl.col("LANGUAGE").is_in([0,95,96,97])).then(None)
            .when(pl.col("LANGUAGE").is_between(2,94)).then(0)
        )
    )
    # welfare
    .with_columns(
        # public assistance person
        public_assistance_pp = (
            pl.when(pl.col("INCWELFR") == 99999).then(0)
            .when(
                (pl.col("INCWELFR") > 0) & (pl.col("INCWELFR") != 99999)
            )
            .then(1)
            .otherwise(0)
        ),
        # poverty
        poverty = (
            pl.when(pl.col("POVERTY") <= 100).then(1)
            .when(pl.col("POVERTY") == 1).then(1)
            .when(pl.col("POVERTY") == 0).then(None)
            .when(pl.col("POVERTY") == 501).then(0)
            .otherwise(0)
        ),
        # social security person
        social_security_pp = (
            pl.when(pl.col("INCSS") == 99999).then(0)
            .when(
                (pl.col("INCSS") > 0) &
                (pl.col("INCSS") != 99999)
            )
            .then(1)
            .otherwise(0)
        )
    )
    # household level
    .with_columns(
        # public assistance household
        public_assistance_hh=pl.col("public_assistance_pp").sum().over(["YEAR","SERIAL"]),
        # social security household
        social_security_hh=pl.col("social_security_pp").sum().over(["YEAR","SERIAL"]),
    )
    .with_columns(
        # public assistance
        public_assistance = (
            pl.when(pl.col("public_assistance_hh") > 0).then(1)
            .when(pl.col("public_assistance_hh") == 0).then(0)
        ),
        # social security
        social_security = (
            pl.when(pl.col("social_security_hh") > 0).then(1)
            .when(pl.col("social_security_hh") == 0).then(0)
        )
    )
    # income and poverty NA boolean indicators
    .with_columns(
        income_NA=pl.col("HHINCOME") == 9999999,
        poverty_NA=pl.col("POVERTY") == 0
    )
    # home value
    .with_columns(
        homevalue=(
            pl.when(pl.col("OWNERSHP") == 2).then(1)
            .when(pl.col("VALUEH") == 9999999).then(None)
            .when(pl.col("VALUEH") < 50000).then(2)
            .when(pl.col("VALUEH").is_between(50_000, 99_999)).then(3)
            .when(pl.col("VALUEH").is_between(100_000, 299_999)).then(4)
            .when(pl.col("VALUEH").is_between(300_000, 499_999)).then(5)
            .when(pl.col("VALUEH").is_between(500_000, 749_999)).then(6)
            .when(pl.col("VALUEH").is_between(750_000, 999_999)).then(7)
            .when(pl.col("VALUEH") >= 1_000_000).then(8)
        )
    )
    
)
ipums_df_t

### Imputing Metro
* Create a data frame used for the imputation.
* Remove any records that have any column with a null.
* Create a categorical field **metro_fct** that is calculated based on the value of the **TEHC_METRO** field that will either be "metro", "nonmetro" or null.
* Impute the records where **metro_fct is null**.
* Call R function **VIM.knn** to impute the values where missing.


In [None]:
ipums_impute_metro = (ipums_df_t
 .filter(
     (pl.col("YEAR") == 2018) &
     (pl.col("GQ").is_in([1,2])) &
     (pl.col("RELATE") == 1)
 )
 .with_columns(
     pl.col("STATEFIP").alias("state"),
     hh_income_rank=pl.col("hh_income") + 1,
     edu_rank=pl.col("edu") + 1,
     white=pl.when(pl.col("race_eth") == 0).then(1).otherwise(0),
     black=pl.when(pl.col("race_eth") == 1).then(1).otherwise(0),
     hispanic=pl.when(pl.col("race_eth") == 2).then(1).otherwise(0),
     asian=pl.when(pl.col("race_eth") == 3).then(1).otherwise(0),
     other_race=pl.when(pl.col("race_eth") == 4).then(1).otherwise(0),
     own_home=pl.when(pl.col("OWNERSHP") == 1).then(1).otherwise(0),
     homevalue_rank=pl.col("homevalue") + 1,
     self_emp=pl.when(pl.col("class_worker").is_in([0,1])).then(1).otherwise(0),
     wage_salary_emp=pl.when(pl.col("class_worker").is_in([2])).then(1).otherwise(0),
     not_employed=pl.when(pl.col("class_worker").is_in([3,4])).then(1).otherwise(0),
     age_rank=pl.col('age') + 1,
     married_withkids=pl.when(pl.col("household_type").is_in([0])).then(1).otherwise(0),
     married_nokids=pl.when(pl.col("household_type").is_in([1])).then(1).otherwise(0),
     single_withkids=pl.when(pl.col("household_type").is_in([2])).then(1).otherwise(0),
     single_nokids=pl.when(pl.col("household_type").is_in([3])).then(1).otherwise(0),
 )
 .select(
     pl.col("PERNUM"),
     pl.col("SERIAL"),
     pl.col("state"),
     pl.col("METRO"),
     pl.col("hh_income_rank"),
     pl.col("male"),
     pl.col("age_rank"),
     pl.col("edu_rank"),
     pl.col("white"),
     pl.col("black"),
     pl.col("hispanic"),
     pl.col("asian"),
     pl.col("other_race"),
     pl.col("own_home"),
     pl.col("homevalue_rank"),
     pl.col("disability"),
     pl.col("self_emp"),
     pl.col("wage_salary_emp"),
     pl.col("not_employed"),
     pl.col("married_withkids"),
     pl.col("married_nokids"),
     pl.col("single_withkids"),
     pl.col("single_nokids"),
     pl.col("english_at_home"),
     pl.col("public_assistance"),
     pl.col("social_security"),
     pl.col("poverty"),   
 )
 .drop_nulls()
 .with_columns(
     metro_fct=(
                    pl.when(pl.col("METRO") == 0).then(None)
                    .when(pl.col("METRO").is_in([2,3,4])).then(pl.lit("metro"))
                    .when(pl.col("METRO").is_in([1])).then(pl.lit("nonmetro"))
                    .otherwise(None)
                    .cast(pl.Categorical)
                )
 )
 .with_columns(
    pl.col("metro_fct").is_null().alias("metro_fct_was_missing")
 )
)
ipums_impute_metro

In [None]:
# create pandas version
ipums_impute_metro_pdf = ipums_impute_metro.to_pandas()
# create deep copy
ipums_impute_metro_pdf = ipums_impute_metro_pdf.copy()
# loop through columns
for c in ipums_impute_metro_pdf.columns:
    # detect string data types
    if str(ipums_impute_metro_pdf[c].dtype).startswith("string"):
        # cast string data types as object data type
        ipums_impute_metro_pdf[c] = ipums_impute_metro_pdf[c].astype(object)

In [None]:
# python to r conversion
with localconverter(ro.default_converter + pandas2ri.converter):
    r_ipums_impute_metro_pdf = ro.conversion.py2rpy(ipums_impute_metro_pdf)
# assign variable to R global environment
ro.globalenv["r_ipums_impute_metro_pdf"] = r_ipums_impute_metro_pdf

In [None]:
# execute R command to impute field
ro.r("""
suppressPackageStartupMessages(library(VIM))
suppressPackageStartupMessages(library(dplyr))

ipums_impute_metro <- VIM::kNN(r_ipums_impute_metro_pdf, variable = "metro_fct", imp_var = FALSE)
""")

In [None]:
# get the results from R and put them back into Polars data frame
with localconverter(ro.default_converter + pandas2ri.converter):
    ipums_impute_metro = pl.from_pandas(ro.conversion.rpy2py(ro.globalenv["ipums_impute_metro"]))
ipums_impute_metro

In [None]:
ipums_impute_metro.null_count()

In [None]:
ipums_impute_metro.height == ipums_impute_metro['SERIAL'].n_unique()

In [None]:
ipums_df_t.height == ipums_df_t['SERIAL','PERNUM','YEAR'].n_unique()

### Second Set of Transformations
* Join Metro imputation
* Calculate **metro** binary variable.
* Calculate **tenure** varaible.
* Drop duplicate columns (_right) that are created as a result of the left join.

In [None]:
ipums_df_t2 = (ipums_df_t
 # left join on metro table 
 .join(ipums_impute_metro,
       how='left',
       on=['SERIAL','PERNUM'])
#  metro
 .with_columns(
       metro=pl.when(pl.col("metro_fct") == "metro").then(1).otherwise(0)
 )
#  tenure
 .with_columns(
       tenure=(
              pl.when(pl.col("OWNERSHPD") == 12).then(1)
              .when(pl.col("OWNERSHPD") == 13).then(2)
              .when(pl.col("OWNERSHPD").is_in([21,22])).then(3)
       )
 )
)
# drop duplicate columns as a result of the join
ipums_df_t2 = ipums_df_t2.drop([col for col in ipums_df_t2.columns if col.endswith("_right")])
ipums_df_t2

## IPUMS Post Stratification Table

In [None]:
ipums_post_strat = (ipums_df_t2
 .filter(
     (pl.col("YEAR") == 2018) 
     & (pl.col("GQ").is_in([1,2]))
     & (pl.col("RELATE").is_in([1]))
 )
 .select(
     pl.col("PUMA"),
     pl.col("COUNTYFIP"),
     pl.col("state"),
     pl.col("metro"),
     pl.col("hh_income"),
     pl.col("male"),
     pl.col("age"),
     pl.col("edu"),
     pl.col("race_eth"),
     pl.col("tenure"),
     pl.col("homevalue"),
     pl.col("disability"),
     pl.col("class_worker"),
     pl.col("citizen"),
     pl.col("household_type"),
     pl.col("english_at_home"),
     pl.col("public_assistance"),
     pl.col("social_security"),
     pl.col("poverty"),
     pl.col("HHWT"),
 )
)

non_hhwt_group_fields = ipums_post_strat.select(~cs.contains("HHWT")).columns

ipums_post_strat = (ipums_post_strat
                    .group_by(non_hhwt_group_fields)
                    .agg(pl.col("HHWT").sum().alias("WGT"))
                   )
ipums_post_strat

### Race Props

In [None]:
race_prop_ipums = (ipums_df_t2
 .group_by("race_eth")
 .agg(
     pl.col("HHWT").sum().alias("racetot")
 )
 .with_columns(
     pl.col("racetot").sum().alias("totalpop")
 )
 .select(
     "race_eth",
     (pl.col("racetot") / pl.col("totalpop")).alias("raceprop")
 )
)
race_prop_ipums

In [None]:
race_prop_ipums_puma = (ipums_df_t2
 .filter(pl.col("PUMA").is_in(PUMA_CODES))
 .group_by("race_eth")
 .agg(
     pl.col("HHWT").sum().alias("racetot")
 )
 .with_columns(
     pl.col("racetot").sum().alias("totalpop")
 )
 .select(
     "race_eth",
     (pl.col("racetot") / pl.col("totalpop")).alias("raceprop")
 )
)
race_prop_ipums_puma

## Output

In [None]:
ipums_df_t2.write_parquet('output/ipums.parquet')

In [None]:
ipums_post_strat.write_parquet('output/ipums_post_stratification.parquet')