In [1]:
# Author: Srinivasa Rudraraju
#Description: Spark ML code in Python to access datasets from Amazon S3, train models on standard algorithms in ML packages and perform gridsearch on each of the algorithms

In [2]:
# Replace with your values
# NOTE: Set the access to this notebook appropriately to protect the security of your keys.
# Or you can delete this cell after you run the mount command below once successfully.
import urllib
ACCESS_KEY = "AKIAIO4R3NCSNWJ2CDGA"
SECRET_KEY = urllib.quote("WjEl1E8wRDpuDoqT3h15WtYJkC3h6bqOcFxgIWgz")
ENCODED_SECRET_KEY = SECRET_KEY.replace("/", "%2F")
AWS_BUCKET_NAME = "platformtesting"
MOUNT_NAME = "pftestingmountdataset1"

In [3]:
#dbutils.fs.mount("s3a://%s:%s@%s" % (ACCESS_KEY, ENCODED_SECRET_KEY, AWS_BUCKET_NAME), "/mnt/%s" % MOUNT_NAME)

In [4]:
display(dbutils.fs.ls("/mnt/%s" % MOUNT_NAME))

In [5]:
# Disable warnings and load Pandas package
import warnings
warnings.filterwarnings('ignore')
import pandas as pd

In [6]:
# PRINT START TIME
import datetime
datetime.datetime.now()

trainData = sqlContext.read.load('/mnt/pftestingmountdataset1/1/01covtype-TrainingSplit1.csv',
                          format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')

trainData.cache()
trainData.printSchema()

In [7]:
pd.DataFrame(trainData.take(5), columns=trainData.columns).transpose()

In [8]:
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def vectorizeData(data):
    return data.rdd.map(lambda r: [r[-1]-1, Vectors.dense(r[:-1])]).toDF(['label','features'])

vectorized_CV_data = vectorizeData(trainData) 

# Index labels, adding metadata to the label column
labelIndexer = StringIndexer(inputCol='label',outputCol='indexedLabel').fit(vectorized_CV_data)

# Automatically identify categorical features and index them
featureIndexer = VectorIndexer(inputCol='features',outputCol='indexedFeatures').fit(vectorized_CV_data)

In [9]:
from pyspark.ml.classification import DecisionTreeClassifier

# RECORD START TIME
import datetime
timestart = datetime.datetime.now()

# Train a DecisionTree model
dTree = DecisionTreeClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures')

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dTree])

# Search through decision tree's maxDepth parameter for best model
paramGrid = ParamGridBuilder().addGrid(dTree.maxDepth, (3,10))\
                              .addGrid(dTree.maxBins, (50,200))\
                              .build()

# Set F-1 score as evaluation metric for best model selection
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel',
                                              predictionCol='prediction', metricName='f1')    

# Set up 3-fold cross validation
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=2)

CV_model = crossval.fit(vectorized_CV_data)

# Fetch best model
tree_model = CV_model.bestModel.stages[2]

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"

print tree_model

In [10]:
for i in range(1,11):
  
  testData = sqlContext.read.load('/mnt/pftestingmountdataset1/1/01covtype-CVSplit'+str(i)+'.csv', format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')
    
  vectorized_test_data = vectorizeData(testData)

  transformed_data = CV_model.transform(vectorized_test_data)
  print evaluator.getMetricName(), 'accuracy:', evaluator.evaluate(transformed_data)

  predictions = transformed_data.select('label', 'prediction')
  predictions.toPandas().head()
                                                                             

In [11]:
from pyspark.ml.classification import RandomForestClassifier

# RECORD START TIME
import datetime
timestart = datetime.datetime.now()

# Train a DecisionTree model
rForest = RandomForestClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures')

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rForest])

# Search through decision tree's maxDepth parameter for best model
paramGrid = ParamGridBuilder().addGrid(rForest.maxDepth, (3,10))\
                              .addGrid(rForest.maxBins, (50,200))\
                              .build()

# Set F-1 score as evaluation metric for best model selection
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel',predictionCol='prediction', metricName='f1')    

# Set up 3-fold cross validation
crossval = CrossValidator(estimator=pipeline,estimatorParamMaps=paramGrid,evaluator=evaluator,numFolds=2)

CV_model = crossval.fit(vectorized_CV_data)

# Fetch best model
rf_model = CV_model.bestModel.stages[2]

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"

print rf_model

In [12]:
for i in range(1,11):
  
  testData = sqlContext.read.load('/mnt/pftestingmountdataset1/1/01covtype-CVSplit'+str(i)+'.csv', format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')
    
  vectorized_test_data = vectorizeData(testData)

  transformed_data = CV_model.transform(vectorized_test_data)
  print evaluator.getMetricName(), 'accuracy:', evaluator.evaluate(transformed_data)

  predictions = transformed_data.select('label', 'prediction')
  predictions.toPandas().head()

In [13]:
from pyspark.ml.classification import LogisticRegression,OneVsRest

# RECORD START TIME
import datetime
timestart = datetime.datetime.now()

# Train a DecisionTree model
lRegression = LogisticRegression(labelCol='indexedLabel', featuresCol='indexedFeatures')

lr = OneVsRest(classifier=lRegression)

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, lr])

