# Big Data Analysis using Pyspark

Exploring the dataset using pyspark:
1. Who are the winners of the D1 division in the Germany Football Association (Bundesliga) in the
last decade?
2. Which teams have been relegated in the past 10 years?
3. Which season of Bundesliga was the most competitive in the last decade?
4. What's the best month to watch Bundesliga?

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import pandas as pd
import numpy as np

In [None]:
!pip install -U pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=eda2382701fc13ebc7703024f4424396b82a384745413fa17c55e6c88bff8053
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
import pyspark
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('Football Association').getOrCreate()

In [None]:
matches = spark.read.format('csv').option('inferSchema',True).option('header',True).load('/content/drive/MyDrive/Football_Association/Matches.csv')

In [None]:
matches.show()

+--------+---+------+----------+------------------+--------------+----+----+---+
|Match_ID|Div|Season|      Date|          HomeTeam|      AwayTeam|FTHG|FTAG|FTR|
+--------+---+------+----------+------------------+--------------+----+----+---+
|       1| D2|  2009|2010-04-04|        Oberhausen|Kaiserslautern|   2|   1|  H|
|       2| D2|  2009|2009-11-01|       Munich 1860|Kaiserslautern|   0|   1|  A|
|       3| D2|  2009|2009-10-04|     Frankfurt FSV|Kaiserslautern|   1|   1|  D|
|       4| D2|  2009|2010-02-21|     Frankfurt FSV|     Karlsruhe|   2|   1|  H|
|       5| D2|  2009|2009-12-06|             Ahlen|     Karlsruhe|   1|   3|  A|
|       6| D2|  2009|2010-04-03|      Union Berlin|     Karlsruhe|   1|   1|  D|
|       7| D2|  2009|2009-08-14|         Paderborn|     Karlsruhe|   2|   0|  H|
|       8| D2|  2009|2010-03-08|         Bielefeld|     Karlsruhe|   0|   1|  A|
|       9| D2|  2009|2009-09-26|    Kaiserslautern|     Karlsruhe|   2|   0|  H|
|      10| D2|  2009|2009-11

In [None]:
teams = spark.read.format('csv').option('inferSchema',True).option('header',True).load('/content/drive/MyDrive/Football_Association/Teams.csv')

In [None]:
teams.show()

+------+-------------+---------+----------+------------------+----------------------+------------------+---------------+
|Season|     TeamName|KaderHome|AvgAgeHome|ForeignPlayersHome|OverallMarketValueHome|AvgMarketValueHome|StadiumCapacity|
+------+-------------+---------+----------+------------------+----------------------+------------------+---------------+
|  2017|Bayern Munich|       27|        26|                15|             597950000|          22150000|          75000|
|  2017|     Dortmund|       33|        25|                18|             416730000|          12630000|          81359|
|  2017|   Leverkusen|       31|        24|                15|             222600000|           7180000|          30210|
|  2017|   RB Leipzig|       30|        23|                15|             180130000|           6000000|          42959|
|  2017|   Schalke 04|       29|        24|                17|             179550000|           6190000|          62271|
|  2017|   M'gladbach|       31|

In [None]:
teams_in_matches = spark.read.format('csv').option('inferSchema',True).option('header',True).load('/content/drive/MyDrive/Football_Association/Teams_in_Matches.csv')

In [None]:
teams_in_matches.show()

+--------+--------------+
|Match_ID|Unique_Team_ID|
+--------+--------------+
|       1|            26|
|       1|            46|
|       2|            26|
|       2|            42|
|       3|            26|
|       3|            44|
|       4|            29|
|       4|            44|
|       5|            29|
|       5|            49|
|       6|            29|
|       6|            35|
|       7|            21|
|       7|            29|
|       8|            29|
|       8|            30|
|       9|            26|
|       9|            29|
|      10|            29|
|      10|            33|
+--------+--------------+
only showing top 20 rows



In [None]:
unique_teams = spark.read.format('csv').option('inferSchema',True).option('header',True).load('/content/drive/MyDrive/Football_Association/Unique_Teams.csv')

In [None]:
unique_teams.show()

