In [None]:
from pyspark.sql import SparkSession
import os
from pyspark.sql.functions import when, col
from pyspark.sql import functions as F



In [2]:
# Create spark session
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/07 17:25:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Define the base directory for processed data
BASE = "../data/processed/parquet_pums"

In [4]:
# List all parquet files for the PUMS data
files = sorted([f for f in os.listdir(BASE) if f.endswith(".parquet") and f != "pums_all.parquet"])
files

['pums_2015.parquet',
 'pums_2016.parquet',
 'pums_2017.parquet',
 'pums_2018.parquet',
 'pums_2019.parquet',
 'pums_2021.parquet',
 'pums_2022.parquet',
 'pums_2023.parquet']

In [5]:
# Convert .parquet files to DataFrames
dfs = []

for file in files:
    year = int(file.replace("pums_", "").replace(".parquet", ""))
    path = os.path.join(BASE, file)
    
    print("Loading:", path)
    df = spark.read.parquet(path)
    df = df.withColumn("YEAR", df.YEAR.cast("int"))
    dfs.append(df)

Loading: ../data/processed/parquet_pums/pums_2015.parquet


                                                                                

Loading: ../data/processed/parquet_pums/pums_2016.parquet
Loading: ../data/processed/parquet_pums/pums_2017.parquet
Loading: ../data/processed/parquet_pums/pums_2018.parquet
Loading: ../data/processed/parquet_pums/pums_2019.parquet
Loading: ../data/processed/parquet_pums/pums_2021.parquet
Loading: ../data/processed/parquet_pums/pums_2022.parquet
Loading: ../data/processed/parquet_pums/pums_2023.parquet


In [6]:
# Now, combine all DataFrames into a single DataFrame
pums = dfs[0]
for df in dfs[1:]:
    pums = pums.unionByName(df)

In [7]:
# Print the schema
pums.printSchema()

root
 |-- ST: integer (nullable = true)
 |-- PUMA: integer (nullable = true)
 |-- AGEP: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- RAC1P: integer (nullable = true)
 |-- HISP: integer (nullable = true)
 |-- MAR: integer (nullable = true)
 |-- SCHL: integer (nullable = true)
 |-- ESR: integer (nullable = true)
 |-- OCCP: integer (nullable = true)
 |-- SOCP: string (nullable = true)
 |-- WAGP: integer (nullable = true)
 |-- PINCP: integer (nullable = true)
 |-- WKW: integer (nullable = true)
 |-- DIS: integer (nullable = true)
 |-- DEAR: integer (nullable = true)
 |-- DEYE: integer (nullable = true)
 |-- DOUT: integer (nullable = true)
 |-- CIT: integer (nullable = true)
 |-- NATIVITY: integer (nullable = true)
 |-- YEAR: integer (nullable = true)



Column names:

- ST = State Codes (FIPS)
- PUMA = Public Use Microdata Area (an area with ~100k population)
- AGEP = Age of the person in years
- SEX = Sex of the person (1 male, 2 female)
- RAC1P = Race
- HISP = Hispanic origin (0 not hispanic, 1-4 hispanic)
- MAR = Marital status (1 married, 3 widowed, 4 divorced, 6 never married)
- SCHL = level of education
- ESR = Employment Status Recode (employed/unemployed, armed forced, not in labor force)
- OCCP = Occupation code (job type)
- SOCP = SOC occupation code
- WAGP = Wage or salary income
- PINCP = Total personal income
- WKW = Weeks worked last year (the smaller the number, the MORE weeks)
- DIS = Disability status (1 yes, 2 no)
- DEAR = Hearing difficulty (1 yes, 2 no)
- DEYE = Vision difficulty (1 yes, 2 no)
- DOUT = Mobility difficulty (1 yes, 2 no)
- CIT = Citizenship status (1 US born citizen, 2 born in PR/US territories, 3 born abroad to American parents, 4 naturalized citizen, 5 not a citizen)
- YRNAT = Year naturalized (if applicable)
- NATIVITY = Nativity status (1 native, 2 foreign-born)
- YEAR = ACS survey year (2015-2023)

While converting .csv files to .parquets, we got rid of a lot of columns. We have 21 columns with the most important data for our analysis.

In [8]:
print("Total rows:", pums.count())



Total rows: 25979410


                                                                                

We have 25,979,410 rows. Now, we need to clean it and standartize it.

In [9]:
pums.describe().show()

25/12/07 17:26:34 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+-------+------------------+-----------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+-----------------+------------------+------------------+-------------------+------------------+------------------+------------------+-------------------+------------------+
|summary|                ST|             PUMA|              AGEP|                SEX|             RAC1P|              HISP|               MAR|             SCHL|               ESR|             OCCP|              SOCP|              WAGP|            PINCP|               WKW|               DIS|               DEAR|              DEYE|              DOUT|               CIT|           NATIVITY|              YEAR|
+-------+------------------+-----------------+------------------+-------------------+------------------+------------------+------------------+-----------------+------------------+-----

