## Spark Setup

In [1]:
!apt-get update

# Download Java Virtual Machine (JVM)
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download Spark
!wget -q https://dlcdn.apache.org/spark/spark-3.2.3/spark-3.2.3-bin-hadoop3.2.tgz

Get:1 http://security.ubuntu.com/ubuntu focal-security InRelease [114 kB]
Hit:2 http://archive.ubuntu.com/ubuntu focal InRelease
Get:3 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
Get:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu focal InRelease [18.1 kB]
Get:5 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
Ign:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  InRelease
Get:7 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2004/x86_64  InRelease [1,581 B]
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  Release
Hit:9 http://ppa.launchpad.net/cran/libgit2/ubuntu focal InRelease
Hit:10 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu focal InRelease
Get:11 http://archive.ubuntu.com/ubuntu focal-backports InRelease [108 kB]
Get:12 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu focal InRelease [24.3 kB]
Hit:13 http://ppa.launc

In [5]:
# Unzip the file
!tar xf spark-3.2.3-bin-hadoop3.2.tgz

# Install findspark
!pip3 install -q findspark

# Install pypsark
!pip3 install pyspark

import os
import findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = '/content/spark-3.2.3-bin-hadoop3.2'
findspark.init()

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m19.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=e28478faa2e3a76e302dbc43fcfe2cfae6bd91a5d6a8a137bb6a60924b512c4d
  Stored in directory: /root/.cache/pip/wheels/b1/59/a0/a1a0624b5e865fd389919c1a10f53aec9b12195d6747710baf
Successfully built pyspark
Installing collected packages: py4j, pyspa

## Imports

In [6]:
# Standard library
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = 'all'
import multiprocessing
import warnings

# Third Party
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

# Spark
import pyspark.pandas as ps
from pyspark.sql import SparkSession, DataFrame, Window
import pyspark.sql.functions as f
from pyspark.sql.types import *

from google.colab import drive
drive.mount('/content/drive')

# Create spark_session
spark = SparkSession.builder.master('local[*]') \
                    .appName('imputation_strategies') \
                    .config('spark.sql.repl.eagerEval.enabled', True) \
                    .config('spark.sql.repl.eagerEval.maxNumRows', 15) \
                    .getOrCreate()

spark



Mounted at /content/drive


## Global Setting

In [7]:
seed = 1227
rs = np.random.RandomState(seed)

ps.set_option("display.max_rows", 15)

warnings.filterwarnings('ignore')

path_gdrive = '/content/drive/Shareddrives/DMP-W23/Repo/data/'

# Number of cpu's
multiprocessing.cpu_count()

2

## Data

In [8]:
df = spark.read.csv(path_gdrive + 'train_joined.csv', inferSchema=True, header=True)
df.count(), len(df.columns)

(122265, 30)

In [9]:
# Convert timestamp to date
df = df.withColumn('first_day_of_month', df.first_day_of_month.cast('string'))

In [10]:
df.printSchema()

