# PSTAT134 Project Code

### Aapthi Nagesh, Alex Roginski, Yuchen Wu, Nicholas Axl Andrian


### Setup

In [None]:
# In Jupyter notebook hosted by Google Cloud

from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .appName("Read Voter File Data") \
    .getOrCreate()

df = (
spark.read.format("parquet")  # Edit by Alex
    .option("header", "true")
    .option("inferSchema", "true")
    .load("gs://winter-2024-voter-file/VM2Uniform/VM2Uniform--OR--2021-02-05")
    #use oregondata if local
)
# By Andy
# This will load the uscounties.csv file into df2 variable, which will contain County IDs.
raw_df2 = (
spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("gs://winter-2024-voter-file/uscounties.csv")
)

# Filter rows where state_name is "Oregon"
df2 = raw_df2.filter(raw_df2["state_name"] == "Oregon")

# Show the resulting DataFrame
df2.show()

### Cleaning data (select columns and remove NAs)


In [None]:
# By Alex

from pyspark.sql.functions import col

selectedColumns = ['County', 'Ethnic_Description', 'Voters_Gender', 'Voters_Age', 'Mailing_Families_HHCount', 'Mailing_HHGender_Description', 'Residence_Families_HHCount', 'CommercialData_EstHomeValue', 'CommercialData_EstimatedHHIncomeAmount', 'CommercialData_EstimatedAreaMedianHHIncome', 'CommercialData_AreaMedianHousingValue', 'CommercialData_AreaMedianEducationYears', 'CommercialData_PropertyType', 'CommercialData_StateIncomeDecile','Parties_Description']

cleaned_df = df.select(selectedColumns)

data = cleaned_df.dropna()




### Creating df_merged

In [None]:
# Andy 
from pyspark.sql.functions import upper

# Normalize the "County" column in both DataFrames to uppercase
df1 = data.withColumn("County", upper(data["County"]))
df2 = df2.withColumn("county", upper(df2["county"]))

df_joined = df1.join(df2, df1["County"] == df2["county"])

# Selecting the desired columns from the joined DataFrame
df_merged = df_joined.select(df1["*"], df2["county_fips"])

print("Row count:", df_merged.count(), "Column count:", len(df_merged.columns))
df_merged.show()


### Checking for unique party descriptions

In [None]:


# Axl - Checking for unique values in the Parties_Description column
unique_parties = df_merged.select('Parties_Description').distinct()
unique_parties.show()


### Binning outcome variable in df_merged 

In [None]:
# Andy's code to merge bin
from pyspark.sql.functions import when, col, lit

parties_to_keep = ['Republican', 'Democratic', 'Non-Partisan']  # List of parties to keep

df_merged = df_merged.withColumn("Parties_Description", when(col("Parties_Description").isin(parties_to_keep), col("Parties_Description")).otherwise(lit("Other")))


###  Look at sample rows of data 

In [None]:

import pandas as pd
test_rows = df_merged.head(10)

column_headers= df_merged.columns

new_df = pd.DataFrame(test_rows, columns=column_headers)

### Missing Data


In [None]:
from pyspark.sql.functions import col, sum

# ChatGPT:
missing_values = df.select([sum(col(column).isNull().cast("int")).alias(column) for column in df.columns])  # Get missing data for each column
# The following took a few minutes to load

missing_values_df = missing_values.toPandas()  # Turn the data into a pandas dataframe to make it readable


totalNumberOfRows = df.count()  # Count total number of rows in dataset

transposed_df = missing_values_df.T.reset_index()  # tranpose it to make the data go down rows instead of across columns
transposed_df.columns = ['Column_Name', 'Count_Missing']  # Rename columns
transposed_df = transposed_df.sort_values(by='Count_Missing',ascending=False)  # sort values by number missing
transposed_df['Percent_Missing'] = transposed_df['Count_Missing']/totalNumberOfRows  # Get percentage missing (extra)

transposed_df


### Percent Missing Plot


In [None]:
# PERCENT MISSING

import matplotlib.pyplot as plt

# Assuming sorted_df_descending is your DataFrame with sorted missing counts
plt.figure(figsize=(10, 6))
plt.hist(transposed_df['Percent_Missing'], bins=10, color='lightgreen', edgecolor='black')
plt.xlabel('Percent Missing in Column')
plt.ylabel('Column Frequency')
plt.title('% Missing Values in All Columns of Oregon Data')
plt.grid(True)
plt.show()


# EDA


### Pie Chart of Party Counts


In [None]:
#  By Aapthi and Alex 

import matplotlib.pyplot as plt

import numpy as np

parties_count_df = df_merged.groupBy("Parties_Description").count()

# Convert to Pandas DataFrame
parties_count_pdf = parties_count_df.toPandas()

# Sorting for consistency
parties_count_pdf.sort_values('count', ascending=False, inplace=True)

# Customization parameters
colors = ['tab:green', 'tab:blue', 'tab:red', 'tab:grey',]  # Custom colors
wp = {'linewidth': 1, 'edgecolor': "black"}  # Wedge properties

# Function to format the autopct
def func(pct, allvals):
    absolute = int(pct/100.*np.sum(allvals))
    return "{:.1f}%\n({:d})".format(pct, absolute)

# Plotting
fig, ax = plt.subplots(figsize=(10, 7))
wedges, texts, autotexts = ax.pie(parties_count_pdf['count'],
                                  autopct=lambda pct: func(pct, parties_count_pdf['count']),
                                  labels=parties_count_pdf['Parties_Description'],
                                  startangle=30,
                                  wedgeprops=wp,
                                  colors=colors,
                                  textprops=dict(color="black", size=12))

plt.title('Parties Description Distribution')
plt.show()


### Income vs. Party pref. (Andy)


In [None]:
# Originally created by Andy, editted by Alex

from pyspark.sql.functions import regexp_replace, col
from pyspark.sql import functions as F
import matplotlib.pyplot as plt

# Define colors for the bars
colors = ['tab:blue', 'tab:red', 'tab:green', 'tab:grey']

df_merged_temp = df_merged.withColumn('Income_Amount',
                                 regexp_replace(col('CommercialData_EstimatedHHIncomeAmount'), '[$,]', '').cast('float'))

