In [1]:
import findspark
findspark.init()

import pyspark
sc = pyspark.SparkContext()

In [2]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
from pyspark.sql.types import *

In [3]:
def to_24_hour(time):
    if(len(time) == 0):
        return ""
    pieces = time.split(':')
    numeric_time = int(pieces[0]+pieces[1][:2])
    if ('p' in time):
        numeric_time += 1200
        return numeric_time
    elif ('a' in time):
        return numeric_time
    
def clean_games(line):
    data = line.split(',') # split
    data = data[1:len(data)-1] # remove game # and empty col
    
    if (data[4] == 'W' and data[6] < data[7]): # move warriors score into single col
        data[6], data[7] = data[6], data[7]
        
    data[0] = data[0].split() # expand game date
    data[1] = data[1].split() # expand game time
    if (len(data[1]) == 0): # no game time
        data[1] =['','']

    to_return = data[0] + data[1] + data[2:]
    to_return[4] = to_24_hour(to_return[4])
    del to_return[7] # delete box score
    del to_return[6] # delete tv provider
    del to_return[5] # delete time zone because all EST
    
    if(to_return[5] == '@'): 
        to_return[5] = 'away'
    else: 
        to_return[5] ='home'
        
    if (to_return[2] != ''):
        to_return[2] = int(to_return[2])
    if (to_return[3] != ''):
        to_return[3] = int(to_return[3])
    if (to_return[4] != ''):
        to_return[4] = int(to_return[4])
    return to_return

def remove_postgame_data(data):
    return data[:8]

In [4]:
all_games_RDD = sc.textFile("./data/train")\
    .filter(lambda line : line[0] != 'G')\
    .map(clean_games)\
    .map(remove_postgame_data)\
    .filter(lambda data : data[4] != '')

playoffs_2016_RDD = sc.textFile("./data/test")\
    .filter(lambda line : line[0] != 'G')\
    .map(clean_games)\
    .map(remove_postgame_data)\
    .filter(lambda data : data[4] != '')

In [5]:
all_games_RDD.count()

504

In [6]:
# "date time tv box score away opponent outcome OT score1 score2 season_wins season_losses streak
data_strings = enumerate("day_of_week month day year time location opponent outcome OT warriors_score opponents_score season_wins season_losses streak".split())
for pair in data_strings:
    if (pair[0] < 8):
        print (pair)


(0, 'day_of_week')
(1, 'month')
(2, 'day')
(3, 'year')
(4, 'time')
(5, 'location')
(6, 'opponent')
(7, 'outcome')


In [7]:
all_games_RDD.collect()[0]

['Wed', 'Oct', 28, 2009, 1930, 'home', 'Houston Rockets', 'L']

In [8]:
schema = StructType([ \
    StructField("day_of_week", StringType(), True), \
    StructField("month", StringType(), True), \
    StructField("day", IntegerType(), True), \
    StructField("year", IntegerType(), True), \
    StructField("time", IntegerType(), True), \
    StructField("location", StringType(), True), \
    StructField("opponent", StringType(), True), \
    StructField("outcome", StringType(), True)])

In [9]:
train_df = sqlContext.createDataFrame(all_games_RDD, schema)
test_df = sqlContext.createDataFrame(playoffs_2016_RDD, schema)

In [10]:
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml.feature import VectorAssembler

day_of_week_indexer = StringIndexer(inputCol = 'day_of_week', outputCol = 'day_of_week_indexed')
month_indexer = StringIndexer(inputCol = 'month', outputCol = 'month_indexed')
location_indexer = StringIndexer(inputCol = 'location', outputCol = 'location_indexed')
opponent_indexer = StringIndexer(inputCol = 'opponent', outputCol = 'opponent_indexed')
outcome_indexer = StringIndexer(inputCol = 'outcome', outputCol = 'outcome_indexed')

string_cols = ['day_of_week_indexed', 'month_indexed', 'location_indexed', 'opponent_indexed']
numeric_cols = ['day', 'year', 'time']

assembler = VectorAssembler(
    inputCols =  string_cols + numeric_cols,
    outputCol = 'features')

In [25]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier

In [26]:
classifier = DecisionTreeClassifier(labelCol = 'outcome_indexed', featuresCol = 'features')

pipeline = Pipeline(stages=[day_of_week_indexer, 
                            month_indexer, 
                            location_indexer,
                            opponent_indexer,
                            outcome_indexer,
                            assembler, 
                            classifier])
model = pipeline.fit(train_df)

In [27]:
predictions = (model.transform(test_df))

In [28]:
#predictions.select("prediction", "outcome_indexed").toPandas()

In [29]:
# IndexToString is expirimental
# converter = IndexToString(inputCol="prediction", outputCol="predicted_val", labels=outcome_indexer)
# converted = converter.transform(predictions)
# predictions.select("predicted_val", "outcome").toPandas()

In [30]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="outcome_indexed", predictionCol="prediction", metricName="precision")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

Test Error = 0.375 