+-------------+--------------+
|     TeamName|Unique_Team_ID|
+-------------+--------------+
|Bayern Munich|             1|
|     Dortmund|             2|
|   Leverkusen|             3|
|   RB Leipzig|             4|
|   Schalke 04|             5|
|   M'gladbach|             6|
|    Wolfsburg|             7|
|      FC Koln|             8|
|   Hoffenheim|             9|
|       Hertha|            10|
|        Mainz|            11|
|      Hamburg|            12|
|Werder Bremen|            13|
|Ein Frankfurt|            14|
|     Augsburg|            15|
|     Freiburg|            16|
|    Stuttgart|            17|
|     Hannover|            18|
|   Ingolstadt|            19|
|    Darmstadt|            20|
+-------------+--------------+
only showing top 20 rows



# joining teams_in_matches and unique_Team ON Unique_Team_ID

In [None]:
team_1 = teams_in_matches.join(unique_teams,
                                     teams_in_matches.Unique_Team_ID == unique_teams.Unique_Team_ID,
                                     "leftouter")

In [None]:
team_1.show()

+--------+--------------+--------------+--------------+
|Match_ID|Unique_Team_ID|      TeamName|Unique_Team_ID|
+--------+--------------+--------------+--------------+
|       1|            26|Kaiserslautern|            26|
|       1|            46|    Oberhausen|            46|
|       2|            26|Kaiserslautern|            26|
|       2|            42|   Munich 1860|            42|
|       3|            26|Kaiserslautern|            26|
|       3|            44| Frankfurt FSV|            44|
|       4|            29|     Karlsruhe|            29|
|       4|            44| Frankfurt FSV|            44|
|       5|            29|     Karlsruhe|            29|
|       5|            49|         Ahlen|            49|
|       6|            29|     Karlsruhe|            29|
|       6|            35|  Union Berlin|            35|
|       7|            21|     Paderborn|            21|
|       7|            29|     Karlsruhe|            29|
|       8|            29|     Karlsruhe|        

In [None]:
columns_to_drop = [unique_teams["Unique_Team_ID"]]
team_details = team_1.drop(*columns_to_drop)

In [None]:
team_details.show()

+--------+--------------+--------------+
|Match_ID|Unique_Team_ID|      TeamName|
+--------+--------------+--------------+
|       1|            26|Kaiserslautern|
|       1|            46|    Oberhausen|
|       2|            26|Kaiserslautern|
|       2|            42|   Munich 1860|
|       3|            26|Kaiserslautern|
|       3|            44| Frankfurt FSV|
|       4|            29|     Karlsruhe|
|       4|            44| Frankfurt FSV|
|       5|            29|     Karlsruhe|
|       5|            49|         Ahlen|
|       6|            29|     Karlsruhe|
|       6|            35|  Union Berlin|
|       7|            21|     Paderborn|
|       7|            29|     Karlsruhe|
|       8|            29|     Karlsruhe|
|       8|            30|     Bielefeld|
|       9|            26|Kaiserslautern|
|       9|            29|     Karlsruhe|
|      10|            29|     Karlsruhe|
|      10|            33| Hansa Rostock|
+--------+--------------+--------------+
only showing top

# joining matches and team_details ON Match_ID

In [None]:
matches_team_details_1 = matches.join(team_details,
                                    matches.Match_ID == team_details.Match_ID, "leftouter")

In [None]:
matches_team_details_1.show()

+--------+---+------+----------+--------------+--------------+----+----+---+--------+--------------+--------------+
|Match_ID|Div|Season|      Date|      HomeTeam|      AwayTeam|FTHG|FTAG|FTR|Match_ID|Unique_Team_ID|      TeamName|
+--------+---+------+----------+--------------+--------------+----+----+---+--------+--------------+--------------+
|       1| D2|  2009|2010-04-04|    Oberhausen|Kaiserslautern|   2|   1|  H|       1|            26|Kaiserslautern|
|       1| D2|  2009|2010-04-04|    Oberhausen|Kaiserslautern|   2|   1|  H|       1|            46|    Oberhausen|
|       2| D2|  2009|2009-11-01|   Munich 1860|Kaiserslautern|   0|   1|  A|       2|            26|Kaiserslautern|
|       2| D2|  2009|2009-11-01|   Munich 1860|Kaiserslautern|   0|   1|  A|       2|            42|   Munich 1860|
|       3| D2|  2009|2009-10-04| Frankfurt FSV|Kaiserslautern|   1|   1|  D|       3|            26|Kaiserslautern|
|       3| D2|  2009|2009-10-04| Frankfurt FSV|Kaiserslautern|   1|   1|

In [None]:
columns_to_drop_1 = [team_details["Match_ID"]]
matches_team_details = matches_team_details_1.drop(*columns_to_drop_1)

In [None]:
matches_team_details.show()