party_income_df = df_merged_temp.groupBy('Parties_Description')\
    .agg(F.avg('Income_Amount').alias('Average_Income'))

party_income_pdf = party_income_df.toPandas()

# Sort the DataFrame by 'Average_Income' column in descending order
party_income_pdf_sorted = party_income_pdf.sort_values(by='Average_Income', ascending=False)

plt.figure(figsize=(10, 6))
bars = plt.bar(party_income_pdf_sorted['Parties_Description'], party_income_pdf_sorted['Average_Income'], color=colors)
plt.xlabel('Party Preference')
plt.ylabel('Average Income')
plt.title('Average Income by Party Preference')
plt.xticks(rotation=0)

# Iterate over the bars and use plt.text() to display the value on each bar
for bar in bars:
    yval = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2, yval, '$' + '{:,.0f}'.format(round(yval, 0)), fontsize=12, va='bottom', ha='center')

plt.show()


### Education Level vs Party Pref.


In [None]:
# by Aapthi, edited by Alex
from pyspark.sql.functions import regexp_replace, col
from pyspark.sql import functions as F
import matplotlib.pyplot as plt

df_merged_temp = df_merged.withColumn('Education_Years',
                                 regexp_replace(col('CommercialData_AreaMedianEducationYears'), '[$,]', '').cast('int'))

education_df = df_merged_temp.groupBy('Parties_Description')\
    .agg(F.avg('Education_Years').alias('Education_Years'))

party_education_df = education_df.toPandas()

# Sort the DataFrame by 'Education_Years'
party_education_df_sorted = party_education_df.sort_values(by='Education_Years',  ascending=False)

# Plotting the Average Education Years vs Parties
plt.figure(figsize=(10, 6))
bars = plt.bar(party_education_df_sorted['Parties_Description'], party_education_df_sorted['Education_Years'], 
        color = ['tab:blue', 'tab:Grey', 'tab:green', 'tab:red'])

plt.xlabel('Party Preference')
plt.ylabel('Average Education Years')
plt.title('Average Education Year by Party Preference')
plt.xticks(rotation=0)

# Adding the value above the bars
for bar in bars:
    yval = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2, yval, round(yval, 1), ha='center', va='bottom')

plt.show()



### Property type and Party Pref.


In [None]:
# Originally by Andy, editted by Alex

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np

# Assuming df_merged is your Spark DataFrame
# Create a pivot table with 'Parties_Description' as rows and 'CommercialData_PropertyType' as columns, with counts
party_property_df = df_merged.groupBy('Parties_Description')\
    .pivot('CommercialData_PropertyType')\
    .count()

# Convert to Pandas DataFrame
party_property_pdf = party_property_df.toPandas()

# Set the index to 'Parties_Description' for plotting
party_property_pdf.set_index('Parties_Description', inplace=True)

# Convert counts to percentages
party_property_pdf = party_property_pdf.div(party_property_pdf.sum(axis=1), axis=0) * 100

# Sort columns by their total percentage across all parties
party_property_pdf = party_property_pdf.reindex(party_property_pdf.sum().sort_values(ascending=False).index, axis=1)

# Plotting
fig, ax = plt.subplots(figsize=(12, 8))
party_property_pdf.plot(kind='bar', stacked=True, ax=ax)

plt.xlabel('Party Preference')
plt.ylabel('Percentage')
plt.title('Property Type Distribution by Party Preference')
plt.xticks(rotation=0)

# Place the legend outside the plot area
plt.legend(title='Property Type', loc='upper left', bbox_to_anchor=(1, 1))

# Custom function to add labels
def add_labels(ax):
    for c in ax.containers:
        labels = [f'{v.get_height():.0f}%' if v.get_height() > 5 else '' for v in c]
        ax.bar_label(c, labels=labels, label_type='center', color='white', rotation=0, padding=3)

add_labels(ax)

plt.tight_layout()


### Ethnic Description EDA


In [None]:
# Axl - checking counts for unique values in Ethnic_Description

from pyspark.sql import functions as F
ethnic_description_counts = df.groupBy("Ethnic_Description")\
                               .count()\
                               .withColumnRenamed("count", "Counts")\
                               .orderBy(F.desc("Counts"))

# Show the result
ethnic_description_counts.show(10)


### Top 10 ethnic descriptions’ party preference


In [None]:
# Axl - Plotting histogram for party description per the top 20 ethnic descriptions
# had to limit it to 10 due to the large number which made visualization difficult
# Editted by Alex
from pyspark.sql.functions import col, count
import matplotlib.pyplot as plt


# df for plotting
result_df = df_merged.groupBy('Ethnic_Description').pivot('Parties_Description').count().na.fill(0)
result_df = result_df.withColumn('Total', __builtins__.sum(result_df[col] for col in result_df.columns[1:]))
result_df = result_df.orderBy(col('Total').desc()).limit(10)
result_df = result_df.drop('Total')

# Define the desired order
desired_order = ["Democratic", "Republican", "Non-Partisan", "Registered Independent", "Other"]
parties_ordered = [party for party in desired_order if party in result_df.columns]
parties_ordered += [party for party in result_df.columns if party not in desired_order and party != 'Ethnic_Description']

result_data = result_df.select(['Ethnic_Description'] + parties_ordered).collect()
categories = [row['Ethnic_Description'] for row in result_data]
counts = [list(row[1:]) for row in result_data]  # Exclude the first column which is 'Ethnic_Description'
total_per_group = [__builtins__.sum(row) for row in counts]
grand_total = df_merged.count()

fig, ax = plt.subplots(figsize=(15, 8))
bar_width = 0.9
bottoms = [0] * len(categories)


party_colors = {
    "Democratic": "blue",
    "Republican": "red",
    "Non-Partisan": "green",
    "Other": "grey",
}


for i, party in enumerate(parties_ordered):
    party_counts = [row[i + 1] for row in result_data]  # Offset due to 'Ethnic_Description'
    party_counts = [count / grand_total * 100 for count in party_counts]
    ax.bar(categories, party_counts, width=bar_width, label=party, bottom=bottoms, color=party_colors.get(party, "black"))
    bottoms = [bottom + party_counts[j] for j, bottom in enumerate(bottoms)]

    

