# Load the clean NFL data

In [2]:
# The code was removed by DSX for sharing.

In [5]:
configuration_name = 'os_f02c10e2d3544fe3ae82e368ee5170ef_configs'
bmos = ibmos2spark.bluemix(sc, credentials, configuration_name)

df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true').option('inferschema','true')\
  .load(bmos.url('HWNFLDemo', 'fixed_rb_filtered.csv'))
df.take(2)

[Row(_c0=0, name=u'C.Newton', position=u'QB', rushing_yds=43, home=True, opponent=u'NO', stadium_city=u'Charlotte, NC', capacity=u'75,419', surface=u'Voyager Bermuda Grass', roof=u'Open', temp=65, wspd=7),
 Row(_c0=1, name=u'J.Stewart', position=u'RB', rushing_yds=46, home=True, opponent=u'NO', stadium_city=u'Charlotte, NC', capacity=u'75,419', surface=u'Voyager Bermuda Grass', roof=u'Open', temp=65, wspd=7)]

In [6]:
df.where("stadium_city like '%MN'").take(5)

[Row(_c0=1437, name=u'S.Bradford', position=u'QB', rushing_yds=6, home=True, opponent=u'NYG', stadium_city=u'Minneapolis, MN', capacity=u'66,655', surface=u'UBU Speed Series S5-M Synthetic Turf', roof=u'Fixed', temp=49, wspd=0),
 Row(_c0=1438, name=u'S.Diggs', position=u'WR', rushing_yds=-1, home=True, opponent=u'NYG', stadium_city=u'Minneapolis, MN', capacity=u'66,655', surface=u'UBU Speed Series S5-M Synthetic Turf', roof=u'Fixed', temp=49, wspd=0),
 Row(_c0=1439, name=u'J.McKinnon', position=u'RB', rushing_yds=85, home=True, opponent=u'NYG', stadium_city=u'Minneapolis, MN', capacity=u'66,655', surface=u'UBU Speed Series S5-M Synthetic Turf', roof=u'Fixed', temp=49, wspd=0),
 Row(_c0=1440, name=u'Z.Line', position=u'FB', rushing_yds=6, home=True, opponent=u'NYG', stadium_city=u'Minneapolis, MN', capacity=u'66,655', surface=u'UBU Speed Series S5-M Synthetic Turf', roof=u'Fixed', temp=49, wspd=0),
 Row(_c0=1441, name=u'C.Patterson', position=u'WR', rushing_yds=2, home=True, opponent=u'

In [22]:
df.count()

4391

# Remove null rows and rows with values that don't appear enough for training

In [23]:
df = df.where("not temp is Null")

In [24]:
df = df.withColumn('ishome', df['home'].cast('integer'))

In [25]:
df.groupBy("position").count().where('count < 10').createOrReplaceTempView("lowPosCount")

In [26]:
df.groupBy("name").count().where('count < 10').createOrReplaceTempView("lowNameCount")

In [27]:
df.createOrReplaceTempView("allRows")

In [28]:
morePlayers = spark.sql("""select * from allRows where 
                                            name not in (select name from lowNameCount) and
                                            position not in (select position from lowPosCount)""")

In [29]:
#features temp wspd roof surface home stadium_city away_team name position teamid
#label rushing_yds

# Create string indexers, one hot encoders, a vector assembler, and the SparkML pipeline

In [30]:
from pyspark.ml.feature import StringIndexer

nameInd = StringIndexer(inputCol="name", outputCol="nameInd")
posInd = StringIndexer(inputCol="position", outputCol="posInd")
#ampmInd = StringIndexer(inputCol="ampm", outputCol="ampmInd")
roofInd = StringIndexer(inputCol="roof", outputCol="roofInd")
surfInd = StringIndexer(inputCol="surface", outputCol="surfInd")
stadInd = StringIndexer(inputCol="stadium_city", outputCol="stadInd")
teamInd = StringIndexer(inputCol="teamid", outputCol="teamInd")
oppInd = StringIndexer(inputCol="away_team", outputCol="oppInd")

In [31]:
from pyspark.ml.feature import OneHotEncoder

nameEnc = OneHotEncoder(inputCol="nameInd", outputCol="nameEnc")
posEnc = OneHotEncoder(inputCol="posInd", outputCol="posEnc")
#ampmEnc = OneHotEncoder(inputCol="ampmInd", outputCol="ampmEnc")
roofEnc = OneHotEncoder(inputCol="roofInd", outputCol="roofEnc")
surfEnc = OneHotEncoder(inputCol="surfInd", outputCol="surfEnc")
stadEnc = OneHotEncoder(inputCol="stadInd", outputCol="stadEnc")
teamEnc = OneHotEncoder(inputCol="teamInd", outputCol="teamEnc")
oppEnc = OneHotEncoder(inputCol="oppInd", outputCol="oppEnc")

In [32]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

vecAss = VectorAssembler(
    inputCols=["nameEnc", "posEnc", "roofEnc", "surfEnc", "stadEnc", "temp", "wspd", "ishome", "teamEnc", "oppEnc"],
    outputCol="features")

In [33]:
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(featuresCol="features",labelCol="rushing_yds")

In [34]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[nameInd,nameEnc,posInd,posEnc,roofInd,roofEnc,surfInd,surfEnc,
                           stadInd,stadEnc,teamInd,teamEnc,oppInd,oppEnc,vecAss,rf])

# Find the best model using a grid search and cross validation

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
import datetime

paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxBins, [40,60,80])\
    .addGrid(rf.maxDepth, [16,20,25])\
    .addGrid(rf.numTrees, [80,100,120])\
    .build()
    