+--------+---+------+----------+--------------+--------------+----+----+---+--------------+--------------+
|Match_ID|Div|Season|      Date|      HomeTeam|      AwayTeam|FTHG|FTAG|FTR|Unique_Team_ID|      TeamName|
+--------+---+------+----------+--------------+--------------+----+----+---+--------------+--------------+
|       1| D2|  2009|2010-04-04|    Oberhausen|Kaiserslautern|   2|   1|  H|            26|Kaiserslautern|
|       1| D2|  2009|2010-04-04|    Oberhausen|Kaiserslautern|   2|   1|  H|            46|    Oberhausen|
|       2| D2|  2009|2009-11-01|   Munich 1860|Kaiserslautern|   0|   1|  A|            26|Kaiserslautern|
|       2| D2|  2009|2009-11-01|   Munich 1860|Kaiserslautern|   0|   1|  A|            42|   Munich 1860|
|       3| D2|  2009|2009-10-04| Frankfurt FSV|Kaiserslautern|   1|   1|  D|            26|Kaiserslautern|
|       3| D2|  2009|2009-10-04| Frankfurt FSV|Kaiserslautern|   1|   1|  D|            44| Frankfurt FSV|
|       4| D2|  2009|2010-02-21| Fran

# 1. Who are the winners of the D1 division in the Germany Football Association (Bundesliga) in the last decade?

In [None]:
division_D1 = matches_team_details.filter(matches_team_details.Div == 'D1')

In [None]:
division_D1.show()

+--------+---+------+----------+-------------+----------+----+----+---+--------------+-------------+
|Match_ID|Div|Season|      Date|     HomeTeam|  AwayTeam|FTHG|FTAG|FTR|Unique_Team_ID|     TeamName|
+--------+---+------+----------+-------------+----------+----+----+---+--------------+-------------+
|      21| D1|  2009|2010-02-06|       Bochum|Leverkusen|   1|   1|  D|             3|   Leverkusen|
|      21| D1|  2009|2010-02-06|       Bochum|Leverkusen|   1|   1|  D|            28|       Bochum|
|      22| D1|  2009|2009-11-22|Bayern Munich|Leverkusen|   1|   1|  D|             1|Bayern Munich|
|      22| D1|  2009|2009-11-22|Bayern Munich|Leverkusen|   1|   1|  D|             3|   Leverkusen|
|      23| D1|  2009|2010-05-08|   M'gladbach|Leverkusen|   1|   1|  D|             3|   Leverkusen|
|      23| D1|  2009|2010-05-08|   M'gladbach|Leverkusen|   1|   1|  D|             6|   M'gladbach|
|      24| D1|  2009|2009-08-08|        Mainz|Leverkusen|   2|   2|  D|             3|   Le

In [None]:
from pyspark.sql.functions import current_date
from pyspark.sql.functions import date_add
from pyspark.sql.functions import date_trunc


In [None]:
start_date = spark.sql("""
    SELECT date_format(
        date_add(date_trunc('year', date_add(current_date(), -(365*20))), 0),
        'yyyy-MM-dd'
    ) as start
""")


In [None]:
end_date = spark.sql("""
    SELECT date_format(
        date_add(date_trunc('year', date_add(current_date(), -(365*10))), 0),
        'yyyy-MM-dd'
    ) as start
""")

d1 dataset includes only last decade records of D1 division

In [None]:
d1 = division_D1.filter( (division_D1.Date >= start_date.first()[0]) & (division_D1.Date <= end_date.first()[0]) )


In [None]:
d1.show()

+--------+---+------+----------+-------------+----------+----+----+---+--------------+-------------+
|Match_ID|Div|Season|      Date|     HomeTeam|  AwayTeam|FTHG|FTAG|FTR|Unique_Team_ID|     TeamName|
+--------+---+------+----------+-------------+----------+----+----+---+--------------+-------------+
|      21| D1|  2009|2010-02-06|       Bochum|Leverkusen|   1|   1|  D|             3|   Leverkusen|
|      21| D1|  2009|2010-02-06|       Bochum|Leverkusen|   1|   1|  D|            28|       Bochum|
|      22| D1|  2009|2009-11-22|Bayern Munich|Leverkusen|   1|   1|  D|             1|Bayern Munich|
|      22| D1|  2009|2009-11-22|Bayern Munich|Leverkusen|   1|   1|  D|             3|   Leverkusen|
|      23| D1|  2009|2010-05-08|   M'gladbach|Leverkusen|   1|   1|  D|             3|   Leverkusen|
|      23| D1|  2009|2010-05-08|   M'gladbach|Leverkusen|   1|   1|  D|             6|   M'gladbach|
|      24| D1|  2009|2009-08-08|        Mainz|Leverkusen|   2|   2|  D|             3|   Le