ax.set_xlabel('Ethnic Description')
ax.set_ylabel('Percent of Voters')
ax.set_title('Top 10 Ethnic Descriptions by Party Preference')
ax.set_xticks(range(len(categories)))
ax.set_xticklabels(categories, rotation=45)


from matplotlib.ticker import PercentFormatter
plt.gca().yaxis.set_major_formatter(PercentFormatter(decimals=0))

ax.legend(title='Party Preference', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.show()





### Gender

In [None]:
# Aapthi

# create a pivot table
party_gender_df = df_merged.groupBy('Parties_Description')\
    .pivot('Voters_Gender')\
    .count()

# convert to pd dataframe
party_gender_pdf = party_gender_df.toPandas()
party_gender_pdf.set_index('Parties_Description', inplace=True)

party_gender_pdf = party_gender_pdf.div(party_gender_pdf.sum(axis=1), axis=0) * 100

# sort columns by their total percentage
party_gender_pdf = party_gender_pdf.reindex(party_gender_pdf.sum().sort_values(ascending=False).index, axis=1)

# plotting
fig, ax = plt.subplots(figsize=(12, 8))
party_gender_pdf.plot(kind='bar', stacked=False, ax=ax)
plt.xlabel('Party Preference')
plt.ylabel('Percentage')
plt.title('Voter\'s Gender Distribution by Party Preference')
plt.xticks(rotation=0)
plt.legend(title='Gender', loc='upper left', bbox_to_anchor=(1, 1))

# Custom function to add percent labels
def add_labels(ax):
    for c in ax.containers:
        labels = [f'{v.get_height():.0f}%' if v.get_height() > 5 else '' for v in c]
        ax.bar_label(c, labels=labels, label_type='center', color='white', rotation=0, padding=3)

add_labels(ax)

plt.tight_layout()



### Age

In [None]:
#By Aapthi, adapted from Andy’s code
df_merged_temp = df_merged_temp.withColumn('Age_Level',
                                 when(col('Voters_Age') < 36, '18-35',)
                                 .when(col('Voters_Age') < 65, '36-64')
                                 .otherwise('65+'))

age_party_dist = df_merged_temp.groupBy('Age_Level', 'Parties_Description').count()

age_party_pdf = age_party_dist.toPandas()


# Plotting
age_levels = ['18-35', '36-64', '65+']
party_colors = {'Democratic': 'tab:blue', 'Republican': 'tab:red', 'Other': 'tab:grey', 'Non-Partisan': 'tab:orange'}
age_levels_titles = {'Low Income': "(<$40k)", 'Medium Income': "(\$40k-\$100k)", 'High Income': "($100k+)"}
fig, axs = plt.subplots(1, len(age_levels), figsize=(18, 6))  # 1 row, len(income_levels) columns

for idx, level in enumerate(age_levels):
    subset = age_party_pdf[age_party_pdf['Age_Level'] == level]
    colors = [party_colors[party] for party in subset['Parties_Description']]
    axs[idx].pie(subset['count'], labels=subset['Parties_Description'], 
                 autopct='%1.1f%%', startangle=140, colors=colors)
    axs[idx].set_title(f'Age {level}')

plt.show()

# Model Building 


In [None]:
df_merged = df_merged.drop('county_fips')


## Data Preparations

In [None]:
# Converting $ values to numeric AND Convert numeric columns stored as string to numeric

# By Alex

import pandas as pd 

from pyspark.sql.functions import regexp_replace, col


# Convert string columns that contain "$" into numeric

df_merged = df_merged.withColumn('CommercialData_EstHomeValue',
                                 regexp_replace(col('CommercialData_EstHomeValue'), '[$,]', '').cast('float'))

df_merged = df_merged.withColumn('CommercialData_EstimatedHHIncomeAmount',
                                 regexp_replace(col('CommercialData_EstimatedHHIncomeAmount'), '[$,]', '').cast('float'))

df_merged = df_merged.withColumn('CommercialData_EstimatedAreaMedianHHIncome',
                                 regexp_replace(col('CommercialData_EstimatedAreaMedianHHIncome'), '[$,]', '').cast('float'))

df_merged = df_merged.withColumn('CommercialData_AreaMedianHousingValue',
                                 regexp_replace(col('CommercialData_AreaMedianHousingValue'), '[$,]', '').cast('float'))


# Convert numeric columns stored as numeric into string

# Columns to convert to numeric
columns_to_convert = ["Voters_Age", "Mailing_Families_HHCount", "Residence_Families_HHCount", "CommercialData_AreaMedianEducationYears"]

# Convert columns to numeric type
for column in columns_to_convert:
    df_merged = df_merged.withColumn(column, col(column).cast('float'))


# PRINT RESULT

test_rows = df_merged.head(10)

column_headers= df_merged.columns

new_df = pd.DataFrame(test_rows, columns=column_headers)

new_df


In [None]:
df_merged.dtypes

### Random Forest Model (IN JUPYTER NOTEBOOK)


In [None]:
# Fitting the Model, Accuracy


# TA CYRUS


selectedColumns = [
'Ethnic_Description', 'Voters_Gender', 'Voters_Age' , 'CommercialData_EstHomeValue','CommercialData_EstimatedHHIncomeAmount','CommercialData_EstimatedAreaMedianHHIncome','CommercialData_AreaMedianHousingValue', 'Parties_Description',"Mailing_Families_HHCount","Residence_Families_HHCount", 'CommercialData_AreaMedianEducationYears', 'CommercialData_PropertyType', 'CommercialData_StateIncomeDecile', 'County','Mailing_HHGender_Description']   # FILL IN HERE ##########
 
# Original:
# selectedColumns = ['County', 'Mailing_Addresses_City', 
# 'Ethnic_Description', 'Voters_Gender', 'Voters_Age', 
# 'Mailing_Families_HHCount', 'Mailing_HHGender_Description', 
# 'Residence_Families_HHCount', 'CommercialData_EstHomeValue', 
# 'CommercialData_EstimatedHHIncomeAmount', 
# 'CommercialData_EstimatedAreaMedianHHIncome', 
# 'CommercialData_AreaMedianHousingValue', 
# 'CommercialData_AreaMedianEducationYears',
# 'CommercialData_PropertyType', 
# 'CommercialData_StateIncomeDecile',
# 'Parties_Description']

categoricalColumns = ['County','Mailing_HHGender_Description',"Ethnic_Description",  "Voters_Gender", 'CommercialData_PropertyType','CommercialData_StateIncomeDecile']  # FILL IN HERE ################

# categoricalColumns options 
# ["County", "Mailing_Addresses_City", "Ethnic_Description", 
# "Voters_Gender", "Mailing_HHGender_Description", 
# "CommercialData_PropertyType", "CommercialData_StateIncomeDecile"] 

columns_with_dollar_signs = ['CommercialData_EstHomeValue','CommercialData_EstimatedHHIncomeAmount','CommercialData_EstimatedAreaMedianHHIncome','CommercialData_AreaMedianHousingValue']   # FILL IN HERE  #####################

# columns_with_dollar_signs options = [
#     'CommercialData_EstHomeValue',
#     'CommercialData_EstimatedHHIncomeAmount',
#     'CommercialData_EstimatedAreaMedianHHIncome',
#     'CommercialData_AreaMedianHousingValue'
# ]

numeric_columns_in_data_not_dollar_sign = ["Voters_Age","Mailing_Families_HHCount","Residence_Families_HHCount",'CommercialData_AreaMedianEducationYears']  # FILL IN HERE  #####

# numeric_columns_in_data_not_dollar_sign options: ["Voters_Age", 
# "Mailing_Families_HHCount", 
# "Residence_Families_HHCount"]

# ----------------------------




from pyspark.sql.functions import regexp_replace, col

# columns_with_dollar_signs = [
#     'CommercialData_EstHomeValue',
#     'CommercialData_EstimatedHHIncomeAmount',
#     'CommercialData_EstimatedAreaMedianHHIncome',
#     'CommercialData_AreaMedianHousingValue'
# ]

for column in columns_with_dollar_signs:
    df_merged = df_merged.withColumn(column,
                                     regexp_replace(col(column), '[$,]', '').cast('float'))

# numeric_columns_in_data_not_dollar_sign = ["Voters_Age", "Mailing_Families_HHCount", "Residence_Families_HHCount", "CommercialData_AreaMedianEducationYears"]

# Convert columns to numeric type
for column in numeric_columns_in_data_not_dollar_sign:
    df_merged = df_merged.withColumn(column, col(column).cast('float'))






import time

# Start time
start_time = time.time()


# By Alex 

# TA Cyrus helped me find the following code and I used the following source:

# https://stackoverflow.com/questions/35804755/apply-onehotencoder-for-several-categorical-columns-in-sparkmlib


from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

# categoricalColumns = ["County", "Mailing_Addresses_City", "Ethnic_Description", "Voters_Gender", "Mailing_HHGender_Description", "CommercialData_PropertyType", "CommercialData_StateIncomeDecile"]
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid='keep') for column in categoricalColumns]


