In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import split, explode
from pyspark.sql.types import IntegerType

import numpy as np
import pandas as pd

import seaborn as sns
import matplotlib.pyplot as plt

spark = SparkSession.builder.appName("More models").getOrCreate()

In [2]:
# Import required libraries
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator

In [3]:
# load AZ data
bucket_dir="gs://pstat135-voter-file/VM2Uniform/"
df = spark.read.parquet(bucket_dir + 'VM2Uniform--AZ--2021-05-20')

# Read MA data
dfma = spark.read.parquet(bucket_dir + 'VM2Uniform--MA--2021-01-19')

# load ND data
dfnd = spark.read.parquet(bucket_dir + 'VM2Uniform--ND--2021-03-18')

                                                                                

In [24]:
# convert string columns to numeric 
df = df.withColumn(
    "ElectionReturns_G18CountyTurnoutAllRegisteredVoters", 
    regexp_replace(
        "ElectionReturns_G18CountyTurnoutAllRegisteredVoters", "%", ""
    ).cast(IntegerType())
)
df = df.withColumn(
    "ElectionReturns_G14CountyTurnoutAllRegisteredVoters", 
    regexp_replace(
        "ElectionReturns_G14CountyTurnoutAllRegisteredVoters", "%", ""
    ).cast(IntegerType())
)
df = df.withColumn(
    "ElectionReturns_G08CountyTurnoutAllRegisteredVoters", 
    regexp_replace(
        "ElectionReturns_G08CountyTurnoutAllRegisteredVoters", "%", ""
    ).cast(IntegerType())
)

df = df.withColumn(
    "CommercialData_EstimatedHHIncomeAmount", 
    expr(
        "substring(CommercialData_EstimatedHHIncomeAmount, 2, length(CommercialData_EstimatedHHIncomeAmount))"
    ).cast(IntegerType())
)
df = df.withColumn(
    "CommercialData_EstHomeValue", 
    expr(
        "substring(CommercialData_EstHomeValue, 2, length(CommercialData_EstHomeValue))"
    ).cast(IntegerType())
)
df = df.withColumn("Voters_Age", df["Voters_Age"].cast(IntegerType()))

In [25]:
# select some variables we are interested in from the data
spark.conf.set("spark.sql.debug.maxToStringFields", 1000) 
df_need = df.select(
    "County", "Voters_Age", "Voters_Gender",
    "CommercialData_Education", "EthnicGroups_EthnicGroup1Desc",
    "CommercialData_EstimatedHHIncomeAmount", 
    "CommercialData_EstHomeValue",
    "ElectionReturns_G18CountyTurnoutAllRegisteredVoters",
    "ElectionReturns_G14CountyTurnoutAllRegisteredVoters",
    "ElectionReturns_G08CountyTurnoutAllRegisteredVoters"
)
# Drop missing values 
df_need = df_need.na.drop()

df_need.show(10)

+------+----------+-------------+------------------------+-----------------------------+--------------------------------------+---------------------------+---------------------------------------------------+---------------------------------------------------+---------------------------------------------------+
|County|Voters_Age|Voters_Gender|CommercialData_Education|EthnicGroups_EthnicGroup1Desc|CommercialData_EstimatedHHIncomeAmount|CommercialData_EstHomeValue|ElectionReturns_G18CountyTurnoutAllRegisteredVoters|ElectionReturns_G14CountyTurnoutAllRegisteredVoters|ElectionReturns_G08CountyTurnoutAllRegisteredVoters|
+------+----------+-------------+------------------------+-----------------------------+--------------------------------------+---------------------------+---------------------------------------------------+---------------------------------------------------+---------------------------------------------------+
|NAVAJO|        63|            F|     HS Diploma - Likely|      

In [6]:
from pyspark.ml.feature import Bucketizer
from pyspark.sql.functions import count


# Define the age groups or bins
splits = [17, 25, 35, 45, 55, 65, float("inf")]

# Create the Bucketizer object and set the input and output columns
bucketizer = Bucketizer(splits=splits, inputCol="Voters_Age", outputCol="AgeGroup")

# Apply the Bucketizer transformation to the dataframe
age_bucketed_df = bucketizer.transform(df_need)

# Group by county and age group and count the number of records in each group
counties_age_count_df = age_bucketed_df.groupBy("County", "AgeGroup").agg(count("*").alias("Count"))

# Pivot the data to have age groups as columns
age_pivot_df = counties_age_count_df.groupBy("County").pivot("AgeGroup").sum("Count")

# Rename the columns to reflect the age groups
age_groups = [
    "age18-25", "age26-35", "age36-45", "age46-55", "age56-65", "age66+"
]
for i in range(len(age_groups)):
    age_pivot_df = age_pivot_df.withColumnRenamed(
        str(float(i)), age_groups[i]
    )

age_pivot_df.show()

                                                                                

+----------+--------+--------+--------+--------+--------+------+
|    County|age18-25|age26-35|age36-45|age46-55|age56-65|age66+|
+----------+--------+--------+--------+--------+--------+------+
|      YUMA|    2452|    5981|    6821|    7060|    8335| 14603|
|  COCONINO|    1984|    4043|    5041|    5558|    6877| 10859|
|   COCHISE|    1354|    3475|    4694|    5225|    8112| 17116|
|  GREENLEE|      89|     241|     296|     352|     388|   600|
|    MOHAVE|    1582|    4365|    5481|    7154|   13474| 33306|
|SANTA CRUZ|     820|    1698|    1701|    2063|    2520|  4424|
|   YAVAPAI|    2028|    4969|    6521|    8693|   16249| 44347|
|      GILA|     377|     925|    1285|    1753|    3374|  8918|
|    LA PAZ|     119|     245|     300|     364|     599|  1452|
|    APACHE|     143|     478|     858|    1091|    1820|  3095|
|      PIMA|   14266|   35469|   40343|   45635|   62966|126401|
|    GRAHAM|     454|    1119|    1448|    1498|    1662|  3055|
|     PINAL|    4837|   1

