![CRISP-DM](https://raw.githubusercontent.com/zaratsian/Spark/master/nfl_banner2.png)


## Use Case:  Predicting NFL plays

## Loading Libraries

In [1]:
# Install Spark dependencies
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!rm spark-3.2.1-bin-hadoop3.2.tgz
!wget --no-cookies --no-check-certificate https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar zxvf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark

rm: cannot remove 'spark-3.2.1-bin-hadoop3.2.tgz': No such file or directory
--2022-03-13 02:21:57--  https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 300971569 (287M) [application/x-gzip]
Saving to: ‘spark-3.2.1-bin-hadoop3.2.tgz’


2022-03-13 02:22:11 (135 MB/s) - ‘spark-3.2.1-bin-hadoop3.2.tgz’ saved [300971569/300971569]

spark-3.2.1-bin-hadoop3.2/
spark-3.2.1-bin-hadoop3.2/LICENSE
spark-3.2.1-bin-hadoop3.2/NOTICE
spark-3.2.1-bin-hadoop3.2/R/
spark-3.2.1-bin-hadoop3.2/R/lib/
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/DESCRIPTION
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/INDEX
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/Meta/
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/Meta/Rd.rds
spark-3.2.1-bin-hadoop3.2/R/lib/SparkR/Meta

In [2]:
import os
os.environ["JAVA_HOME"]  = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"

from pyspark import SparkContext
from pyspark.sql import SparkSession

import datetime, time 
import re, random, sys
from pyspark.sql.types import StructType, StructField, ArrayType, IntegerType, StringType, FloatType, LongType, DateType
from pyspark.sql.functions import struct, array, lit, monotonically_increasing_id, col, expr, when, concat, udf, split, size, lag, count, isnull
from pyspark.sql import Window
from pyspark.ml.linalg import Vectors
from pyspark.ml.regression import GBTRegressor, LinearRegression, GeneralizedLinearRegression, RandomForestRegressor
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier
from pyspark.ml.feature import VectorIndexer, VectorAssembler, StringIndexer, IndexToString
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator

In [3]:
# Initialize Spark Session
spark = SparkSession.builder.appName("Spark NFL Predictions").master("local[*]").getOrCreate()

## Data Ingestion

### Download Data

In [6]:
!wget https://raw.githubusercontent.com/zaratsian/Datasets/master/NFLPlaybyPlay.csv

--2022-03-13 02:36:13--  https://raw.githubusercontent.com/zaratsian/Datasets/master/NFLPlaybyPlay.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 15488579 (15M) [text/plain]
Saving to: ‘NFLPlaybyPlay.csv’


2022-03-13 02:36:14 (70.3 MB/s) - ‘NFLPlaybyPlay.csv’ saved [15488579/15488579]



### Load data into Spark Dataframe

In [7]:
schema = StructType([
    StructField("id", IntegerType()),
    StructField("Date", DateType()),
    StructField("GameID", IntegerType()),
    StructField("Drive", IntegerType()),
    StructField("qtr", IntegerType()),
    StructField("down", StringType()),
    StructField("time", StringType()),
    StructField("TimeUnder", IntegerType()),
    StructField("TimeSecs", StringType()),
    StructField("PlayTimeDiff", StringType()),
    StructField("SideofField", StringType()),
    StructField("yrdln", StringType()),
    StructField("yrdline100", StringType()),
    StructField("ydstogo", IntegerType()),
    StructField("ydsnet", IntegerType()),
    StructField("GoalToGo", StringType()),
    StructField("FirstDown", StringType()),
    StructField("posteam", StringType()),
    StructField("DefensiveTeam", StringType()),
    StructField("desc", StringType()),
    StructField("PlayAttempted", IntegerType()),
    StructField("Yards_Gained", IntegerType()),
    StructField("sp", IntegerType()),
    StructField("Touchdown", IntegerType()),
    StructField("ExPointResult", StringType()),
    StructField("TwoPointConv", StringType()),
    StructField("DefTwoPoint", StringType()),
    StructField("Safety", IntegerType()),
    StructField("PuntResult", StringType()),
    StructField("PlayType", StringType()),
    StructField("Passer", StringType()),
    StructField("PassAttempt", IntegerType()),
    StructField("PassOutcome", StringType()),
    StructField("PassLength", StringType()),
    StructField("PassLocation", StringType()),
    StructField("InterceptionThrown", IntegerType()),
    StructField("Interceptor", StringType()),
    StructField("Rusher", StringType()),
    StructField("RushAttempt", IntegerType()),
    StructField("RunLocation", StringType()),
    StructField("RunGap", StringType()),
    StructField("Receiver", StringType()),
    StructField("Reception", IntegerType()),
    StructField("ReturnResult", StringType()),
    StructField("Returner", StringType()),
    StructField("BlockingPlayer", StringType()),
    StructField("Tackler1", StringType()),
    StructField("Tackler2", StringType()),
    StructField("FieldGoalResult", StringType()),
    StructField("FieldGoalDistance", StringType()),
    StructField("Fumble", IntegerType()),
    StructField("RecFumbTeam", StringType()),
    StructField("RecFumbPlayer", StringType()),
    StructField("Sack", IntegerType()),
    StructField("Challenge.Replay", IntegerType()),
    StructField("ChalReplayResult", StringType()),
    StructField("Accepted.Penalty", IntegerType()),
    StructField("PenalizedTeam", StringType()),
    StructField("PenaltyType", StringType()),
    StructField("PenalizedPlayer", StringType()),
    StructField("Penalty.Yards", IntegerType()),
    StructField("PosTeamScore", StringType()),
    StructField("DefTeamScore", StringType()),
    StructField("ScoreDiff", StringType()),
    StructField("AbsScoreDiff", StringType()),
    StructField("Season", IntegerType())
])

rawdata = spark.read.load('NFLPlaybyPlay.csv', format="csv", header=True, schema=schema)

In [8]:
rawdata.show(10,False)

+---+----------+----------+-----+---+----+-----+---------+--------+------------+-----------+-----+----------+-------+------+--------+---------+-------+-------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+------------+---+---------+-------------+------------+-----------+------+----------+--------+----------------+-----------+-----------+----------+------------+------------------+-----------+-----------+-----------+-----------+------+--------+---------+------------+--------+--------------+-----------+--------+---------------+-----------------+------+-----------+-------------+----+----------------+----------------+----------------+-------------+-----------+---------------+-------------+------------+------------+---------+------------+------+
|id |Date      |GameID    |Drive|qtr|down|time |TimeUnder|TimeSecs|PlayTimeDiff|SideofField|yrdln|yrdline1

## Data Cleaning, Transformations, Enrichment

### Data Cleaning & Transformations

In [9]:
columns_to_keep =   [   
                    "Date", "GameID", "Drive", "qtr", "down", "time", "TimeUnder", "TimeSecs", 
                    "PlayTimeDiff", "yrdline100", "ydstogo", "ydsnet", "FirstDown", "posteam", 
                    "DefensiveTeam", "Yards_Gained", "Touchdown", "PlayType", "PassLength", 
                    "PassLocation", "RunLocation",
                    #"Passer", "Rusher", "InterceptionThrown", "Season"
                    "PosTeamScore", "DefTeamScore"
                    ]

# Filter columns (keep)
nfldata = rawdata.select(columns_to_keep)

# Drop rows with NAa:
nfldata = nfldata.filter(nfldata.down != 'NA')

# approxQuantile
nfldata.approxQuantile(col='Yards_Gained', probabilities=[0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0], relativeError=0.05)

# Filter target variable (Yards_Gained) to remove outliers
nfldata = nfldata.filter( (col('Yards_Gained') <= 20 ) & (col('Yards_Gained') >= -5 ) )
nfldata.approxQuantile(col='Yards_Gained', probabilities=[0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0], relativeError=0.05)

numeric_columns     = [c[0] for c in nfldata.dtypes if c[1] not in ['string','timestamp']]
categorical_columns = [c[0] for c in nfldata.dtypes if c[1] in ['string']]
datetime_columns    = [c[0] for c in nfldata.dtypes if c[1] in ['timestamp']]


### Data Enrichment & Additional Transformations

In [10]:
nfldata2 = nfldata.withColumn("Date",            col("Date"))                       \
                    .withColumn("GameID",       col("GameID").cast("int"))          \
                    .withColumn("Drive",        col("Drive").cast("int"))           \
                    .withColumn("qtr",          col("qtr").cast("int"))             \
                    .withColumn("down",         col("down").cast("int"))            \
                    .withColumn("time",         col("time").cast("string"))         \
                    .withColumn("TimeUnder",    col("TimeUnder").cast("int"))       \
                    .withColumn("TimeSecs",     col("TimeSecs").cast("int"))        \
                    .withColumn("PlayTimeDiff", col("PlayTimeDiff").cast("int"))    \
                    .withColumn("yrdline100",   col("yrdline100").cast("int"))      \
                    .withColumn("ydstogo",      col("ydstogo").cast("int"))         \
                    .withColumn("ydsnet",       col("ydsnet").cast("int"))          \
                    .withColumn("FirstDown",    col("FirstDown").cast("int"))       \
                    .withColumn("posteam",      col("posteam").cast("string"))      \
                    .withColumn("DefensiveTeam",col("DefensiveTeam").cast("string"))\
                    .withColumn("Yards_Gained", col("Yards_Gained").cast("int"))    \
                    .withColumn("Touchdown",    col("Touchdown").cast("int"))       \
                    .withColumn("PlayType",     col("PlayType").cast("string"))     \
                    .withColumn("PassLength",   col("PassLength").cast("string"))   \
                    .withColumn("PassLocation", col("PassLocation").cast("string")) \
                    .withColumn("RunLocation",  col("RunLocation").cast("string"))  \
                    .withColumn("PosTeamScore", col("PosTeamScore").cast("int"))    \
                    .withColumn("DefTeamScore", col("DefTeamScore").cast("int")) 


numeric_columns     = [c[0] for c in nfldata2.dtypes if c[1] not in ['string','timestamp']]
categorical_columns = [c[0] for c in nfldata2.dtypes if c[1] in ['string']]
datetime_columns    = [c[0] for c in nfldata2.dtypes if c[1] in ['timestamp']]

# Category Levels
[nfldata2.select(nfldata2[c]).groupBy(nfldata2[c]).count().show(5,False) for c in categorical_columns]

+-----+-----+
|time |count|
+-----+-----+
|03:15|34   |
|02:39|38   |
|07:24|36   |
|06:43|37   |
|04:02|28   |
+-----+-----+
only showing top 5 rows

+-------+-----+
|posteam|count|
+-------+-----+
|NYJ    |1182 |
|CAR    |1146 |
|TB     |1121 |
|OAK    |1113 |
|DET    |1117 |
+-------+-----+
only showing top 5 rows

+-------------+-----+
|DefensiveTeam|count|
+-------------+-----+
|NYJ          |1105 |
|CAR          |1122 |
|TB           |1154 |
|OAK          |1200 |
|DET          |1064 |
+-------------+-----+
only showing top 5 rows

+----------+-----+
|PlayType  |count|
+----------+-----+
|Field Goal|985  |
|Run       |12699|
|No Play   |2463 |
|QB Kneel  |424  |
|Timeout   |2    |
+----------+-----+
only showing top 5 rows

+----------+-----+
|PassLength|count|
+----------+-----+
|NA        |18366|
|Deep      |2755 |
|Short     |14933|
+----------+-----+

+------------+-----+
|PassLocation|count|
+------------+-----+
|NA          |18366|
|left        |6648 |
|middle      |3924 |
|

[None, None, None, None, None, None, None]

### Data Enrichment & Additional Transformations (Continued...)

In [11]:
# Filter - Keep where Playtype in ['Run','Pass'] 
nfldata2 = nfldata2.filter( (nfldata2.PlayType=="Run") | (nfldata2.PlayType=="Pass") )

# Derive Date var(s)
nfldata2 = nfldata2.withColumn("month_day", concat(nfldata2["Date"].substr(6,2), nfldata2["Date"].substr(9,2)).cast("int") )

# Lag (Get previous PlayType)
w = Window().partitionBy('GameID','Drive').orderBy('GameID','Drive', col('TimeSecs').desc())
nfldata2 = nfldata2.withColumn("PlayType_lag", lag("PlayType").over(w) ) \
                 .withColumn("PlayType_lag",  when( isnull('PlayType_lag'), 'FirstPlay').otherwise( col('PlayType_lag') ) ) \
                 .orderBy('GameID','Drive', col('TimeSecs').desc())

# Print Results
#nfldata2.select(["GameID","Drive","qtr","down","TimeSecs","PlayType","PlayType_lag","yrdline100","posteam","month_day"]).show(50,False)

# Split into "Run" and "Pass" (I want to build two models)
nfldata2_run  = nfldata2.filter( col('PlayType')=='Run' )
nfldata2_pass = nfldata2.filter( col('PlayType')=='Pass' )

print("Total Number of Records:   " + str(nfldata2.count()))
print("Number of Running Records: " + str(nfldata2_run.count()))
print("Number of Passing Records: " + str(nfldata2_pass.count()))

Total Number of Records:   29412
Number of Running Records: 12699
Number of Passing Records: 16713


## Model Building

### Split into Train and Test
NOTE: You can use a randomsplit or something a little bit more appropriate like a [cross validator](https://spark.apache.org/docs/latest/ml-tuning.html#cross-validation)

In [12]:
training_run, testing_run   = nfldata2_run.randomSplit([0.8, 0.2], seed=12345)
training_pass, testing_pass = nfldata2_pass.randomSplit([0.8, 0.2], seed=12345)

### Building Model Pipeline

In [13]:
# Prepare string variables so that they can be used by the decision tree algorithm
# StringIndexer encodes a string column of labels to a column of label indices
si1 = StringIndexer(inputCol="PlayType", outputCol="PlayType_index")
si2 = StringIndexer(inputCol="PlayType_lag", outputCol="PlayType_lag_index")
si3 = StringIndexer(inputCol="PassLength", outputCol="PassLength_index")
si4 = StringIndexer(inputCol="PassLocation", outputCol="PassLocation_index")
si5 = StringIndexer(inputCol="RunLocation", outputCol="RunLocation_index")

target   = 'Yards_Gained'
features = ['qtr','down','TimeSecs','yrdline100','ydstogo','ydsnet','month_day','PlayType_lag_index']

#encode the Label column: feature indexer
fi = StringIndexer(inputCol='Yards_Gained', outputCol='label').fit(training_run)

# Pipelines API requires that input variables are passed in  a vector
va  = VectorAssembler(inputCols=features, outputCol="features")

In [14]:
# run the algorithm and build model taking the default settings
rfr = RandomForestRegressor(featuresCol="label", labelCol=target, predictionCol="prediction", maxDepth=5, maxBins=350, seed=12345)
gbr = GBTRegressor(featuresCol="features", labelCol=target, predictionCol="prediction", maxDepth=5, maxBins=350, seed=12345)

# Convert indexed labels back to original labels, label converter
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=fi.labels)

### Training the Model

In [15]:
# Build the machine learning pipeline
pipeline_run  = Pipeline(stages=[si2, fi, va, gbr, labelConverter])

# Build model. 
# The fitted model from a Pipeline is a PipelineModel, which consists of fitted models and transformers, corresponding to the pipeline stages.
model_run = pipeline_run.fit(training_run)

# store the predictions on training data on HDFS
#model_run.write().overwrite().save('hdfs://dzaratsian0.field.hortonworks.com:8020/models/nfl_model_run3')

In [16]:
#dir(model_run.stages[-2])#.featureImportances
model_run.stages[-2].featureImportances


SparseVector(8, {0: 0.0056, 1: 0.0632, 2: 0.0461, 3: 0.3159, 4: 0.1073, 5: 0.4037, 6: 0.0319, 7: 0.0264})

## Making predictions against the Trained Model

In [17]:
# Make predictions.
predictions = model_run.transform(testing_run)
# show the results
predictions.show(3)

+----------+----------+-----+---+----+-----+---------+--------+------------+----------+-------+------+---------+-------+-------------+------------+---------+--------+----------+------------+-----------+------------+------------+---------+------------+------------------+-----+--------------------+-----------------+--------------+
|      Date|    GameID|Drive|qtr|down| time|TimeUnder|TimeSecs|PlayTimeDiff|yrdline100|ydstogo|ydsnet|FirstDown|posteam|DefensiveTeam|Yards_Gained|Touchdown|PlayType|PassLength|PassLocation|RunLocation|PosTeamScore|DefTeamScore|month_day|PlayType_lag|PlayType_lag_index|label|            features|       prediction|predictedLabel|
+----------+----------+-----+---+----+-----+---------+--------+------------+----------+-------+------+---------+-------+-------------+------------+---------+--------+----------+------------+-----------+------------+------------+---------+------------+------------------+-----+--------------------+-----------------+--------------+
|2015-0

### Generate results of classifier

In [18]:
predictions=predictions.select(predictions["Yards_Gained"],predictions["predictedLabel"],predictions["prediction"])
type(predictions)


pyspark.sql.dataframe.DataFrame

In [19]:
predictions.show(5)

+------------+--------------+-----------------+
|Yards_Gained|predictedLabel|       prediction|
+------------+--------------+-----------------+
|           6|             0|4.851255886343654|
|           5|             4|3.026776673117564|
|           4|             1|2.946948295943713|
|           3|             0|4.154273148816881|
|          -1|             0|4.023135487322596|
+------------+--------------+-----------------+
only showing top 5 rows



### Model Evaluation

In [20]:
# Evaluate Results
evaluator = RegressionEvaluator(metricName="rmse", labelCol=target)  # rmse (default)|mse|r2|mae
RMSE = evaluator.evaluate(predictions)
print('RMSE: ' + str(RMSE))

evaluator = RegressionEvaluator(metricName="mae", labelCol=target)  # rmse (default)|mse|r2|mae
MAE = evaluator.evaluate(predictions) # Mean Absolute Error
print('MSE: ' + str(MAE))

RMSE: 3.3627579488062893
MSE: 2.2484946882408487