In [None]:
from pyspark.sql.functions import when, col, sum, rank
from pyspark.sql.window import Window
from pyspark.sql import functions as F

Calculate points for home and away teams based on match results

In [None]:
d1 = d1.withColumn("HomePoints", when(col("FTR") == "H", 3).otherwise(when(col("FTR") == "D", 1).otherwise(0)))
d1 = d1.withColumn("AwayPoints", when(col("FTR") == "A", 3).otherwise(when(col("FTR") == "D", 1).otherwise(0)))


Convert the points columns to integers

In [None]:
d1 = d1.withColumn("HomePoints", d1["HomePoints"].cast("int"))
d1 = d1.withColumn("AwayPoints", d1["AwayPoints"].cast("int"))

Calculate total points for home and away teams

In [None]:
d1 = d1.withColumn("TotalHomePoints", sum("HomePoints").over(Window.partitionBy("Season", "HomeTeam")))
d1 = d1.withColumn("TotalAwayPoints", sum("AwayPoints").over(Window.partitionBy("Season", "AwayTeam")))


In [None]:
d1 = d1.withColumn("TotalPoints", d1["TotalHomePoints"] + d1["TotalAwayPoints"])


Find the winner for each season

In [None]:
windowSpec = Window.partitionBy("Season").orderBy(F.desc("TotalPoints"))
winners = d1.withColumn("Rank",
                        F.row_number().over(windowSpec)).filter(col("Rank") == 1).select("Season", "HomeTeam", "AwayTeam", "TotalPoints")

Q1_ANS : the winners for each season

In [None]:
winners.show()

+------+-------------+-------------+-----------+
|Season|     HomeTeam|     AwayTeam|TotalPoints|
+------+-------------+-------------+-----------+
|  2002|       Hertha|Bayern Munich|         78|
|  2003|Bayern Munich|Werder Bremen|        158|
|  2004|   Leverkusen|Bayern Munich|        144|
|  2005|Bayern Munich|      Hamburg|        160|
|  2006|   Schalke 04|Werder Bremen|        146|
|  2007|Werder Bremen|Bayern Munich|        148|
|  2008|    Wolfsburg|Bayern Munich|        156|
|  2009|Bayern Munich|Werder Bremen|        142|
|  2010|Bayern Munich|   Leverkusen|        154|
|  2011|Bayern Munich|     Dortmund|        160|
|  2012|   Schalke 04|Bayern Munich|         78|
+------+-------------+-------------+-----------+



#2. Which teams have been relegated in the past 10 years?

In [None]:
start_date_2 = spark.sql("""
    SELECT date_format(
        date_add(date_trunc('year', date_add(current_date(), -(365*10))), 0),
        'yyyy-MM-dd'
    ) as start
""")

In [None]:
end_date_2 = spark.sql("""
    SELECT date_format(
        date_add(date_trunc('year', date_add(current_date(), 0)), 0),
        'yyyy-MM-dd'
    ) as start
""")

In [None]:
d2 = matches_team_details.filter( (matches_team_details.Date >= start_date_2.first()[0]) & (matches_team_details.Date <= end_date_2.first()[0]) )


In [None]:
d2 = d2.withColumn("HomePoints", when(col("FTR") == "H", 3).otherwise(when(col("FTR") == "D", 1).otherwise(0)))
d2 = d2.withColumn("AwayPoints", when(col("FTR") == "A", 3).otherwise(when(col("FTR") == "D", 1).otherwise(0)))


In [None]:
d2 = d2.withColumn("HomePoints", d2["HomePoints"].cast("int"))
d2 = d2.withColumn("AwayPoints", d2["AwayPoints"].cast("int"))

In [None]:
d2 = d2.withColumn("TotalHomePoints", sum("HomePoints").over(Window.partitionBy("Season", "HomeTeam")))
d2 = d2.withColumn("TotalAwayPoints", sum("AwayPoints").over(Window.partitionBy("Season", "AwayTeam")))


In [None]:
d2 = d2.withColumn("TotalPoints", d2["TotalHomePoints"] + d2["TotalAwayPoints"])


Create a DataFrame with the total points for each team in each season

In [None]:
points_d2 = d2.select('Season', 'HomeTeam', 'AwayTeam', 'TotalPoints')