# numeric_columns_in_data_not_dollar_sign = ["Voters_Age", "Mailing_Families_HHCount", "Residence_Families_HHCount", "CommercialData_AreaMedianEducationYears"]

# one_hot_encoder_list = []
# for indexer in indexers:
#     ohe_col = "{0}_encoded".format(indexer.getOutputCol()) 
    
#     one_hot_encoder = OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=ohe_col)
#     one_hot_encoder_list.append(one_hot_encoder)


assembler = VectorAssembler(
    inputCols=[indexer.getOutputCol() for indexer in indexers] + numeric_columns_in_data_not_dollar_sign + columns_with_dollar_signs,
    outputCol="features"
)


(trainData, testData) = df_merged.randomSplit([0.7, 0.3], seed=123)

labelIndexer = StringIndexer(inputCol="Parties_Description", outputCol="label")

randomForest = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10,maxBins=100)

pipeline = Pipeline(stages=indexers + [labelIndexer, assembler, randomForest])



# By Alex

# FOR THE FOLLOWING CODE, YOU MUST RUN IT IN ON A CLUSTER ON GOOGLE COULD. 
# YOU WILL RUN OUT OF MEMORY RUNNING LOCALLY (java.lang.OutOfMemoryError: Java heap space)

model = pipeline.fit(trainData)





# End time
end_time = time.time()

# Calculate elapsed time in seconds
elapsed_time_seconds = end_time - start_time

# Convert elapsed time to minutes
elapsed_time_minutes = elapsed_time_seconds / 60

print("Time taken:", elapsed_time_minutes, "minutes")







from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import time

# Start time
start_time = time.time()

predictions = model.transform(testData)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)



# End time
end_time = time.time()

# Calculate elapsed time in seconds
elapsed_time_seconds = end_time - start_time

# Convert elapsed time to minutes
elapsed_time_minutes = elapsed_time_seconds / 60

print("Time taken:", elapsed_time_minutes, "minutes")





Output

Time taken: 7.387498692671458 minutes


Accuracy: 0.459



#### Feature Importance


In [None]:
import matplotlib.pyplot as plt
import pandas as pd

rf_model = model.stages[-1] 

feature_importance = rf_model.featureImportances.toArray()

feature_cols = model.stages[-2].getInputCols()


feature_df = pd.DataFrame({"Feature": feature_cols, "Importance": feature_importance})

feature_df = feature_df.sort_values(by="Importance", ascending=False)

# Plotting the bar chart
plt.figure(figsize=(10, 6))
plt.barh(feature_df["Feature"], feature_df["Importance"], color='skyblue')
plt.xlabel('Importance')
plt.ylabel('Feature')
plt.title('Feature Importance')
plt.gca().invert_yaxis()  # Invert y-axis to have the highest importance at the top
plt.show()


### Multinomial Logistic Regression


In [None]:
# Fitting the Model, Accuracy
# Axl and Alex
from pyspark.ml.classification import LogisticRegression

from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml import Pipeline


from pyspark.ml.feature import StandardScaler


# categoricalColumns = ["County", "Mailing_Addresses_City", "Ethnic_Description", "Voters_Gender", "Mailing_HHGender_Description", "CommercialData_PropertyType", "CommercialData_StateIncomeDecile"]
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid='keep') for column in categoricalColumns if column != 'Parties_Description']


# numeric_columns_in_data_not_dollar_sign = ["Voters_Age", "Mailing_Families_HHCount", "Residence_Families_HHCount", "CommercialData_AreaMedianEducationYears"]