In [7]:
# bucketize household income
splits = [-float("inf"), 50000, 100000, 150000, 200000, float("inf")]
bucketizer = Bucketizer(
    splits=splits, inputCol="CommercialData_EstimatedHHIncomeAmount", 
    outputCol="HHIncomeGroup"
)
income_bucketed_df = bucketizer.transform(df_need)
counties_income_count_df = income_bucketed_df.groupBy("County", "HHIncomeGroup") \
    .agg(count("*").alias("Count"))
income_pivot_df = counties_income_count_df.groupBy("County").pivot("HHIncomeGroup").sum("Count")

# rename columns
income_groups = [
    "inc0-50k", "inc50k-100k", "inc100k-150k", "inc150k-200k", "inc200k+"
]
for i in range(len(income_groups)):
    income_pivot_df = income_pivot_df.withColumnRenamed(
        str(float(i)), income_groups[i]
    )

income_pivot_df.show()



+----------+--------+-----------+------------+------------+--------+
|    County|inc0-50k|inc50k-100k|inc100k-150k|inc150k-200k|inc200k+|
+----------+--------+-----------+------------+------------+--------+
|      YUMA|   14834|      21199|        5981|        1681|    1557|
|  COCONINO|    7044|      14341|        7324|        2688|    2965|
|   COCHISE|   12552|      17862|        6545|        1608|    1409|
|  GREENLEE|     465|       1110|         279|          45|      67|
|    MOHAVE|   24260|      29423|        7197|        2238|    2244|
|SANTA CRUZ|    5730|       5236|        1356|         461|     443|
|   YAVAPAI|   22085|      38550|       12441|        4567|    5164|
|      GILA|    6610|       6863|        1850|         646|     663|
|    LA PAZ|    1488|       1199|         249|          61|      82|
|    APACHE|    3376|       3058|         720|         173|     158|
|      PIMA|   88456|     135292|       59159|       20441|   21732|
|    GRAHAM|    2750|       4614| 

                                                                                

In [8]:
# bucketize home value
splits = [-float("inf"), 50000, 100000, 150000, 200000, 250000, float("inf")]
bucketizer = Bucketizer(
    splits=splits, inputCol="CommercialData_EstHomeValue", 
    outputCol="HomeValueGroup"
)
homevalue_bucketed_df = bucketizer.transform(df_need)
counties_homevalue_count_df = homevalue_bucketed_df.groupBy("County", "HomeValueGroup") \
    .agg(count("*").alias("Count"))
homevalue_pivot_df = counties_homevalue_count_df.groupBy("County").pivot("HomeValueGroup").sum("Count")

# rename columns
homevalue_groups = [
    "hvalue0-50k", "hvalue50k-100k", "hvalue100k-150k", "hvalue150k-200k", 
    "hvalue200k-250k", "hvalue250k+"
]
for i in range(len(homevalue_groups)):
    homevalue_pivot_df = homevalue_pivot_df.withColumnRenamed(
        str(float(i)), homevalue_groups[i]
    )

homevalue_pivot_df.show()



+----------+-----------+--------------+---------------+---------------+---------------+-----------+
|    County|hvalue0-50k|hvalue50k-100k|hvalue100k-150k|hvalue150k-200k|hvalue200k-250k|hvalue250k+|
+----------+-----------+--------------+---------------+---------------+---------------+-----------+
|      YUMA|        688|          3120|          11375|          11108|           7609|      11352|
|  COCONINO|        319|          1599|           2109|           1685|           2246|      26404|
|   COCHISE|        814|          4673|           8675|          10170|           5863|       9781|
|  GREENLEE|        158|          1050|            553|             92|             47|         66|
|    MOHAVE|       1157|          5972|           7751|          11271|           8411|      30800|
|SANTA CRUZ|        142|           540|           2819|           4868|           2075|       2782|
|   YAVAPAI|        483|          1471|           4072|           7153|           7491|      62137|


                                                                                

In [9]:
# pivot education
edu_count_df = df_need.select("County", "CommercialData_Education") \
    .groupBy("County", "CommercialData_Education").agg(count("*").alias("Count"))
edu_pivot_df = edu_count_df.groupBy("County").pivot("CommercialData_Education").sum("Count")

edu_pivot_df.show(5)



+--------+------------------------------+--------------------+------------------------------+--------------------+-----------------------------+-------------------+------------------------------+-----------------------------+---------------------+------------------------------+----------------------------------------------+
|  County|Bach Degree - Extremely Likely|Bach Degree - Likely|Grad Degree - Extremely Likely|Grad Degree - Likely|HS Diploma - Extremely Likely|HS Diploma - Likely|Less than HS Diploma - Ex Like|Less than HS Diploma - Likely|Some College - Likely|Some College -Extremely Likely|Vocational Technical Degree - Extremely Likely|
+--------+------------------------------+--------------------+------------------------------+--------------------+-----------------------------+-------------------+------------------------------+-----------------------------+---------------------+------------------------------+----------------------------------------------+
|    YUMA|            

                                                                                

In [10]:
# combine columns in edu_pivot_df
edu_pivot_df = edu_pivot_df.withColumn(
    "Bach", 
    edu_pivot_df['Bach Degree - Extremely Likely'] + edu_pivot_df['Bach Degree - Likely']
)
edu_pivot_df = edu_pivot_df.withColumn(
    "Grad", 
    edu_pivot_df['Grad Degree - Extremely Likely'] + edu_pivot_df['Grad Degree - Likely']
)
edu_pivot_df = edu_pivot_df.withColumn(
    "HighSchool", 
    edu_pivot_df['HS Diploma - Extremely Likely'] + edu_pivot_df['HS Diploma - Likely']
)
edu_pivot_df = edu_pivot_df.withColumn(
    "LessHS", 
    edu_pivot_df['Less than HS Diploma - Ex Like'] + edu_pivot_df['Less than HS Diploma - Likely']
)
edu_pivot_df = edu_pivot_df.withColumn(
    "College", 
    edu_pivot_df['Some College - Likely'] + edu_pivot_df['Some College -Extremely Likely']
)
edu_pivot_df = edu_pivot_df.withColumn(
    "VocTech", 
    edu_pivot_df['Vocational Technical Degree - Extremely Likely']
)

