## TASK: Using Pyspark to get Answer to the Question:

Who are the winners of the D1 division in the Germany Football Association (Bundesliga) between 2000–2010?

#### Importing Dataset from GitHub:

In [1]:
import pandas as pd
import requests

    
req = requests.get("https://raw.githubusercontent.com/henokyemam/Wrangling_PySpark/main/Data/Matches.csv")
url_content = req.content
csv_file = open('matches.csv', 'wb')

csv_file.write(url_content)
csv_file.close()

In [2]:
#Reading file in Pandas Dataframe
df = pd.read_csv("matches.csv")
df.head(5)

Unnamed: 0,Match_ID,Div,Season,Date,HomeTeam,AwayTeam,FTHG,FTAG,FTR
0,1,D2,2009,2010-04-04,Oberhausen,Kaiserslautern,2,1,H
1,2,D2,2009,2009-11-01,Munich 1860,Kaiserslautern,0,1,A
2,3,D2,2009,2009-10-04,Frankfurt FSV,Kaiserslautern,1,1,D
3,4,D2,2009,2010-02-21,Frankfurt FSV,Karlsruhe,2,1,H
4,5,D2,2009,2009-12-06,Ahlen,Karlsruhe,1,3,A


#### Importing Spark Related Libraries

In [3]:
try:
    import pyspark
    from pyspark import SparkContext
    from pyspark.sql import SparkSession, Window, Row
    from pyspark.sql.functions import *
    from pyspark.sql.types import *
    import matplotlib.pyplot as plt
    print("all imported")
except:
    print("error in loading")

all imported


#### Building a Spark Session

In [4]:
spark = SparkSession \
        .builder \
        .appName("Spark-Matches-Project") \
        .getOrCreate()
spark

#### Reading File in Spark

In [5]:
sql_df = spark.read.csv("matches.csv",header=True,inferSchema=True)
type(sql_df)

pyspark.sql.dataframe.DataFrame

In [6]:
sql_df.printSchema()

root
 |-- Match_ID: integer (nullable = true)
 |-- Div: string (nullable = true)
 |-- Season: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- HomeTeam: string (nullable = true)
 |-- AwayTeam: string (nullable = true)
 |-- FTHG: integer (nullable = true)
 |-- FTAG: integer (nullable = true)
 |-- FTR: string (nullable = true)



In [7]:
sql_df.show(3)

+--------+---+------+----------+-------------+--------------+----+----+---+
|Match_ID|Div|Season|      Date|     HomeTeam|      AwayTeam|FTHG|FTAG|FTR|
+--------+---+------+----------+-------------+--------------+----+----+---+
|       1| D2|  2009|2010-04-04|   Oberhausen|Kaiserslautern|   2|   1|  H|
|       2| D2|  2009|2009-11-01|  Munich 1860|Kaiserslautern|   0|   1|  A|
|       3| D2|  2009|2009-10-04|Frankfurt FSV|Kaiserslautern|   1|   1|  D|
+--------+---+------+----------+-------------+--------------+----+----+---+
only showing top 3 rows



In [8]:
sql_df.describe().show()

+-------+------------------+-----+----------------+----------+--------+--------+------------------+------------------+-----+
|summary|          Match_ID|  Div|          Season|      Date|HomeTeam|AwayTeam|              FTHG|              FTAG|  FTR|
+-------+------------------+-----+----------------+----------+--------+--------+------------------+------------------+-----+
|  count|             24625|24625|           24625|     24625|   24625|   24625|             24625|             24625|24625|
|   mean|27335.103228426397| null|2004.69876142132|      null|    null|    null|1.5170355329949239| 1.097502538071066| null|
| stddev| 16631.84606173585| null|7.16693132762853|      null|    null|    null|1.3594706607115317|1.1635103573988272| null|
|    min|                 1|   D1|            1993|1993-07-28|  Aachen|  Aachen|                -1|                -1|    A|
|    max|             46774|   E0|            2017|2018-05-13| Zwickau| Zwickau|                 9|                 9|    H|


#### Renaming Some columns

In [9]:
#Renaming last three columns 
cols_old = sql_df.columns[-3:]
cols_old

['FTHG', 'FTAG', 'FTR']

In [10]:
cols_old = sql_df.columns[6:]
cols_old