evaluator = RegressionEvaluator(labelCol="rushing_yds", predictionCol="prediction", metricName="rmse")

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

print datetime.datetime.now()
cvModel = crossval.fit(morePlayers)
print datetime.datetime.now()

2017-09-13 16:27:41.642298


In [38]:
rmse = evaluator.evaluate(cvModel.bestModel.transform(morePlayers))
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
#22.6746

Root Mean Squared Error (RMSE) on test data = 22.6746


In [39]:
print cvModel.bestModel.stages[-1]._java_obj.getMaxBins()#<--40
print cvModel.bestModel.stages[-1]._java_obj.getMaxDepth()#25-->
print cvModel.bestModel.stages[-1]._java_obj.getNumTrees()#120-->

40
25
120


In [40]:
#features temp wspd roof surface home stadium_city away_team name position teamid
#label rushing_yds
df = df.select('temp','wspd','roof','surface','ishome','stadium_city','away_team','name','position','teamid','rushing_yds')

# Train the final model on the full dataset

In [41]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline

rf = RandomForestRegressor(featuresCol="features",labelCol="rushing_yds",maxBins=60,maxDepth=25,numTrees=80)
pipeline = Pipeline(stages=[nameInd,nameEnc,posInd,posEnc,roofInd,roofEnc,surfInd,surfEnc,
                           stadInd,stadEnc,teamInd,teamEnc,oppInd,oppEnc,vecAss,rf])

model = pipeline.fit(df)

In [42]:
predictions = model.transform(df)
predictions.select("rushing_yds","prediction").take(10)

[Row(rushing_yds=26, prediction=30.990729166666664),
 Row(rushing_yds=5, prediction=8.366956569974738),
 Row(rushing_yds=5, prediction=6.2899855277760315),
 Row(rushing_yds=47, prediction=32.01399980970929),
 Row(rushing_yds=22, prediction=29.92765865639912),
 Row(rushing_yds=10, prediction=16.792291666666664),
 Row(rushing_yds=-2, prediction=5.651267773709917),
 Row(rushing_yds=44, prediction=33.0953426656544),
 Row(rushing_yds=59, prediction=35.19739571416388),
 Row(rushing_yds=41, prediction=37.45950587606837)]

# Save the model to Watson Machine Learning to use as an API

In [None]:
# The code was removed by DSX for sharing.

In [43]:
ml_repository_client = MLRepositoryClient(service_path)
ml_repository_client.authorize(username, password)

model_artifact = MLRepositoryArtifact(model, training_data=df, name="Rush")

saved_model = ml_repository_client.models.save(model_artifact)

print saved_model.meta.available_props()
print
print "modelType: " + saved_model.meta.prop("modelType")
print "trainingDataSchema: " + str(saved_model.meta.prop("trainingDataSchema"))
print "creationTime: " + str(saved_model.meta.prop("creationTime"))
print "modelVersionHref: " + saved_model.meta.prop("modelVersionHref")
print "label: " + saved_model.meta.prop("label")

['inputDataSchema', 'evaluationMetrics', 'pipelineVersionHref', 'modelVersionHref', 'trainingDataRef', 'pipelineType', 'creationTime', 'lastUpdated', 'label', 'authorEmail', 'trainingDataSchema', 'authorName', 'version', 'modelType', 'runtime', 'evaluationMethod']

modelType: sparkml-model-2.0
trainingDataSchema: {u'fields': [{u'nullable': True, u'type': u'integer', u'name': u'temp', u'metadata': {}}, {u'nullable': True, u'type': u'integer', u'name': u'wspd', u'metadata': {}}, {u'nullable': True, u'type': u'string', u'name': u'roof', u'metadata': {}}, {u'nullable': True, u'type': u'string', u'name': u'surface', u'metadata': {}}, {u'nullable': True, u'type': u'integer', u'name': u'ishome', u'metadata': {}}, {u'nullable': True, u'type': u'string', u'name': u'stadium_city', u'metadata': {}}, {u'nullable': True, u'type': u'string', u'name': u'away_team', u'metadata': {}}, {u'nullable': True, u'type': u'string', u'name': u'name', u'metadata': {}}, {u'nullable': True, u'type': u'string', u'n

In [44]:
loadedModelArtifact = ml_repository_client.models.get(saved_model.uid)

print str(loadedModelArtifact.name)

Rush
