# Michael Kastanowski, mrk9fx
# Gary Mitchell , gm3gq

## Section 2 Group 7

In [31]:
import csv
import sys
import pandas as pd
import time
from pyspark.sql import SparkSession,SQLContext 
from pyspark.sql.functions import col
from pyspark.mllib.linalg import Vectors
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator,RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, OneHotEncoder, StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline  
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder


#import pyspark.sql.functions as F
#import pyspark.mllib.regression as reg
#from pyspark.mllib.classification import LogisticRegressionWithLBFGS
#from pyspark.mllib.evaluation import BinaryClassificationMetrics
#from pyspark.ml.feature import *  
spark= SparkSession.builder.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)




In [2]:
vacc_yc= pd.read_csv("Vaccination_Coverage_among_Young_Children__0-35_Months_.csv")

In [3]:
vacc_yc[["Estimate (%)"]]=vacc_yc[["Estimate (%)"]].apply(pd.to_numeric)

In [4]:
vacc_yc #for quick reference

Unnamed: 0,Vaccine,Dose,Geography Type,Geography,Birth Year/Birth Cohort,Age,Estimate (%),95% CI (%),Sample Size
0,DTaP,≥1 Dose,States/Local Areas,Alabama,2011,3 Months,87.6,81.6 to 91.8,218.0
1,DTaP,≥1 Dose,States/Local Areas,Alaska,2011,3 Months,87.9,85.5 to 89.9,288.0
2,DTaP,≥1 Dose,States/Local Areas,Arizona,2011,3 Months,84.8,79.5 to 88.9,266.0
3,DTaP,≥1 Dose,States/Local Areas,Arkansas,2011,3 Months,80.6,75.7 to 84.7,230.0
4,DTaP,≥1 Dose,States/Local Areas,California,2011,3 Months,89.6,84.2 to 93.3,298.0
...,...,...,...,...,...,...,...,...,...
58645,Influenza,,States/Local Areas,Virginia,2017,24 Months,63.2,51.6 to 74.7,201.0
58646,Influenza,,States/Local Areas,Washington,2017,24 Months,72.9,64.3 to 80.8,235.0
58647,Influenza,,States/Local Areas,West Virginia,2017,24 Months,54.0,44.6 to 64.1,175.0
58648,Influenza,,States/Local Areas,Wisconsin,2017,24 Months,68.2,57.9 to 78.1,161.0


In [5]:
df = spark.read.csv("Vaccination_Coverage_among_Young_Children__0-35_Months_.csv",header=True, inferSchema=True).dropna()

In [15]:
# Estimate to Float
#df=df.withColumn("est", col('Estimate (%)').cast("float"))
df=df.withColumn("year", col('Birth Year/Birth Cohort').cast("float"))

In [7]:
df

DataFrame[Vaccine: string, Dose: string, Geography Type: string, Geography: string, Birth Year/Birth Cohort: string, Age: string, Estimate (%): string, 95% CI (%): string, Sample Size: int, est: float, year: int]

In [20]:
#Vaccine Dummy Coding

vacc_string = StringIndexer(inputCol="Vaccine", outputCol="VaccineIndex")
vacc_encoder = OneHotEncoder(inputCol="VaccineIndex", outputCol="VaccineVec")

geo_string = StringIndexer(inputCol="Geography", outputCol="GeoIndex")
geo_encoder = OneHotEncoder(inputCol="GeoIndex", outputCol="GeoVec")

age_string=StringIndexer(inputCol="Age", outputCol="AgeIndex")
age_encoder = OneHotEncoder(inputCol="AgeIndex", outputCol="AgeVec")

year_string=StringIndexer(inputCol="Birth Year/Birth Cohort", outputCol="BirthYearIndex")
year_encoder = OneHotEncoder(inputCol="BirthYearIndex", outputCol="BirthYearVec")

assembler =VectorAssembler(inputCols=["Sample Size", 'GeoVec', 'VaccineVec', "BirthYearVec"], outputCol='features')

In [21]:
#Linear Regression Model
lr = LinearRegression(featuresCol='features',         # feature vector name
                      labelCol='est',  # target variable name
                      maxIter=10,
                      regParam=0.3, 
                      elasticNetParam=0.8)

lr_pipeline = Pipeline(stages=[vacc_string, vacc_encoder, geo_string, geo_encoder, age_string, age_encoder, \
                               year_string, year_encoder, assembler, lr] )
lrModel=lr_pipeline.fit(df)
lrPred=lrModel.transform(df)

In [22]:
ev = RegressionEvaluator(predictionCol="prediction", labelCol="est")
print('-'*20)
print("METRICS")
print("Mean Squared Error:", ev.evaluate(lrPred, {ev.metricName: "mse"}))
print("R Squared:", ev.evaluate(lrPred, {ev.metricName:'r2'}))

--------------------
METRICS
Mean Squared Error: 161.20379798463597
R Squared: 0.16798294161265048


In [24]:
# Lasso Regression Model
lasso = LinearRegression(featuresCol='features',         # feature vector name
                      labelCol='est',  # target variable name
                      maxIter=10,
                      regParam=0.3, 
                      elasticNetParam=1)

lasso_pipeline = Pipeline(stages=[vacc_string, vacc_encoder, geo_string, geo_encoder, age_string, age_encoder, \
                               year_string, year_encoder, assembler, lasso] )
lassoModel=lasso_pipeline.fit(df)
lassoPred=lassoModel.transform(df)


In [25]:
print('-'*20)
print("METRICS")
print("Mean Squared Error:", ev.evaluate(lassoPred, {ev.metricName: "mse"}))
print("R Squared:", ev.evaluate(lassoPred, {ev.metricName:'r2'}))

--------------------
METRICS
Mean Squared Error: 161.89803621688833
R Squared: 0.1643997874994101


In [26]:
# Ridge Regression Model
ridge = LinearRegression(featuresCol='features',         # feature vector name
                      labelCol='est',  # target variable name
                      maxIter=10,
                      regParam=0.3, 
                      elasticNetParam=0)

ridge_pipeline =Pipeline(stages=[vacc_string, vacc_encoder, geo_string, geo_encoder, age_string, age_encoder, \
                               year_string, year_encoder, assembler, ridge] )
ridgeModel=ridge_pipeline.fit(df)
ridgePred=ridgeModel.transform(df)

In [27]:
print('-'*20)
print("METRICS")
print("Mean Squared Error:", ev.evaluate(ridgePred, {ev.metricName: "mse"}))
print("R Squared:", ev.evaluate(ridgePred, {ev.metricName:'r2'}))

--------------------
METRICS
Mean Squared Error: 158.3815072936676
R Squared: 0.1825495587021394


In [28]:
# Cross Validation

In [37]:
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01]) \
    .build()

crossval = CrossValidator(estimator=ridge_pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator().setLabelCol('est'),
                          numFolds=2)

In [38]:
t0 = time.time()
cvModel = crossval.setParallelism(1).fit(df) # train 4 models in parallel
#cvModel = crossval.fit(training)
print("train time:", time.time() - t0)
print('-'*30)

train time: 10.26500391960144
------------------------------


In [39]:
cvModel

CrossValidatorModel_f5859eee88db