In [43]:
from ipynb.fs.full.data_extraction import init_spark, read_file
from pyspark.sql.functions import count,when,isnan,dayofmonth, month, year,col,udf,struct,lit
from pyspark.sql.types import IntegerType,LongType
import datetime
from pyspark.ml.feature import StringIndexer, VectorAssembler,IndexToString,VectorIndexer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

In [2]:
spark = init_spark()

In [3]:
match = read_file("Match.csv")
player_attributes = read_file("Player_Attributes.csv")
players = read_file("Player.csv")

In [4]:
match.filter(match["home_player_1"].isNotNull()).select("home_player_1","date")
player_attributes.filter(player_attributes["player_api_id"] == 39890).select("player_api_id","overall_rating").show()

+-------------+--------------+
|player_api_id|overall_rating|
+-------------+--------------+
|        39890|            64|
|        39890|            66|
|        39890|            66|
|        39890|            68|
|        39890|            67|
|        39890|            67|
|        39890|            66|
|        39890|            64|
|        39890|            62|
|        39890|            62|
+-------------+--------------+



In [5]:
# print(match.columns)
# match.select("home_player_X1").filter(match["home_player_X1"].isNotNull()).count()
required_columns = ["match_api_id","home_team_api_id","away_team_api_id","date","home_team_goal","away_team_goal","home_player_1", "home_player_2", "home_player_3", "home_player_4", "home_player_5",
               "home_player_6", "home_player_7", "home_player_8", "home_player_9", "home_player_10",
               "home_player_11", "away_player_1", "away_player_2", "away_player_3", "away_player_4",
               "away_player_5", "away_player_6", "away_player_7", "away_player_8", "away_player_9",
               "away_player_10", "away_player_11","B365H","B365D","B365A","BWH","BWD","BWA"]

match = match.select(required_columns)

In [6]:
match = match.na.drop()
player_attributes = player_attributes.na.drop()

In [7]:
def get_overall_rating(row,match_df_columns):
    match_id = row[match_df_columns.index("match_api_id")]
    match_date = row[match_df_columns.index("date")]
    all_players = ['home_player_1', 'home_player_2', 'home_player_3', "home_player_4", "home_player_5",
               "home_player_6", "home_player_7", "home_player_8", "home_player_9", "home_player_10",
               "home_player_11", "away_player_1", "away_player_2", "away_player_3", "away_player_4",
               "away_player_5", "away_player_6", "away_player_7", "away_player_8", "away_player_9",
               "away_player_10", "away_player_11"]
    for player in players:
        player_id = row[match_df_columns.index(player)]
        attributes = player_attributes.filter(player_attributes["player_api_id"] == player_id)
        current_attributes = attributes.filter(attributes["date"]<match_date).orderBy("date",ascending=False)
        overall_rating = current_attributes.select("overall_rating").mean()
    return overall_rating

# overall_rating_udf = udf(get_overall_rating)

In [8]:
grouped_rating = player_attributes.groupBy("player_api_id").mean().select("player_api_id","avg(overall_rating)")\
.withColumnRenamed("avg(overall_rating)","mean_rating").orderBy("mean_rating",ascending=False)
# player.filter(player["player_api_id"] == 35724).show()

In [9]:
# match_new = match
all_players = ['home_player_1', 'home_player_2', 'home_player_3', "home_player_4", "home_player_5",
               "home_player_6", "home_player_7", "home_player_8", "home_player_9", "home_player_10",
               "home_player_11", "away_player_1", "away_player_2", "away_player_3", "away_player_4",
               "away_player_5", "away_player_6", "away_player_7", "away_player_8", "away_player_9",
               "away_player_10", "away_player_11"]
# for player in all_players:
#     player_df = match.select("match_api_id",player)
#     joined_df = player_df.alias("pdf").join(players.alias("p"), player_df[player] == players["player_api_id"])\
#     .select("match_api_id",player_df[player])
#     match_new = match_new.join(joined_df.alias("pdf"), on = "match_api_id").drop(joined_df[player])

    
grouped_rating.filter(grouped_rating["player_api_id"] == 38327).show()

+-------------+------------------+
|player_api_id|       mean_rating|
+-------------+------------------+
|        38327|63.214285714285715|
+-------------+------------------+