['FTHG', 'FTAG', 'FTR']

In [11]:
cols_new = ["HomeTeamGoals", "AwayTeamGoals", "FinalResult"]

##### Python's zip() function creates an iterator that will aggregate elements from two or more iterables. 

In [12]:
old_new_cols = [*zip(cols_old,cols_new)]
old_new_cols

[('FTHG', 'HomeTeamGoals'), ('FTAG', 'AwayTeamGoals'), ('FTR', 'FinalResult')]

In [13]:
for cols_old, cols_new in old_new_cols:
    sql_df = sql_df.withColumnRenamed(cols_old, cols_new)
    
sql_df.show(10)

+--------+---+------+----------+--------------+--------------+-------------+-------------+-----------+
|Match_ID|Div|Season|      Date|      HomeTeam|      AwayTeam|HomeTeamGoals|AwayTeamGoals|FinalResult|
+--------+---+------+----------+--------------+--------------+-------------+-------------+-----------+
|       1| D2|  2009|2010-04-04|    Oberhausen|Kaiserslautern|            2|            1|          H|
|       2| D2|  2009|2009-11-01|   Munich 1860|Kaiserslautern|            0|            1|          A|
|       3| D2|  2009|2009-10-04| Frankfurt FSV|Kaiserslautern|            1|            1|          D|
|       4| D2|  2009|2010-02-21| Frankfurt FSV|     Karlsruhe|            2|            1|          H|
|       5| D2|  2009|2009-12-06|         Ahlen|     Karlsruhe|            1|            3|          A|
|       6| D2|  2009|2010-04-03|  Union Berlin|     Karlsruhe|            1|            1|          D|
|       7| D2|  2009|2009-08-14|     Paderborn|     Karlsruhe|           

In [14]:
sql_df.printSchema()

root
 |-- Match_ID: integer (nullable = true)
 |-- Div: string (nullable = true)
 |-- Season: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- HomeTeam: string (nullable = true)
 |-- AwayTeam: string (nullable = true)
 |-- HomeTeamGoals: integer (nullable = true)
 |-- AwayTeamGoals: integer (nullable = true)
 |-- FinalResult: string (nullable = true)



#### Dataset Manipulation for Staright Forward and Easy Forward Understanding 

In [15]:
#Creating new columns for result segregation

sql_df = sql_df \
    .withColumn('HomeTeamWin', when(col('FinalResult') == 'H', 1).otherwise(0)) \
    .withColumn('AwayTeamWin', when(col('FinalResult') == 'A', 1).otherwise(0)) \
    .withColumn('GameTie', when(col('FinalResult') == 'D', 1).otherwise(0))

sql_df.limit(5).toPandas()

Unnamed: 0,Match_ID,Div,Season,Date,HomeTeam,AwayTeam,HomeTeamGoals,AwayTeamGoals,FinalResult,HomeTeamWin,AwayTeamWin,GameTie
0,1,D2,2009,2010-04-04,Oberhausen,Kaiserslautern,2,1,H,1,0,0
1,2,D2,2009,2009-11-01,Munich 1860,Kaiserslautern,0,1,A,0,1,0
2,3,D2,2009,2009-10-04,Frankfurt FSV,Kaiserslautern,1,1,D,0,0,1
3,4,D2,2009,2010-02-21,Frankfurt FSV,Karlsruhe,2,1,H,1,0,0
4,5,D2,2009,2009-12-06,Ahlen,Karlsruhe,1,3,A,0,1,0


In [16]:
sql_df.columns

['Match_ID',
 'Div',
 'Season',
 'Date',
 'HomeTeam',
 'AwayTeam',
 'HomeTeamGoals',
 'AwayTeamGoals',
 'FinalResult',
 'HomeTeamWin',
 'AwayTeamWin',
 'GameTie']

### Question : Who are the winners of the D1 division in the Germany Football Association (Bundesliga) between 2000–2010?

In [17]:
print(sql_df.count())

#bundesliga is a D1 division and we are interested in season <= 2010 and >= 2000

bundesliga = sql_df \
            .filter(col("Div") == "D1") \
            .filter((col("Season") >= 2000) & (col("Season") <= 2010) )

type(bundesliga)
bundesliga.count()      

