# Data Import & Preprocessing 
### Project: Exploring Trends in US Happiness with Census Data
Team Members: Taylor Witte, Donald Yu, Praveen Manimaran, Vitush Agarwal, Parker Aman

UCSD Spring 2024 232R Big Data Analytics Using Spark 


## Enviroment Setup

In [1]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import warnings
import logging
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col, element_at, udf
from pyspark.sql.functions import explode
from pyspark_dist_explore import Histogram, hist, distplot, pandas_histogram
from pyspark.sql import Row
from pyspark.sql.functions import sum as spark_sum
from pyspark.sql.functions import when
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.ml import Pipeline
from sklearn.model_selection import train_test_split
from pyspark.ml.functions import vector_to_array
from pyspark.ml.feature import VectorAssembler, MinMaxScaler, PCA, PCAModel
from pyspark.sql.types import ArrayType, DoubleType, NumericType
from pyspark.ml.linalg import Vectors
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import pylab as pl
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

## Import Datasets

### Import World Happiness Data

In [2]:
# Import necessary modules
from pyspark.sql import SparkSession

# Create a SparkSession object
spark = SparkSession.builder \
    .appName("BigDataGroupProject") \
    .config("spark.some.config.option", "some-value") \
    .config("spark.executor.memory", "64g") \
    .config("spark.driver.memory", "64g") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.3") \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("ERROR")

24/06/08 08:32:32 WARN Utils: Your hostname, Taylors-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.4.60 instead (on interface en0)
24/06/08 08:32:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/08 08:32:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
#World Happiness Data
world_happiness = pd.read_csv("C:/Users/Parker Aman/dev/Dev_School/Big Data/Group Project/World Happiness Report.csv")

# Filter for USA & years of cencus data
US_happiness =  world_happiness.loc[world_happiness['Country Name'] == 'United States']
US_happiness = US_happiness[US_happiness.Year >= 2012]
US_happiness = US_happiness[US_happiness.Year != 2020]


In [4]:
#World Happiness Results 
happiness_rank = pd.read_csv("C:/Users/Parker Aman/dev/Dev_School/Big Data/Group Project/World_Happiness_Index.csv"
)

# Filter for USA & years of cencus data
happiness_rank = happiness_rank.loc[happiness_rank['Country'] == 'United States']
happiness_rank = happiness_rank[happiness_rank.Year != 2023]
happiness_rank = happiness_rank[happiness_rank.Year != 2020]
happiness_rank = happiness_rank.rename(columns={'Country': 'Country Name', 'Year':'Year','Index':'Happiness_Index',
                       'Rank':'Happiness_Rank'})
happiness_rank

Unnamed: 0,Country Name,Year,Happiness_Index,Happiness_Rank
1444,United States,2013,7.082,17
1445,United States,2015,7.119,15
1446,United States,2016,7.104,13
1447,United States,2017,6.993,14
1448,United States,2018,6.886,18
1449,United States,2019,6.892,19
1451,United States,2021,6.951,19
1452,United States,2022,6.977,16
1523,United States,2012,7.27,6


### Import Individual Census Data

In [3]:
#Import Individual Census Data 

# Define the path to the CSV file
file_path = "usa_00006.csv"
 #change this to your own!!

# Read the CSV file into a DataFrame
df_id = spark.read.csv(file_path, header=True, inferSchema=True)

#Normalize Monitary Values to 2000 with CPI99
df_id = df_id.withColumn('AINCTOT', df_id.INCTOT * df_id.CPI99)
df_id = df_id.withColumn('AFTOTINC', df_id.FTOTINC * df_id.CPI99)
df_id = df_id.withColumn('AINCWELFR', df_id.INCWELFR * df_id.CPI99)
df_id = df_id.withColumn('AINCINVST', df_id.INCINVST * df_id.CPI99)
#df_id = df_id.withColumn('APOVERTY', df_id.POVERTY * df_id.CPI99)
#Store raw monetary values
raw_id = df_id.select('SAMPLE', 'CBSERIAL', 'INCTOT', 'FTOTINC', 'INCWELFR', 'INCINVST')
#Remove raw monetary value from dataframe 
df_id = df_id.drop('INCTOT', 'FTOTINC', 'INCWELFR', 'INCINVST')

