#IST718 Project - Google and NCAA Women's Basketball Tournament Prediction

#Logistic Regression Machine learning algorithm to build 20-year predictive model

@authors
Sanjana Rajagopala,
Shefali Vajramatti,
Apoorva Rajendra Angre,
Sandya Madhavan

In [2]:
#IMPORT ALL THE REQUIRED PACKAGES
import pandas as pd
from pyspark.ml import feature
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import classification
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql import functions as fn
from pyspark.sql.types import IntegerType
import numpy as np
from pyspark.mllib.evaluation import BinaryClassificationMetrics as metric
from pyspark.ml.feature import StringIndexer

In [3]:
#################################################################################################################
################################### Using 1998-2017 as training data ############################################

In [4]:
#Read the RegularCompact2018 CSV File
CompactDF_2018 = spark.read.csv("/FileStore/tables/WRegularSeasonCompactResults_2018.csv", header=True, inferSchema= True)

In [5]:
display(CompactDF_2018)

In [6]:
#Converting to pandas
CompactDF_2018=CompactDF_2018.toPandas()

In [7]:
#Define weights for the seeds of each team - Meaning keep the highest weight of 16 for the team with Seed 1
weights_dict = {}
j = 1
for i in range(16,0,-1):
  weights_dict[j] = i
  j+=1

In [8]:
#PRE_PROCESSING THE DATAFRAME for 1998-2017- CompactDF_2018

temp_win = []
#wseed_num = []
#lseed_num = []
#diff_seed = []
loc_col = []
diff_score = []
low_team = []
high_team = []

for row in CompactDF_2018.iterrows():
  
  team_1 = row[1]['WTeamID']
  team_2 = row[1]['LTeamID']
  loc_val = row[1]['WLoc']
  #wseed = row[1]['WSeed']
  #lseed =row[1]['LSeed']
  
  #Maintain the win column value as 1 if the team with lower teamID has won in the match
  if(team_1<team_2):
    temp_win.append(1)
    low_team.append(team_1)
    high_team.append(team_2)
  else:
    temp_win.append(0)
    high_team.append(team_1)
    low_team.append(team_2)
    
  #Give the highest weight when played in the home ground, least of outside home, medium vlaue otherwise   
  if(loc_val == 'H'):
    loc_col.append(3)
  elif(loc_val == 'N'):
    loc_col.append(2)
  elif(loc_val=='A'):
    loc_col.append(1)
    
  #Maintain the difference between seeds of the teams  
  #temp_val = abs(weights_dict[int(wseed[1:])] - weights_dict[int(lseed[1:])])
  #diff_seed.append(temp_val)
  
  #Maintain the column with difference between scores of the teams
  diff_score.append(abs(row[1]['WScore'] - row[1]['LScore']))

In [9]:
#Add the above obtained lists as columns into the DF
CompactDF_2018['WLProb'] = temp_win
#WLCompact_2018['Seed_Diff'] = diff_seed
CompactDF_2018['Loc'] = loc_col
CompactDF_2018['Score_Diff'] = diff_score
CompactDF_2018['Low_team'] = low_team
CompactDF_2018['High_team'] = high_team

In [10]:
#Conversion into the Spark SQL Dataframe
sqlCtx = SQLContext(sc)
sql_compactDF_2018 = sqlCtx.createDataFrame(CompactDF_2018)

#Rename the result column with the name label so that all the algorithms can be applied without any problems
sql_compactDF_2018= sql_compactDF_2018.withColumnRenamed("WLProb", "label")

In [11]:
#To do
#FEATURE ENGINEERING

#Definition of new features from existing data

#Obtain the totalMatches played and win percentage of the team in respective season
wDF_2018 = sql_compactDF_2018.groupBy('WTeamID').agg(fn.sum('label').alias('won'), fn.count('Season').alias('WCount'))
lDF_2018 = sql_compactDF_2018.groupBy('LTeamID').agg(fn.count('Season').alias('LCount'))

#Rename and maintain a clean DF
wDF_2018 = wDF_2018.selectExpr("WTeamID as teamID", "won", "WCount")
lDF_2018 = lDF_2018.selectExpr("LTeamID as teamID", "LCount")

#Create a DF of matches with the above combined details
matchDF_2018 = wDF_2018.join(lDF_2018, (wDF_2018.teamID== lDF_2018.teamID), how='right')
matchDF_2018 = matchDF_2018.withColumn("totalMatches", sum([matchDF_2018[col] for col in ['WCount', 'LCount']]))

#Computing the win percetage for the individaul teams
matchDF_2018=matchDF_2018.withColumn("winPercentage", fn.col('WCount')/fn.col('totalMatches') )

