In [1]:
# import context manager: SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType
from pyspark.ml.linalg import Vectors
from pyspark.ml.stat import Correlation
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import SQLContext
# set up the session
spark = SparkSession \
        .builder \
        .appName("project")\
        .config("spark.executor.memory", "100g")\
        .getOrCreate()
        
sqlContext = SQLContext(spark)

In [2]:
spark

In [3]:
import os
os.listdir()
os.getcwd()

'/sfs/qumulo/qhome/smn7ba/ds5110/project'

In [4]:
#import pandas too for visualizations
import pandas as pd
pd.set_option('display.max_rows', 200000)

In [5]:
%%time
#import mlLib libraries for classification
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder,TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
from pyspark.ml.feature import PCA
from pyspark.mllib.evaluation import MulticlassMetrics,BinaryClassificationMetrics
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

CPU times: user 2.44 ms, sys: 2.54 ms, total: 4.98 ms
Wall time: 4.45 ms


### Read Data; Create a binary flag; rename columns; Drop if necessary

In [6]:
%%time
#import whole data from the census
data = spark.read.csv('/project/ds5559/ds5110_project_snoo/acs_15_19_south.csv', inferSchema="true", header="true")

CPU times: user 2.33 ms, sys: 2.6 ms, total: 4.93 ms
Wall time: 25.5 s


In [7]:
%%time
#writing a user defined function to create a Educated or Not Flag - if EDUC>6 then it is 1 and if not 0
#https://towardsdatascience.com/5-ways-to-add-a-new-column-in-a-pyspark-dataframe-4e75c2fd8c08
def EDUCFunc(value):
  if   value > 6: 
      return 1
  else:
      return 0

#create the function to be applied and create a new column EDUC_FLAG
udfsomefunc = F.udf(EDUCFunc, IntegerType())
data = data.withColumn("EDUC_FLAG", udfsomefunc("EDUC"))
#see sample data
data.select('EDUC_FLAG').show(5)

+---------+
|EDUC_FLAG|
+---------+
|        0|
|        1|
|        0|
|        1|
|        0|
+---------+
only showing top 5 rows

CPU times: user 3.56 ms, sys: 1.93 ms, total: 5.49 ms
Wall time: 986 ms


In [8]:
%%time
#check the count for EDUC>6 or verify if flag was populated correctly
data.filter(data.EDUC>6).count()

CPU times: user 2.03 ms, sys: 1.17 ms, total: 3.19 ms
Wall time: 6.44 s


2470127

In [9]:
%%time
#Verify the flag count. Should match number above
data.filter(data.EDUC_FLAG!=0).count()

CPU times: user 3.33 ms, sys: 578 µs, total: 3.91 ms
Wall time: 8.29 s


2470127

### About data
AMERICAN COMMUNITY SURVEY 2015-2019 5-YEAR SAMPLE <br>
5-in-100 national random sample of the population <br>
Contains all households and persons from the 1% ACS samples for 2015, 2016, 2017, 2018, and 2019 identifiable by year. <br>
The data include persons in group quarters. <br>
This is a weighted sample. <br>
The smallest identifiable geographic unit is the PUMA, containing at least 100,000 persons. PUMAs do not cross state boundaries. <br>
Users should read the FAQ on the multi-year data. <br>


WHERE CAN I GET BETTER GEOGRAPHIC IDENTIFIERS? <br>
The lowest unit of geography in the microdata files is still the PUMA. PUMAs contain at least 100,000 people. <br>
Aggregate data (but not microdata) is currently available from the Census Bureau for geographic areas as small as block groups, but only for the entire 2005-2009 period. <br>


PERNUM numbers all persons within each household consecutively in the order in which they appear on the original census or survey form. <br>
When combined with SAMPLE and SERIAL, PERNUM uniquely identifies each person within the IPUMS. <br>

MULTYEAR identifies the actual year of survey in multi-year ACS/PRCS samples. <br>

<br>
For example, the 3-year ACS and PRCS data files each include cases from three single-year files. <br>
For these multi-year samples, the YEAR variable identifies the last year of data (2007 for the 2005-2007 3-year data; 2008 for the 2006-2008 data; and so on). <br>
MULTYEAR gives the single-year sample from which the case was drawn (2005, 2006, or 2007 for the 2005-2007 3-year data; 2006, 2007, or 2008 for the 2006-2008 3-year data; and so on). <br>