edu_pivot_df = edu_pivot_df.select(
    "County",
    "Bach", "Grad", "HighSchool", "LessHS", 
    "College", "VocTech"
)
edu_pivot_df.show()



+----------+------+------+----------+------+-------+-------+
|    County|  Bach|  Grad|HighSchool|LessHS|College|VocTech|
+----------+------+------+----------+------+-------+-------+
|      YUMA|  9814|  3837|     12072|  6499|  12960|     70|
|  COCONINO|  9775|  6930|      7115|  1397|   9101|     44|
|   COCHISE|  9369|  5071|     10051|  2634|  12786|     65|
|  GREENLEE|   335|   109|       762|  null|    635|      7|
|    MOHAVE| 11587|  5109|     23065|  5043|  20404|    154|
|SANTA CRUZ|  3269|  1274|      4070|  null|   2861|     18|
|   YAVAPAI| 19338| 12185|     21784|  3941|  25450|    109|
|      GILA|  3442|  1856|      5335|  1183|   4786|     30|
|    LA PAZ|   498|   231|      1201|  null|    795|      9|
|    APACHE|  1546|   496|      2967|  null|   1821|     49|
|      PIMA| 85320| 58965|     74454| 16921|  88919|    501|
|    GRAHAM|  1969|   880|      2699|   621|   3053|     14|
|     PINAL| 25005| 11422|     33948|  7857|  38348|    176|
|    NAVAJO|  4476|  226

                                                                                

In [11]:
# pivot ethnic 
eth_count_df = df_need.select("County", "EthnicGroups_EthnicGroup1Desc") \
    .groupBy("County", "EthnicGroups_EthnicGroup1Desc").agg(count("*").alias("Count"))
eth_pivot_df = eth_count_df.groupBy("County").pivot("EthnicGroups_EthnicGroup1Desc").sum("Count")

eth_pivot_df.show()



+----------+--------------------+--------+-----------------------+-----------------------+-----+
|    County|East and South Asian|European|Hispanic and Portuguese|Likely African-American|Other|
+----------+--------------------+--------+-----------------------+-----------------------+-----+
|  COCONINO|                 567|   28490|                   3647|                    114| 1544|
|   COCHISE|                 592|   29403|                   9215|                    371|  395|
|  GREENLEE|                   7|    1196|                    737|                      3|   23|
|    MOHAVE|                 596|   57230|                   6563|                     81|  892|
|SANTA CRUZ|                  90|    3372|                   9648|                      3|  113|
|   YAVAPAI|                 844|   74273|                   6388|                     73| 1229|
|      GILA|                 129|   13978|                   2227|                     14|  284|
|      YUMA|                 5

                                                                                

In [12]:
# pivot gender
gender_count_df = df_need.select("County", "Voters_Gender") \
    .groupBy("County", "Voters_Gender").agg(count("*").alias("Count"))
gender_pivot_df = gender_count_df.groupBy("County").pivot("Voters_Gender").sum("Count")

gender_pivot_df.show()



+----------+------+------+
|    County|     F|     M|
+----------+------+------+
|      YUMA| 23339| 21913|
|  COCONINO| 17841| 16521|
|  GREENLEE|   985|   981|
|    MOHAVE| 33505| 31857|
|SANTA CRUZ|  6774|  6452|
|   COCHISE| 20710| 19266|
|   YAVAPAI| 43911| 38896|
|    LA PAZ|  1644|  1435|
|    APACHE|  4419|  3066|
|      PIMA|172745|152335|
|    GRAHAM|  4788|  4448|
|    NAVAJO| 11284|  9652|
|      GILA|  8654|  7978|
|     PINAL| 60548| 56208|
|  MARICOPA|647374|584677|
+----------+------+------+



                                                                                

In [13]:
# turnout by county
turnout_df = df_need.select("County", "ElectionReturns_G18CountyTurnoutAllRegisteredVoters")
turnout_df = turnout_df.groupBy("County").agg(
    mean("ElectionReturns_G18CountyTurnoutAllRegisteredVoters").alias(
        "G18Turnout"
    )
)
turnout_df.show()



+----------+----------+
|    County|G18Turnout|
+----------+----------+
|  COCONINO|      61.0|
|   COCHISE|      60.0|
|  GREENLEE|      56.0|
|    MOHAVE|      53.0|
|SANTA CRUZ|      47.0|
|   YAVAPAI|      73.0|
|      GILA|      66.0|
|      YUMA|      43.0|
|    LA PAZ|      47.0|
|      PIMA|      67.0|
|    GRAHAM|      57.0|
|    NAVAJO|      53.0|
|     PINAL|      59.0|
|    APACHE|      48.0|
|  MARICOPA|      62.0|
+----------+----------+



                                                                                

In [14]:
import copy


def count2ratio(pivot_df):
    """
    convert count to ratio
    """
    pivot_pddf = pivot_df.toPandas()
    pivot_arr = pivot_pddf.iloc[:, 1:].to_numpy()
    
    new_pivot_arr = copy.deepcopy(pivot_arr).astype('float')
    
    # calculate values in np.arr
    for rowi in range(pivot_arr.shape[0]):
        for colj in range(pivot_arr.shape[1]):
            new_pivot_arr[rowi, colj] = pivot_arr[rowi, colj] / np.nansum(pivot_arr[rowi, :])

    # update values in pd.df
    for rowi in range(pivot_pddf.shape[0]):
        pivot_pddf.iloc[rowi, 1:] = new_pivot_arr[rowi, :]

    new_pivot_df = spark.createDataFrame(pivot_pddf)
    
    return new_pivot_df

In [15]:
# convert count to ratio: age
age_ratio_pivot_df = count2ratio(age_pivot_df)
gender_ratio_pivot_df = count2ratio(gender_pivot_df)
edu_ratio_pivot_df = count2ratio(edu_pivot_df)
income_ratio_pivot_df = count2ratio(income_pivot_df)
homevalue_ratio_pivot_df = count2ratio(homevalue_pivot_df)
eth_ratio_pivot_df = count2ratio(eth_pivot_df)

                                                                                