#Create Pandas DF only for this manipulation
match_pd_DF_2018 = matchDF_2018.toPandas()
#Renaming and selecting required data - avoiding redundancy
match_pd_DF_2018 = match_pd_DF_2018.iloc[:,[0,6]]
matchDF_2018 = sqlCtx.createDataFrame(match_pd_DF_2018)

In [12]:
#Joining Sql Compact with win percentage fields
winPercentage_DF = sql_compactDF_2018.join(matchDF_2018, (matchDF_2018.teamID== sql_compactDF_2018.WTeamID), how='left').select('DayNum', sql_compactDF_2018.Season, 'WTeamID', 'WScore', 'LTeamID', "LScore", 'NumOT', 'label', 'Loc', "Score_Diff",fn.col('winPercentage').alias('W_win_percentage'), 'Low_team') 

winPercentage_DF = winPercentage_DF.join(matchDF_2018, (matchDF_2018.teamID== winPercentage_DF.LTeamID), how='left').select('DayNum', winPercentage_DF.Season, 'WTeamID', 'WScore', 'LTeamID', "LScore", 'NumOT', 'label', 'Loc' ,"Score_Diff",'W_win_percentage',fn.col('winPercentage').alias('L_win_percentage'), 'Low_team' )

In [13]:
#Filling NAs with zero
winPercentage_DF=winPercentage_DF.na.fill(0)

In [14]:
trainedData1998_2017 = spark.read.csv("/FileStore/tables/98_17DF.csv", header=True, inferSchema= True)

In [15]:
display(trainedData1998_2017)

In [16]:
#Feature Definition and Vector Assembler creation

##############################################   MODEL - 1 #####################################################
#Initial Set of Features with only the simple columns
#DayNum - With the higher day number means games played at later stages. Hence, add as feature so that it means a better performance
#WTeamID -ID indicate the possibility in that match up
#Score_Diff - Difference between win and lose scores of the match
#NumOT - Number of Overtimes in the match
#Loc - Played at home, outside or neither
featureCols_1_2018 = ["DayNum", "WTeamID", "Score_Diff", "Loc", "NumOT"]

#set the input and output column names**
assembler_1_2018 = feature.VectorAssembler(inputCols = featureCols_1_2018, outputCol = "features")

# Train a Logistic Regression model
logisticReg_1_2018 = LogisticRegression()

# Chain vecAssembler and Logistic regression model  
pipeline_1_2018 = Pipeline(stages=[assembler_1_2018, logisticReg_1_2018])

# Run stages in pipeline and train model
model_1_2018 = pipeline_1_2018.fit(trainedData1998_2017)

In [17]:
#Determine the testing accuracy for the model performance
test_predictions_1_2018 = model_1_2018.transform(winPercentage_DF)

In [18]:
display(test_predictions_1_2018)

In [19]:
#Define the evaluator to obtain the areaUnderROC or the AUC score of the model
evaluator = BinaryClassificationEvaluator()
#Display the accuracies
print("The AUC metric for the testing dataset of model-1", evaluator.evaluate(test_predictions_1_2018))

In [20]:
############################### MODEL - 2 ##################################################

#Defnition of Features - with all the match and team feautres, win and lose percetages and teh period win percetage feaures
featureCols_2_2018 = ["W_win_percentage","L_win_percentage","DayNum","WTeamID", "Score_Diff", "Loc", "NumOT"]

#Logistic Regression for all features
logisticReg_2_2018 = LogisticRegression()

assembler_2_2018=feature.VectorAssembler(inputCols=featureCols_2_2018,outputCol="features")
# Chain labelIndexer, vecAssembler and NBmodel in a 
pipeline_2_2018 = Pipeline(stages=[ assembler_2_2018, logisticReg_2_2018])

# Run stages in pipeline and train model
model_2_2018 = pipeline_2_2018.fit(trainedData1998_2017)

In [21]:
#Determine the testing accuracy for the model performance
test_predictions_2_2018 = model_2_2018.transform(winPercentage_DF)

In [22]:
#Display the accuracies
print("The AUC metric for the testing dataset of model-2", evaluator.evaluate(test_predictions_2_2018))

In [23]:
#####################################################################################################################
#################################### Using 2010-2017 as training data ###############################################


In [24]:
#Read the detailed dataset
DetailedDF_2018=spark.read.csv("/FileStore/tables/WRegularSeasonDetailedResults_2018.csv", header=True, inferSchema= True)

In [25]:
DetailedDF_2018=DetailedDF_2018.toPandas()

In [26]:
#PRE_PROCESSING THE DATAFRAME for 2010-2017