# Search through decision tree's maxDepth parameter for best model
paramGrid = ParamGridBuilder().addGrid(lRegression.regParam, (0.01,0.1))\
                              .addGrid(lRegression.maxIter, (5,15))\
                              .build()
        
# Set F-1 score as evaluation metric for best model selection
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel',predictionCol='prediction',metricName='f1')    

# Set up 3-fold cross validation
crossval = CrossValidator(estimator=pipeline,estimatorParamMaps=paramGrid,evaluator=evaluator,numFolds=2)

CV_model = crossval.fit(vectorized_CV_data)

# Fetch best model
lr_model = CV_model.bestModel.stages[2]

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"

print lr_model

In [14]:
for i in range(1,11):
  
  testData = sqlContext.read.load('/mnt/pftestingmountdataset1/1/01covtype-CVSplit'+str(i)+'.csv', format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')
    
  vectorized_test_data = vectorizeData(testData)

  transformed_data = CV_model.transform(vectorized_test_data)
  print evaluator.getMetricName(), 'accuracy:', evaluator.evaluate(transformed_data)

  predictions = transformed_data.select('label', 'prediction')
  predictions.toPandas().head()

In [15]:
from pyspark.ml.classification import MultilayerPerceptronClassifier

# RECORD START TIME
import datetime
timestart = datetime.datetime.now()

layers = [54,20,10,7]
# Train a DecisionTree model
mlPreceptron = MultilayerPerceptronClassifier(labelCol='indexedLabel', featuresCol='indexedFeatures',layers=layers)

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, mlPreceptron])

# Search through decision tree's maxDepth parameter for best model
paramGrid = ParamGridBuilder().addGrid(mlPreceptron.blockSize, (128, 256))\
                              .addGrid(mlPreceptron.maxIter, (5, 10))\
                              .build()
        
# Set F-1 score as evaluation metric for best model selection
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel',predictionCol='prediction', metricName='f1')    

# Set up 3-fold cross validation
crossval = CrossValidator(estimator=pipeline,estimatorParamMaps=paramGrid,evaluator=evaluator,numFolds=2)

CV_model = crossval.fit(vectorized_CV_data)

# Fetch best model
mlp_model = CV_model.bestModel.stages[2]

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"

print mlp_model

In [16]:
for i in range(1,11):
  
  testData = sqlContext.read.load('/mnt/pftestingmountdataset1/1/01covtype-CVSplit'+str(i)+'.csv', format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')
    
  vectorized_test_data = vectorizeData(testData)

  transformed_data = CV_model.transform(vectorized_test_data)
  print evaluator.getMetricName(), 'accuracy:', evaluator.evaluate(transformed_data)

  predictions = transformed_data.select('label', 'prediction')
  predictions.toPandas().head()

In [17]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.feature import MinMaxScaler

# RECORD START TIME
import datetime
timestart = datetime.datetime.now()

scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(vectorized_CV_data)

# rescale each feature to range [min, max].
scaledData = scalerModel.transform(vectorized_CV_data)
scaledData.show()

# Train a NaiveBayes model
nb = NaiveBayes(labelCol="indexedLabel", featuresCol="scaledFeatures", modelType="multinomial")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, nb])

# Search through decision tree's maxDepth parameter for best model
# Create ParamGrid and Evaluator for Cross Validation
paramGrid = ParamGridBuilder().addGrid(nb.smoothing, (0.0,1.0)).build()
        
# Set F-1 score as evaluation metric for best model selection
evaluator = MulticlassClassificationEvaluator(metricName="weightedPrecision")

# Set up 3-fold cross validation
crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid,evaluator=evaluator,numFolds=2)

CV_model = crossval.fit(scaledData)

# Fetch best model
nb_model = CV_model.bestModel.stages[2]

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"

print nb_model

In [18]:
for i in range(1,11):
  
  testData = sqlContext.read.load('/mnt/pftestingmountdataset1/1/01covtype-CVSplit'+str(i)+'.csv', format='com.databricks.spark.csv', 
                          header='true', 
                          inferSchema='true')
    
  vectorized_test_data = vectorizeData(testData)
  
  scalerTestModel = scaler.fit(vectorized_test_data)
  scaledTestData = scalerTestModel.transform(vectorized_test_data)

  transformed_data = CV_model.transform(scaledTestData)
  print evaluator.getMetricName(), 'accuracy:', evaluator.evaluate(transformed_data)

  predictions = transformed_data.select('label', 'prediction')
  predictions.toPandas().head()

In [19]:
predictions.toPandas()