In [16]:
# rename county columns
age_ratio_pivot_df = age_ratio_pivot_df.withColumn(
    "ageCounty", age_ratio_pivot_df["County"]
)
age_ratio_pivot_df = age_ratio_pivot_df.drop("County")
###
edu_ratio_pivot_df = edu_ratio_pivot_df.withColumn(
    "eduCounty", edu_ratio_pivot_df["County"]
)
edu_ratio_pivot_df = edu_ratio_pivot_df.drop("County")
###
income_ratio_pivot_df = income_ratio_pivot_df.withColumn(
    "incCounty", income_ratio_pivot_df["County"]
)
income_ratio_pivot_df = income_ratio_pivot_df.drop("County")
###
homevalue_ratio_pivot_df = homevalue_ratio_pivot_df.withColumn(
    "hvalCounty", homevalue_ratio_pivot_df["County"]
)
homevalue_ratio_pivot_df = homevalue_ratio_pivot_df.drop("County")
###
eth_ratio_pivot_df = eth_ratio_pivot_df.withColumn(
    "ethCounty", eth_ratio_pivot_df["County"]
)
eth_ratio_pivot_df = eth_ratio_pivot_df.drop("County")
###
turnout_df = turnout_df.withColumn(
    "toutCounty", turnout_df["County"]
)
turnout_df = turnout_df.drop("County")

# All factors

In [40]:
# join pivot tables
joined_pivot_df = gender_ratio_pivot_df.join(
    age_ratio_pivot_df, gender_ratio_pivot_df.County==age_ratio_pivot_df.ageCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    edu_ratio_pivot_df, joined_pivot_df.County==edu_ratio_pivot_df.eduCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    income_ratio_pivot_df, joined_pivot_df.County==income_ratio_pivot_df.incCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    homevalue_ratio_pivot_df, joined_pivot_df.County==homevalue_ratio_pivot_df.hvalCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    eth_ratio_pivot_df, joined_pivot_df.County==eth_ratio_pivot_df.ethCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    turnout_df, joined_pivot_df.County==turnout_df.toutCounty,
    "inner"
)

# drop additional county columns
joined_pivot_df = joined_pivot_df.drop(
    "ageCounty", "eduCounty", "incCounty", "hvalCounty", "ethCounty","toutCounty"
)

joined_pivot_df.printSchema()

root
 |-- County: string (nullable = true)
 |-- F: double (nullable = true)
 |-- M: double (nullable = true)
 |-- age18-25: double (nullable = true)
 |-- age26-35: double (nullable = true)
 |-- age36-45: double (nullable = true)
 |-- age46-55: double (nullable = true)
 |-- age56-65: double (nullable = true)
 |-- age66+: double (nullable = true)
 |-- Bach: double (nullable = true)
 |-- Grad: double (nullable = true)
 |-- HighSchool: double (nullable = true)
 |-- LessHS: double (nullable = true)
 |-- College: double (nullable = true)
 |-- VocTech: double (nullable = true)
 |-- inc0-50k: double (nullable = true)
 |-- inc50k-100k: double (nullable = true)
 |-- inc100k-150k: double (nullable = true)
 |-- inc150k-200k: double (nullable = true)
 |-- inc200k+: double (nullable = true)
 |-- hvalue0-50k: double (nullable = true)
 |-- hvalue50k-100k: double (nullable = true)
 |-- hvalue100k-150k: double (nullable = true)
 |-- hvalue150k-200k: double (nullable = true)
 |-- hvalue200k-250k: double 

In [41]:
joined_pivot_df = joined_pivot_df.na.fill(value=0)
# Create vector assembler to combine all features
assembler = VectorAssembler(
    inputCols=joined_pivot_df.schema.names[1:-1],
    outputCol="features"
)
# Create random forest regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="G18Turnout")

# Create a pipeline for all transformations and the model
pipeline = Pipeline(stages=[assembler, rf])

# Split the data into training and testing sets
(train_data, test_data) = joined_pivot_df.randomSplit([0.7, 0.3], seed=100)

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the testing data
predictions = model.transform(test_data)

# Evaluate the performance of the model on the test data

evaluator = RegressionEvaluator(
    labelCol="G18Turnout", predictionCol="prediction", metricName="rmse"
)
rmse = evaluator.evaluate(predictions)

print(f"RMSE on the test data: {rmse}")

23/03/21 02:33:40 WARN org.apache.spark.ml.tree.impl.DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 12 (= number of training instances)

RMSE on the test data: 3.0138568866708533


                                                                                

# Gender + Age

In [45]:
# join pivot tables
joined_pivot_df = gender_ratio_pivot_df.join(
    age_ratio_pivot_df, gender_ratio_pivot_df.County==age_ratio_pivot_df.ageCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    turnout_df, joined_pivot_df.County==turnout_df.toutCounty,
    "inner"
)

# drop additional county columns
joined_pivot_df = joined_pivot_df.drop(
    "ageCounty", "toutCounty"
)

joined_pivot_df.printSchema()

root
 |-- County: string (nullable = true)
 |-- F: double (nullable = true)
 |-- M: double (nullable = true)
 |-- age18-25: double (nullable = true)
 |-- age26-35: double (nullable = true)
 |-- age36-45: double (nullable = true)
 |-- age46-55: double (nullable = true)
 |-- age56-65: double (nullable = true)
 |-- age66+: double (nullable = true)
 |-- G18Turnout: double (nullable = true)



In [46]:
joined_pivot_df = joined_pivot_df.na.fill(value=0)
# Create vector assembler to combine all features
assembler = VectorAssembler(
    inputCols=joined_pivot_df.schema.names[1:-1],
    outputCol="features"
)
# Create random forest regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="G18Turnout")

# Create a pipeline for all transformations and the model
pipeline = Pipeline(stages=[assembler, rf])

# Split the data into training and testing sets
(train_data, test_data) = joined_pivot_df.randomSplit([0.7, 0.3], seed=100)

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the testing data
predictions = model.transform(test_data)

# Evaluate the performance of the model on the test data

evaluator = RegressionEvaluator(
    labelCol="G18Turnout", predictionCol="prediction", metricName="rmse"
)
rmse = evaluator.evaluate(predictions)

