In [1]:
from pyspark.sql.functions import col, when

In [2]:
data = (spark
        .read
        .option("header", "true")
        .csv("../data/FiveThirtyEight_Midwest_Survey.csv"))
data = data.where(col('Location (Census Region)').isNotNull())

In [3]:
# Encoding nulls
columns_with_null = [
    'Location (Census Region)',
    'Gender', 'Age', 
    'Household Income', 'Education']
for column in columns_with_null:
    data = data.withColumn(column, when(col(column).isNull(), "__null")
                           .otherwise(col(column)))

In [4]:
data.printSchema()

root
 |-- RespondentID: string (nullable = true)
 |-- In your own words, what would you call the part of the country you live in now?: string (nullable = true)
 |-- Personally identification as a Midwesterner?: string (nullable = true)
 |-- Illinois in MW?: string (nullable = true)
 |-- Indiana in MW?: string (nullable = true)
 |-- Iowa in MW?: string (nullable = true)
 |-- Kansas in MW?: string (nullable = true)
 |-- Michigan in MW?: string (nullable = true)
 |-- Minnesota in MW?: string (nullable = true)
 |-- Missouri in MW?: string (nullable = true)
 |-- Nebraska in MW?: string (nullable = true)
 |-- North Dakota in MW?: string (nullable = true)
 |-- Ohio in MW?: string (nullable = true)
 |-- South Dakota in MW?: string (nullable = true)
 |-- Wisconsin in MW?: string (nullable = true)
 |-- Arkansas in MW?: string (nullable = true)
 |-- Colorado in MW?: string (nullable = true)
 |-- Kentucky in MW?: string (nullable = true)
 |-- Oklahoma in MW?: string (nullable = true)
 |-- Pennsylv

## Splitting data into train and test 

In [5]:
data_train, data_test = data.randomSplit([0.6, 0.4], seed=5)

## Separating clean, and dirty columns as well a a column we will try to predict

In [6]:
target_column = 'Location (Census Region)'
dirty_column = 'In your own words, what would you call the part of the country you live in now?'
clean_columns = [
    'Personally identification as a Midwesterner?',
    'Illinois in MW?',
    'Indiana in MW?',
    'Kansas in MW?',
    'Iowa in MW?',
    'Michigan in MW?',
    'Minnesota in MW?',
    'Missouri in MW?',
    'Nebraska in MW?',
    'North Dakota in MW?',
    'Ohio in MW?',
    'South Dakota in MW?',
    'Wisconsin in MW?',
    'Arkansas in MW?',
    'Colorado in MW?',
    'Kentucky in MW?',
    'Oklahoma in MW?',
    'Pennsylvania in MW?',
    'West Virginia in MW?',
    'Montana in MW?',
    'Wyoming in MW?',
    'Gender',
    'Age',
    'Household Income',
    'Education'
]

In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.classification import RandomForestClassifier

In [8]:
from dirty_cat_spark.feature.encoder import SimilarityEncoder


encoder_similarity = (SimilarityEncoder()
                      .setInputCol(dirty_column)
                      .setOutputCol("encoded")
                      .setSimilarityType("nGram")
                      .setVocabSize(200))

string_indexer_dirty = (StringIndexer()
                  .setInputCol(dirty_column)
                  .setOutputCol(dirty_column + "_indexed")
                  .setHandleInvalid("keep")) 

encoder_hot = (OneHotEncoder()
           .setInputCol(dirty_column + "_indexed")
           .setOutputCol("encoded"))

In [9]:
string_indexer_clean = [(StringIndexer()
                         .setInputCol(clean_column)
                         .setOutputCol(clean_column + "_indexed")
                         .setHandleInvalid("keep")) 
                        for clean_column in clean_columns]

assembler = (VectorAssembler()
             .setInputCols([c + "_indexed" for c in clean_columns] + ["encoded"])
             .setOutputCol("features"))

vector_indexer = (VectorIndexer()
                  .setInputCol("features")
                  .setOutputCol("featuresIndexed")
                  .setMaxCategories(10)
                  .setHandleInvalid("skip"))

scaler = (StandardScaler()
          .setInputCol("featuresIndexed")
          .setOutputCol("scaledFeatures")
          .setWithMean(False))


indexed_label = StringIndexer(inputCol=target_column, 
                             outputCol="indexedLabel")

classifier = (RandomForestClassifier()
              .setFeaturesCol("scaledFeatures")
              .setLabelCol("indexedLabel")
              .setNumTrees(10)
              .setSeed(5))

In [10]:
pipeline_similarity = Pipeline(stages=string_indexer_clean + 
                               [encoder_similarity, assembler, 
                                vector_indexer, 
                                scaler, indexed_label, classifier])

In [11]:
pipeline_similarity_hot = Pipeline(stages=string_indexer_clean + 
                               [string_indexer_dirty, encoder_hot, 
                                assembler, vector_indexer, 
                                scaler, indexed_label, classifier])

In [None]:
pipeline_similarity_hot_model = pipeline_similarity_hot.fit(data_train)

In [None]:
pipeline_similarity_model = pipeline_similarity.fit(data_train)

In [None]:
res = pipeline_similarity_model.transform(data_test)
res_hot = pipeline_similarity_hot_model.transform(data_test)

In [None]:
res_df = res.select("probability", "indexedLabel", "prediction").toPandas()
res_hot_df = res_hot.select("probability", "indexedLabel", "prediction").toPandas()

In [None]:
%matplotlib inline
import numpy as np
from matplotlib import pyplot as plt

In [None]:
y_true = np.hstack(res_df.indexedLabel.values)
y_pred = res_df.prediction.values

accuracy = np.sum(y_pred == y_true) / y_true.shape[0]

accuracy

In [None]:
y_true = np.hstack(res_hot_df.indexedLabel.values)
y_hot_pred = res_hot_df.prediction.values

accuracy_hot = np.sum(y_hot_pred == y_true) / y_true.shape[0]

accuracy_hot