one_hot_encoder_list = []
for indexer in indexers:
    ohe_col = "{0}_encoded".format(indexer.getOutputCol()) 
    
    one_hot_encoder = OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=ohe_col)
    one_hot_encoder_list.append(one_hot_encoder)


assembler = VectorAssembler(
    inputCols=[indexer.getOutputCol() for indexer in indexers] + numeric_columns_in_data_not_dollar_sign + columns_with_dollar_signs,
    outputCol="features"
)


featureArr = [index.getOutputCol() for index in indexers] + [('scaled_' + f) for f in numeric_columns_in_data_not_dollar_sign + columns_with_dollar_signs]

va1 = [VectorAssembler(inputCols = [f], outputCol=('vec_' + f)) for f in numeric_columns_in_data_not_dollar_sign + columns_with_dollar_signs]
ss = [StandardScaler(inputCol="vec_" + f, outputCol="scaled_" + f, withMean=True, withStd=True) for f in numeric_columns_in_data_not_dollar_sign + columns_with_dollar_signs]

va2 = VectorAssembler(inputCols=featureArr, outputCol="features")
lr = LogisticRegression()



labelIndexer = StringIndexer(inputCol="Parties_Description", outputCol="label")


stages = va1 + ss + indexers + [va2, labelIndexer, lr]

log_pipeline = Pipeline(stages=stages)

(trainData, testData) = df_merged.randomSplit([0.7, 0.3], seed=123)

log_model = log_pipeline.fit(trainData)







# log_reg = LogisticRegression(labelCol="label", featuresCol="scaled_features")

# log_pipeline = Pipeline(stages=indexers + [scaler, labelIndexer, assembler, log_reg])

# log_model = log_pipeline.fit(trainData)


# End time
end_time = time.time()

# Calculate elapsed time in seconds
elapsed_time_seconds = end_time - start_time

# Convert elapsed time to minutes
elapsed_time_minutes = elapsed_time_seconds / 60

print("Time taken:", elapsed_time_minutes, "minutes")






from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import time

# Start time
start_time = time.time()

predictions = log_model.transform(testData)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)



# End time
end_time = time.time()

# Calculate elapsed time in seconds
elapsed_time_seconds = end_time - start_time

# Convert elapsed time to minutes
elapsed_time_minutes = elapsed_time_seconds / 60

print("Time taken:", elapsed_time_minutes, "minutes")








Output

Time taken: 5.091986711819967 minutes

Accuracy: 0.46134799364665036



#### Feature Importance


In [None]:
orderedPartyList = ['Democratic','Non-Partisan','Republican','Other']  # This order comes from TA Cyrus. It is in this order because apparently it goes descending by count for each party in df_merged.  -Cyrus


coefficients = log_model.stages[-1].coefficientMatrix.toArray()

feature_names = log_model.stages[-3].getInputCols()


# Create 4 bar plots
for i in range(4):
    plt.figure(figsize=(10, 6))
    plt.bar(feature_names, coefficients[i])
    plt.xlabel('Feature')
    plt.ylabel('Coefficient Value')
    plt.title('Coefficients for {}'.format(orderedPartyList[i]))
    plt.xticks(rotation=90)
    plt.tight_layout()
    plt.show()


#### Getting the top input for each of the top 5 predictors given a party preference


In [None]:
# Axl - calculate the absolute coefficients
coefficients_df = pd.DataFrame(coefficients, columns=feature_names, index=orderedPartyList)

# Get the top 5 predictors for each party preference
top_predictors = {}
for party in orderedPartyList:
    top_predictors[party] = coefficients_df.loc[party].nlargest(5)

# Print the top 5 predictors for each party preference
for party, predictors in top_predictors.items():
    print(f"Top 5 predictors for {party}:")
    print(predictors)
    print()


#### Finding the top input for each predictor within each party group (standardized)


In [None]:
# Axl - find the top input for each predictor within each party group (standardized)
def find_top_inputs(predictions, top_predictors):
    top_inputs = {}

    # Iterate over each party preference
    for party, predictors in top_predictors.items():
        top_inputs[party] = {}

        # Filter predictions DataFrame for the current party
        party_df = predictions.filter(predictions["Parties_Description"] == party)

        # Iterate over the top predictors for the current party
        for predictor in predictors.index:
            # Group by the predictor and count occurrences of each input value
            grouped_df = party_df.groupBy(predictor).count()

            # Order by count in descending order
            ordered_df = grouped_df.orderBy("count", ascending=False)

            # Get the top input for the predictor
            top_input = ordered_df.first()[predictor]

            # Store the top input for the predictor
            top_inputs[party][predictor] = top_input

    return top_inputs

# Call the function to find the top inputs for each party preference
top_inputs = find_top_inputs(predictions, top_predictors)

# Print the top inputs for each party preference and predictor
for party, predictors in top_inputs.items():
    print(f"Top inputs for {party}:")
    for predictor, top_input in predictors.items():
        print(f"{predictor}: {top_input}")
    print()



#### Finding the top input for each predictor within each party group (True values)


In [None]:
# Axl - top 5 predictors for each party preference (unscaled and not indexed)
top_5_unscaled_predictors_Democratic = ['Voters_Gender', 'CommercialData_AreaMedianEducationYears', 'Voters_Age', 'CommercialData_EstimatedAreaMedianHHIncome', 'Residence_Families_HHCount']
top_5_unscaled_predictors_Non_Partisan = ['Voters_Age', 'Voters_Gender', 'Mailing_HHGender_Description', 'CommercialData_PropertyType', 'Residence_Families_HHCount']
top_5_unscaled_predictors_Republican = ['Voters_Age', 'CommercialData_AreaMedianEducationYears', 'Mailing_HHGender_Description', 'CommercialData_EstimatedAreaMedianHHIncome', 'Residence_Families_HHCount']
top_5_unscaled_predictors_Other = ['Voters_Gender', 'Voters_Age', 'CommercialData_EstimatedAreaMedianHHIncome', 'CommercialData_AreaMedianHousingValue', 'CommercialData_PropertyType']