print(f"RMSE on the test data: {rmse}")

23/03/21 02:37:08 WARN org.apache.spark.ml.tree.impl.DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 12 (= number of training instances)

RMSE on the test data: 8.794529549668932


                                                                                

# Gender + Age + Education

In [19]:
# join pivot tables
joined_pivot_df = gender_ratio_pivot_df.join(
    age_ratio_pivot_df, gender_ratio_pivot_df.County==age_ratio_pivot_df.ageCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    edu_ratio_pivot_df, joined_pivot_df.County==edu_ratio_pivot_df.eduCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    turnout_df, joined_pivot_df.County==turnout_df.toutCounty,
    "inner"
)

# drop additional county columns
joined_pivot_df = joined_pivot_df.drop(
    "ageCounty", "eduCounty", "toutCounty"
)

joined_pivot_df.printSchema()

root
 |-- County: string (nullable = true)
 |-- F: double (nullable = true)
 |-- M: double (nullable = true)
 |-- age18-25: double (nullable = true)
 |-- age26-35: double (nullable = true)
 |-- age36-45: double (nullable = true)
 |-- age46-55: double (nullable = true)
 |-- age56-65: double (nullable = true)
 |-- age66+: double (nullable = true)
 |-- Bach: double (nullable = true)
 |-- Grad: double (nullable = true)
 |-- HighSchool: double (nullable = true)
 |-- LessHS: double (nullable = true)
 |-- College: double (nullable = true)
 |-- VocTech: double (nullable = true)
 |-- G18Turnout: double (nullable = true)



In [20]:
joined_pivot_df = joined_pivot_df.na.fill(value=0)
# Create vector assembler to combine all features
assembler = VectorAssembler(
    inputCols=joined_pivot_df.schema.names[1:-1],
    outputCol="features"
)
# Create random forest regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="G18Turnout")

# Create a pipeline for all transformations and the model
pipeline = Pipeline(stages=[assembler, rf])

# Split the data into training and testing sets
(train_data, test_data) = joined_pivot_df.randomSplit([0.7, 0.3], seed=100)

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the testing data
predictions = model.transform(test_data)

# Evaluate the performance of the model on the test data

evaluator = RegressionEvaluator(
    labelCol="G18Turnout", predictionCol="prediction", metricName="rmse"
)
rmse = evaluator.evaluate(predictions)

print(f"RMSE on the test data: {rmse}")

23/03/21 02:12:15 WARN org.apache.spark.ml.tree.impl.DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 12 (= number of training instances)

RMSE on the test data: 6.18238424881534


                                                                                

# Gender + Age + Income

In [24]:
# join pivot tables
joined_pivot_df = gender_ratio_pivot_df.join(
    age_ratio_pivot_df, gender_ratio_pivot_df.County==age_ratio_pivot_df.ageCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    income_ratio_pivot_df, joined_pivot_df.County==income_ratio_pivot_df.incCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    turnout_df, joined_pivot_df.County==turnout_df.toutCounty,
    "inner"
)

# drop additional county columns
joined_pivot_df = joined_pivot_df.drop(
    "ageCounty", "incCounty", "toutCounty"
)

joined_pivot_df.printSchema()

root
 |-- County: string (nullable = true)
 |-- F: double (nullable = true)
 |-- M: double (nullable = true)
 |-- age18-25: double (nullable = true)
 |-- age26-35: double (nullable = true)
 |-- age36-45: double (nullable = true)
 |-- age46-55: double (nullable = true)
 |-- age56-65: double (nullable = true)
 |-- age66+: double (nullable = true)
 |-- inc0-50k: double (nullable = true)
 |-- inc50k-100k: double (nullable = true)
 |-- inc100k-150k: double (nullable = true)
 |-- inc150k-200k: double (nullable = true)
 |-- inc200k+: double (nullable = true)
 |-- G18Turnout: double (nullable = true)



In [25]:
joined_pivot_df = joined_pivot_df.na.fill(value=0)
# Create vector assembler to combine all features
assembler = VectorAssembler(
    inputCols=joined_pivot_df.schema.names[1:-1],
    outputCol="features"
)
# Create random forest regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="G18Turnout")

# Create a pipeline for all transformations and the model
pipeline = Pipeline(stages=[assembler, rf])

# Split the data into training and testing sets
(train_data, test_data) = joined_pivot_df.randomSplit([0.7, 0.3], seed=100)

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the testing data
predictions = model.transform(test_data)

# Evaluate the performance of the model on the test data

evaluator = RegressionEvaluator(
    labelCol="G18Turnout", predictionCol="prediction", metricName="rmse"
)
rmse = evaluator.evaluate(predictions)

print(f"RMSE on the test data: {rmse}")

23/03/21 02:15:16 WARN org.apache.spark.ml.tree.impl.DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 12 (= number of training instances)

RMSE on the test data: 3.337507802737445


                                                                                

# Gender + Age + Housevalue

In [26]:
# join pivot tables
joined_pivot_df = gender_ratio_pivot_df.join(
    age_ratio_pivot_df, gender_ratio_pivot_df.County==age_ratio_pivot_df.ageCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    homevalue_ratio_pivot_df, joined_pivot_df.County==homevalue_ratio_pivot_df.hvalCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    turnout_df, joined_pivot_df.County==turnout_df.toutCounty,
    "inner"
)

# drop additional county columns
joined_pivot_df = joined_pivot_df.drop(
    "ageCounty", "hvalCounty", "toutCounty"
)

joined_pivot_df.printSchema()

root
 |-- County: string (nullable = true)
 |-- F: double (nullable = true)
 |-- M: double (nullable = true)
 |-- age18-25: double (nullable = true)
 |-- age26-35: double (nullable = true)
 |-- age36-45: double (nullable = true)
 |-- age46-55: double (nullable = true)
 |-- age56-65: double (nullable = true)
 |-- age66+: double (nullable = true)
 |-- hvalue0-50k: double (nullable = true)
 |-- hvalue50k-100k: double (nullable = true)
 |-- hvalue100k-150k: double (nullable = true)
 |-- hvalue150k-200k: double (nullable = true)
 |-- hvalue200k-250k: double (nullable = true)
 |-- hvalue250k+: double (nullable = true)
 |-- G18Turnout: double (nullable = true)