24625


3366

##### The design is to aggregate the home and away game results separately creating two dataframes: home and away.

#### Home Team Features

In [18]:
#Overall shape:
print("shape of bundesliga:",(bundesliga.count(),len(bundesliga.columns)))

shape of bundesliga: (3366, 12)


In [19]:
bundesliga.groupby("Season","HomeTeam").count().show(5)

+------+-------------+-----+
|Season|     HomeTeam|count|
+------+-------------+-----+
|  2005|Bayern Munich|   17|
|  2008|   M'gladbach|   17|
|  2006|      Cottbus|   17|
|  2010|     Hannover|   17|
|  2004|   Leverkusen|   17|
+------+-------------+-----+
only showing top 5 rows



In [20]:
home = bundesliga.groupby("Season","HomeTeam") \
        .agg(
        sum("HomeTeamWin").alias("TotalHomeWin"),
        sum("AwayTeamWin").alias("TotalHomeLoss"),
        sum("GameTie").alias("TotalHomeTie"),
        sum("HomeTeamGoals").alias("HomeScoredGoals"),
        sum('AwayTeamGoals').alias('HomeAgainstGoals')) \
        .withColumnRenamed("HomeTeam","Team")
    
print(home.show(7))
    
#Finding out shape of output:
print((home.count(),len(home.columns)))

+------+-------------+------------+-------------+------------+---------------+----------------+
|Season|         Team|TotalHomeWin|TotalHomeLoss|TotalHomeTie|HomeScoredGoals|HomeAgainstGoals|
+------+-------------+------------+-------------+------------+---------------+----------------+
|  2005|Bayern Munich|          14|            1|           2|             42|              14|
|  2008|   M'gladbach|           5|            8|           4|             23|              27|
|  2006|      Cottbus|           6|            6|           5|             21|              22|
|  2010|     Hannover|          12|            4|           1|             32|              17|
|  2004|   Leverkusen|          12|            2|           3|             42|              18|
|  2007|      Cottbus|           8|            7|           2|             25|              20|
|  2004|        Mainz|           9|            5|           3|             28|              21|
+------+-------------+------------+-----

In [21]:
#away game features 
away =  bundesliga.groupby('Season', 'AwayTeam') \
       .agg(sum('AwayTeamWin').alias('TotalAwayWin'),
            sum('HomeTeamWin').alias('TotalAwayLoss'),
            sum('GameTie').alias('TotalAwayTie'),
            sum('AwayTeamGoals').alias('AwayScoredGoals'),
            sum('HomeTeamGoals').alias('AwayAgainstGoals'))  \
       .withColumnRenamed('AwayTeam', 'Team')

print(away.show(7))
    
#Finding out shape of output:
print((away.count(),len(away.columns)))

type(away)

+------+-------------+------------+-------------+------------+---------------+----------------+
|Season|         Team|TotalAwayWin|TotalAwayLoss|TotalAwayTie|AwayScoredGoals|AwayAgainstGoals|
+------+-------------+------------+-------------+------------+---------------+----------------+
|  2005|Bayern Munich|           8|            2|           7|             25|              18|
|  2008|   M'gladbach|           3|           11|           3|             16|              35|
|  2006|      Cottbus|           5|            9|           3|             17|              27|
|  2010|     Hannover|           7|            8|           2|             17|              28|
|  2004|   Leverkusen|           4|            7|           6|             23|              26|
|  2007|      Cottbus|           1|            9|           7|             10|              36|
|  2004|     Hannover|           5|            8|           4|             13|              17|
+------+-------------+------------+-----

pyspark.sql.dataframe.DataFrame

Then, inner join them on ‘Team’ and ‘Season’ fields to create a single dataframe containing game level aggregation: table. 