# Axl - find the top input for each predictor within each party group (standardized)
def find_top_inputs(predictions, top_predictors):
    top_inputs = {}

    # Iterate over each party preference
    for party, predictors in top_predictors.items():
        top_inputs[party] = {}

        # Filter predictions DataFrame for the current party
        party_df = predictions.filter(predictions["Parties_Description"] == party)

        # Iterate over the top predictors for the current party
        for predictor in predictors:
            # Group by the predictor and count occurrences of each input value
            grouped_df = party_df.groupBy(predictor).count()

            # Order by count in descending order
            ordered_df = grouped_df.orderBy("count", ascending=False)

            # Get the top input for the predictor
            top_input = ordered_df.first()[predictor]

            # Store the top input for the predictor
            top_inputs[party][predictor] = top_input

    return top_inputs

# Define top predictors for each party preference
top_predictors = {
    'Democratic': top_5_unscaled_predictors_Democratic,
    'Non-Partisan': top_5_unscaled_predictors_Non_Partisan,
    'Republican': top_5_unscaled_predictors_Republican,
    'Other': top_5_unscaled_predictors_Other
}

# Call the function to find the top inputs for each party preference
top_inputs = find_top_inputs(predictions, top_predictors)

# Print the top inputs for each party preference and predictor
for party, predictors in top_inputs.items():
    print(f"Top inputs for {party}:")
    for predictor, top_input in predictors.items():
        print(f"{predictor}: {top_input}")
    print()



## Generalizability (Running on Washington Data)


### Setting up the cluster


In [None]:
# Written by Alex Roginski

# 1. Create a project

# 2. Go to Cloud Storage --> Buckets and create one

# Now, run this code in the Cloud Shell:
export PROJECT_ID="" ## Your project ID here. NO LEADING OR TRAILING SPACES AROUND "=".

export BUCKET_NAME="" ## Your Bucket Name that you just created

export CLUSTER_NAME="" # Make your own cluster name here (whatever you want)

# The following comes from the project assignment pdf. Run line by line.

gsutil -u $PROJECT_ID ls gs://winter-2024-voter-file/

# gsutil -u $PROJECT_ID -m cp -r gs://winter-2024-voter-file/VM2Uniform/VM2Uniform--WY--2021-01-13 gs://$BUCKET_NAME


gcloud dataproc clusters create $CLUSTER_NAME  --enable-component-gateway  --bucket $BUCKET_NAME  --region us-central1  --single-node  --master-machine-type e2-standard-4  --master-boot-disk-size 500  --image-version 2.1-debian11  --optional-components JUPYTER  --project $PROJECT_ID

#


### Setup OREGON

In [None]:
# In Jupyter notebook hosted by Google Cloud

from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .appName("Read Voter File Data") \
.config("spark.executor.memory", "8g") \
    .getOrCreate()

df = (
spark.read.format("parquet")  # Edit by Alex
    .option("header", "true")
    .option("inferSchema", "true")
    .load("gs://winter-2024-voter-file/VM2Uniform/VM2Uniform--OR--2021-02-05")
)
# By Andy
# This will load the uscounties.csv file into df2 variable, which will contain County IDs.




# By Alex

from pyspark.sql.functions import col

selectedColumns = ['County', 'Ethnic_Description', 'Voters_Gender', 'Voters_Age', 'Mailing_Families_HHCount', 'Mailing_HHGender_Description', 'Residence_Families_HHCount', 'CommercialData_EstHomeValue', 'CommercialData_EstimatedHHIncomeAmount', 'CommercialData_EstimatedAreaMedianHHIncome', 'CommercialData_AreaMedianHousingValue', 'CommercialData_AreaMedianEducationYears', 'CommercialData_PropertyType', 'CommercialData_StateIncomeDecile','Parties_Description']

cleaned_df = df.select(selectedColumns)

data = cleaned_df.dropna()



df_merged = data




# Andy's code to merge bin
from pyspark.sql.functions import when, col, lit

parties_to_keep = ['Republican', 'Democratic', 'Non-Partisan']  # List of parties to keep

df_merged = df_merged.withColumn("Parties_Description", when(col("Parties_Description").isin(parties_to_keep), col("Parties_Description")).otherwise(lit("Other")))








# By Alex

import pandas as pd 

from pyspark.sql.functions import regexp_replace, col


# Convert string columns that contain "$" into numeric

df_merged = df_merged.withColumn('CommercialData_EstHomeValue',
                                 regexp_replace(col('CommercialData_EstHomeValue'), '[$,]', '').cast('float'))

df_merged = df_merged.withColumn('CommercialData_EstimatedHHIncomeAmount',
                                 regexp_replace(col('CommercialData_EstimatedHHIncomeAmount'), '[$,]', '').cast('float'))

df_merged = df_merged.withColumn('CommercialData_EstimatedAreaMedianHHIncome',
                                 regexp_replace(col('CommercialData_EstimatedAreaMedianHHIncome'), '[$,]', '').cast('float'))

df_merged = df_merged.withColumn('CommercialData_AreaMedianHousingValue',
                                 regexp_replace(col('CommercialData_AreaMedianHousingValue'), '[$,]', '').cast('float'))


# Convert numeric columns stored as numeric into string

# Columns to convert to numeric
columns_to_convert = ["Voters_Age", "Mailing_Families_HHCount", "Residence_Families_HHCount", "CommercialData_AreaMedianEducationYears"]

# Convert columns to numeric type
for column in columns_to_convert:
    df_merged = df_merged.withColumn(column, col(column).cast('float'))


# PRINT RESULT

test_rows = df_merged.head(10)

column_headers= df_merged.columns

new_df = pd.DataFrame(test_rows, columns=column_headers)

new_df


#### Random Forest setup


In [None]:

# TA CYRUS


selectedColumns = [
'Ethnic_Description', 'Voters_Gender', 'Voters_Age' , 'CommercialData_EstHomeValue','CommercialData_EstimatedHHIncomeAmount','CommercialData_EstimatedAreaMedianHHIncome','CommercialData_AreaMedianHousingValue', 'Parties_Description',"Mailing_Families_HHCount","Residence_Families_HHCount", 'CommercialData_AreaMedianEducationYears', 'CommercialData_PropertyType', 'CommercialData_StateIncomeDecile', 'County','Mailing_HHGender_Description']   # FILL IN HERE ##########
 