From this, we can see that in some columns, a lot of values are missing, especially in "ST" (state code), "OCCP" (occupation), "SOCP" (SOC occupation code), "WKW" (weeks worked), and "ESR" (employment status recode) columns. Since we have a lot of data overall, we will remove rows with missing data in "ST", "SCHL", and "ESR". For the other ones, more complex data manipulation is needed.

In [12]:
missing_counts = (
    pums.select([
        F.sum(F.col(c).isNull().cast("int")).alias(c + "_missing")
        for c in pums.columns
    ])
)

In [13]:
missing_counts.show(vertical=True)



-RECORD 0--------------------
 ST_missing       | 3405809  
 PUMA_missing     | 0        
 AGEP_missing     | 0        
 SEX_missing      | 0        
 RAC1P_missing    | 0        
 HISP_missing     | 0        
 MAR_missing      | 0        
 SCHL_missing     | 744163   
 ESR_missing      | 4531141  
 OCCP_missing     | 10584298 
 SOCP_missing     | 10584298 
 WAGP_missing     | 4213088  
 PINCP_missing    | 4213088  
 WKW_missing      | 19416650 
 DIS_missing      | 0        
 DEAR_missing     | 0        
 DEYE_missing     | 0        
 DOUT_missing     | 4213088  
 CIT_missing      | 0        
 NATIVITY_missing | 0        
 YEAR_missing     | 0        



                                                                                

In [14]:
pums_clean = (
    pums
    .filter(F.col("ST").isNotNull())
    .filter(F.col("SCHL").isNotNull())
    .filter(F.col("ESR").isNotNull())
)

In [16]:
print("Original rows:", pums.count())
print("Cleaned rows:", pums_clean.count())


Original rows: 25979410




Cleaned rows: 18600624


                                                                                

In [17]:
# Replace missing values in SOCP with "Unknown"
pums_clean = pums_clean.withColumn("SOCP", F.coalesce(F.col("SOCP"), F.lit("Unknown")))

In [18]:
num_cols = ["OCCP", "WAGP", "PINCP", "WKW", "DOUT"]

for c in num_cols:
    pums_clean = pums_clean.withColumn(c, F.coalesce(F.col(c), F.lit(0)))

In [19]:
# Verify
pums_clean.describe().show()
print("Final row count:", pums_clean.count())

                                                                                

+-------+------------------+-----------------+-----------------+-------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+-------------------+-------------------+------------------+-------------------+------------------+------------------+------------------+
|summary|                ST|             PUMA|             AGEP|                SEX|             RAC1P|              HISP|              MAR|              SCHL|               ESR|              OCCP|              SOCP|              WAGP|            PINCP|               WKW|                DIS|               DEAR|              DEYE|               DOUT|               CIT|          NATIVITY|              YEAR|
+-------+------------------+-----------------+-----------------+-------------------+------------------+------------------+-----------------+------------------+------------------+----

                                                                                

In [20]:
missing_counts_2 = (
    pums_clean.select([
        F.sum(F.col(c).isNull().cast("int")).alias(c + "_missing")
        for c in pums_clean.columns
    ])
)

In [21]:
missing_counts_2.show(vertical=True)



-RECORD 0---------------
 ST_missing       | 0   
 PUMA_missing     | 0   
 AGEP_missing     | 0   
 SEX_missing      | 0   
 RAC1P_missing    | 0   
 HISP_missing     | 0   
 MAR_missing      | 0   
 SCHL_missing     | 0   
 ESR_missing      | 0   
 OCCP_missing     | 0   
 SOCP_missing     | 0   
 WAGP_missing     | 0   
 PINCP_missing    | 0   
 WKW_missing      | 0   
 DIS_missing      | 0   
 DEAR_missing     | 0   
 DEYE_missing     | 0   
 DOUT_missing     | 0   
 CIT_missing      | 0   
 NATIVITY_missing | 0   
 YEAR_missing     | 0   



                                                                                

We got rid of the rows where missing values would prevent us from doing proper analysis. For all others, replaced with "Unknown" or 0. Now, the data is 18,600,624 rows long.

In [24]:
spark.conf.set("spark.sql.parquet.compression.codec", "snappy")

In [None]:
pums_clean.write.mode("overwrite").parquet("../data/processed/parquet_pums/pums_all_cleaned.parquet")
print("Saved cleaned file!")



Saved cleaned file!


                                                                                

25/12/07 18:48:21 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 152620 ms exceeds timeout 120000 ms
25/12/07 18:48:21 WARN SparkContext: Killing executors is not supported by current scheduler.
25/12/07 18:48:25 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$