In [10]:
# match_new.cache()
# grouped_rating.cache()
# print(datetime.datetime.now())
# for player in all_players:
#     player_df = match_new.select("match_api_id",player)
#     joined_df = player_df.join(grouped_rating, grouped_rating["player_api_id"] == player_df[player]).drop("player_api_id")\
#     .withColumnRenamed("mean_rating",player+"_mean_rating")
#     match_new = match_new.join(joined_df, on = "match_api_id").drop(joined_df[player])
#     match_new.select("match_api_id",player,player+"_mean_rating").show(1)
#     print(datetime.datetime.now())
match_new = match    
match.cache()
grouped_rating.cache()
print(datetime.datetime.now())
for player in all_players:
    player_df = match.select("match_api_id",player)
    joined_df = player_df.join(grouped_rating, player_df[player] == grouped_rating["player_api_id"]).drop("player_api_id")\
    .withColumnRenamed("mean_rating",player+"_mean_rating")
    match_new = match_new.join(joined_df, on = "match_api_id").drop(match_new[player])
    match_new.select("match_api_id",player,player+"_mean_rating").show(1)
    print(datetime.datetime.now())
match_new.cache()

2019-04-04 17:07:08.473113
+------------+-------------+-------------------------+
|match_api_id|home_player_1|home_player_1_mean_rating|
+------------+-------------+-------------------------+
|      493017|        38327|       63.214285714285715|
+------------+-------------+-------------------------+
only showing top 1 row

2019-04-04 17:07:13.252481
+------------+-------------+-------------------------+
|match_api_id|home_player_2|home_player_2_mean_rating|
+------------+-------------+-------------------------+
|      489204|        46403|        77.79166666666667|
+------------+-------------+-------------------------+
only showing top 1 row

2019-04-04 17:07:14.315895
+------------+-------------+-------------------------+
|match_api_id|home_player_3|home_player_3_mean_rating|
+------------+-------------+-------------------------+
|      489204|        24531|                    78.16|
+------------+-------------+-------------------------+
only showing top 1 row

2019-04-04 17:07:15.35

