# Random Forests: Presidential Contributions

Let's look at a random forests models for the presidential dataset.

We are going to try to predict two variables:

1. Amount of contribution (regression)
2. Candidate of Contribution (classification).

In [1]:
# initialize Spark Session
import os
import sys
top_dir = os.path.abspath(os.path.join(os.getcwd(), "../"))
if top_dir not in sys.path:
    sys.path.append(top_dir)

from init_spark import init_spark
spark = init_spark()
spark

Initializing Spark...
Spark found in :  /home/ubuntu/spark
Spark config:
	 spark.app.name=TestApp
	spark.master=local[*]
	executor.memory=2g
	spark.sql.warehouse.dir=/tmp/tmp4nenodek
	some_property=some_value
Spark UI running on port 4043


In [2]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import isnan, when, count, col, split, trim, countDistinct, abs 
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import IntegerType

import pyspark.sql.functions

In [4]:
dataset = spark.read.csv("/data/presidential_election_contribs/2016/2016-with-names.csv", header=True, inferSchema=True)


In [5]:
dataset.show()

+-----------------+--------------------+------------+------------+---------+---------+-----------+--------------------+--------------------+
|CONTB_RECEIPT_AMT|             CAND_NM|    LASTNAME|   FIRSTNAME|CONTBR_ST|      LAT|        LNG|     CONTBR_EMPLOYER|   CONTBR_OCCUPATION|
+-----------------+--------------------+------------+------------+---------+---------+-----------+--------------------+--------------------+
|              5.0|Clinton, Hillary ...|      RIGNEY|     FARRELL|       CA|33.147294|-117.322181|       SELF-EMPLOYED|          CONTRACTOR|
|            100.0|    Sanders, Bernard|      ARNOLD|         IRA|       CA| 38.34642|-122.694127|                NONE|        NOT EMPLOYED|
|             24.0|Cruz, Rafael Edwa...|    VANDOREN|HELEN E. MS.|       MD|39.002745| -76.931721|             RETIRED|             RETIRED|
|            100.0|Clinton, Hillary ...|    RICHARDS|        MARC|       CA| 34.07041|-118.350411|       SELF-EMPLOYED|                 ART|
|            

In [6]:

feature_columns = ['CAND_NM', 'LASTNAME', 'FIRSTNAME', 'CONTBR_ST', 'LAT', 'LNG', 'CONTBR_EMPLOYER', "CONTBR_OCCUPATION"]
numeric_columns = ['LAT', 'LNG']
categorical_columns = ['CAND_NM', 'LASTNAME', 'FIRSTNAME', 'CONTBR_ST', 'CONTBR_EMPLOYER', "CONTBR_OCCUPATION"]
categorical_index = ['CAND_NM_index', 'FIRSTNAME_index', 'LASTNAME_index', 'CONTBR_ST_index', 'CONTBR_EMPLOYER_index', 
                     "CONTBR_OCCUPATION_index"]
prediction_column = ['CONTB_RECEIPT_AMT']


In [7]:
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index", handleInvalid="keep").fit(dataset) for column in categorical_columns ]
pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(dataset).transform(dataset)

In [8]:
assembler = VectorAssembler(inputCols=numeric_columns + categorical_index, outputCol="features")
fv = assembler.transform(df_r.na.drop())

In [9]:
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(fv)


In [10]:

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = fv.randomSplit([0.7, 0.3])


In [11]:

# Train a RandomForest model.
rf = RandomForestRegressor(featuresCol="indexedFeatures", maxBins=12000)

# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[featureIndexer, rf])


In [12]:
# Train model.  This also runs the indexers.


trainingData = trainingData.withColumn("label",trainingData.CONTB_RECEIPT_AMT)


model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)


In [13]:

# Select example rows to display.
#predictions.select("prediction", "indexedLabel", "features").show(5)

predictions.select('CONTB_RECEIPT_AMT', 'prediction').show(100)


+-----------------+------------------+
|CONTB_RECEIPT_AMT|        prediction|
+-----------------+------------------+
|              0.0| 3568.155913978495|
|              1.0|1223.9425518382345|
|              1.0| 3083.572690733082|
|              1.0|2770.6230265628315|
|              1.0| 1756.259655813835|
|              1.0|  4137.15742913001|
|              1.0| 4172.666666666666|
|              1.0| 2861.940490933067|
|              1.0| 2865.750110087921|
|              1.0|3966.1696375234687|
|              1.0| 3626.650473587871|
|              1.0| 3792.937412587413|
|              1.0|           3760.75|
|              1.0| 2546.110432582149|
|              1.0| 3270.453643114034|
|              1.0|4077.1574291300103|
|              1.0| 3865.280913978495|
|              1.0| 2900.250806159426|
|              1.0|3843.4818086513565|
|              1.0|  674.579257538643|
|              1.0|3426.4554809122224|
|              1.0|3172.9034084504397|
|              1.0|1667.2

In [14]:

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="CONTB_RECEIPT_AMT", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

rfModel = model.stages[1]
print(rfModel)  # summary only


Root Mean Squared Error (RMSE) on test data = 3230.13
RandomForestRegressionModel (uid=RandomForestRegressor_b12320c4496b) with 20 trees


In [15]:
rfModel.featureImportances

SparseVector(8, {0: 0.002, 1: 0.0018, 2: 0.016, 3: 0.206, 4: 0.375, 5: 0.0182, 6: 0.321, 7: 0.0601})

In [16]:
print(numeric_columns + categorical_columns)

['LAT', 'LNG', 'CAND_NM', 'LASTNAME', 'FIRSTNAME', 'CONTBR_ST', 'CONTBR_EMPLOYER', 'CONTBR_OCCUPATION']


We see the following Variables in Order of Importnace
1. CONTBR_ST
2. LASTNAME
3. FIRSTNAME
4. CONTBR_EMPLOYER
5. CONTBR_OCCUPATION

LAT, LONG, and CAND_NM had virtually no impact.

In [17]:

# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="CONTB_RECEIPT_AMT", predictionCol="prediction", metricName="r2")
r2 = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % r2)



Root Mean Squared Error (RMSE) on test data = -66.5649


Negative R squared means our data fit worse than the null hypothesis.