In [2]:
#!aws s3 cp s3://msan694-group/final_nba.csv final_nba.csv
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import GBTClassifier
from pyspark.ml import Pipeline

In [9]:
rdd_vec = sc.textFile("final_nba.csv")
header = rdd_vec.first() #extract header
print header
rdd_vec.take(5)

label,SHOT_NUMBER,DRIBBLES,TOUCH_TIME,SHOT_DIST,PTS_TYPE,CLOSE_DEF_DIST,home,time


[u'label,SHOT_NUMBER,DRIBBLES,TOUCH_TIME,SHOT_DIST,PTS_TYPE,CLOSE_DEF_DIST,home,time',
 u'1,1,2,1.9,7.7,2,1.3,0,10.85',
 u'0,2,0,0.8,28.2,3,6.1,0,11.76666667',
 u'0,3,3,2.7,10.1,2,0.9,0,12',
 u'0,4,2,1.9,17.2,2,3.4,0,12.21666667']

In [10]:
data = rdd_vec.filter(lambda row: row not in header) #filter out the header!
new_rdd = data.map(lambda line: line.split(',')) # split the wide vector by ","
new_rdd.count()

128069

In [4]:
#turn to Vectors.dense (with label out front)
split_rdd = new_rdd.map(lambda line: (float(line[0]), Vectors.dense([float(c) for c in line[1:len(line)]]))) 

# Create the DataFrame from the collected RDD
full_df = sqlContext.createDataFrame(split_rdd.collect(), ["label", "features"])
full_df.show()