temp_win = []
#wseed_num = []
#lseed_num = []
#diff_seed = []
loc_col = []
diff_score = []
low_team = []
high_team = []

for row in DetailedDF_2018.iterrows():
  
  team_1 = row[1]['WTeamID']
  team_2 = row[1]['LTeamID']
  loc_val = row[1]['WLoc']
  #wseed = row[1]['WSeed']
  #lseed =row[1]['LSeed']
  
  #Maintain the win column value as 1 if the team with lower teamID has won in the match
  if(team_1<team_2):
    temp_win.append(1)
    low_team.append(team_1)
    high_team.append(team_2)
  else:
    temp_win.append(0)
    high_team.append(team_1)
    low_team.append(team_2)
    
  #Give the highest weight when played in the home ground, least of outside home, medium vlaue otherwise   
  if(loc_val == 'H'):
    loc_col.append(3)
  elif(loc_val == 'N'):
    loc_col.append(2)
  elif(loc_val=='A'):
    loc_col.append(1)
    
  #Maintain the difference between seeds of the teams  
  #temp_val = abs(weights_dict[int(wseed[1:])] - weights_dict[int(lseed[1:])])
  #diff_seed.append(temp_val)
  
  #Maintain the column with difference between scores of the teams
  diff_score.append(abs(row[1]['WScore'] - row[1]['LScore']))

In [27]:
#Add the above obtained lists as columns into the DF
DetailedDF_2018['WLProb'] = temp_win
#WLCompact_2018['Seed_Diff'] = diff_seed
DetailedDF_2018['Loc'] = loc_col
DetailedDF_2018['Score_Diff'] = diff_score
DetailedDF_2018['Low_team'] = low_team
DetailedDF_2018['High_team'] = high_team

In [28]:
#Conversion into the Spark SQL Dataframe
sqlCtx = SQLContext(sc)
sql_detailedDF_2018 = sqlCtx.createDataFrame(DetailedDF_2018)

#Rename the result column with the name label so that all the algorithms can be applied without any problems
sql_detailedDF_2018= sql_detailedDF_2018.withColumnRenamed("WLProb", "label")

In [29]:
display(sql_detailedDF_2018)

In [30]:
#To do
#FEATURE ENGINEERING

#Definition of new features from existing data

#Obtain the totalMatches played and win percentage of the team in respective season
dtwDF_2018 = sql_detailedDF_2018.groupBy('WTeamID').agg(fn.sum('label').alias('won'), fn.count('Season').alias('WCount'))
dtlDF_2018 = sql_detailedDF_2018.groupBy('LTeamID').agg(fn.count('Season').alias('LCount'))

#Rename and maintain a clean DF
dtwDF_2018 = dtwDF_2018.selectExpr("WTeamID as teamID", "won", "WCount")
dtlDF_2018 = dtlDF_2018.selectExpr("LTeamID as teamID", "LCount")

#Create a DF of matches with the above combined details
dtmatchDF_2018 = dtwDF_2018.join(dtlDF_2018, (dtwDF_2018.teamID== dtlDF_2018.teamID), how='right')
dtmatchDF_2018 = dtmatchDF_2018.withColumn("totalMatches", sum([dtmatchDF_2018[col] for col in ['WCount', 'LCount']]))

#Computing the win percetage for the individaul teams
dtmatchDF_2018=dtmatchDF_2018.withColumn("winPercentage", fn.col('WCount')/fn.col('totalMatches') )

#Create Pandas DF only for this manipulation
dtmatch_pd_DF_2018 = dtmatchDF_2018.toPandas()
#Renaming and selecting required data - avoiding redundancy
dtmatch_pd_DF_2018 = dtmatch_pd_DF_2018.iloc[:,[0,6]]
dtmatchDF_2018 = sqlCtx.createDataFrame(dtmatch_pd_DF_2018)

In [31]:
#Display the DF to ensure the join has not missed any data rows and other details
display(dtmatchDF_2018)
#Expect NaN because of the null values introduced during the join

In [32]:
#Add the details from matchDF to the initial integrated DF

dtwinPercentage_DF = sql_detailedDF_2018.join(dtmatchDF_2018, (dtmatchDF_2018.teamID== sql_detailedDF_2018.WTeamID), how='left').select('DayNum', sql_detailedDF_2018.Season, 'WTeamID', 'WScore', 'LTeamID', "LScore", 'NumOT', 'label', 'Loc',"Score_Diff",fn.col('winPercentage').alias('W_win_percentage'),'WFGM','WFGA','WFGM3','WFGA3','WFTM','WFTA','WOR','WDR','WAst','WTO','WStl','WBlk','WPF','LFGM', 'LFGA','LFGM3','LFGA3','LFTM','LFTA','LOR','LDR','LAst','LTO','LStl','LBlk','LPF', 'Low_team') 