Find the teams with the lowest points in each season

In [None]:
lowest_points = points_d2.groupBy("Season", "HomeTeam").min("TotalPoints").withColumnRenamed("min(TotalPoints)", "TotalPoints")

Identify the relegated teams based on a threshold (adjust as needed)

In [None]:
relegated_teams = lowest_points.filter(col("TotalPoints") <= 30).select("HomeTeam").rdd.flatMap(lambda x: x).collect()

Remove duplicates from the list of relegated teams

In [None]:
relegated_teams = list(set(relegated_teams))

Display the list of relegated teams

In [None]:
print("Relegated Teams in the Past 10 Years:")
for team in relegated_teams:
    print(team)

Relegated Teams in the Past 10 Years:
Werder Bremen
Fortuna Dusseldorf
Braunschweig
Aston Villa
QPR
Sandhausen
Wigan
West Brom
Munich 1860
Reading
Stuttgart
Bochum
Hamburg
Greuther Furth
Fulham
Freiburg
Erzgebirge Aue
Stoke
Mainz
Wolfsburg
Hoffenheim
Ingolstadt
Ein Frankfurt
Regensburg


#3. Which season of Bundesliga was the most competitive in the last decade?

In [None]:
start_date_3 = spark.sql("""
    SELECT date_format(
        date_add(date_trunc('year', date_add(current_date(), -(365*20))), 0),
        'yyyy-MM-dd'
    ) as start
""")

In [None]:
end_date_3 = spark.sql("""
    SELECT date_format(
        date_add(date_trunc('year', date_add(current_date(), -(365*10))), 0),
        'yyyy-MM-dd'
    ) as start
""")

In [None]:
matches_total_score = matches_team_details

In [None]:
matches_total_score.show()

In [None]:
matches_total_score = matches_total_score.withColumn("HomePoints", when(col("FTR") == "H", 3).otherwise(when(col("FTR") == "D", 1).otherwise(0)))
matches_total_score = matches_total_score.withColumn("AwayPoints", when(col("FTR") == "A", 3).otherwise(when(col("FTR") == "D", 1).otherwise(0)))


In [None]:
matches_total_score = matches_total_score.withColumn("HomePoints", matches_total_score["HomePoints"].cast("int"))
matches_total_score = matches_total_score.withColumn("AwayPoints", matches_total_score["AwayPoints"].cast("int"))

In [None]:
matches_total_score = matches_total_score.withColumn("TotalHomePoints", sum("HomePoints").over(Window.partitionBy("Season", "HomeTeam")))
matches_total_score = matches_total_score.withColumn("TotalAwayPoints", sum("AwayPoints").over(Window.partitionBy("Season", "AwayTeam")))


In [None]:
matches_total_score = matches_total_score.withColumn("TotalPoints", matches_total_score["TotalHomePoints"] + matches_total_score["TotalAwayPoints"])


In [None]:
matches_total_score.show()

+--------+---+------+----------+----------------+--------+----+----+---+--------------+----------------+----------+----------+---------------+---------------+-----------+
|Match_ID|Div|Season|      Date|        HomeTeam|AwayTeam|FTHG|FTAG|FTR|Unique_Team_ID|        TeamName|HomePoints|AwayPoints|TotalHomePoints|TotalAwayPoints|TotalPoints|
+--------+---+------+----------+----------------+--------+----+----+---+--------------+----------------+----------+----------+---------------+---------------+-----------+
|   37908| E0|  1993|1994-04-23|     Aston Villa| Arsenal|   1|   2|  A|            82|     Aston Villa|         0|         3|             58|             66|        124|
|   37908| E0|  1993|1994-04-23|     Aston Villa| Arsenal|   1|   2|  A|            81|         Arsenal|         0|         3|             58|             66|        124|
|   37647| E0|  1993|1993-11-20|         Chelsea| Arsenal|   0|   2|  A|            83|         Chelsea|         0|         3|             76|   

In [None]:
d4 = matches_total_score.filter( (matches_total_score.Date >= start_date_3.first()[0]) & (matches_total_score.Date <= end_date_3.first()[0]) )

In [None]:
d4.show(3)

