# Bundesliga analysis and match result prediction 
- [Bundesliga analysis and match result prediction](#toc1_)    
  - [Loading of the dfs and extraction of some information](#toc1_1_)    
  - [Questions answers](#toc1_2_)    
  - [Random Forest model to predict result](#toc1_3_)    
    - [Prediction of matches with wins, losses and draws](#toc1_3_1_)    
    - [Prediction of matches with no Draws](#toc1_3_2_)    
  - [Conclussions](#toc1_4_)    

<!-- vscode-jupyter-toc-config
	numbering=false
	anchor=true
	flat=false
	minLevel=1
	maxLevel=6
	/vscode-jupyter-toc-config -->
<!-- THIS CELL WILL BE REPLACED ON TOC UPDATE. DO NOT WRITE YOUR TEXT IN THIS CELL -->




In this notebook we are going to make an **analysis of some data of the first football division in Germany using Pyspark**, extracting some information of each season: champions of the league year by year, relegated ones of each season and season with more goals. We'll also see the teams with more economic power and the stadiums with more capacity. Finally, we will do a **simple model also with Pyspark using a Random Forest** with which we'll try to predict the results of the football matches with some independent variables (this will be very difficult, as we'll see later).

**It's necessary to clear that the main goals of this work is to extract some conclussions of the data we have through some data transformations and build a simple ML pipeline, doing both things using the library Pyspark.**

## <a id='toc1_1_'></a>[Loading of the dfs and extraction of some information](#toc0_)

As data we have four dataframes, which have already suffered some transformations (ETL process): they are already propperly formatted, without NaNs, stored in csv format:
* A dataframe called Matches.csv with the result of the matches of some different seasons.
* A dataframe called Teams.csv with the information of the different teams.
* A dataframe called Unique_Teams.csv with the IDS of the teams. In this df we have teams from Germany and England for the first and the second division.


The first step is to load the necessary libraries and start the spark session.

In [2]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.appName("bundesliga_analysis").getOrCreate()

23/03/30 16:04:20 WARN Utils: Your hostname, javier-VivoBook resolves to a loopback address: 127.0.1.1; using 192.168.1.136 instead (on interface wlo1)
23/03/30 16:04:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/30 16:04:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


**Load of Unique_Teams df**

We are going to start with the load of this df because we want to ensure that we don't get any team from outside of Germany. Let's load it.

In [3]:
unique_teams = spark.read.option("header",True).option("inferSchema" , "true").csv("./data/Unique_Teams.csv")
unique_teams.show(2)
unique_teams.dtypes

+-------------+--------------+
|     TeamName|Unique_Team_ID|
+-------------+--------------+
|Bayern Munich|             1|
|     Dortmund|             2|
+-------------+--------------+
only showing top 2 rows



[('TeamName', 'string'), ('Unique_Team_ID', 'int')]

We only have two columns, which are of the correct data type. Looking directly at the raw csv, we have realised that the german teams with have an Unique_Team_ID value <= 80, while the English Teams have a greater ID. Knowing this what we are going to do is to filter the df using this ID. 

In [4]:
unique_teams.filter(
    F.col('Unique_Team_ID') > 75
).show(10)

+------------+--------------+
|    TeamName|Unique_Team_ID|
+------------+--------------+
|Wattenscheid|            76|
| Wuppertaler|            77|
|     Zwickau|            78|
|    RW Essen|            79|
|  M'Gladbach|            80|
|     Arsenal|            81|
| Aston Villa|            82|
|     Chelsea|            83|
|   Liverpool|            84|
|    Man City|            85|
+------------+--------------+
only showing top 10 rows



This could have been done doing a web scrapping of teams of both leagues in such a way that every team had a Country variable (or directly downloading the csvs from Internet, they're available there), joining this dfs and filtering in order to get the german ones. Nevertheless, the objective of this project is to use pyspark, so we'll do in the faster way.

In [5]:
unique_german_teams = unique_teams.filter(
    F.col('Unique_Team_ID') <= 80
)
unique_german_teams.show(2)

+-------------+--------------+
|     TeamName|Unique_Team_ID|
+-------------+--------------+
|Bayern Munich|             1|
|     Dortmund|             2|
+-------------+--------------+
only showing top 2 rows



**Load of the teams df**

Now we are going to load and take a look at the teams df. 

In [6]:
teams = spark.read.option("header",True).option("inferSchema" , "true").csv("./data/Teams.csv")
teams.show(5)

+------+-------------+----------+------------------+---------------+
|Season|     TeamName|AvgAgeHome|OverallMarketValue|StadiumCapacity|
+------+-------------+----------+------------------+---------------+
|  2017|Bayern Munich|        26|         597950000|          75000|
|  2017|     Dortmund|        25|         416730000|          81359|
|  2017|   Leverkusen|        24|         222600000|          30210|
|  2017|   RB Leipzig|        23|         180130000|          42959|
|  2017|   Schalke 04|        24|         179550000|          62271|
+------+-------------+----------+------------------+---------------+
only showing top 5 rows



Now let's take a look at the number of differents teams in our csv:

In [7]:
teams.select('TeamName').distinct().count()

57

We have 57 different teams. Let's do an inner join with the df unique_german_teams df by the TeamName in order to ensure we just have german teams.

In [8]:
teams = teams.join(
    unique_german_teams,on='TeamName',how='inner'
).drop('Unique_Team_ID')
teams.show()

+-------------+------+----------+------------------+---------------+
|     TeamName|Season|AvgAgeHome|OverallMarketValue|StadiumCapacity|
+-------------+------+----------+------------------+---------------+
|Bayern Munich|  2017|        26|         597950000|          75000|
|     Dortmund|  2017|        25|         416730000|          81359|
|   Leverkusen|  2017|        24|         222600000|          30210|
|   RB Leipzig|  2017|        23|         180130000|          42959|
|   Schalke 04|  2017|        24|         179550000|          62271|
|   M'gladbach|  2017|        25|         154400000|          54014|
|    Wolfsburg|  2017|        24|         124430000|          30000|
|      FC Koln|  2017|        26|         118550000|          49968|
|   Hoffenheim|  2017|        24|         107330000|          30164|
|       Hertha|  2017|        26|          86800000|          74475|
|        Mainz|  2017|        25|          71950000|          34000|
|      Hamburg|  2017|        25| 

Now let's show the two teams with more market value for each season:

In [9]:
from pyspark.sql.window import Window

w=Window().partitionBy("Season")

teams.withColumn(
        "rank", F.row_number().over(w.orderBy(F.col("OverallMarketValue").desc()))
).filter(
        (F.col("rank")==1) | (F.col("rank")==2)
).drop("rank").select('Season','TeamName','OverallMarketValue').show(30)

+------+-------------+------------------+
|Season|     TeamName|OverallMarketValue|
+------+-------------+------------------+
|  2005|Bayern Munich|         171500000|
|  2005|Werder Bremen|          93800000|
|  2006|Bayern Munich|         166980000|
|  2006|Werder Bremen|         128680000|
|  2007|Bayern Munich|         222930000|
|  2007|Werder Bremen|         129030000|
|  2008|Bayern Munich|         238900000|
|  2008|Werder Bremen|         142450000|
|  2009|Bayern Munich|         283250000|
|  2009|    Wolfsburg|         155880000|
|  2010|Bayern Munich|         284500000|
|  2010|    Wolfsburg|         210730000|
|  2011|Bayern Munich|         335600000|
|  2011|     Dortmund|         158200000|
|  2012|Bayern Munich|         407300000|
|  2012|     Dortmund|         227680000|
|  2013|Bayern Munich|         483980000|
|  2013|     Dortmund|         285150000|
|  2014|Bayern Munich|         564180000|
|  2014|     Dortmund|         329800000|
|  2015|Bayern Munich|         608

As we can see, Bayern Munich is the team with more economic power of the Bundesliga teams, as every season it is the team with more market value of the league. The second most valuable team in the last years is Borussia Dortmund, while in the 2010s the second most valuable team used to be Werder Bremen. It's also remarkable the fact that every year the value of the team hasn't stopped of increase. This is because of the football market's inflation, that has grow a lot in the past 20 years. 

Now let's check the maximum values of the different teams across the years.

In [10]:
teams.groupBy('TeamName','Season').agg(
    F.max('OverallMarketValue').alias('MaxMarketValue')
).orderBy(F.col('MaxMarketValue').desc()).show()

+-------------+------+--------------+
|     TeamName|Season|MaxMarketValue|
+-------------+------+--------------+
|Bayern Munich|  2015|     608500000|
|Bayern Munich|  2017|     597950000|
|Bayern Munich|  2016|     595400000|
|Bayern Munich|  2014|     564180000|
|Bayern Munich|  2013|     483980000|
|     Dortmund|  2017|     416730000|
|Bayern Munich|  2012|     407300000|
|Bayern Munich|  2011|     335600000|
|     Dortmund|  2014|     329800000|
|     Dortmund|  2016|     321550000|
|     Dortmund|  2015|     311600000|
|     Dortmund|  2013|     285150000|
|Bayern Munich|  2010|     284500000|
|Bayern Munich|  2009|     283250000|
|    Wolfsburg|  2015|     265430000|
|   Leverkusen|  2016|     244580000|
|Bayern Munich|  2008|     238900000|
|   Schalke 04|  2016|     238750000|
|     Dortmund|  2012|     227680000|
|    Wolfsburg|  2016|     225350000|
+-------------+------+--------------+
only showing top 20 rows



The first team that appears in the list different from the Bayern Munich is Borussia dortmund. Both of this teams are the ones that appears most in the top list of market value. In the final spots of the list we have also two teams: Wolfsburg and Bayern Leverkusen. 

Now let's check the top 10 stadiums with most capacity.

In [11]:
from pyspark.sql.window import Window

w=Window().partitionBy("TeamName")

teams.withColumn(
        "rank", F.row_number().over(w.orderBy(F.col("StadiumCapacity").desc()))
).filter(
        (F.col("rank")==1)
).drop("rank").select('TeamName','StadiumCapacity').orderBy(
        F.col('StadiumCapacity').desc()
).show(10)

+------------------+---------------+
|          TeamName|StadiumCapacity|
+------------------+---------------+
|          Dortmund|          81359|
|     Bayern Munich|          75000|
|            Hertha|          74475|
|        Schalke 04|          62271|
|         Stuttgart|          60449|
|           Hamburg|          57376|
|Fortuna Dusseldorf|          54600|
|        M'gladbach|          54014|
|     Ein Frankfurt|          51500|
|          Nurnberg|          50000|
+------------------+---------------+
only showing top 10 rows



As we can see Borussia Dortmund is the team with more capacity, followed by the Bayern Munich and by Hertha Berlin. Now we are going to see which teams are the youngest and the oldest ones in the teams we have. Starting with the older ones, this is the oldest teams that we have in our data across the years:

In [12]:
teams.groupBy('TeamName','Season').agg(
    F.max('AvgAgeHome').alias('AvgAgeHome')
).orderBy(F.col('AvgAgeHome').desc()).show(10)

+--------------+------+----------+
|      TeamName|Season|AvgAgeHome|
+--------------+------+----------+
|         Ahlen|  2005|        28|
|     Offenbach|  2006|        27|
|      Hannover|  2007|        27|
| Bayern Munich|  2005|        27|
|      Augsburg|  2006|        27|
|        Aachen|  2005|        27|
|      Duisburg|  2005|        27|
|     Wolfsburg|  2005|        27|
|Erzgebirge Aue|  2006|        27|
| Frankfurt FSV|  2008|        27|
+--------------+------+----------+
only showing top 10 rows



Now let's check the youngest teams:

In [13]:
teams.groupBy('TeamName','Season').agg(
    F.min('AvgAgeHome').alias('AvgAgeHome')
).orderBy(F.col('AvgAgeHome')).show(10)

+--------------+------+----------+
|      TeamName|Season|AvgAgeHome|
+--------------+------+----------+
|    RB Leipzig|  2015|        22|
|    Hoffenheim|  2009|        22|
|   Munich 1860|  2010|        22|
|        Bochum|  2016|        22|
|    Leverkusen|  2007|        23|
|Kaiserslautern|  2011|        23|
|    RB Leipzig|  2016|        23|
|      Dortmund|  2006|        23|
| Werder Bremen|  2014|        23|
|       FC Koln|  2012|        23|
+--------------+------+----------+
only showing top 10 rows



It's remarkable that we found the older teams in our data in the earlier years of study, while we have the youngest teams in the last years of our study. This is probably because in the last years footballers have become professionals earlier, decreasing the teams' mean age.

**Load of the matches df**

Now we are going to load and take a look at the matches df. 

In [14]:
matches = spark.read.option("header",True).option("inferSchema" , "true").csv("./data/Matches.csv")
matches.count()

24625

We are going to ensure that in our matches df we don't have any foreign club. This could be a match of the another league or a match from a international clubs competition of a german club facing a foreign club, so we are going to impose that both teams are german.

In [15]:
matches = matches.join(
    unique_german_teams,
    on = [(unique_german_teams.TeamName == matches.HomeTeam)],
    how='inner').drop(
'Unique_Team_ID','TeamName').dropDuplicates().join(
    unique_german_teams,
    on = [(unique_german_teams.TeamName == matches.AwayTeam)],
    how='inner').drop(
    'Unique_Team_ID','TeamName').dropDuplicates()

matches.count()

15239

Now we just have german teams playing with each other. We can see that maybe some variables are not very well named. We are going to change the names of the columns 'FTHG' and 'FTAG', which basically are the goals scored by the home team and by the away team, respectively.  Also, the final result, which is a categorical variable named as FTR, is not very explanatory, so we are going to rename it.

In [16]:
matches = matches.withColumnRenamed(
    'FTHG','HomeGoals'
).withColumnRenamed(
    'FTAG','AwayGoals'
).withColumnRenamed(
    'FTR','Result'
)
matches.show(5)

+--------+---+------+-------------------+------------------+------------+---------+---------+------+
|Match_ID|Div|Season|               Date|          HomeTeam|    AwayTeam|HomeGoals|AwayGoals|Result|
+--------+---+------+-------------------+------------------+------------+---------+---------+------+
|       7| D2|  2009|2009-08-14 00:00:00|         Paderborn|   Karlsruhe|        2|        0|     H|
|      23| D1|  2009|2010-05-08 00:00:00|        M'gladbach|  Leverkusen|        1|        1|     D|
|     366| D2|  2009|2010-01-22 00:00:00|Fortuna Dusseldorf|Union Berlin|        1|        0|     H|
|     413| D2|  2009|2010-03-12 00:00:00|             Ahlen|     Koblenz|        0|        2|     A|
|     861| D2|  2010|2011-01-22 00:00:00|           Cottbus|   Bielefeld|        2|        1|     H|
+--------+---+------+-------------------+------------------+------------+---------+---------+------+
only showing top 5 rows



We can see that we have a variable named Div, which indicates the division of the football teams that are playing the match. We want to analyse the Bundesliga, which corresponds to the first division (D1), so we are going to filter the df in order to just keep the matches of this division.

In [17]:
matches = matches.filter(F.col('Div') == 'D1')
matches.count()

7616

Now we are going to check what is the effect of being home team or away team in the result.

In [18]:
matches.groupBy('Result').agg(F.count('Result')).show()

+------+-------------+
|Result|count(Result)|
+------+-------------+
|     D|         1872|
|     A|         2009|
|     H|         3735|
+------+-------------+



It's much more frequent that the home team is the one who wins. Let's see who's the top 3 teams that win the most as visitor:

In [19]:
matches.groupBy(
    'Result','AwayTeam'
).agg(
    F.count('Result'),F.count('AwayTeam').alias('AwayVictories')
).drop(
    'Result','count(Result)'
).orderBy(F.col('AwayVictories').desc()).show(5)

+-------------+-------------+
|     AwayTeam|AwayVictories|
+-------------+-------------+
|Bayern Munich|          216|
|   M'gladbach|          203|
|      Hamburg|          203|
|Werder Bremen|          192|
|Ein Frankfurt|          186|
+-------------+-------------+
only showing top 5 rows



## <a id='toc1_2_'></a>[Questions answers](#toc0_)

**Champions and relegateds of the Bundesliga year by year**

Now let's check who where the Bundesliga winners and the three relegateds in the years between 2000 and 2015. Firstly, we are going to filter the matches of the seasons we are interested in, and then we are going to create three new columns with a 0 or a 1 depending on the result, indicating if the home team has won, has tied or has lost.

In [20]:
matches_00_15 = matches.filter((F.col('Season') >= 2000) & (F.col('Season') <= 2015))

matches_00_15 = matches_00_15.withColumn(
    'HomeTeamWin', F.when(F.col('Result') == 'H', 1).otherwise(0)
).withColumn('AwayTeamWin', F.when(F.col('Result') == 'A', 1).otherwise(0)
).withColumn(
    'GameTie', F.when(F.col('Result') == 'D', 1).otherwise(0)
)


Now we create two dfs: one for the stats for each team when they play at home and the same for each team when they play as visitors. This way we then join both dfs and do some calculations in order to get the champions and relegateds for each year.

In [21]:
# build the home team stats for each season
hometeam = matches_00_15.groupBy(
    'Season','HomeTeam'
).agg(
    F.sum(F.col('HomeTeamWin')).alias('TotalHomeWin'),
    F.sum(F.col('GameTie')).alias('TotalHomeDraw'),
    F.sum(F.col('AwayTeamWin')).alias('TotalHomeLoss'),
    F.sum(F.col('HomeGoals')).alias('TotalHomeGoalsScored'),
    F.sum(F.col('AwayGoals')).alias('TotalHomeGoalsConceded'),
).withColumnRenamed('HomeTeam', 'Team')

# build the away team stats for each season
awayteam = matches_00_15.groupBy(
    'Season','AwayTeam'
).agg(
    F.sum(F.col('HomeTeamWin')).alias('TotalAwayLoss'),
    F.sum(F.col('GameTie')).alias('TotalAwayDraw'),
    F.sum(F.col('AwayTeamWin')).alias('TotalAwayWin'),
    F.sum(F.col('AwayGoals')).alias('TotalAwayGoalsScored'),
    F.sum(F.col('HomeGoals')).alias('TotalAwayGoalsConceded'),
).withColumnRenamed('AwayTeam', 'Team')

# combine the previous dfs and get general season stats
total_results_season = hometeam.join(awayteam, on=['Season','Team'],how='inner')
total_results_season = total_results_season.withColumn(
    'TotalVictories',F.col('TotalHomeWin') + F.col('TotalAwayWin')
).withColumn(
    'TotalDraws',F.col('TotalHomeDraw') + F.col('TotalAwayDraw')
).withColumn(
    'TotalLosses',F.col('TotalHomeLoss') + F.col('TotalAwayLoss')
).withColumn(
    'TotalTeamGoalsScored',F.col('TotalHomeGoalsScored') + F.col('TotalAwayGoalsScored')
).withColumn(
    'TotalTeamGoalsConceded',F.col('TotalHomeGoalsConceded') + F.col('TotalAwayGoalsConceded')
).withColumn(
    '+-',F.col('TotalTeamGoalsScored') - F.col('TotalTeamGoalsConceded')
).withColumn(
    'Points', (F.col('TotalHomeWin') + F.col('TotalAwayWin'))*3 + (F.col('TotalHomeDraw') + F.col('TotalAwayDraw'))*1
).drop('TotalHomeWin', 'TotalHomeDraw', 'TotalHomeLoss', 'TotalHomeGoalsScored','TotalHomeGoalsConceded', 'TotalAwayLoss', 'TotalAwayDraw', 'TotalAwayWin','TotalAwayGoalsScored','TotalAwayGoalsConceded')

total_results_season.show(2)

+------+-------------+--------------+----------+-----------+--------------------+----------------------+---+------+
|Season|         Team|TotalVictories|TotalDraws|TotalLosses|TotalTeamGoalsScored|TotalTeamGoalsConceded| +-|Points|
+------+-------------+--------------+----------+-----------+--------------------+----------------------+---+------+
|  2005|Bayern Munich|            22|         9|          3|                  67|                    32| 35|    75|
|  2008|   M'gladbach|             8|         7|         19|                  39|                    62|-23|    31|
+------+-------------+--------------+----------+-----------+--------------------+----------------------+---+------+
only showing top 2 rows



We now have the df built and we in order to get the champions and relegateds for each year we only have to filter that information from it. Firstly we want to get the champions for each year.

In [22]:
w=Window().partitionBy('Season')

total_results_season.withColumn(
        "rank", F.row_number().over(w.orderBy(F.col("Points").desc()))
).filter(
        (F.col("rank")==1)
).drop("rank").select('Season','Team','Points').orderBy(
        F.col('Season')
).show()


+------+-------------+------+
|Season|         Team|Points|
+------+-------------+------+
|  2000|Bayern Munich|    63|
|  2001|     Dortmund|    70|
|  2002|Bayern Munich|    75|
|  2003|Werder Bremen|    74|
|  2004|Bayern Munich|    77|
|  2005|Bayern Munich|    75|
|  2006|    Stuttgart|    70|
|  2007|Bayern Munich|    76|
|  2008|    Wolfsburg|    69|
|  2009|Bayern Munich|    70|
|  2010|     Dortmund|    75|
|  2011|     Dortmund|    81|
|  2012|Bayern Munich|    91|
|  2013|Bayern Munich|    90|
|  2014|Bayern Munich|    79|
|  2015|Bayern Munich|    88|
+------+-------------+------+



The teams that mostly won the leaguer in the last years are the ones that before we have seen have most money: Bayern Munich and Dortmund. We are surprised by the presence of Stuttgart in 2006. Let's check now the three relegateds of each year:

In [23]:
w=Window().partitionBy('Season')

total_results_season.withColumn(
        "rank", F.row_number().over(w.orderBy(F.col("Points")))
).filter(
        (F.col("rank")==1) | (F.col("rank")==2) | (F.col("rank")==3)
).drop("rank").select('Season','Team','Points').orderBy(
        F.col('Season')
).show()

+------+--------------+------+
|Season|          Team|Points|
+------+--------------+------+
|  2000|  Unterhaching|    35|
|  2000| Ein Frankfurt|    35|
|  2000|        Bochum|    27|
|  2001|      St Pauli|    22|
|  2001|      Freiburg|    30|
|  2001|       FC Koln|    29|
|  2002|      Nurnberg|    30|
|  2002|     Bielefeld|    36|
|  2002|       Cottbus|    30|
|  2003|   Munich 1860|    32|
|  2003| Ein Frankfurt|    32|
|  2003|       FC Koln|    23|
|  2004| Hansa Rostock|    30|
|  2004|        Bochum|    35|
|  2004|      Freiburg|    18|
|  2005|       FC Koln|    30|
|  2005|Kaiserslautern|    33|
|  2005|      Duisburg|    27|
|  2006|    M'gladbach|    26|
|  2006|         Mainz|    34|
+------+--------------+------+
only showing top 20 rows



**Season with more goals**

Now let's see in which of the seasons of study had more goals scored:

In [24]:
more_goals_season = matches_00_15.withColumn(
    'TotalGameGoals', F.col('HomeGoals') + F.col('AwayGoals')
).drop(
    'Match_ID','Div','Date','HomeTeam','AwayTeam','HomeGoals','AwayGoals'
).groupBy(
    'Season'
).agg(
    F.sum('TotalGameGoals').alias('TotalSeasonGoals')
).orderBy(F.col('TotalSeasonGoals').desc())

more_goals_season.show()

+------+----------------+
|Season|TotalSeasonGoals|
+------+----------------+
|  2013|             967|
|  2003|             909|
|  2012|             898|
|  2000|             897|
|  2010|             894|
|  2008|             894|
|  2001|             893|
|  2004|             890|
|  2011|             875|
|  2009|             866|
|  2015|             866|
|  2005|             861|
|  2007|             860|
|  2014|             843|
|  2006|             837|
|  2002|             821|
+------+----------------+



## <a id='toc1_3_'></a>[Random Forest model to predict result](#toc0_)
We are going to modify the df in order to try to predict the result of the match based in some features. We've chosen the use of a RF because with it we can check the feature importances, and because it's a well known and a very used model (remember that, as we said before, we want to do a simple model building a pipeline using Pyspark). This will be a simple model and it'll be difficult to obtain nice results with it because the result of a football match depends on a lot of factors that we don't have stored (injured players of each team and the dependency on that players of the team, weather of that day, assistance of that match...). Nevertheless, even if we had the majority of them, it would be very difficult to predict the result because it can depend on other important factors that can't be monitorised, such as the emotional situation of each player. It's remarkable to say that, as we have seen previously, we have approximattely 2000 draws, 2000 away victories and 3800 home victories, so we are facing a more or less balanced classes problem (we don't have a class that is much more numerous than the others).

With the data we have, we want to modify the matches df, adding the market value of each team, the home team stadium capacity and the avg age of each team to it (this can be important because in football the experience is a very important factor in order to have a good pressure management). We just want to know the result, not the score, so we drop the info about the goals (also, if we had it in the features to predict, the model would have no sense because it wouldn't failed); we don't care about the date of the match, so we delete it. We will also delete (when all the pertinent joins that need these variables are done) the Season and the TeamName columns, because they don't give any information and we don't want them to be variables with which to predict.

### <a id='toc1_3_1_'></a>[Prediction of matches with wins, losses and draws](#toc0_)

We'll start our analysis trying to predict the result of the match as it is in reality; it can be a win, a draw or a loss of the home team. Having the class "Draw"  adds dificulty to the problem, because it's more likely that a more powerful team ties with a smaller team that losses to it. Nevertheless, we are going to try to predict the result with it.

In [25]:
'''
We divide the unions in some steps to explain the sequence step by step. Firstly we filter the matches df we want and drop the variables we don't want. 
Then we do an inner join with teams df by TeamName and Season to get the information of each home team for every season (mean of age, market value and 
stadium capacity), renaming the variables propperly and dropping some duplicate variables result of the join.
'''
study_df = matches.filter(
    (F.col('Season') >= 2000) & (F.col('Season') <= 2015)
).drop(
    'Match_ID','Date','AwayGoals','HomeGoals','Div'
).join(
    teams,
    on = [(teams.TeamName == matches.HomeTeam) & (teams.Season == matches.Season)],
    how='inner'
).withColumnRenamed(
    'StadiumCapacity','HomeStadiumCapacity'
).withColumnRenamed(
    'OverallMarketValue','MarketValueHomeTeam'
).withColumnRenamed(
    'AvgAgeHome','AvgAgeHomeTeam'
).drop(
    teams.Season
).drop(
    teams.TeamName
)

'''
After that we do an inner join of the previous df with teams df by TeamName and Season to get the information of each away team for every season, 
(avg age and market value), renaming the variables propperly and dropping some duplicate variables result of the join.
'''

study_df = study_df.join(
    teams,
    on = [(teams.TeamName == matches.AwayTeam) & (teams.Season == matches.Season)],
    how='inner'
).withColumnRenamed(
    'OverallMarketValue','MarketValueAwayTeam'
).withColumnRenamed(
    'AvgAgeHome','AvgAgeAwayTeam'
).drop(
    teams.Season
).drop(
    teams.StadiumCapacity
).drop(
    teams.TeamName
).drop(
    matches.Season
).drop(
    'TeamName','StadiumCapacity','Season'
)

''' 
Finally we drop the HomeTeam and AwayTeam name.
'''

study_df = study_df.drop(
    'HomeTeam','AwayTeam'
)

study_df.show(5)

+------+--------------+-------------------+-------------------+--------------+-------------------+
|Result|AvgAgeHomeTeam|MarketValueHomeTeam|HomeStadiumCapacity|AvgAgeAwayTeam|MarketValueAwayTeam|
+------+--------------+-------------------+-------------------+--------------+-------------------+
|     D|            25|           45800000|              54014|            24|          109050000|
|     H|            25|          122000000|              30000|            23|           43180000|
|     D|            23|           43180000|              50000|            23|          125350000|
|     A|            23|           43180000|              50000|            23|           93600000|
|     D|            23|           45450000|              24000|            23|          115900000|
+------+--------------+-------------------+-------------------+--------------+-------------------+
only showing top 5 rows



Now we are going to use the *pyspark.ml.VectorAssembler* to load all the features we want for our model into a single feature vector in order to train our model. This columns will be all the independent columns (all the columns except from Result). We also have to encode the Result column, and we do so using the StringIndexer.

In [26]:
from pyspark.ml.feature import VectorAssembler, StringIndexer

# define an instance of assembler
assembler = VectorAssembler(inputCols= ['AvgAgeHomeTeam', 'MarketValueHomeTeam', 'HomeStadiumCapacity', 'AvgAgeAwayTeam', 'MarketValueAwayTeam'],
                            outputCol='features')

# StringIndexer encodes a string column of labels to a column of label indices. The indices are in [0, numLabels), ordered by label frequencies, so 
# #the most frequent label gets index 0. In our case, the label column (Result) will be encoded to label indices, from 0 to 3; the most frequent label
#  (H) will be indexed as 0.
label_stringIdx = StringIndexer(inputCol = 'Result', outputCol = 'label')

Now we define a pipeline with these two stages and create a df with them.

In [27]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = [assembler, label_stringIdx])

pipelineModel = pipeline.fit(study_df)
df = pipelineModel.transform(study_df)
df.show(2)

#we just want the columns features and label, which are the ones we'll use for the model
df = df.select('features','label')
df.show(2)

+------+--------------+-------------------+-------------------+--------------+-------------------+--------------------+-----+
|Result|AvgAgeHomeTeam|MarketValueHomeTeam|HomeStadiumCapacity|AvgAgeAwayTeam|MarketValueAwayTeam|            features|label|
+------+--------------+-------------------+-------------------+--------------+-------------------+--------------------+-----+
|     D|            25|           45800000|              54014|            24|          109050000|[25.0,4.58E7,5401...|  2.0|
|     H|            25|          122000000|              30000|            23|           43180000|[25.0,1.22E8,3000...|  0.0|
+------+--------------+-------------------+-------------------+--------------+-------------------+--------------------+-----+
only showing top 2 rows

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[25.0,4.58E7,5401...|  2.0|
|[25.0,1.22E8,3000...|  0.0|
+--------------------+-----+
only showing top 2 rows



This could have been done without using the ML Pipeline, but, as we can see below it's much more longer and messy.

In [28]:
from pyspark.ml.feature import VectorAssembler, StringIndexer

# define an instance of assembler
assembler = VectorAssembler(inputCols= ['AvgAgeHomeTeam', 'MarketValueHomeTeam', 'HomeStadiumCapacity', 'AvgAgeAwayTeam', 'MarketValueAwayTeam'],
                            outputCol='features')

# transform our df
study_df_assembled = assembler.transform(study_df)

# we now have all the features in just one variable
study_df_assembled.select('features','Result').show(1,truncate=False)

# Now we use the StringIndexer with the column  result for that previously transformed df
label_stringIdx = StringIndexer(inputCol = 'Result', outputCol = 'label')

model_df = label_stringIdx.fit(study_df_assembled).transform(study_df_assembled)
model_df.show(1)
model_df = model_df.select('features','label')
model_df.show(1)

+-----------------------------------+------+
|features                           |Result|
+-----------------------------------+------+
|[25.0,4.58E7,54014.0,24.0,1.0905E8]|D     |
+-----------------------------------+------+
only showing top 1 row

+------+--------------+-------------------+-------------------+--------------+-------------------+--------------------+-----+
|Result|AvgAgeHomeTeam|MarketValueHomeTeam|HomeStadiumCapacity|AvgAgeAwayTeam|MarketValueAwayTeam|            features|label|
+------+--------------+-------------------+-------------------+--------------+-------------------+--------------------+-----+
|     D|            25|           45800000|              54014|            24|          109050000|[25.0,4.58E7,5401...|  2.0|
+------+--------------+-------------------+-------------------+--------------+-------------------+--------------------+-----+
only showing top 1 row

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[25.0,4.5

Now we divide into training and test df using Randomsplit:

In [29]:
training_df, test_df = df.randomSplit([0.7,0.3])
print(f'Original df count: {df.count()}, \n Training df count: {training_df.count()}, \n Test df count: {test_df.count()}')

Original df count: 3366, 
 Training df count: 2377, 
 Test df count: 989


Now we are going to import and to use the RandomForest model. we firstly fit the model using the training df and then we predict with the test df:

In [30]:
from pyspark.ml.classification import RandomForestClassifier

rf_classifier = RandomForestClassifier(
    labelCol='label', numTrees= 50
).fit(
    training_df
)

labels_pred = rf_classifier.transform(test_df)
labels_pred.show(2)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[22.0,9.063E7,301...|  2.0|[25.7653589578414...|[0.51530717915682...|       0.0|
|[22.0,9.063E7,301...|  0.0|[25.6622916233822...|[0.51324583246764...|       0.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 2 rows



Let's get some classification metrics of our results:

In [31]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#accuracy
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName='accuracy')
print(f'Accuracy: {evaluator.evaluate(labels_pred)}')

#precision
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName='weightedPrecision')
print(f'weightedPrecision: {evaluator.evaluate(labels_pred)}')

#recall
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName='weightedRecall')
print(f'weightedRecall: {evaluator.evaluate(labels_pred)}')

#f1
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName='f1')
print(f'f1: {evaluator.evaluate(labels_pred)}')

Accuracy: 0.47219413549039435
weightedPrecision: 0.39083118581167287
weightedRecall: 0.47219413549039435
f1: 0.3988346356865378


As we can see we have obtained a very poor results: an accuracy of less than 50 % dissapointing, as well as the results of the precision and the recall. The f1 score, which is the harmonic mean of these last two metrics and has a value of 0.38, is another sign that the results are very bad. 

Lastly we are going to analyse the feature importances of our model. Feature importance refers to techniques that assign a score to input features based on how useful they are at predicting a target variable. 

In [32]:
print(study_df.drop('Result').columns)
rf_classifier.featureImportances

['AvgAgeHomeTeam', 'MarketValueHomeTeam', 'HomeStadiumCapacity', 'AvgAgeAwayTeam', 'MarketValueAwayTeam']


SparseVector(5, {0: 0.0458, 1: 0.3699, 2: 0.1623, 3: 0.0599, 4: 0.3621})

As we can see, the most important variables, with lots of difference, are the MarketValue of both teams. The third most important variable is the capacity of the Home stadium and lastly the most irrelevant features are the ones related with the age of the teams. 

### <a id='toc1_3_2_'></a>[Prediction of matches with no Draws](#toc0_)

As we have said before, this class adds difficulty to the problem. As we have obtained very bad results, let's check whether removing this class helps the model to improve its performance.

In [33]:
study_df = study_df.filter(F.col('Result')!= 'D')
study_df.count()

2516

In [63]:
study_df.groupBy('Result').agg(F.count('Result')).show()

+------+-------------+
|Result|count(Result)|
+------+-------------+
|     A|         1001|
|     H|         1515|
+------+-------------+



We still have a more or less balanced problem. As before, we are going to apply the Pipeline previously created to the new df.

In [34]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = [assembler, label_stringIdx])

pipelineModel = pipeline.fit(study_df)
df = pipelineModel.transform(study_df)
df.show(2)

#we just want the columns features and label, which are the ones we'll use for the model
df = df.select('features','label')
df.show(2)

+------+--------------+-------------------+-------------------+--------------+-------------------+--------------------+-----+
|Result|AvgAgeHomeTeam|MarketValueHomeTeam|HomeStadiumCapacity|AvgAgeAwayTeam|MarketValueAwayTeam|            features|label|
+------+--------------+-------------------+-------------------+--------------+-------------------+--------------------+-----+
|     H|            25|          122000000|              30000|            23|           43180000|[25.0,1.22E8,3000...|  0.0|
|     A|            23|           43180000|              50000|            23|           93600000|[23.0,4.318E7,500...|  1.0|
+------+--------------+-------------------+-------------------+--------------+-------------------+--------------------+-----+
only showing top 2 rows

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[25.0,1.22E8,3000...|  0.0|
|[23.0,4.318E7,500...|  1.0|
+--------------------+-----+
only showing top 2 rows



Now we divide into training and test df using Randomsplit:

In [35]:
training_df, test_df = df.randomSplit([0.7,0.3])
print(f'Original df count: {df.count()}, \n Training df count: {training_df.count()}, \n Test df count: {test_df.count()}')

Original df count: 2516, 
 Training df count: 1758, 
 Test df count: 758


Now we are going to import and to use the RandomForest model. we firstly fit the model using the training df and then we predict with the test df:

In [36]:
from pyspark.ml.classification import RandomForestClassifier

rf_classifier = RandomForestClassifier(
    labelCol='label', numTrees= 50
).fit(
    training_df
)

labels_pred = rf_classifier.transform(test_df)
labels_pred.show(2)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|[22.0,9.063E7,301...|  0.0|[31.3620352742704...|[0.62724070548540...|       0.0|
|[22.0,9.063E7,301...|  1.0|[30.0291010894482...|[0.60058202178896...|       0.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 2 rows



Let's get some classification metrics of our results:

In [37]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#accuracy
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName='accuracy')
print(f'Accuracy: {evaluator.evaluate(labels_pred)}')

#precision
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName='weightedPrecision')
print(f'weightedPrecision: {evaluator.evaluate(labels_pred)}')

#recall
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName='weightedRecall')
print(f'weightedRecall: {evaluator.evaluate(labels_pred)}')

#f1
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName='f1')
print(f'f1: {evaluator.evaluate(labels_pred)}')

Accuracy: 0.6503957783641161
weightedPrecision: 0.6450378278797431
weightedRecall: 0.6503957783641161
f1: 0.6168232579120898


As we can see, the results we have obtained are better than the previous one with the label "Draw" (as we expected), but they aren't very good, with an accuracy value of 0.66 and a f1 score of 0.64.

Lastly we are going to analyse the feature importances of our model. Feature importance refers to techniques that assign a score to input features based on how useful they are at predicting a target variable. 

In [38]:
print(study_df.drop('Result').columns)
rf_classifier.featureImportances

['AvgAgeHomeTeam', 'MarketValueHomeTeam', 'HomeStadiumCapacity', 'AvgAgeAwayTeam', 'MarketValueAwayTeam']


SparseVector(5, {0: 0.0404, 1: 0.3505, 2: 0.1792, 3: 0.0461, 4: 0.3837})

As before the most important variables are the MarketValue of both teams, the third most important variable is the capacity of the Home stadium and lastly the most irrelevant features are the ones related with the age of the teams. 

**Grid Search trying to improve the results**

Finally we are going to try doing a GridSearch in order to obtain the best hyperparameters of the model, with the hope to improve the results obtained. We have tried to improve the accuracy, because we have a balanced classes problem.

In [60]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

rf = RandomForestClassifier(labelCol="label", featuresCol="features")
pipeline = Pipeline(stages=[rf])
paramGrid = ParamGridBuilder().addGrid(
    rf.numTrees, [20, 30,50,80]
).addGrid(
    rf.maxDepth, [3, 5, 7]
).build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator= MulticlassClassificationEvaluator(predictionCol="prediction", metricName='accuracy'),
                          numFolds=4) 

cvModel = crossval.fit(training_df)

                                                                                

23/03/30 17:10:33 WARN DAGScheduler: Broadcasting large task binary with size 1122.5 KiB


                                                                                

23/03/30 17:13:41 WARN DAGScheduler: Broadcasting large task binary with size 1162.4 KiB


                                                                                

23/03/30 17:13:48 WARN DAGScheduler: Broadcasting large task binary with size 1024.3 KiB


                                                                                

23/03/30 17:16:38 WARN DAGScheduler: Broadcasting large task binary with size 1114.1 KiB


                                                                                

23/03/30 17:19:49 WARN DAGScheduler: Broadcasting large task binary with size 1073.7 KiB


                                                                                

In [62]:
# print the best hyperparameters values
import numpy as np
print(cvModel.getEstimatorParamMaps()[ np.argmax(cvModel.avgMetrics) ])

# now we predict the labels using the best model
bestModel = cvModel.bestModel
labels_pred = bestModel.transform(test_df)

#accuracy
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName='accuracy')
print(f'Accuracy: {evaluator.evaluate(labels_pred)}')

#precision
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName='weightedPrecision')
print(f'weightedPrecision: {evaluator.evaluate(labels_pred)}')

#recall
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName='weightedRecall')
print(f'weightedRecall: {evaluator.evaluate(labels_pred)}')

#f1
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", metricName='f1')
print(f'f1: {evaluator.evaluate(labels_pred)}')

{Param(parent='RandomForestClassifier_53e00e6b96f8', name='numTrees', doc='Number of trees to train (>= 1).'): 50, Param(parent='RandomForestClassifier_53e00e6b96f8', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes. Must be in range [0, 30].'): 3}
Accuracy: 0.6543535620052771
weightedPrecision: 0.6521333069767574
weightedRecall: 0.6543535620052771
f1: 0.6185221027428494


We can see that using a Grid Search trying to optimize the hyperparameters we haven't obtained a much better results; in fact, we have got almost the same result as before.

## <a id='toc1_4_'></a>[Conclussions](#toc0_)

We have obtained a very bad results with our model, but, as we commented before, it was expected: we are trying to predict a very complex thing as it is the result of a match with just the market value of the teams, the stadium where the match is played capacity and the age of the teams. Football matches are much more complex than that and a lot of factors influence on it: injured players of each team and the dependency on that players of the team, weather of that day, assistance of that match, nearby of international competition games that might do the coaches let the best players rest or even, as we have commented before, the emotional situation of each player. 

With this all, we don't have enough data to obtain a good result for our model, and, even if we had it, this wouldn't be enough to predict with lot of confidence the result. This is football is a sport and, as a result, even if we had all the data we have stated before, we can't take in account another crucial factor: the luck.

Leaving aside all these, we have accomplished the goal of this job, which was work with pyspark, transforming dfs and building ML pipelines with this tool.