# Original:
# selectedColumns = ['County', 'Mailing_Addresses_City', 
# 'Ethnic_Description', 'Voters_Gender', 'Voters_Age', 
# 'Mailing_Families_HHCount', 'Mailing_HHGender_Description', 
# 'Residence_Families_HHCount', 'CommercialData_EstHomeValue', 
# 'CommercialData_EstimatedHHIncomeAmount', 
# 'CommercialData_EstimatedAreaMedianHHIncome', 
# 'CommercialData_AreaMedianHousingValue', 
# 'CommercialData_AreaMedianEducationYears',
# 'CommercialData_PropertyType', 
# 'CommercialData_StateIncomeDecile',
# 'Parties_Description']

categoricalColumns = ['County','Mailing_HHGender_Description',"Ethnic_Description",  "Voters_Gender", 'CommercialData_PropertyType','CommercialData_StateIncomeDecile']  # FILL IN HERE ################

# categoricalColumns options 
# ["County", "Mailing_Addresses_City", "Ethnic_Description", 
# "Voters_Gender", "Mailing_HHGender_Description", 
# "CommercialData_PropertyType", "CommercialData_StateIncomeDecile"] 

columns_with_dollar_signs = ['CommercialData_EstHomeValue','CommercialData_EstimatedHHIncomeAmount','CommercialData_EstimatedAreaMedianHHIncome','CommercialData_AreaMedianHousingValue']   # FILL IN HERE  #####################

# columns_with_dollar_signs options = [
#     'CommercialData_EstHomeValue',
#     'CommercialData_EstimatedHHIncomeAmount',
#     'CommercialData_EstimatedAreaMedianHHIncome',
#     'CommercialData_AreaMedianHousingValue'
# ]

numeric_columns_in_data_not_dollar_sign = ["Voters_Age","Mailing_Families_HHCount","Residence_Families_HHCount",'CommercialData_AreaMedianEducationYears']  # FILL IN HERE  #####

# numeric_columns_in_data_not_dollar_sign options: ["Voters_Age", 
# "Mailing_Families_HHCount", 
# "Residence_Families_HHCount"]

# ----------------------------




from pyspark.sql.functions import regexp_replace, col

# columns_with_dollar_signs = [
#     'CommercialData_EstHomeValue',
#     'CommercialData_EstimatedHHIncomeAmount',
#     'CommercialData_EstimatedAreaMedianHHIncome',
#     'CommercialData_AreaMedianHousingValue'
# ]

for column in columns_with_dollar_signs:
    df_merged = df_merged.withColumn(column,
                                     regexp_replace(col(column), '[$,]', '').cast('float'))

# numeric_columns_in_data_not_dollar_sign = ["Voters_Age", "Mailing_Families_HHCount", "Residence_Families_HHCount", "CommercialData_AreaMedianEducationYears"]

# Convert columns to numeric type
for column in numeric_columns_in_data_not_dollar_sign:
    df_merged = df_merged.withColumn(column, col(column).cast('float'))






import time

# Start time
start_time = time.time()


# By Alex 

# TA Cyrus helped me find the following code and I used the following source:

# https://stackoverflow.com/questions/35804755/apply-onehotencoder-for-several-categorical-columns-in-sparkmlib


from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

# categoricalColumns = ["County", "Mailing_Addresses_City", "Ethnic_Description", "Voters_Gender", "Mailing_HHGender_Description", "CommercialData_PropertyType", "CommercialData_StateIncomeDecile"]
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid='keep') for column in categoricalColumns]


# numeric_columns_in_data_not_dollar_sign = ["Voters_Age", "Mailing_Families_HHCount", "Residence_Families_HHCount", "CommercialData_AreaMedianEducationYears"]

# one_hot_encoder_list = []
# for indexer in indexers:
#     ohe_col = "{0}_encoded".format(indexer.getOutputCol()) 
    
#     one_hot_encoder = OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=ohe_col)
#     one_hot_encoder_list.append(one_hot_encoder)


assembler = VectorAssembler(
    inputCols=[indexer.getOutputCol() for indexer in indexers] + numeric_columns_in_data_not_dollar_sign + columns_with_dollar_signs,
    outputCol="features"
)


(trainData, testData) = df_merged.randomSplit([0.7, 0.3], seed=123)

labelIndexer = StringIndexer(inputCol="Parties_Description", outputCol="label")

randomForest = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=10,maxBins=100)

pipeline = Pipeline(stages=indexers + [labelIndexer, assembler, randomForest])



# By Alex

# FOR THE FOLLOWING CODE, YOU MUST RUN IT IN ON A CLUSTER ON GOOGLE COULD. 
# YOU WILL RUN OUT OF MEMORY RUNNING LOCALLY (java.lang.OutOfMemoryError: Java heap space)

model = pipeline.fit(trainData)





# End time
end_time = time.time()

# Calculate elapsed time in seconds
elapsed_time_seconds = end_time - start_time

# Convert elapsed time to minutes
elapsed_time_minutes = elapsed_time_seconds / 60

print("Time taken:", elapsed_time_minutes, "minutes")







from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import time

# Start time
start_time = time.time()

predictions = model.transform(testData)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)



# End time
end_time = time.time()

# Calculate elapsed time in seconds
elapsed_time_seconds = end_time - start_time

# Convert elapsed time to minutes
elapsed_time_minutes = elapsed_time_seconds / 60

print("Time taken:", elapsed_time_minutes, "minutes")





### Setup WASHINGTON


In [None]:
# In Jupyter notebook hosted by Google Cloud

from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .appName("Read Voter File Data") \
.config("spark.executor.memory", "8g") \
    .getOrCreate()


DATA_DIRECTORY = "gs://winter-2024-voter-file/VM2Uniform/VM2Uniform--WA--2020-12-09/"


df = (
spark.read.format("parquet")  # Edit by Alex
    .option("header", "true")
    .option("inferSchema", "true")
    .load(DATA_DIRECTORY)
)
# By Andy
# This will load the uscounties.csv file into df2 variable, which will contain County IDs.




# By Alex

from pyspark.sql.functions import col