In [27]:
joined_pivot_df = joined_pivot_df.na.fill(value=0)
# Create vector assembler to combine all features
assembler = VectorAssembler(
    inputCols=joined_pivot_df.schema.names[1:-1],
    outputCol="features"
)
# Create random forest regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="G18Turnout")

# Create a pipeline for all transformations and the model
pipeline = Pipeline(stages=[assembler, rf])

# Split the data into training and testing sets
(train_data, test_data) = joined_pivot_df.randomSplit([0.7, 0.3], seed=100)

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the testing data
predictions = model.transform(test_data)

# Evaluate the performance of the model on the test data

evaluator = RegressionEvaluator(
    labelCol="G18Turnout", predictionCol="prediction", metricName="rmse"
)
rmse = evaluator.evaluate(predictions)

print(f"RMSE on the test data: {rmse}")

23/03/21 02:16:52 WARN org.apache.spark.ml.tree.impl.DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 12 (= number of training instances)

RMSE on the test data: 5.851317515454675


                                                                                

# Gender + Age + Ethnic

In [52]:
# join pivot tables
joined_pivot_df = gender_ratio_pivot_df.join(
    age_ratio_pivot_df, gender_ratio_pivot_df.County==age_ratio_pivot_df.ageCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    eth_ratio_pivot_df, joined_pivot_df.County==eth_ratio_pivot_df.ethCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    turnout_df, joined_pivot_df.County==turnout_df.toutCounty,
    "inner"
)

# drop additional county columns
joined_pivot_df = joined_pivot_df.drop(
    "ageCounty", "ethCounty", "toutCounty"
)

joined_pivot_df.printSchema()

root
 |-- County: string (nullable = true)
 |-- F: double (nullable = true)
 |-- M: double (nullable = true)
 |-- age18-25: double (nullable = true)
 |-- age26-35: double (nullable = true)
 |-- age36-45: double (nullable = true)
 |-- age46-55: double (nullable = true)
 |-- age56-65: double (nullable = true)
 |-- age66+: double (nullable = true)
 |-- East and South Asian: double (nullable = true)
 |-- European: double (nullable = true)
 |-- Hispanic and Portuguese: double (nullable = true)
 |-- Likely African-American: double (nullable = true)
 |-- Other: double (nullable = true)
 |-- G18Turnout: double (nullable = true)



In [53]:
joined_pivot_df = joined_pivot_df.na.fill(value=0)
# Create vector assembler to combine all features
assembler = VectorAssembler(
    inputCols=joined_pivot_df.schema.names[1:-1],
    outputCol="features"
)
# Create random forest regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="G18Turnout")

# Create a pipeline for all transformations and the model
pipeline = Pipeline(stages=[assembler, rf])

# Split the data into training and testing sets
(train_data, test_data) = joined_pivot_df.randomSplit([0.7, 0.3], seed=100)

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the testing data
predictions = model.transform(test_data)

# Evaluate the performance of the model on the test data

evaluator = RegressionEvaluator(
    labelCol="G18Turnout", predictionCol="prediction", metricName="rmse"
)
rmse = evaluator.evaluate(predictions)

print(f"RMSE on the test data: {rmse}")

23/03/21 02:45:29 WARN org.apache.spark.ml.tree.impl.DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 12 (= number of training instances)

RMSE on the test data: 8.283278839525645


                                                                                

# Gender + Age + Income + Housevalue

In [19]:
# join pivot tables
joined_pivot_df = gender_ratio_pivot_df.join(
    age_ratio_pivot_df, gender_ratio_pivot_df.County==age_ratio_pivot_df.ageCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    income_ratio_pivot_df, joined_pivot_df.County==income_ratio_pivot_df.incCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    homevalue_ratio_pivot_df, joined_pivot_df.County==homevalue_ratio_pivot_df.hvalCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    turnout_df, joined_pivot_df.County==turnout_df.toutCounty,
    "inner"
)

# drop additional county columns
joined_pivot_df = joined_pivot_df.drop(
    "ageCounty", "eduCounty", "incCounty", "hvalCounty", "toutCounty"
)

joined_pivot_df.printSchema()

root
 |-- County: string (nullable = true)
 |-- F: double (nullable = true)
 |-- M: double (nullable = true)
 |-- age18-25: double (nullable = true)
 |-- age26-35: double (nullable = true)
 |-- age36-45: double (nullable = true)
 |-- age46-55: double (nullable = true)
 |-- age56-65: double (nullable = true)
 |-- age66+: double (nullable = true)
 |-- inc0-50k: double (nullable = true)
 |-- inc50k-100k: double (nullable = true)
 |-- inc100k-150k: double (nullable = true)
 |-- inc150k-200k: double (nullable = true)
 |-- inc200k+: double (nullable = true)
 |-- hvalue0-50k: double (nullable = true)
 |-- hvalue50k-100k: double (nullable = true)
 |-- hvalue100k-150k: double (nullable = true)
 |-- hvalue150k-200k: double (nullable = true)
 |-- hvalue200k-250k: double (nullable = true)
 |-- hvalue250k+: double (nullable = true)
 |-- G18Turnout: double (nullable = true)



In [20]:
joined_pivot_df = joined_pivot_df.na.fill(value=0)
# Create vector assembler to combine all features
assembler = VectorAssembler(
    inputCols=joined_pivot_df.schema.names[1:-1],
    outputCol="features"
)
# Create random forest regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="G18Turnout")

# Create a pipeline for all transformations and the model
pipeline = Pipeline(stages=[assembler, rf])

# Split the data into training and testing sets
(train_data, test_data) = joined_pivot_df.randomSplit([0.7, 0.3], seed=100)

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the testing data
predictions = model.transform(test_data)

# Evaluate the performance of the model on the test data

evaluator = RegressionEvaluator(
    labelCol="G18Turnout", predictionCol="prediction", metricName="rmse"
)
rmse = evaluator.evaluate(predictions)