dtwinPercentage_DF = dtwinPercentage_DF.join(dtmatchDF_2018, (dtmatchDF_2018.teamID== dtwinPercentage_DF.LTeamID), how='left').select('DayNum', dtwinPercentage_DF.Season, 'WTeamID', 'WScore', 'LTeamID', "LScore", 'NumOT', 'label', 'Loc' ,"Score_Diff",'W_win_percentage',fn.col('winPercentage').alias('L_win_percentage'), 'WFGM', 'WFGA','WFGM3','WFGA3','WFTM','WFTA','WOR','WDR','WAst','WTO','WStl','WBlk','WPF','LFGM', 'LFGA','LFGM3','LFGA3','LFTM','LFTA','LOR','LDR','LAst','LTO','LStl','LBlk','LPF','Low_team' )

In [33]:
#Removing null values 
dtwinPercentage_DF=dtwinPercentage_DF.na.fill(0)

In [34]:
#Adding modified features to the initial dataframe

temp_DF = dtwinPercentage_DF.select('DayNum', 'WTeamID', 'WScore','LTeamID', "LScore", 'NumOT', 'label', 'Loc' ,"Score_Diff",'W_win_percentage', 'L_win_percentage', 'Season', 'Low_team',(fn.col('WFGM')/fn.col('WFGA')).alias('WGoals_ratio'),(fn.col('WFGM3')/fn.col('WFGA3')).alias('W3pointers_ratio'),(fn.col('WFTM')/fn.col('WFTA')).alias('WFreethrows_ratio'),(fn.col('WOR')+fn.col('WDR')+fn.col('WAst')+fn.col('WTO')+fn.col('WStl')+fn.col('WBlk')-fn.col('WPF')).alias('Win_accomplish'), (fn.col('LFGM')/fn.col('LFGA')).alias('LGoals_ratio'),(fn.col('LFGM3')/fn.col('LFGA3')).alias('L3pointers_ratio'),(fn.col('LFTM')/fn.col('LFTA')).alias('LFreethrows_ratio'),(fn.col('LOR')+fn.col('LDR')+fn.col('LAst')+fn.col('LTO')+fn.col('LStl')+fn.col('LBlk')-fn.col('LPF')).alias('Lose_accomplish'))

In [35]:
dtwinPercentage_DF=temp_DF
#Display final dataframe
display(dtwinPercentage_DF)

In [36]:
#Store as test data
testData_2018 = dtwinPercentage_DF
testData_2018=testData_2018.na.fill(0)

In [37]:
trainedData2010_2017 = spark.read.csv("/FileStore/tables/2010_2017_traindata.csv", header=True, inferSchema= True)

In [38]:
display(trainedData2010_2017)

In [39]:
#Feature Definition and Vector Assembler creation

##############################################   MODEL - 3 #####################################################
#Initial Set of Features with only the simple columns
#DayNum - With the higher day number means games played at later stages. Hence, add as feature so that it means a better performance
#WTeamID and LTeamID - the IDs indicate the possibility in that match up
#Score_Diff - Difference between win and lose scores of the match
#Seed_Diff - Difference between seeds of the two playing teams
#NUMOT - Number of Overtimes in the match
#Loc - Played at home, outside or neither
featureCols_3_2018 = ["DayNum", "WTeamID", "Score_Diff", "Loc", "NumOT"]

#set the input and output column names**
assembler_3_2018 = feature.VectorAssembler(inputCols = featureCols_3_2018, outputCol = "features")

# Train a Logistic Regression model
logisticReg_3_2018 = LogisticRegression()

# Chain vecAssembler and Logistic regression model  
pipeline_3_2018 = Pipeline(stages=[assembler_3_2018, logisticReg_3_2018])

# Run stages in pipeline and train model
model_3_2018 = pipeline_3_2018.fit(trainedData2010_2017)

In [40]:
#Determine the testing accuracy for the model performance
test_predictions_3_2018 = model_3_2018.transform(testData_2018)

In [41]:
#Define the evaluator to obtain the areaUnderROC or the AUC score of the model
evaluator = BinaryClassificationEvaluator()

#Display the accuracies
print("The AUC metric for the testing dataset of model-3", evaluator.evaluate(test_predictions_3_2018))

In [42]:
#Checking the average prediction for the test data set ( A Balanced dataset)
test_predictions_3_2018.select(fn.avg('prediction')).show()

In [43]:
###############################MODEL - 4 ##################################################