DataFrame[match_api_id: int, home_team_api_id: int, away_team_api_id: int, date: timestamp, home_team_goal: int, away_team_goal: int, B365H: double, B365D: double, B365A: double, BWH: double, BWD: double, BWA: double, home_player_1: int, home_player_1_mean_rating: double, home_player_2: int, home_player_2_mean_rating: double, home_player_3: int, home_player_3_mean_rating: double, home_player_4: int, home_player_4_mean_rating: double, home_player_5: int, home_player_5_mean_rating: double, home_player_6: int, home_player_6_mean_rating: double, home_player_7: int, home_player_7_mean_rating: double, home_player_8: int, home_player_8_mean_rating: double, home_player_9: int, home_player_9_mean_rating: double, home_player_10: int, home_player_10_mean_rating: double, home_player_11: int, home_player_11_mean_rating: double, away_player_1: int, away_player_1_mean_rating: double, away_player_2: int, away_player_2_mean_rating: double, away_player_3: int, away_player_3_mean_rating: double, away_pla

In [11]:
# goals scored at home
home_goals = match_new.select("home_team_api_id","home_team_goal","away_team_goal").groupBy("home_team_api_id").sum()\
.withColumnRenamed("sum(home_team_goal)","home_team_home_goals_scored")\
.withColumnRenamed("sum(away_team_goal)","home_team_home_goals_conceded")\
.withColumnRenamed("home_team_api_id","team_id")\
.select("team_id","home_team_home_goals_scored","home_team_home_goals_conceded")

# goals scored away
away_goals = match_new.select("away_team_api_id","home_team_goal","away_team_goal").groupBy("away_team_api_id").sum()\
.withColumnRenamed("sum(home_team_goal)","away_team_away_goals_conceded")\
.withColumnRenamed("sum(away_team_goal)","away_team_away_goals_scored")\
.withColumnRenamed("away_team_api_id","team_id")\
.select("team_id","away_team_away_goals_conceded","away_team_away_goals_scored")


In [12]:
# goal difference
goal_difference = home_goals.join(away_goals, on="team_id")
goal_difference = goal_difference.withColumn("goal_diff",(goal_difference["home_team_home_goals_scored"]+goal_difference["away_team_away_goals_scored"])
                                    - (goal_difference["home_team_home_goals_conceded"]+goal_difference["away_team_away_goals_conceded"]))\
.withColumnRenamed("home_team_api_id","team_id")
# home_goals.show()
# goal_difference.show()
# update home team goal difference to original df
match_with_goals_scored = match_new.join(goal_difference, match_new["home_team_api_id"] == goal_difference["team_id"])\
.withColumnRenamed("goal_diff","home_team_goal_diff").drop("team_id","home_team_home_goals_scored","home_team_home_goals_conceded"
                                                          ,"away_team_away_goals_conceded","away_team_away_goals_scored")
# update away team goal difference to original df
match_with_goals_scored = match_with_goals_scored.join(goal_difference, match_with_goals_scored["away_team_api_id"] == goal_difference["team_id"])\
.withColumnRenamed("goal_diff","away_team_goal_diff").drop("team_id","home_team_home_goals_scored","home_team_home_goals_conceded"
                                                          ,"away_team_away_goals_conceded","away_team_away_goals_scored")
# match_with_goals_scored.select("match_api_id","home_team_api_id","away_team_api_id","home_team_goal_diff","away_team_goal_diff").show()
match_with_goals_scored.cache()

DataFrame[match_api_id: int, home_team_api_id: int, away_team_api_id: int, date: timestamp, home_team_goal: int, away_team_goal: int, B365H: double, B365D: double, B365A: double, BWH: double, BWD: double, BWA: double, home_player_1: int, home_player_1_mean_rating: double, home_player_2: int, home_player_2_mean_rating: double, home_player_3: int, home_player_3_mean_rating: double, home_player_4: int, home_player_4_mean_rating: double, home_player_5: int, home_player_5_mean_rating: double, home_player_6: int, home_player_6_mean_rating: double, home_player_7: int, home_player_7_mean_rating: double, home_player_8: int, home_player_8_mean_rating: double, home_player_9: int, home_player_9_mean_rating: double, home_player_10: int, home_player_10_mean_rating: double, home_player_11: int, home_player_11_mean_rating: double, away_player_1: int, away_player_1_mean_rating: double, away_player_2: int, away_player_2_mean_rating: double, away_player_3: int, away_player_3_mean_rating: double, away_pla

In [13]:
# condition =.withColumn("home_result",
#                     when(match_new["home_team_goal"]>match_new["away_team_goal"],"win").otherwise(
#                         when(match_new["home_team_goal"]<match_new["away_team_goal"],"lost").otherwise("draw")))

# get home team results - win, loss or draw
home_results = match_new.select("home_team_api_id","home_team_goal","away_team_goal").withColumnRenamed("home_team_api_id","team_id")\
.withColumn("home_result",
                    when(match_new["home_team_goal"]>match_new["away_team_goal"],"win").otherwise(
                        when(match_new["home_team_goal"]<match_new["away_team_goal"],"lost").otherwise("draw")))

# get away team results - win, loss or draw
away_results = match_new.select("away_team_api_id","home_team_goal","away_team_goal").withColumnRenamed("away_team_api_id","team_id")\
.withColumn("away_result",
                    when(match_new["home_team_goal"]>match_new["away_team_goal"],"lost").otherwise(
                        when(match_new["home_team_goal"]<match_new["away_team_goal"],"win").otherwise("draw")))

# # home_wins.groupBy("team_id","home_result").count()\
# # .filter((home_wins["home_result"] == "win") | (home_wins["home_result"] == "lost"))\
# # .filter(home_wins["team_id"] == 9987).show()
# #
# team_results = home_results.join(away_results,on="team_id")

In [14]:
# get number of home wins
home_wins = home_results.select("team_id","home_result")
home_wins = home_wins.filter(home_wins["home_result"] == "win").groupBy("team_id").count().withColumnRenamed("count","home_wins")
# get number of away wins
away_wins = away_results.select("team_id","away_result")
away_wins = away_wins.filter(away_wins["away_result"] == "win").groupBy("team_id").count().withColumnRenamed("count","away_wins")

# calculate total wins of team
total_wins = home_wins.join(away_wins,on="team_id")
total_wins = total_wins.withColumn("wins", total_wins["home_wins"]+ total_wins["away_wins"])

# update home team wins to original match df
match_with_wins = match_with_goals_scored.join(total_wins, match_with_goals_scored["home_team_api_id"] == total_wins["team_id"])\
.drop("team_id","home_wins","away_wins").withColumnRenamed("wins","home_team_wins")

# update away team wins to original match df
match_with_wins = match_with_wins.join(total_wins, match_with_wins["away_team_api_id"] == total_wins["team_id"])\
.drop("team_id","home_wins","away_wins").withColumnRenamed("wins","away_team_wins")
match_with_wins.cache()

DataFrame[match_api_id: int, home_team_api_id: int, away_team_api_id: int, date: timestamp, home_team_goal: int, away_team_goal: int, B365H: double, B365D: double, B365A: double, BWH: double, BWD: double, BWA: double, home_player_1: int, home_player_1_mean_rating: double, home_player_2: int, home_player_2_mean_rating: double, home_player_3: int, home_player_3_mean_rating: double, home_player_4: int, home_player_4_mean_rating: double, home_player_5: int, home_player_5_mean_rating: double, home_player_6: int, home_player_6_mean_rating: double, home_player_7: int, home_player_7_mean_rating: double, home_player_8: int, home_player_8_mean_rating: double, home_player_9: int, home_player_9_mean_rating: double, home_player_10: int, home_player_10_mean_rating: double, home_player_11: int, home_player_11_mean_rating: double, away_player_1: int, away_player_1_mean_rating: double, away_player_2: int, away_player_2_mean_rating: double, away_player_3: int, away_player_3_mean_rating: double, away_pla

In [15]:
match_with_label = match_with_wins.withColumn("match_label", when(match_with_wins["home_team_goal"]> match_with_wins["away_team_goal"],"win")
                                             .otherwise(when(match_with_wins["home_team_goal"]<match_with_wins["away_team_goal"],"loss").otherwise("draw"))
                                             )
match_with_label.cache()

DataFrame[match_api_id: int, home_team_api_id: int, away_team_api_id: int, date: timestamp, home_team_goal: int, away_team_goal: int, B365H: double, B365D: double, B365A: double, BWH: double, BWD: double, BWA: double, home_player_1: int, home_player_1_mean_rating: double, home_player_2: int, home_player_2_mean_rating: double, home_player_3: int, home_player_3_mean_rating: double, home_player_4: int, home_player_4_mean_rating: double, home_player_5: int, home_player_5_mean_rating: double, home_player_6: int, home_player_6_mean_rating: double, home_player_7: int, home_player_7_mean_rating: double, home_player_8: int, home_player_8_mean_rating: double, home_player_9: int, home_player_9_mean_rating: double, home_player_10: int, home_player_10_mean_rating: double, home_player_11: int, home_player_11_mean_rating: double, away_player_1: int, away_player_1_mean_rating: double, away_player_2: int, away_player_2_mean_rating: double, away_player_3: int, away_player_3_mean_rating: double, away_pla

In [16]:
book_keeper = ["B365H","B365D","B365A","BWH","BWD","BWA"]
match_with_probs = match_with_label
for bk in book_keeper:
    match_with_probs = match_with_probs.withColumn(bk, 1/match_with_probs[bk])


In [41]:
match_data = match_with_probs
indexer = StringIndexer(inputCol="match_label",outputCol= "label")
match_data = indexer.fit(match_data).transform(match_data)
match_data.columns

features = [x for x in match_data.columns if x not in ["date","label","match_label","match_api_id"
                                                       ,"home_team_api_id","away_team_api_id"]+book_keeper]

assembler = VectorAssembler(inputCols = features, outputCol = "features")
match_data = assembler.transform(match_data)
final_data = match_data.select("features","label")
train,test = final_data.randomSplit([0.7,0.3])
train.cache()
test.cache()

DataFrame[features: vector, label: double]

In [38]:
# logistic regression
lr = LogisticRegression(maxIter=7, regParam=0.1, elasticNetParam=0.8,family="multinomial")
model = lr.fit(train)
result = model.evaluate(test)
multi_class = MulticlassClassificationEvaluator()
multi_class.evaluate(result.predictions)

0.7859793433095731

In [61]:
# random forest
match_data = match_with_probs
labelIndexer = StringIndexer(inputCol="match_label", outputCol="label").fit(match_data)
features = [x for x in match_data.columns if x not in ["date","label","match_label","match_api_id"
                                                       ,"home_team_api_id","away_team_api_id"]+book_keeper]

assembler = VectorAssembler(inputCols = features, outputCol = "features")
print("Vector assembler done "+str(datetime.datetime.now()))
match_data = assembler.transform(match_data)
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(match_data)
print("Feature indexer done "+str(datetime.datetime.now()))
final_data = match_data.select("match_api_id","features","label","match_label")
(trainingData, testData) = final_data.randomSplit([0.7, 0.3])
rf = RandomForestClassifier(labelCol="label", featuresCol="indexedFeatures", numTrees=100)
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",labels=labelIndexer.labels)
print("label converter done "+str(datetime.datetime.now()))
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])
model = pipeline.fit(trainingData)
print("model fit "+str(datetime.datetime.now()))
predictions = model.transform(testData)
# predictions.columns
predictions.select("match_api_id","predictedLabel", "label", "features").filter(predictions["matc"])
evaluator = MulticlassClassificationEvaluator(
    labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(accuracy)

Vector assembler done 2019-04-04 17:53:29.939010
Feature indexer done 2019-04-04 17:53:32.633763
label converter done 2019-04-04 17:53:32.727052
model fit 2019-04-04 17:54:10.016257
+------------+--------------+-----+--------------------+
|match_api_id|predictedLabel|label|            features|
+------------+--------------+-----+--------------------+
|      653930|          draw|  2.0|[1.0,1.0,26523.0,...|
|      654241|          draw|  2.0|[0.0,0.0,26343.0,...|
|      654476|          draw|  2.0|[1.0,1.0,26126.0,...|
|      829967|          loss|  1.0|[1.0,3.0,30742.0,...|
|      830080|          loss|  1.0|[0.0,1.0,26117.0,...|
+------------+--------------+-----+--------------------+
only showing top 5 rows

0.8376451415766959