print(f"RMSE on the test data: {rmse}")

23/03/21 03:54:23 WARN org.apache.spark.ml.tree.impl.DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 12 (= number of training instances)

RMSE on the test data: 3.241527417746146


                                                                                

# Gender + Age + Education + Income + Housevalue

In [17]:
# join pivot tables
joined_pivot_df = gender_ratio_pivot_df.join(
    age_ratio_pivot_df, gender_ratio_pivot_df.County==age_ratio_pivot_df.ageCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    edu_ratio_pivot_df, joined_pivot_df.County==edu_ratio_pivot_df.eduCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    income_ratio_pivot_df, joined_pivot_df.County==income_ratio_pivot_df.incCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    homevalue_ratio_pivot_df, joined_pivot_df.County==homevalue_ratio_pivot_df.hvalCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    turnout_df, joined_pivot_df.County==turnout_df.toutCounty,
    "inner"
)

# drop additional county columns
joined_pivot_df = joined_pivot_df.drop(
    "ageCounty", "eduCounty", "incCounty", "hvalCounty", "toutCounty"
)

joined_pivot_df.printSchema()

root
 |-- County: string (nullable = true)
 |-- F: double (nullable = true)
 |-- M: double (nullable = true)
 |-- age18-25: double (nullable = true)
 |-- age26-35: double (nullable = true)
 |-- age36-45: double (nullable = true)
 |-- age46-55: double (nullable = true)
 |-- age56-65: double (nullable = true)
 |-- age66+: double (nullable = true)
 |-- Bach: double (nullable = true)
 |-- Grad: double (nullable = true)
 |-- HighSchool: double (nullable = true)
 |-- LessHS: double (nullable = true)
 |-- College: double (nullable = true)
 |-- VocTech: double (nullable = true)
 |-- inc0-50k: double (nullable = true)
 |-- inc50k-100k: double (nullable = true)
 |-- inc100k-150k: double (nullable = true)
 |-- inc150k-200k: double (nullable = true)
 |-- inc200k+: double (nullable = true)
 |-- hvalue0-50k: double (nullable = true)
 |-- hvalue50k-100k: double (nullable = true)
 |-- hvalue100k-150k: double (nullable = true)
 |-- hvalue150k-200k: double (nullable = true)
 |-- hvalue200k-250k: double 

In [18]:
joined_pivot_df = joined_pivot_df.na.fill(value=0)
# Create vector assembler to combine all features
assembler = VectorAssembler(
    inputCols=joined_pivot_df.schema.names[1:-1],
    outputCol="features"
)
# Create random forest regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="G18Turnout")

# Create a pipeline for all transformations and the model
pipeline = Pipeline(stages=[assembler, rf])

# Split the data into training and testing sets
(train_data, test_data) = joined_pivot_df.randomSplit([0.7, 0.3], seed=100)

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the testing data
predictions = model.transform(test_data)

# Evaluate the performance of the model on the test data

evaluator = RegressionEvaluator(
    labelCol="G18Turnout", predictionCol="prediction", metricName="rmse"
)
rmse = evaluator.evaluate(predictions)

print(f"RMSE on the test data: {rmse}")

23/03/21 03:35:29 WARN org.apache.spark.ml.tree.impl.DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 12 (= number of training instances)

RMSE on the test data: 4.153913817112722


                                                                                

# All + turnout 2014

In [27]:
# turnout by county
turnout_df = df_need.select("County", "ElectionReturns_G14CountyTurnoutAllRegisteredVoters")
turnout_df = turnout_df.groupBy("County").agg(
    mean("ElectionReturns_G14CountyTurnoutAllRegisteredVoters").alias(
        "G14Turnout"
    )
)
turnout_df.show()



+----------+----------+
|    County|G14Turnout|
+----------+----------+
|  COCONINO|      44.0|
|   COCHISE|      47.0|
|  GREENLEE|      41.0|
|    MOHAVE|      35.0|
|SANTA CRUZ|      37.0|
|   YAVAPAI|      50.0|
|      GILA|      48.0|
|      YUMA|      29.0|
|    LA PAZ|      32.0|
|      PIMA|      47.0|
|    GRAHAM|      36.0|
|    NAVAJO|      42.0|
|     PINAL|      37.0|
|    APACHE|      41.0|
|  MARICOPA|      39.0|
+----------+----------+



                                                                                

In [28]:
turnout_df = turnout_df.withColumn(
    "toutCounty", turnout_df["County"]
)
turnout_df = turnout_df.drop("County")

In [29]:
# join pivot tables
joined_pivot_df = gender_ratio_pivot_df.join(
    age_ratio_pivot_df, gender_ratio_pivot_df.County==age_ratio_pivot_df.ageCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    edu_ratio_pivot_df, joined_pivot_df.County==edu_ratio_pivot_df.eduCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    income_ratio_pivot_df, joined_pivot_df.County==income_ratio_pivot_df.incCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    homevalue_ratio_pivot_df, joined_pivot_df.County==homevalue_ratio_pivot_df.hvalCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    eth_ratio_pivot_df, joined_pivot_df.County==eth_ratio_pivot_df.ethCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    turnout_df, joined_pivot_df.County==turnout_df.toutCounty,
    "inner"
)

# drop additional county columns
joined_pivot_df = joined_pivot_df.drop(
    "ageCounty", "eduCounty", "incCounty", "hvalCounty", "ethCounty","toutCounty"
)

joined_pivot_df.printSchema()

root
 |-- County: string (nullable = true)
 |-- F: double (nullable = true)
 |-- M: double (nullable = true)
 |-- age18-25: double (nullable = true)
 |-- age26-35: double (nullable = true)
 |-- age36-45: double (nullable = true)
 |-- age46-55: double (nullable = true)
 |-- age56-65: double (nullable = true)
 |-- age66+: double (nullable = true)
 |-- Bach: double (nullable = true)
 |-- Grad: double (nullable = true)
 |-- HighSchool: double (nullable = true)
 |-- LessHS: double (nullable = true)
 |-- College: double (nullable = true)
 |-- VocTech: double (nullable = true)
 |-- inc0-50k: double (nullable = true)
 |-- inc50k-100k: double (nullable = true)
 |-- inc100k-150k: double (nullable = true)
 |-- inc150k-200k: double (nullable = true)
 |-- inc200k+: double (nullable = true)
 |-- hvalue0-50k: double (nullable = true)
 |-- hvalue50k-100k: double (nullable = true)
 |-- hvalue100k-150k: double (nullable = true)
 |-- hvalue150k-200k: double (nullable = true)
 |-- hvalue200k-250k: double 