https://usa.ipums.org/usa/acs_multyr.shtml


In [10]:
%%time
#renaming dependent variable to label because the classfier is not recognizing other names. Skip thsi if you are trying other classifiers

df = data.withColumn("label",data.EDUC_FLAG) \
      .drop("EDUC_FLAG")

CPU times: user 703 µs, sys: 824 µs, total: 1.53 ms
Wall time: 45.4 ms


In [11]:
#saving col names in case if we can use it later ot iterate or use the list for labels etc.
cols = df.columns
#spark.createDataFrame(cols,StringType()).toPandas()

### EDA

In [12]:
#displaying number of rows and columns in the data
print((df.count(), len(df.columns)))

(5965249, 206)


In [13]:
%%time
#number of years in the data set
df.select('MULTYEAR').distinct().show()

+--------+
|MULTYEAR|
+--------+
|    2018|
|    2015|
|    2019|
|    2016|
|    2017|
+--------+

CPU times: user 1.88 ms, sys: 860 µs, total: 2.74 ms
Wall time: 6.87 s


In [14]:
#sampling data to use more effeciently; seed = 42
#https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.sampleBy.html
#https://towardsdatascience.com/exploratory-data-analysis-eda-with-pyspark-on-databricks-e8d6529626b1
#https://www.kaggle.com/tientd95/advanced-pyspark-for-exploratory-data-analysis
sampled = df.sampleBy("MULTYEAR", fractions={2015: 0.1, 2016: 0.1, 2017:0.1, 2018:0.1, 2019:0.1}, seed=42)
sampled.groupBy("MULTYEAR").count().orderBy("MULTYEAR").show()

+--------+------+
|MULTYEAR| count|
+--------+------+
|    2015|117141|
|    2016|117882|
|    2017|119767|
|    2018|119761|
|    2019|121997|
+--------+------+



In [15]:
#map to create meanign ful table # wip
'''
hhtype_dict = {'0':'N/A',\
            '1': 'Married-couple family household',\
            '2': 'Male householder, no wife present',\
            '3': 'Female householder, no husband present',\
            '4': 'Male householder, living alone',\
            '5': 'Male householder, not living alone',\
            '6': 'Female householder, living alone',\
            '7': 'Female householder, not living alone',\
            '9': 'HHTYPE could not be determined'}
'''

"\nhhtype_dict = {'0':'N/A',            '1': 'Married-couple family household',            '2': 'Male householder, no wife present',            '3': 'Female householder, no husband present',            '4': 'Male householder, living alone',            '5': 'Male householder, not living alone',            '6': 'Female householder, living alone',            '7': 'Female householder, not living alone',            '9': 'HHTYPE could not be determined'}\n"

In [16]:
#sampled.select('HHTYPE').rdd.map(lambda x: hhtype_dict.get(x) ).take(5)

In [17]:
%%time
##aggregating counts by Year and HHtype for sampple dataset - add labels - convert to visualization
#sampled_n = sampled.select('HHTYPE').rdd.map(lambda x: hhtype_dict.get(x) )
#sampled.filter((sampled.HHTYPE!=0) & (sampled.HHTYPE!=9)).groupBy('MULTYEAR','HHTYPE').count()\
    #.orderBy('MULTYEAR','count', ascending=False).show(100,truncate=False)
sampled.filter((sampled.HHTYPE!=0) & (sampled.HHTYPE!=9)).groupBy('MULTYEAR','HHTYPE').count()\
    .orderBy('MULTYEAR','count', ascending=False).show(100,truncate=False)

+--------+------+-----+
|MULTYEAR|HHTYPE|count|
+--------+------+-----+
|2019    |1     |73194|
|2019    |3     |14782|
|2019    |6     |7962 |
|2019    |4     |5680 |
|2019    |2     |4338 |
|2019    |5     |1489 |
|2019    |7     |1249 |
|2018    |1     |71736|
|2018    |3     |14839|
|2018    |6     |7782 |
|2018    |4     |5600 |
|2018    |2     |4378 |
|2018    |5     |1541 |
|2018    |7     |1280 |
|2017    |1     |71065|
|2017    |3     |15179|
|2017    |6     |7732 |
|2017    |4     |5502 |
|2017    |2     |4087 |
|2017    |5     |1627 |
|2017    |7     |1217 |
|2016    |1     |69943|
|2016    |3     |15249|
|2016    |6     |7804 |
|2016    |4     |5353 |
|2016    |2     |4034 |
|2016    |5     |1471 |
|2016    |7     |1308 |
|2015    |1     |69494|
|2015    |3     |15353|
|2015    |6     |7832 |
|2015    |4     |5316 |
|2015    |2     |4008 |
|2015    |5     |1526 |
|2015    |7     |1159 |
+--------+------+-----+

