In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import DoubleType, ArrayType

In [2]:
%run /Users/mhuffer@allegient.com/nfl-dataset

In [3]:
columns = ["GameID", "play_id", "Drive", "qtr", "down", "TimeSecs", "PlayTimeDiff", "SideofField", "yrdln", "yrdline100", "ydstogo", "posteam", "DefensiveTeam", "PlayType","PosTeamScore", "DefTeamScore", "AbsScoreDiff", "HomeTeam", "AwayTeam", "Season"]

nflSubset = nflDF[columns]
nflSubset = nflSubset.where((nflSubset['PlayType'] == 'Pass') | (nflSubset['PlayType'] == 'Run'))

nflClean = nflSubset.dropna()

In [4]:
intColumns = ["down", "TimeSecs", "PlayTimeDiff", "yrdln", "yrdline100", "PosTeamScore", "DefTeamScore", "AbsScoreDiff"]

for col in intColumns:
  nflClean = nflClean.withColumn(col, nflClean[col].cast(DoubleType()))
  
nflClean = nflClean.na.fill(0)

In [5]:
labelIndexer = StringIndexer(inputCol = "PlayType", outputCol = "indexedLabel").fit(nflClean)

# Converting all categorical variables into factor indexes
# All string values must be in a numerical format, unlike R, you are not able to create STRING "Factor" levels
PosTeamIndexer = StringIndexer(inputCol = "posteam", outputCol = "indexedPosTeam")
DefTeamIndexer = StringIndexer(inputCol = "DefensiveTeam", outputCol = "indexedDefTeam")
HomeTeamIndexer = StringIndexer(inputCol = "HomeTeam", outputCol = "indexedHomeTeam")
AwayTeamIndexer = StringIndexer(inputCol = "AwayTeam", outputCol = "indexedAwayTeam")
SideOfFieldIndexer = StringIndexer(inputCol = "SideofField", outputCol = "indexedSideOfField")

# Issue with indexer and null values
# You must remove all null values from the dataset prior to fitting the algorithm
# Below creates a vector that consists of all values for a given record
# Vector is fedd into the algorithm during the training and test set
assembler = VectorAssembler(inputCols = ["GameID", "play_id", "Drive", "qtr", "down", "TimeSecs", "PlayTimeDiff", "indexedSideOfField", "yrdln", "yrdline100", "ydstogo", "indexedPosTeam", "indexedDefTeam", "PosTeamScore", "DefTeamScore", "PosTeamScore", "DefTeamScore", "indexedHomeTeam", "indexedAwayTeam", "Season"], outputCol = "indexedFeatures")

# Create the 70/30 split for train and test data sets
(trainData, testData) = nflClean.randomSplit([0.7, 0.3])

# Train the model using the defined train data set
# Max number of bins in forest increased from default = 32 to 40 to ensure that Number of categorical variables indexed above can be split upon
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees = 10, maxBins = 40)

# Return the string value for the predicted label index
# 0 for Pass and 1 for Run
labelConverter = IndexToString(inputCol = "prediction", outputCol = "predictedLabel", labels=labelIndexer.labels)

# Execution of the various steps in the process above
rfPipeline = Pipeline(stages=[labelIndexer, PosTeamIndexer, DefTeamIndexer, HomeTeamIndexer, AwayTeamIndexer, SideOfFieldIndexer, assembler, rf, labelConverter])

# Prior to this everything is not actaully evaluated due to Spark's Lazy Evaluation
model = rfPipeline.fit(trainData)

# Invokes all of the previous steps identified in the pipeline on the test data split
# outputs a dataframe called "predictions"
predictions = model.transform(testData)

# Print out the predicted Play Type, Actual Play Type, and the vector of indexed features
predictions.select("predictedLabel", "PlayType", "indexedFeatures").show(5)

# Determine the accuracy of the model
# Can specify other evaluation metrics
evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol = "prediction", metricName="accuracy")

# Calculate the test error
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

rfModel = model.stages[2]
print(rfModel)

In [6]:
predictions.select("indexedLabel","prediction","predictedLabel").show(5)

In [7]:
rfPipeline.write().overwrite().save("nfl-data/pipelines")

In [8]:
rfModel.write().overwrite().save("/nfl-data/models")

In [9]:
%fs
ls /nfl-data/pipelines/stages

