In [86]:
from pyspark.sql import SparkSession,Window
import pyspark.sql.functions as f

In [2]:
spark = SparkSession.builder.appName("FootballAnalysis").master("local[3]").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/29 08:42:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [63]:
df = spark.read\
        .format("csv")\
        .option("header","true")\
        .option("inferSchema","true")\
        .load("/Users/mohan/Desktop/PySpark/wrangling__pyspark/Data/Matches.csv")
#Rename Column
old_col = df.columns[-3:]
new_col = ['HomeTeamGoals','AwayTeamGoals','FinalResult']
col = zip(old_col,new_col)
for old,new in col:
    df = df.withColumnRenamed(old,new)
df = df.withColumn('HomeTeamWin', f.when(f.col('FinalResult') == 'H', 1).otherwise(0)) \
                        .withColumn('AwayTeamWin', f.when(f.col('FinalResult') == 'A', 1).otherwise(0)) \
                        .withColumn('GameTie', f.when(f.col('FinalResult') == 'D', 1).otherwise(0))

In [64]:
#checking null values in all Columns
null_values = [df.filter(f.col(column).isNull()).count() for column in df.columns]
null_values

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

In [99]:
# Q1 . Who are the winners of the D1 division in the Germany Football Association (Bundesliga) in the last decade?
bundesliga = df.filter(f.expr("Div='D1' and Season>=2000 and Season<=2010"))

home = bundesliga.groupBy("Season","HomeTeam")\
                .agg(f.sum("HomeTeamWin").alias("TotalHomeWin"),
                    f.sum("AwayTeamWin").alias("TotalHomeLoss"),
                    f.sum('GameTie').alias('TotalHomeTie'), 
                     f.sum('HomeTeamGoals').alias('HomeScoredGoals'),
                     f.sum('AwayTeamGoals').alias('HomeAgainstGoals')
                    ).withColumnRenamed('HomeTeam','Team')

away = bundesliga.groupBy("Season","AwayTeam")\
                .agg(f.sum("AwayTeamWin").alias("TotalAwayWin"),
                    f.sum("HomeTeamWin").alias("TotalAwayLoss"),
                    f.sum('GameTie').alias('TotalAwayTie'), 
                     f.sum('AwayTeamGoals').alias('AwayScoredGoals'),
                     f.sum('HomeTeamGoals').alias('AwayAgainstGoals')
                    ).withColumnRenamed('AwayTeam','Team')

window = Window.partitionBy('Season').orderBy(f.col('win_pct').desc(),f.col('GoalDifferentials').desc())

table = home.join(away,how='inner',on=['Season','Team'])\
            .withColumn('Win',f.col('TotalHomeWin') + f.col('TotalAwayWin'))\
            .withColumn('Loss',f.col('TotalHomeLoss') + f.col('TotalAwayLoss'))\
            .withColumn('Tie',f.col('TotalHomeTie') + f.col('TotalAwayTie'))\
            .withColumn('win_pct',f.round(f.col('Win')*100/(f.col('Win')+f.col('Loss')+f.col('Tie')),2))\
            .withColumn('GoalsScored',f.col('HomeScoredGoals') + f.col('AwayScoredGoals'))\
            .withColumn('GoalsAgainst',f.col('HomeAgainstGoals') + f.col('AwayAgainstGoals'))\
            .withColumn('GoalDifferentials',f.col('GoalsScored') - f.col('GoalsAgainst'))\
            .withColumn('position',f.rank().over(window))\
            .select('Season','Team','Win','Loss','Tie','win_pct','GoalsScored','GoalsAgainst','GoalDifferentials','position')
            
table_df = table.where(f.col('position')==1).toPandas()


In [100]:
table_df

Unnamed: 0,Season,Team,Win,Loss,Tie,win_pct,GoalsScored,GoalsAgainst,GoalDifferentials,position
0,2000,Bayern Munich,19,9,6,55.88,62,37,25,1
1,2001,Leverkusen,21,7,6,61.76,77,38,39,1
2,2002,Bayern Munich,23,5,6,67.65,70,25,45,1
3,2003,Werder Bremen,22,4,8,64.71,79,38,41,1
4,2004,Bayern Munich,24,5,5,70.59,75,33,42,1
5,2005,Bayern Munich,22,3,9,64.71,67,32,35,1
6,2006,Stuttgart,21,6,7,61.76,61,37,24,1
7,2007,Bayern Munich,22,2,10,64.71,68,21,47,1
8,2008,Wolfsburg,21,7,6,61.76,80,41,39,1
9,2009,Bayern Munich,20,4,10,58.82,72,31,41,1


In [113]:
# Q2. Which teams have been relegated in the past 10 years?
relegated_teams = table.where(f.expr('position=18 or position=17 or position=16')).toPandas()

In [114]:
relegated_teams

Unnamed: 0,Season,Team,Win,Loss,Tie,win_pct,GoalsScored,GoalsAgainst,GoalDifferentials,position
0,2000,Stuttgart,9,14,11,26.47,42,49,-7,16
1,2000,Unterhaching,8,15,11,23.53,35,59,-24,17
2,2000,Bochum,7,21,6,20.59,30,67,-37,18
3,2001,Freiburg,7,18,9,20.59,37,64,-27,16
4,2001,FC Koln,7,19,8,20.59,26,61,-35,17
5,2001,St Pauli,4,20,10,11.76,37,70,-33,18
6,2002,Bielefeld,8,14,12,23.53,35,46,-11,16
7,2002,Nurnberg,8,20,6,23.53,33,60,-27,17
8,2002,Cottbus,7,18,9,20.59,34,64,-30,18
9,2003,Munich 1860,8,18,8,23.53,32,55,-23,17


In [None]:
# Q3.Does Octoberfest affect the performance of Bundesliga?