CPU times: user 2.86 ms, sys: 2.97 ms, total: 5.83 ms
Wall time

In [18]:
sampled.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- YEAR: integer (nullable = true)
 |-- MULTYEAR: integer (nullable = true)
 |-- SAMPLE: integer (nullable = true)
 |-- SERIAL: integer (nullable = true)
 |-- CBSERIAL: long (nullable = true)
 |-- HHWT: double (nullable = true)
 |-- HHTYPE: integer (nullable = true)
 |-- CLUSTER: long (nullable = true)
 |-- REGION: integer (nullable = true)
 |-- STATEFIP: integer (nullable = true)
 |-- COUNTYFIP: integer (nullable = true)
 |-- METRO: integer (nullable = true)
 |-- STRATA: integer (nullable = true)
 |-- GQ: integer (nullable = true)
 |-- OWNERSHP: integer (nullable = true)
 |-- OWNERSHPD: integer (nullable = true)
 |-- MORTGAGE: integer (nullable = true)
 |-- TAXINCL: integer (nullable = true)
 |-- INSINCL: integer (nullable = true)
 |-- PROPINSR: integer (nullable = true)
 |-- COSTELEC: integer (nullable = true)
 |-- COSTGAS: integer (nullable = true)
 |-- COSTWATR: integer (nullable = true)
 |-- COSTFUEL: integer (nullable = true)
 |-- FOODST

### Transform Data; Scale; PCA; RF Classification - seed 42