path,name,size
dbfs:/nfl-data/pipelines/stages/0_StringIndexer_40698afcc6da77938e44/,0_StringIndexer_40698afcc6da77938e44/,0
dbfs:/nfl-data/pipelines/stages/1_StringIndexer_404694a6a9a24c1e9f13/,1_StringIndexer_404694a6a9a24c1e9f13/,0
dbfs:/nfl-data/pipelines/stages/2_StringIndexer_43f9943c1fc17ffd5958/,2_StringIndexer_43f9943c1fc17ffd5958/,0
dbfs:/nfl-data/pipelines/stages/3_StringIndexer_4e978f2acf3ff8a733ca/,3_StringIndexer_4e978f2acf3ff8a733ca/,0
dbfs:/nfl-data/pipelines/stages/4_StringIndexer_41cca17389c9941010de/,4_StringIndexer_41cca17389c9941010de/,0
dbfs:/nfl-data/pipelines/stages/5_StringIndexer_4ae1924cadd4499cbd29/,5_StringIndexer_4ae1924cadd4499cbd29/,0
dbfs:/nfl-data/pipelines/stages/6_VectorAssembler_43908dd9c588127b3a59/,6_VectorAssembler_43908dd9c588127b3a59/,0
dbfs:/nfl-data/pipelines/stages/7_RandomForestClassifier_4a848c3f3cb447e7b11f/,7_RandomForestClassifier_4a848c3f3cb447e7b11f/,0
dbfs:/nfl-data/pipelines/stages/8_IndexToString_44b08f317369ab5310ce/,8_IndexToString_44b08f317369ab5310ce/,0


In [10]:
# Currently working on creating a UDF to disassemble the vector that contains the probability to create upper and lower bounds.
# Write CSV does not allow the the ability to write complext types
predDF = predictions.drop("indexedFeatures", "rawPrediction", "probability")


In [11]:
predDF.registerTempTable("predDF")

In [12]:
%sql
SELECT *
FROM predDF
LIMIT 2

GameID,play_id,Drive,qtr,down,TimeSecs,PlayTimeDiff,SideofField,yrdln,yrdline100,ydstogo,posteam,DefensiveTeam,PlayType,PosTeamScore,DefTeamScore,AbsScoreDiff,HomeTeam,AwayTeam,Season,indexedLabel,indexedPosTeam,indexedDefTeam,indexedHomeTeam,indexedAwayTeam,indexedSideOfField,prediction,predictedLabel
2015091000,101,1,1,2.0,3544.0,17.0,PIT,47.0,53.0,1,PIT,NE,Run,0.0,0.0,0.0,NE,PIT,2015,1.0,12.0,8.0,0.0,28.0,26.0,1.0,Run
2015091000,236,1,1,2.0,3353.0,27.0,NE,42.0,42.0,28,PIT,NE,Run,0.0,0.0,0.0,NE,PIT,2015,1.0,12.0,8.0,0.0,28.0,28.0,0.0,Pass


In [13]:
predDF.write.csv('/nfl-data/playTypePredictions.csv')

In [14]:
%fs
ls  /nfl-data/playTypePredictions.csv

path,name,size
dbfs:/nfl-data/playTypePredictions.csv/_SUCCESS,_SUCCESS,0
dbfs:/nfl-data/playTypePredictions.csv/_committed_2898762148106643231,_committed_2898762148106643231,464
dbfs:/nfl-data/playTypePredictions.csv/_started_2898762148106643231,_started_2898762148106643231,0
dbfs:/nfl-data/playTypePredictions.csv/part-00000-tid-2898762148106643231-2c1fd896-2d75-46f5-ae0f-6b471285284a-2665-c000.csv,part-00000-tid-2898762148106643231-2c1fd896-2d75-46f5-ae0f-6b471285284a-2665-c000.csv,2382742
dbfs:/nfl-data/playTypePredictions.csv/part-00001-tid-2898762148106643231-2c1fd896-2d75-46f5-ae0f-6b471285284a-2666-c000.csv,part-00001-tid-2898762148106643231-2c1fd896-2d75-46f5-ae0f-6b471285284a-2666-c000.csv,2393808
dbfs:/nfl-data/playTypePredictions.csv/part-00002-tid-2898762148106643231-2c1fd896-2d75-46f5-ae0f-6b471285284a-2667-c000.csv,part-00002-tid-2898762148106643231-2c1fd896-2d75-46f5-ae0f-6b471285284a-2667-c000.csv,2341455
dbfs:/nfl-data/playTypePredictions.csv/part-00003-tid-2898762148106643231-2c1fd896-2d75-46f5-ae0f-6b471285284a-2668-c000.csv,part-00003-tid-2898762148106643231-2c1fd896-2d75-46f5-ae0f-6b471285284a-2668-c000.csv,2332206
dbfs:/nfl-data/playTypePredictions.csv/part-00004-tid-2898762148106643231-2c1fd896-2d75-46f5-ae0f-6b471285284a-2669-c000.csv,part-00004-tid-2898762148106643231-2c1fd896-2d75-46f5-ae0f-6b471285284a-2669-c000.csv,1159753