#Defnition of Features - with all the match and team feautres, win and lose percetages and teh period win percetage feaures
featureCols_4_2018 = ["W_win_percentage","L_win_percentage","DayNum","WTeamID", "Score_Diff", "Loc", "NumOT"]

#Logistic Regression for all features
logisticReg_4_2018 = LogisticRegression()

assembler_4_2018=feature.VectorAssembler(inputCols=featureCols_4_2018,outputCol="features")
# Chain labelIndexer, vecAssembler and NBmodel in a 
pipeline_4_2018 = Pipeline(stages=[ assembler_4_2018, logisticReg_4_2018])

# Run stages in pipeline and train model
model_4_2018 = pipeline_4_2018.fit(trainedData2010_2017)

In [44]:
#Transforming on test data
test_predictions_4_2018 = model_4_2018.transform(testData_2018)

In [45]:
#Accuracy for test data- using all features
#print("The AUC Metric for validation data set of model-3 ", evaluator.evaluate(val_predictions_3))
print("The AUC Metric for test data set of model-4 ", evaluator.evaluate(test_predictions_4_2018))

In [46]:
test_predictions_4_2018.select(fn.avg('prediction')).show()

In [47]:
##### Model 5 ###########
featureCols_5_2018 = ["WTeamID","W_win_percentage", "L_win_percentage"]

#set the input and output column names**
assembler_5_2018 = feature.VectorAssembler(inputCols = featureCols_5_2018, outputCol = "features")

logisticReg_5_2018 = LogisticRegression()

# Chain vecAssembler and Logistic regression model  
pipeline_5_2018 = Pipeline(stages=[assembler_5_2018, logisticReg_5_2018])

# Run stages in pipeline and train model
model_5_2018 = pipeline_5_2018.fit(trainedData2010_2017)

In [48]:
test_predictions_5_2018 = model_5_2018.transform(testData_2018)

In [49]:
print("The AUC Metric for test data set of model-5 ", evaluator.evaluate(test_predictions_5_2018))

In [50]:
######## Model 6 ################
featureCols_6_2018 = ["WTeamID","DayNum", "Score_Diff", "Loc", "NumOT", "W_win_percentage", "L_win_percentage", "WGoals_ratio", 'W3pointers_ratio', 'WFreethrows_ratio', 'Win_accomplish', 'LGoals_ratio','L3pointers_ratio', 'LFreethrows_ratio', 'Lose_accomplish']
#set the input and output column names**
assembler_6_2018 = feature.VectorAssembler(inputCols = featureCols_6_2018, outputCol = "features")

logisticReg_6_2018 = LogisticRegression()

# Chain vecAssembler and Logistic regression model  
pipeline_6_2018 = Pipeline(stages=[assembler_6_2018, logisticReg_6_2018])

# Run stages in pipeline and train model
model_6_2018 = pipeline_6_2018.fit(trainedData2010_2017)

In [51]:
test_predictions_6_2018 = model_6_2018.transform(testData_2018)

In [52]:
print("The AUC Metric for test data set of model-6 ", evaluator.evaluate(test_predictions_6_2018))

In [53]:
#Inferences with pie chart representation

In [54]:
display(test_predictions_1_2018)

In [55]:
results= test_predictions_3_2018
results=results.toPandas()

In [56]:
diff_result= []

for row in results.iterrows():
  diff_result.append(abs(row[1]['prediction'] - row[1]['label']))
  diff_result

In [57]:
a=diff_result.count(0)
b=diff_result.count(1)

In [58]:
import matplotlib.pyplot as plt
plt.figure()
labels = 'correct prediction', 'wrong prediction'
sizes = [a,b]
colors = ['gold', 'yellowgreen']
explode = (0.1, 0)  # explode 1st slice
 
# Plot
plt.pie(sizes, explode=explode, labels=labels, colors=colors,
        autopct='%1.1f%%', shadow=True, startangle=140)
 
plt.axis('equal')
display()

In [59]:
loc_result= []

for row in results.iterrows():
  loc_result.append(abs(row[1]['Loc']))
  loc_result
  
a=loc_result.count(3)
b=loc_result.count(2)
c=loc_result.count(1)
  
import matplotlib.pyplot as plt
plt.figure()
labels = 'Home', 'Neutral', 'Away'
sizes = [a,b,c]
colors = ['gold', 'yellowgreen', 'blue']
explode = (0, 0, 0)  # explode 1st slice
 
# Plot
plt.pie(sizes, explode=explode, labels=labels, colors=colors,
        autopct='%1.1f%%', shadow=True, startangle=140)
 
plt.axis('equal')
display()