label,SHOT_NUMBER,DRIBBLES,TOUCH_TIME,SHOT_DIST,PTS_TYPE,CLOSE_DEF_DIST,home,time
+-----+--------------------+
|label|            features|
+-----+--------------------+
|  1.0|[1.0,2.0,1.9,7.7,...|
|  0.0|[2.0,0.0,0.8,28.2...|
|  0.0|[3.0,3.0,2.7,10.1...|
|  0.0|[4.0,2.0,1.9,17.2...|
|  0.0|[5.0,2.0,2.7,3.7,...|
|  0.0|[6.0,2.0,4.4,18.4...|
|  0.0|[7.0,11.0,9.0,20....|
|  1.0|[8.0,3.0,2.5,3.5,...|
|  0.0|[9.0,0.0,0.8,24.6...|
|  0.0|[1.0,0.0,1.1,22.4...|
|  0.0|[2.0,8.0,7.5,24.5...|
|  1.0|[3.0,14.0,11.9,14...|
|  1.0|[4.0,2.0,2.9,5.9,...|
|  0.0|[1.0,0.0,0.8,26.4...|
|  0.0|[1.0,0.0,0.5,22.8...|
|  1.0|[2.0,3.0,2.7,24.7...|
|  0.0|[3.0,6.0,5.1,25.0...|
|  0.0|[4.0,1.0,0.9,25.6...|
|  1.0|[5.0,0.0,1.2,24.2...|
|  0.0|[1.0,2.0,2.2,25.4...|
+-----+--------------------+
only showing top 20 rows



In [11]:
%%time
(trainingData, testData) = full_df.randomSplit([0.7, 0.3])
trainingData = trainingData.cache()
testData = testData.cache()

CPU times: user 2.1 ms, sys: 880 µs, total: 2.98 ms
Wall time: 155 ms


# GBT

In [13]:
gbt = GBTClassifier(maxIter=2, maxDepth=2, labelCol="label")
pipeline = Pipeline(stages=[gbt])
model = pipeline.fit(trainingData)

In [22]:
# Make predictions.
predictions = model.transform(testData)
predictions.show(20)

+-----+--------------------+----------+
|label|            features|prediction|
+-----+--------------------+----------+
|  0.0|[1.0,0.0,0.0,1.5,...|       1.0|
|  0.0|[1.0,0.0,0.0,2.2,...|       1.0|
|  0.0|[1.0,0.0,0.0,24.5...|       0.0|
|  0.0|[1.0,0.0,0.1,1.2,...|       1.0|
|  0.0|[1.0,0.0,0.2,4.1,...|       1.0|
|  0.0|[1.0,0.0,0.4,4.1,...|       1.0|
|  0.0|[1.0,0.0,0.6,3.7,...|       1.0|
|  0.0|[1.0,0.0,0.6,6.6,...|       0.0|
|  0.0|[1.0,0.0,0.6,7.6,...|       0.0|
|  0.0|[1.0,0.0,0.6,15.3...|       0.0|
|  0.0|[1.0,0.0,0.6,15.6...|       0.0|
|  0.0|[1.0,0.0,0.6,17.3...|       0.0|
|  0.0|[1.0,0.0,0.6,24.3...|       0.0|
|  0.0|[1.0,0.0,0.6,25.1...|       0.0|
|  0.0|[1.0,0.0,0.7,1.2,...|       1.0|
|  0.0|[1.0,0.0,0.7,2.9,...|       1.0|
|  0.0|[1.0,0.0,0.7,5.0,...|       1.0|
|  0.0|[1.0,0.0,0.7,6.3,...|       0.0|
|  0.0|[1.0,0.0,0.7,10.1...|       0.0|
|  0.0|[1.0,0.0,0.7,13.7...|       0.0|
+-----+--------------------+----------+
only showing top 20 rows



In [23]:
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction")
accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % (accuracy*100))

Accuracy = 59.2201


In [39]:
evaluator = MulticlassClassificationEvaluator(
labelCol="label", predictionCol="prediction", metricName="accuracy")

In [49]:
# n-fold validation and the results.
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
cv = CrossValidator().setEstimator(gbt).setEvaluator(evaluator).setNumFolds(5)
paramGrid = ParamGridBuilder().addGrid(gbt.maxDepth, [3,6]).addGrid(gbt.maxIter, [4,20]).build()
cv.setEstimatorParamMaps(paramGrid)
cvmodel = cv.fit(trainingData)

In [50]:
evaluator.evaluate(cvmodel.bestModel.transform(testData))

0.616441908713693

In [51]:
cvmodel.bestModel

GBTClassificationModel (uid=GBTClassifier_4b05842593e4a7f2e2ac) with 4 trees

# Logistic Regression

In [26]:
#Train the model.
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(regParam=0.01, maxIter=1000, fitIntercept=True)
lrmodel = lr.fit(trainingData)
lrmodel = lr.setParams(regParam=0.01, maxIter=500, fitIntercept=True).fit(trainingData)

In [28]:
#Evaluate models using test dataset.
validpredicts = lrmodel.transform(testData)
validpredicts.show()

+-----+--------------------+--------------------+--------------------+----------+
|label|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+----------+
|  0.0|[1.0,0.0,0.0,1.5,...|[-0.3641878432672...|[0.40994618653634...|       1.0|
|  0.0|[1.0,0.0,0.0,2.2,...|[-0.3613138932369...|[0.41064154668971...|       1.0|
|  0.0|[1.0,0.0,0.0,24.5...|[0.44815089963004...|[0.61019950393247...|       0.0|
|  0.0|[1.0,0.0,0.1,1.2,...|[-0.4337087769319...|[0.39324105851018...|       1.0|
|  0.0|[1.0,0.0,0.2,4.1,...|[-0.4323223519640...|[0.39357191193067...|       1.0|
|  0.0|[1.0,0.0,0.4,4.1,...|[-0.1179658659393...|[0.47054268612612...|       1.0|
|  0.0|[1.0,0.0,0.6,3.7,...|[-0.4332154709255...|[0.39335876876738...|       1.0|
|  0.0|[1.0,0.0,0.6,6.6,...|[0.06772491382556...|[0.51692475993533...|       0.0|
|  0.0|[1.0,0.0,0.6,7.6,...|[-0.2229847253367...|[0.44448366108112...|       1.0|
|  0.0|[1.0,0.0,

In [29]:
#Evaluate the model. default metric : Area Under ROC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
bceval = BinaryClassificationEvaluator()
print (bceval.getMetricName() +":" + str(bceval.evaluate(validpredicts)))

areaUnderROC:0.630023701057


In [52]:
# n-fold validation and the results.
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.tuning import ParamGridBuilder
cv = CrossValidator().setEstimator(lr).setEvaluator(bceval).setNumFolds(5)
paramGrid = ParamGridBuilder().addGrid(lr.maxIter, [1000]).addGrid(lr.regParam, [0.0001, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5]).build()
cv.setEstimatorParamMaps(paramGrid)
cvmodel = cv.fit(trainingData)

In [54]:
BinaryClassificationEvaluator().evaluate(cvmodel.bestModel.transform(testData))

0.6300237010572356

In [48]:
cvmodel.bestModel

GBTClassificationModel (uid=GBTClassifier_4b05842593e4a7f2e2ac) with 4 trees