In [32]:
joined_pivot_df = joined_pivot_df.na.fill(value=0)
# Create vector assembler to combine all features
assembler = VectorAssembler(
    inputCols=joined_pivot_df.schema.names[1:-1],
    outputCol="features"
)
# Create random forest regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="G14Turnout")

# Create a pipeline for all transformations and the model
pipeline = Pipeline(stages=[assembler, rf])

# Split the data into training and testing sets
(train_data, test_data) = joined_pivot_df.randomSplit([0.7, 0.3], seed=100)

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the testing data
predictions = model.transform(test_data)

# Evaluate the performance of the model on the test data

evaluator = RegressionEvaluator(
    labelCol="G14Turnout", predictionCol="prediction", metricName="rmse"
)
rmse = evaluator.evaluate(predictions)

print(f"RMSE on the test data: {rmse}")

23/03/21 04:02:34 WARN org.apache.spark.ml.tree.impl.DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 12 (= number of training instances)

RMSE on the test data: 4.43245605355165


                                                                                

# All + turnout 2008

In [33]:
# turnout by county
turnout_df = df_need.select("County", "ElectionReturns_G08CountyTurnoutAllRegisteredVoters")
turnout_df = turnout_df.groupBy("County").agg(
    mean("ElectionReturns_G08CountyTurnoutAllRegisteredVoters").alias(
        "G08Turnout"
    )
)
turnout_df.show()



+----------+----------+
|    County|G08Turnout|
+----------+----------+
|  COCONINO|      62.0|
|   COCHISE|      64.0|
|  GREENLEE|      63.0|
|    MOHAVE|      56.0|
|SANTA CRUZ|      55.0|
|   YAVAPAI|      76.0|
|      GILA|      62.0|
|      YUMA|      55.0|
|    LA PAZ|      58.0|
|      PIMA|      72.0|
|    GRAHAM|      62.0|
|    NAVAJO|      55.0|
|     PINAL|      65.0|
|    APACHE|      53.0|
|  MARICOPA|      69.0|
+----------+----------+



                                                                                

In [34]:
turnout_df = turnout_df.withColumn(
    "toutCounty", turnout_df["County"]
)
turnout_df = turnout_df.drop("County")

In [35]:
# join pivot tables
joined_pivot_df = gender_ratio_pivot_df.join(
    age_ratio_pivot_df, gender_ratio_pivot_df.County==age_ratio_pivot_df.ageCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    edu_ratio_pivot_df, joined_pivot_df.County==edu_ratio_pivot_df.eduCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    income_ratio_pivot_df, joined_pivot_df.County==income_ratio_pivot_df.incCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    homevalue_ratio_pivot_df, joined_pivot_df.County==homevalue_ratio_pivot_df.hvalCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    eth_ratio_pivot_df, joined_pivot_df.County==eth_ratio_pivot_df.ethCounty,
    "inner"
)
joined_pivot_df = joined_pivot_df.join(
    turnout_df, joined_pivot_df.County==turnout_df.toutCounty,
    "inner"
)

# drop additional county columns
joined_pivot_df = joined_pivot_df.drop(
    "ageCounty", "eduCounty", "incCounty", "hvalCounty", "ethCounty","toutCounty"
)

joined_pivot_df.printSchema()

root
 |-- County: string (nullable = true)
 |-- F: double (nullable = true)
 |-- M: double (nullable = true)
 |-- age18-25: double (nullable = true)
 |-- age26-35: double (nullable = true)
 |-- age36-45: double (nullable = true)
 |-- age46-55: double (nullable = true)
 |-- age56-65: double (nullable = true)
 |-- age66+: double (nullable = true)
 |-- Bach: double (nullable = true)
 |-- Grad: double (nullable = true)
 |-- HighSchool: double (nullable = true)
 |-- LessHS: double (nullable = true)
 |-- College: double (nullable = true)
 |-- VocTech: double (nullable = true)
 |-- inc0-50k: double (nullable = true)
 |-- inc50k-100k: double (nullable = true)
 |-- inc100k-150k: double (nullable = true)
 |-- inc150k-200k: double (nullable = true)
 |-- inc200k+: double (nullable = true)
 |-- hvalue0-50k: double (nullable = true)
 |-- hvalue50k-100k: double (nullable = true)
 |-- hvalue100k-150k: double (nullable = true)
 |-- hvalue150k-200k: double (nullable = true)
 |-- hvalue200k-250k: double 

In [36]:
joined_pivot_df = joined_pivot_df.na.fill(value=0)
# Create vector assembler to combine all features
assembler = VectorAssembler(
    inputCols=joined_pivot_df.schema.names[1:-1],
    outputCol="features"
)
# Create random forest regressor
rf = RandomForestRegressor(featuresCol="features", labelCol="G08Turnout")

# Create a pipeline for all transformations and the model
pipeline = Pipeline(stages=[assembler, rf])

# Split the data into training and testing sets
(train_data, test_data) = joined_pivot_df.randomSplit([0.7, 0.3], seed=100)

# Fit the pipeline to the training data
model = pipeline.fit(train_data)

# Make predictions on the testing data
predictions = model.transform(test_data)

# Evaluate the performance of the model on the test data

evaluator = RegressionEvaluator(
    labelCol="G08Turnout", predictionCol="prediction", metricName="rmse"
)
rmse = evaluator.evaluate(predictions)

print(f"RMSE on the test data: {rmse}")

23/03/21 04:04:41 WARN org.apache.spark.ml.tree.impl.DecisionTreeMetadata: DecisionTree reducing maxBins from 32 to 12 (= number of training instances)

RMSE on the test data: 4.831580141803163


                                                                                