In [19]:
%%time
#pass all the features into vector assembler to create a vector format to pass tto the classification model
assembler = VectorAssembler(inputCols=[cols for cols in cols if cols!='label'], outputCol="features") 
transformed = assembler.transform(sampled)
#register table as sql table and keep only columns fo interest and save in a new dataframe. This can be done without using SQl as well.
transformed.registerTempTable('transformed_tbl')
transformed_df = sqlContext.sql('select label,features from transformed_tbl')
transformed_df.show(5)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    1|(205,[0,1,2,3,4,5...|
|    0|(205,[0,1,2,3,4,5...|
|    1|(205,[0,1,2,3,4,5...|
|    0|(205,[0,1,2,3,4,5...|
|    0|(205,[0,1,2,3,4,5...|
+-----+--------------------+
only showing top 5 rows

CPU times: user 6.95 ms, sys: 6.3 ms, total: 13.3 ms
Wall time: 4.09 s


In [20]:
%%time
#splitting data before preprocessing will stop leakage
#randomly split data
training_data, test_data = transformed_df.randomSplit([0.7, 0.3], seed=42)
cached_tr = training_data.cache()

CPU times: user 937 µs, sys: 1.05 ms, total: 1.99 ms
Wall time: 110 ms


In [21]:
training_data.show(1)

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    0|(205,[0,1,2,3,4,5...|
+-----+--------------------+
only showing top 1 row



In [22]:
%%time
#scale the data
scaler_train = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel_train = scaler_train.fit(cached_tr)
scaledData_train = scalerModel_train.transform(cached_tr)

CPU times: user 7.83 ms, sys: 1.46 ms, total: 9.3 ms
Wall time: 24 s


In [23]:
%%time
#check sample scaled data
scaledData_train.select("label","scaledFeatures").show(5)

+-----+--------------------+
|label|      scaledFeatures|
+-----+--------------------+
|    0|(205,[0,1,2,3,4,5...|
|    0|(205,[0,1,2,3,4,5...|
|    0|(205,[0,1,2,3,4,5...|
|    0|(205,[0,1,2,3,4,5...|
|    0|(205,[0,1,2,3,4,5...|
+-----+--------------------+
only showing top 5 rows

CPU times: user 1.03 ms, sys: 1.13 ms, total: 2.17 ms
Wall time: 117 ms


In [24]:
%%time
#pca to reduce 200 odd features into principal components - on training data only because that is our model
#this takes a while to run. imagine it is running at least 9 combinations models with 3 folds and picking the best. Reduce parameters or folds if you want it to run faster
pca_model = PCA(inputCol = "scaledFeatures", outputCol = "pca_features_cv")

#create a randomforest classifier model to pass into pipeline
rf = RandomForestClassifier(labelCol = "label", featuresCol = "pca_features_cv")

#creating a pipeline with the pca and model to use in the cross validator
ppl_cv = Pipeline(stages = [pca_model, rf])


#create a param grid to pass to cross validator 
#k --> number of principal components
#number of treess in rf
#need to add more later
paramGrid = ParamGridBuilder() \
  .addGrid(pca_model.k, [10, 20, 30]) \
  .addGrid(rf.numTrees, [20, 30, 50]) \
  .build()

#passs the model with variosu combinations of the parameters and it will pick the best one. Using 3 folds to save time. Check seed=42.
crossval = CrossValidator(estimator = ppl_cv,\
                                        estimatorParamMaps=paramGrid,\
                                        evaluator = MulticlassClassificationEvaluator(),\
                                        numFolds= 3,seed=42)


#this is our best model - fit the training data
cv_model = crossval.fit(scaledData_train)

CPU times: user 1.09 s, sys: 397 ms, total: 1.49 s
Wall time: 3min 13s


In [25]:
#all the 9 model accuracies. The max one was picked as best
avgMetricsGrid = cv_model.avgMetrics
print(avgMetricsGrid)

#https://tsmatz.github.io/azure-databricks-exercise/exercise04-hyperparams-tuning.html
#https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.tuning.CrossValidator.html
# View all results (accuracy) by each params - these can be converted to pretty tables in pandas later
list(zip(cv_model.getEstimatorParamMaps()))

[0.773276344611239, 0.7734615740708852, 0.7735542859352948, 0.8041363061371772, 0.8039156300327328, 0.8054776937147163, 0.8031545556633992, 0.8038459555606521, 0.8053134868761425]


[({Param(parent='PCA_4bb6a73e0377', name='k', doc='the number of principal components'): 10,
   Param(parent='RandomForestClassifier_cc3bed092c3d', name='numTrees', doc='Number of trees to train (>= 1).'): 20},),
 ({Param(parent='PCA_4bb6a73e0377', name='k', doc='the number of principal components'): 10,
   Param(parent='RandomForestClassifier_cc3bed092c3d', name='numTrees', doc='Number of trees to train (>= 1).'): 30},),
 ({Param(parent='PCA_4bb6a73e0377', name='k', doc='the number of principal components'): 10,
   Param(parent='RandomForestClassifier_cc3bed092c3d', name='numTrees', doc='Number of trees to train (>= 1).'): 50},),
 ({Param(parent='PCA_4bb6a73e0377', name='k', doc='the number of principal components'): 20,
   Param(parent='RandomForestClassifier_cc3bed092c3d', name='numTrees', doc='Number of trees to train (>= 1).'): 20},),
 ({Param(parent='PCA_4bb6a73e0377', name='k', doc='the number of principal components'): 20,
   Param(parent='RandomForestClassifier_cc3bed092c3d', 

In [26]:
#scale test data
scaler_test = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel_test = scaler_test.fit(test_data)
scaledData_test = scalerModel_test.transform(test_data)

In [27]:
%%time
#predict and evaluate the model for accuracy
predictions = cv_model.transform(scaledData_test)
evaluator= MulticlassClassificationEvaluator(labelCol = "label", metricName= "accuracy")
accuracy = evaluator.evaluate(predictions)

CPU times: user 20.5 ms, sys: 6.01 ms, total: 26.5 ms
Wall time: 26.4 s


In [28]:
#increased accuracy with binary flag
print(accuracy)

0.7986235652797704


### Confusion matrix, threshold, roc and other fun stuff

In [29]:
##more to come

### Model without PCA but selective features and QQ variables dropped

In [30]:
#more to come

In [31]:
#References
#https://awesomeopensource.com/project/adornes/spark_python_ml_examples
#https://spark.apache.org/docs/latest/ml-tuning.html
#https://sparkbyexamples.com/pyspark/pyspark-rename-dataframe-column/
#https://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier
#https://people.stat.sc.edu/haigang/sparkCaseStudy.html