root
 |-- row_id: string (nullable = true)
 |-- cfips: integer (nullable = true)
 |-- county: string (nullable = true)
 |-- state: string (nullable = true)
 |-- first_day_of_month: string (nullable = true)
 |-- microbusiness_density: double (nullable = true)
 |-- active: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- pct_foreign_born: double (nullable = true)
 |-- pct_bb: double (nullable = true)
 |-- median_hh_inc: double (nullable = true)
 |-- pct_it_workers: double (nullable = true)
 |-- pct_college: double (nullable = true)
 |-- earning: double (nullable = true)
 |-- treasury_yield_10yr: double (nullable = true)
 |-- monthly_unemp_rate: double (nullable = true)
 |-- med_rent_zero_bed: double (nullable = true)
 |-- med_rent_one_bed: double (nullable = true)
 |-- med_rent_two_beds: double (nullable = true)
 |-- med_rent_three_beds: double (nullable = true)
 |-- annual_tax_rate: double (nullable = true)
 |-- quarter: string (n

## Missing Values

For each column, return the missing values if any of the missing conditions is met, or else return None. Count the occurences of missing values for each column.

In [11]:
df.select([f.count(f.when(
    f.col(col).contains('None') | \
    f.col(col).contains('NULL') | \
   (f.col(col) == '') | \
    f.col(col).isNull() | \
    f.isnan(col), col
  )).alias(col) for col in df.columns])

row_id,cfips,county,state,first_day_of_month,microbusiness_density,active,year,month,pct_foreign_born,pct_bb,median_hh_inc,pct_it_workers,pct_college,earning,treasury_yield_10yr,monthly_unemp_rate,med_rent_zero_bed,med_rent_one_bed,med_rent_two_beds,med_rent_three_beds,annual_tax_rate,quarter,holiday_season,earning_group_lower_25_percentile,earning_group_middle_50_percentile,earning_group_upper_75_percentile,annual_tax_group_lower_25_percentile,annual_tax_group_middle_50_percentile,annual_tax_group_upper_75_percentile
0,0,0,0,0,0,0,0,0,31350,31350,31374,31350,31350,0,0,39,0,0,0,0,0,0,0,0,0,0,0,0,0


### Census Columns Investigation

For the census columns--- `pct_foreign_born`, `pct_bb`, `median_hh_inc`, `pct_college`--- we investigate if removing observations where year == 2022 reduces nullity to zero. In other words, we wish to make sure that the missing values in these columns are not caused by all the reshaping we did earlier.



In [12]:
census_data = (df.select([col for col in df.columns if col.startswith('pct')] + ['median_hh_inc', 'year', 'cfips']))
census_data

pct_foreign_born,pct_bb,pct_it_workers,pct_college,median_hh_inc,year,cfips
2.3,80.6,0.7,16.1,58731.0,2019,1001
2.3,82.7,0.6,16.7,57982.0,2020,1001
2.1,85.5,1.1,16.4,62660.0,2021,1001
,,,,,2022,1001
3.7,81.8,1.4,21.0,58320.0,2019,1003
3.4,85.1,1.0,20.2,61756.0,2020,1003
3.5,87.9,1.3,20.6,64346.0,2021,1003
,,,,,2022,1003
2.7,60.5,0.8,7.6,32525.0,2019,1005
2.6,64.6,1.1,7.3,34990.0,2020,1005


In [13]:
census_data_no_2022 = census_data.filter(f.col('year') != 2022)
census_data_no_2022.select(
    [f.count(f.when(f.col(col).isNull(), col)).alias(col) for col in census_data.columns if col not in ['year', 'cfips']]
)

pct_foreign_born,pct_bb,pct_it_workers,pct_college,median_hh_inc
0,0,0,0,24


All the percentages columns are good. However, there are still missing values in `median_hh_inc`. We'll read in the original census data and compare the missing values. 

In [14]:
original_census = spark.read.csv(path_gdrive + 'original_census_starter.csv', inferSchema=True, header=True)
original_census = original_census.select([col for col in original_census.columns if col.startswith('median')] + ['cfips'])
original_census.count(), len(original_census.columns)

(3142, 6)

Check for missing values in the income columns in the original census data:

In [15]:
original_census.select([f.count(f.when(f.col(col).isNull(), col)).alias(col) for col in original_census.columns if col != 'cfips'])

median_hh_inc_2017,median_hh_inc_2018,median_hh_inc_2019,median_hh_inc_2020,median_hh_inc_2021
0,1,0,2,2


Check the 'cfips' of these missing rows in the original census data:

In [16]:
original_census.filter(f.col('median_hh_inc_2018').isNull() | f.col('median_hh_inc_2020').isNull() | f.col('median_hh_inc_2021').isNull())

median_hh_inc_2017,median_hh_inc_2018,median_hh_inc_2019,median_hh_inc_2020,median_hh_inc_2021,cfips
86019,82306.0,79867,,,2261
33422,,39952,42264.0,46994.0,35039
46534,53194.0,53088,,38659.0,48243
80938,81875.0,83750,44076.0,,48301


Check the 'cfips' of the missing rows in the new training data. Other than the missing values in 2022 (no census data), are there any missing values: 

In [17]:
census_data_no_2022.filter(f.col('median_hh_inc').isNull()).count()

24

In [18]:
census_data_no_2022.filter(f.col('median_hh_inc').isNull()).select('cfips').distinct()

cfips
48301
48243


This makes sense, since the two counties above each contribute 12 months of missing values in the `median_hh_inc` column of the new training set. County 48301 is missing median income values in year 2021 and county 48243 is missing median income values in year 2020. None of these years is 2022, and so removing 2022 did not remove the remaning 24 missing values in the median income column.

In [19]:
census_data_no_2022.filter(f.col('median_hh_inc').isNull() & (f.col('year') == 2020)).count()

12

In [20]:
census_data_no_2022.filter(f.col('median_hh_inc').isNull() & (f.col('year') == 2021)).count()

12

In the original census data, county 2261 also has missing values for 2020 and 2021 while county 35039 has missing median incomes in 2018. What happened to those in the new training set? 

In [21]:
census_data.filter((f.col('cfips') == '2261') & (f.col('year') == 2020))

pct_foreign_born,pct_bb,pct_it_workers,pct_college,median_hh_inc,year,cfips


In [22]:
census_data.filter((f.col('cfips') == '2261') & (f.col('year') == 2021))

pct_foreign_born,pct_bb,pct_it_workers,pct_college,median_hh_inc,year,cfips


In [23]:
census_data.filter((f.col('cfips') == '35039') & (f.col('year') == 2018))

pct_foreign_born,pct_bb,pct_it_workers,pct_college,median_hh_inc,year,cfips


1. The new training set does not have 2018 since the original train set only covers the period from 2019 to 2022. So even if county 35039 has missing median income values in the original census data in 2018, it will not be carried over to the new training set after joining. 

2. Why does missing median incomes for county 2261 in 2020 and 2021 not show up? This is because county 2261 does not exist in the new training set. 

In [24]:
df.filter(f.col('cfips') == '2261').count()

0

**Hypothesis**: Was county 2261 in the original census data but not the original training data? If so, we can explain why it isn't in the new training set.

In [25]:
# Read in original training set
original_train = spark.read.csv(path_gdrive + 'train.csv', inferSchema=True, header=True)
original_train = original_train.select('cfips')
original_train.count(), len(original_train.columns)

(122265, 1)

In [26]:
original_train.filter(f.col('cfips') == '2261').count()

0

In [27]:
original_census.filter(f.col('cfips') == '2261').count()

1

This all makes sense now. County 2261 did not appear in the original training set. It did exist in the original census data and contains missing values. However, because it is not in the original training set, these missing values are not carried over when we merged the census data with the training data. We left joined the census data onto the training set, preserving all rows in the training set and not all rows of the census data. 

In [28]:
del census_data_no_2022

### Unemployment Missing Values Investigation

For the unemployment column, there are 39 missing values:

In [29]:
df.filter(f.col('monthly_unemp_rate').isNull()).select('cfips').distinct()

cfips
15005


All missing values in this column belong to one specific county.

**Hypothesis**: Is it the case that the this county is present in the original trianing set but not in the unemployment data set? Therefore, a left join of the unemployment data set onto the original training set (keeping all the rows in the original training set) will lead to all values for county 15005 missing in the new training set.

In [30]:
unemp = spark.read.csv(path_gdrive + 'OUTPUT_unemployment.csv', header=True, inferSchema=True).select('cfips')
unemp.count(), len(unemp.columns)

(247589, 1)

In [31]:
original_train.filter(f.col('cfips') == '15005').count()

39

In [32]:
unemp.filter(f.col('cfips') == '15005').count()

0

We confirmed our hypothesis--- the missing values in the `monthly_unemp_rate` all belong to one county which was present in the original training set but not the external unemployment data. 

In [33]:
df.filter(f.col('cfips') == '15005').select(['monthly_unemp_rate', 'first_day_of_month']).show(39)

+------------------+------------------+
|monthly_unemp_rate|first_day_of_month|
+------------------+------------------+
|              null|        2019-08-01|
|              null|        2020-08-01|
|              null|        2021-08-01|
|              null|        2022-08-01|
|              null|        2019-09-01|
|              null|        2020-09-01|
|              null|        2021-09-01|
|              null|        2022-09-01|
|              null|        2019-10-01|
|              null|        2020-10-01|
|              null|        2021-10-01|
|              null|        2022-10-01|
|              null|        2019-11-01|
|              null|        2020-11-01|
|              null|        2021-11-01|
|              null|        2019-12-01|
|              null|        2020-12-01|
|              null|        2021-12-01|
|              null|        2020-01-01|
|              null|        2021-01-01|
|              null|        2022-01-01|
|              null|        2020-02-01|


County 15005 is Kalawao County, Hawaii. We can either impute this or, if possible, find the monthly unemployment data for this county for the years covered in the sample. For now, we will impute these values using Hawaii's state level monthly unemployment rates:

In [34]:
hawaii_unemp = spark.read.csv(path_gdrive + 'Archived/hawaii_monthly_unemployment_rate.csv', header=True, inferSchema=True)
hawaii_unemp = hawaii_unemp.withColumnRenamed('DATE', 'first_day_of_month').withColumnRenamed('HIURN', 'monthly_unemp_rate_state')
hawaii_unemp.show(5)

+------------------+------------------------+
|first_day_of_month|monthly_unemp_rate_state|
+------------------+------------------------+
|        2019-08-01|                     2.4|
|        2019-09-01|                     2.3|
|        2019-10-01|                     2.2|
|        2019-11-01|                     2.1|
|        2019-12-01|                     1.7|
+------------------+------------------------+
only showing top 5 rows



In [35]:
# Subset to obtain all rows with missing unemployment rates
missing_unemp_df = df.filter(f.col('cfips') == '15005')

# Assign state level rates to the subsetted dataframe
missing_unemp_df = missing_unemp_df.join(hawaii_unemp, how='inner', on='first_day_of_month')

# Rename and drop original unemployment rate column with missing values
missing_unemp_df = missing_unemp_df.drop('monthly_unemp_rate').withColumnRenamed('monthly_unemp_rate_state', 'monthly_unemp_rate')

# Reorder so we can row-bind the subsetted dataframe with the rest of the data
missing_unemp_df = missing_unemp_df.select(df.columns)

# Row-bind
df = df.filter(f.col('cfips') != '15005').union(missing_unemp_df)
df

row_id,cfips,county,state,first_day_of_month,microbusiness_density,active,year,month,pct_foreign_born,pct_bb,median_hh_inc,pct_it_workers,pct_college,earning,treasury_yield_10yr,monthly_unemp_rate,med_rent_zero_bed,med_rent_one_bed,med_rent_two_beds,med_rent_three_beds,annual_tax_rate,quarter,holiday_season,earning_group_lower_25_percentile,earning_group_middle_50_percentile,earning_group_upper_75_percentile,annual_tax_group_lower_25_percentile,annual_tax_group_middle_50_percentile,annual_tax_group_upper_75_percentile
1001_2019-08-01,1001,Autauga County,Alabama,2019-08-01,3.0076818,1249,2019,8,2.3,80.6,58731.0,0.7,16.1,984.159963088269,1.59,0.0289999999999999,620.0,743.0,880.0,1112.0,6.03,Q3,0,1,0,0,0,1,0
1001_2020-08-01,1001,Autauga County,Alabama,2020-08-01,3.174679,1328,2020,8,2.3,82.7,57982.0,0.6,16.7,1076.30848302157,0.71,0.053,615.0,741.0,876.0,1105.0,6.07,Q3,0,0,1,0,0,1,0
1001_2021-08-01,1001,Autauga County,Alabama,2021-08-01,3.2199171,1358,2021,8,2.1,85.5,62660.0,1.1,16.4,1072.86180471329,1.29,0.03,685.0,820.0,972.0,1229.0,6.05,Q3,0,0,1,0,0,1,0
1001_2022-08-01,1001,Autauga County,Alabama,2022-08-01,3.4238517,1455,2022,8,,,,,,993.996523292287,2.84,0.024,687.0,816.0,976.0,1234.0,6.04,Q3,0,1,0,0,0,1,0
1003_2019-08-01,1003,Baldwin County,Alabama,2019-08-01,7.2391562,11464,2019,8,3.7,81.8,58320.0,1.4,21.0,984.159963088269,1.59,0.0289999999999999,816.0,835.0,959.0,1352.0,6.03,Q3,0,1,0,0,0,1,0
1003_2020-08-01,1003,Baldwin County,Alabama,2020-08-01,7.853229,12756,2020,8,3.4,85.1,61756.0,1.0,20.2,1076.30848302157,0.71,0.06,806.0,811.0,992.0,1355.0,6.07,Q3,0,0,1,0,0,1,0
1003_2021-08-01,1003,Baldwin County,Alabama,2021-08-01,7.9186049,13192,2021,8,3.5,87.9,64346.0,1.3,20.6,1072.86180471329,1.29,0.031,821.0,827.0,1054.0,1428.0,6.05,Q3,0,0,1,0,0,1,0
1003_2022-08-01,1003,Baldwin County,Alabama,2022-08-01,8.4911499,14545,2022,8,,,,,,993.996523292287,2.84,0.026,872.0,878.0,1149.0,1523.0,6.04,Q3,0,1,0,0,0,1,0
1005_2019-08-01,1005,Barbour County,Alabama,2019-08-01,1.0731378,222,2019,8,2.7,60.5,32525.0,0.8,7.6,984.159963088269,1.59,0.0409999999999999,538.0,541.0,716.0,901.0,6.03,Q3,0,1,0,0,0,1,0
1005_2020-08-01,1005,Barbour County,Alabama,2020-08-01,0.9829942,200,2020,8,2.6,64.6,34990.0,1.1,7.3,1076.30848302157,0.71,0.091,508.0,512.0,674.0,840.0,6.07,Q3,0,0,1,0,0,1,0


Now, we should have no missing values for the unemployment column.

In [36]:
df.filter(f.col('monthly_unemp_rate').isNull()).count()

0

In [37]:
del original_train, unemp

## Imputation Strategy for Census Data

For the census columns that have yearly frequency, we will handle the imputation based on the `cfips` column. In other words, for each unique value of `cfips`, we will use the previous years (2019, 2020, 2021) to impute the values for year 2022, which are missing in the new training set.

To get a sense of the distributions of these columns, I will iterate through 5 random `cfips` (there are over 3000 unique `cfips`) at a time, group by year, and examine the year-over-year changes for each of the census columns.

* If the year-over-year changes for these columns are quite significant or they fluctuate a lot, it is more sensible to impute 2022 values using the medians computed based on the past three years.

* If the year-over-year changes for these columns are not notable, it might make sense to simply impute 2022 values using 2021 values.

In [38]:
# List of unique cfips
unique_cfips = list(census_data.select('cfips').distinct().rdd.flatMap(lambda x: x).collect())

In [39]:
for cfips in np.random.choice(unique_cfips, 5):
  (census_data.filter(f.col('cfips') == str(cfips))
              .groupBy('year')
              .agg(
                  f.first('pct_college').alias('pct_college'), # These columns have yearly frequency and so we can simply just get the first value for each year
                  f.first('pct_it_workers').alias('pct_it_workers'),
                  f.first('pct_bb').alias('pct_bb'),
                  f.first('pct_foreign_born').alias('pct_foreign_born'),
                  f.first('median_hh_inc').alias('median_hh_inc')
              ))
  print('\n')

year,pct_college,pct_it_workers,pct_bb,pct_foreign_born,median_hh_inc
2022,,,,,
2019,26.4,0.5,83.1,6.2,58136.0
2020,26.5,0.4,83.4,6.1,57471.0
2021,27.2,0.5,87.5,5.4,59178.0






year,pct_college,pct_it_workers,pct_bb,pct_foreign_born,median_hh_inc
2022,,,,,
2019,27.7,2.1,77.7,2.7,68685.0
2020,27.9,1.5,79.6,2.4,69291.0
2021,30.3,1.5,83.2,2.7,75089.0






year,pct_college,pct_it_workers,pct_bb,pct_foreign_born,median_hh_inc
2022,,,,,
2019,8.3,2.1,66.6,1.6,53056.0
2020,9.7,0.9,68.0,2.9,57589.0
2021,9.2,0.9,71.8,1.8,62125.0






year,pct_college,pct_it_workers,pct_bb,pct_foreign_born,median_hh_inc
2022,,,,,
2019,8.5,1.3,66.0,4.6,47811.0
2020,9.9,1.2,73.5,5.0,50912.0
2021,9.7,1.7,80.5,5.1,57893.0






year,pct_college,pct_it_workers,pct_bb,pct_foreign_born,median_hh_inc
2022,,,,,
2019,10.3,1.6,76.0,3.9,49462.0
2020,10.6,1.6,78.7,3.3,50146.0
2021,10.9,1.4,79.2,3.2,53533.0






A few runs of the loop above reveal that the census columns, especially the percentage `pct` columns, can fluctuate quite a bit from year to year--- sometimes increasing and sometimes decreasing. Therefore, for these columns, we will impute 2022's values based on the medians of the past three years. 

The `median_hh_inc` column generally exhibits upward trends, and so taking the median may lead to underestimations. For this column, we will impute 2022's values using 2021's values. Counties 48301 and 48243 also have missing values for years other than 2022. For those two counties, the values will be imputed using the medians as special cases.

In [40]:
county_48301_median = df.filter(f.col('cfips') == '48301').approxQuantile('median_hh_inc', [0.5], 0)[0] # Returns a list so subset to get float
county_48243_median = df.filter(f.col('cfips') == '48243').approxQuantile('median_hh_inc', [0.5], 0)[0]
county_48301_median, county_48243_median

(44076.0, 38659.0)

In [41]:
# Impute for counties 48301 and 48243 for years other than 2022 (special case since others are imputed using 2021 values)
df = df.withColumn('median_hh_inc', f.when(
    (f.col('cfips') == '48301') & (f.col('median_hh_inc').isNull()), county_48301_median
  ).otherwise(f.col('median_hh_inc')))

df = df.withColumn('median_hh_inc', f.when(
    (f.col('cfips') == '48243') & (f.col('median_hh_inc').isNull()), county_48301_median
  ).otherwise(f.col('median_hh_inc')))

Next, we impute the rest of the missing values (where year == 2022) in the `median_hh_inc` column for all other counties.

In [42]:
# Convert to pandas
df = df.toPandas()

df['median_hh_inc'] = df.groupby('cfips')['median_hh_inc'].transform(lambda col: col.fillna(method='ffill')) # Fill foward 

For the percentages columns:

In [43]:
# Compute the medians by county
(spark.createDataFrame(df).filter(f.col('year') != 2022)
   .groupBy('cfips')
   .agg(
       f.expr('percentile_approx(pct_college, 0.5)').alias('pct_college'), 
       f.expr('percentile_approx(pct_it_workers, 0.5)').alias('pct_it_workers'),
       f.expr('percentile_approx(pct_bb, 0.5)').alias('pct_bb'),
       f.expr('percentile_approx(pct_foreign_born, 0.5)').alias('pct_foreign_born')
   ))

cfips,pct_college,pct_it_workers,pct_bb,pct_foreign_born
1001,16.4,0.7,82.7,2.3
1003,20.6,1.3,85.1,3.5
1005,7.3,0.8,64.6,2.6
1007,7.4,1.7,74.6,1.5
1009,8.9,0.9,79.6,4.5
1011,7.4,0.3,60.1,1.2
1013,9.7,1.4,73.6,1.7
1015,10.5,1.0,79.8,2.5
1017,9.6,2.1,74.5,1.9
1019,6.3,0.9,75.0,1.9


In [44]:
# Impute using medians
df[[col for col in df.columns if col.startswith('pct')]] = df.groupby('cfips')[[col for col in df.columns if col.startswith('pct')]].transform(lambda col: col.fillna(col.median()))

Check the data frame for missing values:

In [45]:
df.isna().sum()

row_id                                   0
cfips                                    0
county                                   0
state                                    0
first_day_of_month                       0
microbusiness_density                    0
active                                   0
year                                     0
month                                    0
pct_foreign_born                         0
pct_bb                                   0
median_hh_inc                            0
pct_it_workers                           0
pct_college                              0
earning                                  0
treasury_yield_10yr                      0
monthly_unemp_rate                       0
med_rent_zero_bed                        0
med_rent_one_bed                         0
med_rent_two_beds                        0
med_rent_three_beds                      0
annual_tax_rate                          0
quarter                                  0
holiday_sea

## Write to Disk

In [46]:
df.to_csv(path_gdrive + 'train_imputed.csv', index=False)