selectedColumns = ['County', 'Ethnic_Description', 'Voters_Gender', 'Voters_Age', 'Mailing_Families_HHCount', 'Mailing_HHGender_Description', 'Residence_Families_HHCount', 'CommercialData_EstHomeValue', 'CommercialData_EstimatedHHIncomeAmount', 'CommercialData_EstimatedAreaMedianHHIncome', 'CommercialData_AreaMedianHousingValue', 'CommercialData_AreaMedianEducationYears', 'CommercialData_PropertyType', 'CommercialData_StateIncomeDecile','Parties_Description']

cleaned_df = df.select(selectedColumns)

data = cleaned_df.dropna()



df_merged = data




# Andy's code to merge bin
from pyspark.sql.functions import when, col, lit

parties_to_keep = ['Republican', 'Democratic', 'Non-Partisan']  # List of parties to keep

df_merged = df_merged.withColumn("Parties_Description", when(col("Parties_Description").isin(parties_to_keep), col("Parties_Description")).otherwise(lit("Other")))








# By Alex

import pandas as pd 

from pyspark.sql.functions import regexp_replace, col


# Convert string columns that contain "$" into numeric

df_merged = df_merged.withColumn('CommercialData_EstHomeValue',
                                 regexp_replace(col('CommercialData_EstHomeValue'), '[$,]', '').cast('float'))

df_merged = df_merged.withColumn('CommercialData_EstimatedHHIncomeAmount',
                                 regexp_replace(col('CommercialData_EstimatedHHIncomeAmount'), '[$,]', '').cast('float'))

df_merged = df_merged.withColumn('CommercialData_EstimatedAreaMedianHHIncome',
                                 regexp_replace(col('CommercialData_EstimatedAreaMedianHHIncome'), '[$,]', '').cast('float'))

df_merged = df_merged.withColumn('CommercialData_AreaMedianHousingValue',
                                 regexp_replace(col('CommercialData_AreaMedianHousingValue'), '[$,]', '').cast('float'))


# Convert numeric columns stored as numeric into string

# Columns to convert to numeric
columns_to_convert = ["Voters_Age", "Mailing_Families_HHCount", "Residence_Families_HHCount", "CommercialData_AreaMedianEducationYears"]

# Convert columns to numeric type
for column in columns_to_convert:
    df_merged = df_merged.withColumn(column, col(column).cast('float'))


# PRINT RESULT

test_rows = df_merged.head(10)

column_headers= df_merged.columns

new_df = pd.DataFrame(test_rows, columns=column_headers)

new_df



### Random Forest Output


In [None]:
testData = df_merged

predictions = model.transform(testData)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)


Accuracy: 0.422 for Washington


### Logistic Regression Output

In [None]:
predictions = log_model.transform(testData)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)


Accuracy: 0.436


### Setup MAINE


#### Random Forest


In [None]:
# In Jupyter notebook hosted by Google Cloud

from pyspark.sql import SparkSession


spark = SparkSession.builder \
    .appName("Read Voter File Data") \
.config("spark.executor.memory", "8g") \
    .getOrCreate()


DATA_DIRECTORY = "gs://winter-2024-voter-file/VM2Uniform/VM2Uniform--WA--2020-12-09/"


df = (
spark.read.format("parquet")  # Edit by Alex
    .option("header", "true")
    .option("inferSchema", "true")
    .load(DATA_DIRECTORY)
)
# By Andy
# This will load the uscounties.csv file into df2 variable, which will contain County IDs.




# By Alex

from pyspark.sql.functions import col

selectedColumns = ['County', 'Ethnic_Description', 'Voters_Gender', 'Voters_Age', 'Mailing_Families_HHCount', 'Mailing_HHGender_Description', 'Residence_Families_HHCount', 'CommercialData_EstHomeValue', 'CommercialData_EstimatedHHIncomeAmount', 'CommercialData_EstimatedAreaMedianHHIncome', 'CommercialData_AreaMedianHousingValue', 'CommercialData_AreaMedianEducationYears', 'CommercialData_PropertyType', 'CommercialData_StateIncomeDecile','Parties_Description']

cleaned_df = df.select(selectedColumns)

data = cleaned_df.dropna()



df_merged = data




# Andy's code to merge bin
from pyspark.sql.functions import when, col, lit

parties_to_keep = ['Republican', 'Democratic', 'Non-Partisan']  # List of parties to keep

df_merged = df_merged.withColumn("Parties_Description", when(col("Parties_Description").isin(parties_to_keep), col("Parties_Description")).otherwise(lit("Other")))








# By Alex

import pandas as pd 

from pyspark.sql.functions import regexp_replace, col


# Convert string columns that contain "$" into numeric

df_merged = df_merged.withColumn('CommercialData_EstHomeValue',
                                 regexp_replace(col('CommercialData_EstHomeValue'), '[$,]', '').cast('float'))

df_merged = df_merged.withColumn('CommercialData_EstimatedHHIncomeAmount',
                                 regexp_replace(col('CommercialData_EstimatedHHIncomeAmount'), '[$,]', '').cast('float'))

df_merged = df_merged.withColumn('CommercialData_EstimatedAreaMedianHHIncome',
                                 regexp_replace(col('CommercialData_EstimatedAreaMedianHHIncome'), '[$,]', '').cast('float'))

df_merged = df_merged.withColumn('CommercialData_AreaMedianHousingValue',
                                 regexp_replace(col('CommercialData_AreaMedianHousingValue'), '[$,]', '').cast('float'))


# Convert numeric columns stored as numeric into string

# Columns to convert to numeric
columns_to_convert = ["Voters_Age", "Mailing_Families_HHCount", "Residence_Families_HHCount", "CommercialData_AreaMedianEducationYears"]

# Convert columns to numeric type
for column in columns_to_convert:
    df_merged = df_merged.withColumn(column, col(column).cast('float'))


# PRINT RESULT

test_rows = df_merged.head(10)

column_headers= df_merged.columns

new_df = pd.DataFrame(test_rows, columns=column_headers)

new_df



Accuracy: 0.384

### Logistic

In [None]:
testData = df_merged
predictions = log_model.transform(testData)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)


Accuracy: 0.393

### setup LOUISIANA

Random Forest

Accuracy: 0.361

#### Logistic


In [None]:
testData = df_merged
predictions = log_model.transform(testData)
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Accuracy:", accuracy)