# Show the schema of the DataFrame
df_id.printSchema()

# Show the first few rows of the DataFrame
df_id.show(5)

                                                                                

root
 |-- YEAR: integer (nullable = true)
 |-- SAMPLE: integer (nullable = true)
 |-- SERIAL: integer (nullable = true)
 |-- CBSERIAL: long (nullable = true)
 |-- HHWT: double (nullable = true)
 |-- CLUSTER: long (nullable = true)
 |-- CPI99: double (nullable = true)
 |-- STRATA: integer (nullable = true)
 |-- GQ: integer (nullable = true)
 |-- PERNUM: integer (nullable = true)
 |-- CBPERNUM: integer (nullable = true)
 |-- PERWT: double (nullable = true)
 |-- FAMSIZE: integer (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- MARST: integer (nullable = true)
 |-- RACE: integer (nullable = true)
 |-- RACED: integer (nullable = true)
 |-- CITIZEN: integer (nullable = true)
 |-- HCOVANY: integer (nullable = true)
 |-- SCHOOL: integer (nullable = true)
 |-- EDUC: integer (nullable = true)
 |-- EDUCD: integer (nullable = true)
 |-- SCHLTYPE: integer (nullable = true)
 |-- EMPSTAT: integer (nullable = true)
 |-- EMPSTATD: integer (nullable = true)


### Import Household Census Data

In [4]:
#Import Household Census Data 

# Define the path to the CSV file
file_path = "usa_00007.csv" #change this to your own!!

# Read the CSV file into a DataFrame
df_hh = spark.read.csv(file_path, header=True, inferSchema=True)

#Since rows represent individuals but we want households remove duplicates
df_hh = df_hh.dropDuplicates()

# Normalize monetary values to 2000 standard with CP199 multiplier
df_hh = df_hh.withColumn('ARENTGRS', df_hh.RENTGRS * df_hh.CPI99)
df_hh = df_hh.withColumn('ACONDOFEE', df_hh.CONDOFEE * df_hh.CPI99)
df_hh = df_hh.withColumn('AMOBLHOME', df_hh.MOBLHOME * df_hh.CPI99)
df_hh = df_hh.withColumn('AHHINCOME', df_hh.HHINCOME * df_hh.CPI99)
df_hh = df_hh.withColumn('AVALUEH', df_hh.VALUEH * df_hh.CPI99)
df_hh = df_hh.withColumn('ACOSTELEC', df_hh.COSTELEC * df_hh.CPI99)
df_hh = df_hh.withColumn('ACOSTGAS', df_hh.COSTGAS * df_hh.CPI99)
df_hh = df_hh.withColumn('ACOSTWATR', df_hh.COSTWATR * df_hh.CPI99)
df_hh = df_hh.withColumn('ACOSTFUEL', df_hh.COSTFUEL * df_hh.CPI99)

#Store raw monetary values
raw_id = df_hh.select('SAMPLE', 'RENTGRS', 'CONDOFEE', 'MOBLHOME', 'HHINCOME',
                       'VALUEH', 'COSTELEC', 'COSTGAS', 'COSTWATR', 'COSTFUEL')
#Remove raw monetary value from dataframe 
df_hh = df_hh.drop('RENTGRS', 'CONDOFEE', 'MOBLHOME', 'HHINCOME',
                    'VALUEH', 'COSTELEC', 'COSTGAS', 'COSTWATR', 'COSTFUEL')

# Show the schema of the DataFrame
df_hh.printSchema()

# Show the first few rows of the DataFrame
df_hh.show(5)

                                                                                

root
 |-- YEAR: integer (nullable = true)
 |-- SAMPLE: integer (nullable = true)
 |-- SERIAL: integer (nullable = true)
 |-- CBSERIAL: long (nullable = true)
 |-- HHWT: double (nullable = true)
 |-- HHTYPE: integer (nullable = true)
 |-- CLUSTER: long (nullable = true)
 |-- CPI99: double (nullable = true)
 |-- STATEICP: integer (nullable = true)
 |-- MET2023: integer (nullable = true)
 |-- STRATA: integer (nullable = true)
 |-- GQ: integer (nullable = true)
 |-- FARM: integer (nullable = true)
 |-- OWNERSHP: integer (nullable = true)
 |-- OWNERSHPD: integer (nullable = true)
 |-- TAXINCL: integer (nullable = true)
 |-- INSINCL: integer (nullable = true)
 |-- FOODSTMP: integer (nullable = true)
 |-- CINETHH: integer (nullable = true)
 |-- VEHICLES: integer (nullable = true)
 |-- COUPLETYPE: integer (nullable = true)
 |-- NFAMS: integer (nullable = true)
 |-- ARENTGRS: double (nullable = true)
 |-- ACONDOFEE: double (nullable = true)
 |-- AMOBLHOME: double (nullable = true)
 |-- AHHINCOM

                                                                                

+----+------+------+-------------+-------+------+-------------+-----+--------+-------+------+---+----+--------+---------+-------+-------+--------+-------+--------+----------+-----+--------+---------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|YEAR|SAMPLE|SERIAL|     CBSERIAL|   HHWT|HHTYPE|      CLUSTER|CPI99|STATEICP|MET2023|STRATA| GQ|FARM|OWNERSHP|OWNERSHPD|TAXINCL|INSINCL|FOODSTMP|CINETHH|VEHICLES|COUPLETYPE|NFAMS|ARENTGRS|ACONDOFEE|         AMOBLHOME|         AHHINCOME|           AVALUEH|         ACOSTELEC|          ACOSTGAS|         ACOSTWATR|         ACOSTFUEL|
+----+------+------+-------------+-------+------+-------------+-----+--------+-------+------+---+----+--------+---------+-------+-------+--------+-------+--------+----------+-----+--------+---------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+
|

## Data Preprossesing 
##### Variable Normalization, Encoding & Feature Expansion 

Note: Normalization of monetary values for inflation was performed in import dataset section 

### Individual Census Data

id_norm is the new normalized spark rdd 

In [8]:
# Create a new column "FULLTIME" if the individual works 40+ hours per week

df_id = df_id.withColumn('FULLTIME',
    F.when((F.col("UHRSWORK") >= 40), 1)\
    .otherwise(0))

In [9]:
#Recoding 

#LOOKING: combine not reported and n/a
df_idN = df_id.withColumn("LOOKING", when(df_id["LOOKING"] == 3, 0).otherwise(df_id["LOOKING"])) 
#Group Quarters: Combine Household types and Group Quarter Types 
df_idN = df_idN.withColumn("GQ", when(df_idN["GQ"] == 2, 1).when(df_idN["GQ"] == 5, 1).when(df_idN["GQ"] == 4, 3).otherwise(df_idN["GQ"]))

#Remove Varaibles 
var_remove = ['SAMPLE', 'SERIAL', 'CBSERIAL', 'HHWT', 'PERNUM', 'CBPERNUM', 'CLUSTER', 'CPI99', 'STRATA',
              'PERWT', 'RACED', 'EDUCD', 'EMPSTATD', 'CLASSWKRD']
df_idN = df_idN.drop(*var_remove)

#Normalize Columns 
columns = df_idN.columns

# Define UDF to convert vector to array
def vector_to_array(v):
    return v.toArray().tolist()
vector_to_array_udf = udf(vector_to_array, ArrayType(DoubleType()))

# Function to scale a single column
def scale_column(df, input_col):
    assembler = VectorAssembler(inputCols=[input_col], outputCol=f"{input_col}_vec")
    scaler = MinMaxScaler(inputCol=f"{input_col}_vec", outputCol=f"{input_col}_scaled_vec")
    pipeline = Pipeline(stages=[assembler, scaler])
    df = pipeline.fit(df).transform(df)
    df = df.withColumn(f"{input_col}_scaled", vector_to_array_udf(col(f"{input_col}_scaled_vec"))[0])
    df = df.drop(f"{input_col}_vec", f"{input_col}_scaled_vec")
    df = df.drop(input_col).withColumnRenamed(f"{input_col}_scaled", input_col)
    return df

# # Scale each column individually
for col_name in columns:
    if isinstance(df_idN.schema[col_name].dataType, NumericType):
        id_norm = scale_column(df_idN, col_name)

### Household Census Data

hh_norm is the new normalized spark rdd 

In [10]:
#Recoding 
#Group Quarters: Combine Household types and Group Quarter Types 
df_hhN = df_hh.withColumn("GQ", when(df_hh["GQ"] == 2, 1).when(df_hh["GQ"] == 5, 1).when(df_hh["GQ"] == 4, 3).otherwise(df_hh["GQ"]))
#Household Types: combine NA and could not be determined
df_hhN = df_hhN.withColumn("HHTYPE", when(df_hhN["HHTYPE"] == 9, 0).otherwise(df_hhN["HHTYPE"]))
#Fix CINETHH None values with Na encoded as 0 
df_hhN = df_hhN.na.fill(value=0, subset=['CINETHH'])

#Remove Varaibles 
var_remove = ['SAMPLE', 'SERIAL', 'CBSERIAL', 'HHWT', 'CLUSTER', 'CPI99', 'STRATA',
              'MET2023', 'TAXINCL', 'INSINCL', 'COUPLETYPE']
df_hhN = df_hhN.drop(*var_remove)

# Create a new column for all Utilities by combining individual costs of gas, electricity, fuel, and water
# and drop old columns

df_hhN = df_hhN.withColumn('COSTUTIL', 
                         df_hhN['ACOSTELEC'] + df_hhN['ACOSTGAS'] + df_hhN["ACOSTWATR"] + df_hhN['ACOSTFUEL'])

df_hhN = df_hhN.drop('ACOSTELEC', 'ACOSTGAS', 'ACOSTWATR', 'ACOSTFUEL')

# Define UDF to convert vector to array
def vector_to_array(v):
    return v.toArray().tolist()
vector_to_array_udf = udf(vector_to_array, ArrayType(DoubleType()))
# Function to scale a single column
def scale_column(df, input_col):
    print(f"Scaling column: {input_col}")
    assembler = VectorAssembler(inputCols=[input_col], outputCol=f"{input_col}_vec")
    scaler = MinMaxScaler(inputCol=f"{input_col}_vec", outputCol=f"{input_col}_scaled_vec")
    pipeline = Pipeline(stages=[assembler, scaler])
    df = pipeline.fit(df).transform(df)
    df = df.withColumn(f"{input_col}_scaled_array", vector_to_array_udf(col(f"{input_col}_scaled_vec")))
    df = df.withColumn(f"{input_col}_scaled", col(f"{input_col}_scaled_array")[0])
    df = df.drop(f"{input_col}_vec", f"{input_col}_scaled_vec", f"{input_col}_scaled_array")
    df = df.drop(input_col).withColumnRenamed(f"{input_col}_scaled", input_col)
    return df
# Scale each column individually if it is numeric
hh_norm = df_hhN
columns = df_hhN.columns
for col_name in columns:
    if isinstance(df_hhN.schema[col_name].dataType, NumericType):
        hh_norm = scale_column(hh_norm, col_name)

Scaling column: YEAR
Scaling column: HHTYPE
Scaling column: STATEICP
Scaling column: GQ
Scaling column: FARM
Scaling column: OWNERSHP
Scaling column: OWNERSHPD
Scaling column: FOODSTMP
Scaling column: CINETHH
Scaling column: VEHICLES
Scaling column: NFAMS
Scaling column: ARENTGRS
Scaling column: ACONDOFEE
Scaling column: AMOBLHOME
Scaling column: AHHINCOME
Scaling column: AVALUEH
Scaling column: COSTUTIL


### Combining Individual and Household Datasets for Model 2

In [None]:
## Preprocessing for Individual 

#LOOKING: combine not reported and n/a
df_id = df_id.withColumn("LOOKING", when(df_id["LOOKING"] == 3, 0).otherwise(df_id["LOOKING"])) 
#Group Quarters: Combine Household types and Group Quarter Types 
df_id = df_id.withColumn("GQ", when(df_id["GQ"] == 2, 1).when(df_id["GQ"] == 5, 1).when(df_id["GQ"] == 4, 3).otherwise(df_id["GQ"]))

#Remove Varaibles 
var_remove = ['SAMPLE', 'SERIAL', 'HHWT', 'PERNUM', 'CLUSTER', 'CPI99', 'STRATA',
              'PERWT', 'RACED', 'EDUCD', 'EMPSTATD', 'CLASSWKRD']
df_id = df_id.drop(*var_remove)


In [None]:
## Preprocessing for Household 
#Group Quarters: Combine Household types and Group Quarter Types 
df_hh = df_hh.withColumn("GQ", when(df_hh["GQ"] == 2, 1).when(df_hh["GQ"] == 5, 1).when(df_hh["GQ"] == 4, 3).otherwise(df_hh["GQ"]))
#Household Types: combine NA and could not be determined
df_hh = df_hh.withColumn("HHTYPE", when(df_hh["HHTYPE"] == 9, 0).otherwise(df_hh["HHTYPE"]))
#Fix CINETHH None values with Na encoded as 0 
df_hh = df_hh.na.fill(value=0, subset=['CINETHH'])

#Remove Varaibles 
var_remove = ['SAMPLE', 'SERIAL', 'HHWT', 'CLUSTER', 'CPI99', 'STRATA',
              'MET2023', 'TAXINCL', 'INSINCL', 'COUPLETYPE']
df_hh = df_hh.drop(*var_remove)

# Create a new column for all Utilities by combining individual costs of gas, electricity, fuel, and water
# and drop old columns

df_hh = df_hh.withColumn('COSTUTIL', 
                         df_hh['ACOSTELEC'] + df_hh['ACOSTGAS'] + df_hh["ACOSTWATR"] + df_hh['ACOSTFUEL'])

df_hh = df_hh.drop('ACOSTELEC', 'ACOSTGAS', 'ACOSTWATR', 'ACOSTFUEL')

In [None]:
# Test Combining Individual & Household Data 
df_h = df_hh.drop('GQ') #drop duplicate columns 
result_df = df_id.join(df_h, on=["CBSERIAL","YEAR"], how="left")
result_df = result_df.dropDuplicates()
result_df.count()
#result_df.printSchema()
from pyspark.sql.functions import count
result_df.groupBy("YEAR").agg(count("*").alias("count")).show()

In [None]:
result_df.show(10)

### Split test/train data

In [None]:
# Split datasets into train and test

id_norm_train, id_norm_test = id_norm.randomSplit(weights=[0.8,0.2], seed=200)
hh_norm_train, hh_norm_test = hh_norm.randomSplit(weights=[0.8,0.2], seed=200)


DataFrame[YEAR: double, HHTYPE: double, STATEICP: double, GQ: double, FARM: double, OWNERSHP: double, OWNERSHPD: double, FOODSTMP: double, CINETHH: double, VEHICLES: double, NFAMS: double, ARENTGRS: double, ACONDOFEE: double, AMOBLHOME: double, AHHINCOME: double, AVALUEH: double, COSTUTIL: double, features: vector]