In [1]:
!pip install pyspark==3.4.0

Collecting pyspark==3.4.0
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317122 sha256=5fd1621047c9e59429a56a26418fa823e07a9fbbcbfc762983410f8134a71ce0
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [2]:
import pyspark

In [3]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Multiple Datasets Analysis") \
    .getOrCreate()


In [5]:
# Load datasets
df1 = spark.read.csv("Matches.csv", header=True, inferSchema=True)
df2 = spark.read.csv("Teams.csv", header=True, inferSchema=True)
df3 = spark.read.csv("Teams_in_Matches.csv", header=True, inferSchema=True)
df4 = spark.read.csv("Unique_Teams.csv", header=True, inferSchema=True)


In [6]:
# Show schema and first few rows of each dataset
print("Schema and Sample Data of Dataset 1")
df1.printSchema()
df1.show(5)

print("Schema and Sample Data of Dataset 2")
df2.printSchema()
df2.show(5)

print("Schema and Sample Data of Dataset 3")
df3.printSchema()
df3.show(5)

print("Schema and Sample Data of Dataset 4")
df4.printSchema()
df4.show(5)


Schema and Sample Data of Dataset 1
root
 |-- Match_ID: integer (nullable = true)
 |-- Div: string (nullable = true)
 |-- Season: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- HomeTeam: string (nullable = true)
 |-- AwayTeam: string (nullable = true)
 |-- FTHG: integer (nullable = true)
 |-- FTAG: integer (nullable = true)
 |-- FTR: string (nullable = true)

+--------+---+------+----------+-------------+--------------+----+----+---+
|Match_ID|Div|Season|      Date|     HomeTeam|      AwayTeam|FTHG|FTAG|FTR|
+--------+---+------+----------+-------------+--------------+----+----+---+
|       1| D2|  2009|2010-04-04|   Oberhausen|Kaiserslautern|   2|   1|  H|
|       2| D2|  2009|2009-11-01|  Munich 1860|Kaiserslautern|   0|   1|  A|
|       3| D2|  2009|2009-10-04|Frankfurt FSV|Kaiserslautern|   1|   1|  D|
|       4| D2|  2009|2010-02-21|Frankfurt FSV|     Karlsruhe|   2|   1|  H|
|       5| D2|  2009|2009-12-06|        Ahlen|     Karlsruhe|   1|   3|  A|
+--------+--

In [10]:
# Assuming the 'Season' column indicates the year
recent_d1_matches = df1.filter((df1.Div == 'D1') & (df1.Season >= 2013))

In [11]:
from pyspark.sql.functions import count, desc

winners_by_season = recent_d1_matches \
    .groupBy('Season', 'HomeTeam') \
    .agg(count('*').alias('wins')) \
    .orderBy('Season', desc('wins')) \
    .dropDuplicates(['Season'])

# Show the winners for each season
winners_by_season.show()

+------+----------+----+
|Season|  HomeTeam|wins|
+------+----------+----+
|  2013|Hoffenheim|  17|
|  2014|  Freiburg|  17|
|  2015| Wolfsburg|  17|
|  2016| Wolfsburg|  17|
|  2017|  Augsburg|  17|
+------+----------+----+



In [12]:
from pyspark.sql.functions import count, desc

winners_by_division = df1 \
    .groupBy('Season', 'Div', 'HomeTeam') \
    .agg(count('*').alias('wins')) \
    .orderBy('Season', 'Div', desc('wins')) \
    .dropDuplicates(['Season', 'Div'])

# Show the winners for each season and division
winners_by_division.show()

+------+---+--------------+----+
|Season|Div|      HomeTeam|wins|
+------+---+--------------+----+
|  1993| D1|      Dortmund|  17|
|  1993| D2|     Wolfsburg|  19|
|  1993| E0|       Arsenal|  21|
|  1994| D1|        Bochum|  17|
|  1994| D2| Frankfurt FSV|  17|
|  1994| E0|         Leeds|  21|
|  1995| D1|      Freiburg|  17|
|  1995| D2|      Hannover|  17|
|  1995| E0|     Blackburn|  19|
|  1996| D1|      St Pauli|  17|
|  1996| D2|      Mannheim|  17|
|  1996| E0|         Leeds|  19|
|  1997| D1|        Hertha|  17|
|  1997| D2|Greuther Furth|  17|
|  1997| E0|      West Ham|  19|
|  1998| D1|    Leverkusen|  17|
|  1998| D2|Greuther Furth|  17|
|  1998| E0|         Derby|  19|
|  1999| D1|    Schalke 04|  17|
|  1999| D2|      Hannover|  17|
+------+---+--------------+----+
only showing top 20 rows



In [15]:
from pyspark.sql.functions import count, desc

# Filter for D1 division and seasons in the last decade (2013 to 2023)
d1_winners = df1 \
    .filter((df1.Div == 'D1') & (df1.Season >= 2013) & (df1.Season <= 2023)) \
    .groupBy('Season', 'HomeTeam') \
    .agg(count('*').alias('wins')) \
    .orderBy('Season', desc('wins')) \
    .dropDuplicates(['Season'])

# Show the winners
d1_winners.show()