In [22]:
table = home.join(away,["Team","Season"],"inner") \
        .withColumn("GoalsScored",col("HomeScoredGoals")+col("AwayScoredGoals")) \
        .withColumn('GoalsAgainst', col('HomeAgainstGoals') + col('AwayAgainstGoals')) \
        .withColumn('GoalDifferentials', col('GoalsScored') - col('GoalsAgainst')) \
        .withColumn('Win', col('TotalHomeWin') + col('TotalAwayWin')) \
        .withColumn('Loss', col('TotalHomeLoss') + col('TotalAwayLoss')) \
        .withColumn('Tie', col('TotalHomeTie') + col('TotalAwayTie')) \
        .drop('HomeScoredGoals', 'AwayScoredGoals', 'HomeAgainstGoals', 'AwayAgainstGoals') \
        .drop('TotalHomeWin', 'TotalAwayWin', 'TotalHomeLoss', 'TotalAwayLoss', 'TotalHomeTie', 'TotalAwayTie') \
        .withColumn('WinPct', round((100* col('Win')/(col('Win') + col('Loss') + col('Tie'))), 2)) 
#Defining Total Win (Out of Total Matches Played)
        

#Finding out shape of output:
print((table.count(),len(table.columns)))

table.limit(5).toPandas()

(198, 9)


Unnamed: 0,Team,Season,GoalsScored,GoalsAgainst,GoalDifferentials,Win,Loss,Tie,WinPct
0,Bayern Munich,2005,67,32,35,22,3,9,64.71
1,M'gladbach,2008,39,62,-23,8,19,7,23.53
2,Cottbus,2006,38,49,-11,11,15,8,32.35
3,Hannover,2010,49,45,4,19,12,3,55.88
4,Leverkusen,2004,65,44,21,16,9,9,47.06


### Using Windows Function of PySpark

I use a basic window function to further aggregate game statistics on season level and rank them based on winning percentage and goal differentials. The reason why I added goal differentials is because it’s used as a tie-breaker in soccer.

#### About Windows Function:
PySpark Window functions are used to calculate results such as the rank, row number e.t.c over a range of input rows.Using them with PySpark SQL and PySpark DataFrame API   

PySpark Window functions operate on a group of rows (like frame, partition) and return a single value for every input row. PySpark SQL supports three kinds of window functions:

- ranking functions
- analytic functions
- aggregate functions


To perform an operation on a group first, we need to partition the data using Window.partitionBy() , and for row number and rank function we need to additionally order by on partition data using orderBy clause.

In [23]:
window = Window.partitionBy("Season").orderBy(col("WinPct").desc(),col("GoalDifferentials").desc())
window

<pyspark.sql.window.WindowSpec at 0x185fca32100>

In [24]:
table = table.withColumn("TeamPosition",rank().over(window))
table.show(4)

+-------------+------+-----------+------------+-----------------+---+----+---+------+------------+
|         Team|Season|GoalsScored|GoalsAgainst|GoalDifferentials|Win|Loss|Tie|WinPct|TeamPosition|
+-------------+------+-----------+------------+-----------------+---+----+---+------+------------+
|Werder Bremen|  2003|         79|          38|               41| 22|   4|  8| 64.71|           1|
|Bayern Munich|  2003|         70|          39|               31| 20|   6|  8| 58.82|           2|
|   Leverkusen|  2003|         73|          39|               34| 19|   7|  8| 55.88|           3|
|    Stuttgart|  2003|         52|          24|               28| 18|   6| 10| 52.94|           4|
+-------------+------+-----------+------------+-----------------+---+----+---+------+------------+
only showing top 4 rows



### Filtering Team That got 1st Position W.r.t each Season

In [25]:
table_1st = table.filter(col("TeamPosition")==1).orderBy(asc("Season"))
table_1st.toPandas()

Unnamed: 0,Team,Season,GoalsScored,GoalsAgainst,GoalDifferentials,Win,Loss,Tie,WinPct,TeamPosition
0,Bayern Munich,2000,62,37,25,19,9,6,55.88,1
1,Leverkusen,2001,77,38,39,21,7,6,61.76,1
2,Bayern Munich,2002,70,25,45,23,5,6,67.65,1
3,Werder Bremen,2003,79,38,41,22,4,8,64.71,1
4,Bayern Munich,2004,75,33,42,24,5,5,70.59,1
5,Bayern Munich,2005,67,32,35,22,3,9,64.71,1
6,Stuttgart,2006,61,37,24,21,6,7,61.76,1
7,Bayern Munich,2007,68,21,47,22,2,10,64.71,1
8,Wolfsburg,2008,80,41,39,21,7,6,61.76,1
9,Bayern Munich,2009,72,31,41,20,4,10,58.82,1