+--------+---+------+----------+-----------+--------+----+----+---+--------------+-----------+----------+----------+---------------+---------------+-----------+
|Match_ID|Div|Season|      Date|   HomeTeam|AwayTeam|FTHG|FTAG|FTR|Unique_Team_ID|   TeamName|HomePoints|AwayPoints|TotalHomePoints|TotalAwayPoints|TotalPoints|
+--------+---+------+----------+-----------+--------+----+----+---+--------------+-----------+----------+----------+---------------+---------------+-----------+
|   41385| E0|  2002|2003-04-05|Aston Villa| Arsenal|   1|   1|  D|            82|Aston Villa|         1|         1|             70|             62|        132|
|   41385| E0|  2002|2003-04-05|Aston Villa| Arsenal|   1|   1|  D|            81|    Arsenal|         1|         1|             70|             62|        132|
|   41300| E0|  2002|2003-01-12| Birmingham| Arsenal|   0|   4|  A|           115| Birmingham|         0|         3|             58|             62|        120|
+--------+---+------+----------+--

In [None]:
from pyspark.sql.functions import min, max

 Calculate the total points for each team in each season

In [None]:
team_season_points = d4.groupBy("Season", "HomeTeam").sum("HomePoints").withColumnRenamed("sum(HomePoints)", "TotalPoints")

Define a window specification to rank teams within each season based on total points

In [None]:
windowSpec_4 = Window.partitionBy("Season").orderBy(col("TotalPoints").desc())

Rank teams within each season

In [None]:
ranked_teams = team_season_points.withColumn("Rank", rank().over(windowSpec))


Filter for top teams

In [None]:
top_teams_per_season = ranked_teams.filter(col("Rank") <= 5)  # Consider the top 5 teams

In [None]:
top_teams_per_season.show()

+------+--------------+-----------+----+
|Season|      HomeTeam|TotalPoints|Rank|
+------+--------------+-----------+----+
|  2002|       Hamburg|         50|   1|
|  2002|    Man United|         44|   2|
|  2002|        Hertha|         44|   2|
|  2002|        Lubeck|         40|   4|
|  2002|    M'gladbach|         40|   4|
|  2002|Greuther Furth|         40|   4|
|  2002|      Dortmund|         40|   4|
|  2002|       Chelsea|         40|   4|
|  2002|      Freiburg|         40|   4|
|  2002|     Newcastle|         40|   4|
|  2003|       Arsenal|         98|   1|
|  2003| Bayern Munich|         84|   2|
|  2003|    Man United|         80|   3|
|  2003|       Chelsea|         80|   3|
|  2003|      Dortmund|         78|   5|
|  2003|      Nurnberg|         78|   5|
|  2004|       Chelsea|         94|   1|
|  2004|       Arsenal|         88|   2|
|  2004| Bayern Munich|         88|   2|
|  2004|    Man United|         84|   4|
+------+--------------+-----------+----+
only showing top

In [None]:
# Calculate the points difference among the top teams in each season
points_difference_per_season = top_teams_per_season.groupBy('Season').agg( max(col('TotalPoints')) - min(col('TotalPoints'))).alias("PointsDifference")

In [None]:
points_difference_per_season.show()

+------+-------------------------------------+
|Season|(max(TotalPoints) - min(TotalPoints))|
+------+-------------------------------------+
|  2002|                                   10|
|  2003|                                   20|
|  2004|                                   10|
|  2005|                                   22|
|  2006|                                   12|
|  2007|                                   22|
|  2008|                                   22|
|  2009|                                   18|
|  2010|                                   30|
|  2011|                                   24|
|  2012|                                   14|
+------+-------------------------------------+



In [None]:
# Find the season with the smallest points difference among the top teams
most_competitive_season = points_difference_per_season.orderBy('(max(TotalPoints) - min(TotalPoints))')

In [None]:
print("The most competitive season in the last decade was:")
most_competitive_season.first()[0]

The most competitive season in the last decade was:


2002


#4. What's the best month to watch Bundesliga?

In [None]:
from pyspark.sql.functions import month, avg

In [None]:
matches_total_score = matches_total_score.withColumn("Month", month(matches_total_score["Date"]))

In [None]:
# Calculate the average number of goals scored per match for each month
avg_goals_per_month = matches_total_score.groupBy("Month").agg(avg((matches_total_score["FTHG"] + matches_total_score["FTAG"])).alias("AvgGoalsPerMatch"))

In [None]:
# Find the month with the highest average number of goals scored per match
best_month = avg_goals_per_month.orderBy("AvgGoalsPerMatch", ascending=False).first()


In [None]:
best_month

Row(Month=6, AvgGoalsPerMatch=3.4007936507936507)

In [None]:
print("The best month to watch the Bundesliga is month", best_month["Month"])

The best month to watch the Bundesliga is month 6