+------+----------+----+
|Season|  HomeTeam|wins|
+------+----------+----+
|  2013|Hoffenheim|  17|
|  2014|  Freiburg|  17|
|  2015| Wolfsburg|  17|
|  2016| Wolfsburg|  17|
|  2017|  Augsburg|  17|
+------+----------+----+



In [16]:
from pyspark.sql.functions import col, lag, when
from pyspark.sql.window import Window

# Assuming 'Div' indicates the division and a lower letter means a higher division
window_spec = Window.orderBy('Season')

# Calculate the previous division for each team
df_with_prev_div = df1.withColumn(
    'PrevDiv',
    lag('Div').over(window_spec.partitionBy('HomeTeam'))
)

# Filter for teams that moved to a lower division
relegated_teams = df_with_prev_div.filter(
    (df_with_prev_div.Div > df_with_prev_div.PrevDiv) &
    (df_with_prev_div.Season >= 2013)
).select('HomeTeam', 'Season', 'Div', 'PrevDiv')

relegated_teams.show()

+------------------+------+---+-------+
|          HomeTeam|Season|Div|PrevDiv|
+------------------+------+---+-------+
|      Braunschweig|  2014| D2|     D1|
|         Darmstadt|  2017| D2|     D1|
|Fortuna Dusseldorf|  2013| D2|     D1|
|          Freiburg|  2015| D2|     D1|
|    Greuther Furth|  2013| D2|     D1|
|          Hannover|  2016| D2|     D1|
|        Ingolstadt|  2017| D2|     D1|
|          Nurnberg|  2014| D2|     D1|
|         Paderborn|  2015| D2|     D1|
|         Stuttgart|  2016| D2|     D1|
+------------------+------+---+-------+



In [19]:
from pyspark.sql.functions import stddev, col

# Filter for D1 (Bundesliga) and last decade
bundesliga_last_decade = df1.filter((df1.Div == 'D1') & (df1.Season >= 2013))

# Calculate points for each team in each season
# (Assuming 3 points for a win, 1 for a draw, 0 for a loss)
bundesliga_with_points = bundesliga_last_decade.withColumn(
    'HomePoints',
    when(col('FTR') == 'H', 3).when(col('FTR') == 'D', 1).otherwise(0)
).withColumn(
    'AwayPoints',
    when(col('FTR') == 'A', 3).when(col('FTR') == 'D', 1).otherwise(0)
)

# Aggregate points for each team in each season
team_points_per_season = bundesliga_with_points.groupBy('Season', 'HomeTeam').sum('HomePoints').union(
    bundesliga_with_points.groupBy('Season', 'AwayTeam').sum('AwayPoints')
).groupBy('Season', 'HomeTeam').sum('sum(HomePoints)')

# Calculate standard deviation of points for each season
stddev_points_per_season = team_points_per_season.groupBy('Season').agg(stddev('sum(sum(HomePoints))').alias('stddev_points')).orderBy(desc('stddev_points'))

# Show the most competitive season
stddev_points_per_season.show(4)

+------+------------------+
|Season|     stddev_points|
+------+------------------+
|  2013|17.365355036841475|
|  2015|15.656129122007371|
|  2016|13.940583534452417|
|  2014|13.587287407167308|
+------+------------------+
only showing top 4 rows



In [20]:
from pyspark.sql.functions import month, sum


bundesliga_data = df1.filter(df1.Div == 'D1')
bundesliga_with_month = bundesliga_data.withColumn('Month', month(bundesliga_data['Date']))

# Calculate total goals scored per month
goals_per_month = bundesliga_with_month.groupBy('Month').agg(sum('FTHG').alias('HomeGoals'), sum('FTAG').alias('AwayGoals')) \
    .withColumn('TotalGoals', col('HomeGoals') + col('AwayGoals')) \
    .orderBy(desc('TotalGoals'))

# Show the month with the most goals
goals_per_month.show(1)

+-----+---------+---------+----------+
|Month|HomeGoals|AwayGoals|TotalGoals|
+-----+---------+---------+----------+
|    4|     1580|     1082|      2662|
+-----+---------+---------+----------+
only showing top 1 row



In [21]:
from pyspark.sql.functions import month, mean, col

# Assuming 'Date' column exists and is in a format that can be parsed
bundesliga_data = df1.filter(df1.Div == 'D1')
bundesliga_with_month = bundesliga_data.withColumn('Month', month(bundesliga_data['Date']))

# Calculate total goals per match
bundesliga_with_total_goals = bundesliga_with_month.withColumn('TotalGoals', col('FTHG') + col('FTAG'))

# Calculate average goals per match for each month
monthly_performance = bundesliga_with_total_goals.groupBy('Month').agg(mean('TotalGoals').alias('AvgGoalsPerMatch')).orderBy('Month')

# Display the performance of each month
monthly_performance.show()

+-----+------------------+
|Month|  AvgGoalsPerMatch|
+-----+------------------+
|    1| 2.271698113207547|
|    2| 2.475796930342385|
|    3|  2.67579408543264|
|    4|2.5645472061657033|
|    5|3.0875370919881306|
|    6|3.7777777777777777|
|    7|              1.75|
|    8|2.8117469879518073|
|    9|2.7309941520467835|
|   10|2.7037037037037037|
|   11| 2.803673938002296|
|   12|2.5015151515151515|
+-----+------------------+

