# Read parquet files for English Premier League

In [0]:
epl_df = spark.read.parquet("abfss://silver@efsa.dfs.core.windows.net/League=English Premier League/*.parquet")

# Import functions

In [0]:
from pyspark.sql.functions import current_date, date_format, col, concat, when, month, lit, year, to_date, count, sum

#Drop Null Values

In [0]:
epl_clean_df = epl_df.dropna(subset=['Date','HomeTeam','AwayTeam','HomeGoals','AwayGoals','Result'])

#Drop Duplicate Records

In [0]:
epl_clean_df = epl_clean_df.dropDuplicates(['Date','HomeTeam','AwayTeam','HomeGoals','AwayGoals','Result'])

#Update season data

In [0]:
epl_season_df = epl_clean_df.withColumn(
    "season",
    concat(
        when(month("date") >= 8, year("date"))
        .otherwise(year("date") - 1),
        lit("/"),
        when(month("date") >= 8, year("date") + 1)
        .otherwise(year("date"))
    )
)

#Get every goal scored by club in each season
1. First Get Home Goals
2. Get Away Goals
3. Get Total Goals per season

In [0]:
epl_home_goals_df = epl_season_df.groupBy('season',"HomeTeam").agg(sum("HomeGoals").alias("GoalsScored")).withColumnRenamed("HomeTeam","Team")

In [0]:
epl_away_goals_df = epl_season_df.groupBy('season',"AwayTeam").agg(sum("AwayGoals").alias("GoalsScored")).withColumnRenamed("AwayTeam","Team")

In [0]:
total_goals_scored_df = epl_home_goals_df.union(epl_away_goals_df).groupBy('season','Team').agg(sum("GoalsScored").alias("TotalGoalsScored"))

#Get Goals Against

In [0]:
epl_goals_against_as_hometeam_df = epl_season_df.groupBy('season',"HomeTeam").agg(sum("AwayGoals").alias("GoalsAgainst"))\
                            .withColumnRenamed("HomeTeam","Team")

In [0]:
epl_goals_against_as_awayteam_df = epl_season_df.groupBy('season',"AwayTeam").agg(sum("HomeGoals").alias("GoalsAgainst"))\
                      .withColumnRenamed("AwayTeam","Team")

In [0]:
total_goals_against_df = epl_goals_against_as_hometeam_df.union(epl_goals_against_as_awayteam_df)\
                        .groupBy('season','Team').agg(sum("GoalsAgainst").alias("TotalGoalsAgainst"))

#Simple Task

- Get the goal difference in a new column

In [0]:
epl_goal_difference = total_goals_scored_df.join(total_goals_against_df,['season','Team'],'inner')\
                              .withColumn('GoalDifference',col('TotalGoalsScored')-col('TotalGoalsAgainst'))

#Next Task is to get total number of points as well as wins

In [0]:
epl_home_wins_loses = epl_season_df.withColumn('Wins',when(col('Result')=='H','1').otherwise('0'))\
  .withColumn('Loses',when(col('Result')=='A','1').otherwise('0'))\
    .withColumn('Draw',when(col('Result')=='D','1').otherwise('0'))\
      .groupBy('season','HomeTeam').agg(sum('Wins').alias('Wins'),sum('Loses').alias('Loses'),sum('Draw').alias('Draw')).withColumnRenamed('HomeTeam','Team')

In [0]:
epl_away_wins_loses = epl_season_df.withColumn('Wins',when(col('Result')=='A','1').otherwise('0'))\
                                .withColumn('Loses',when(col('Result')=='H','1').otherwise('0'))\
                                .withColumn('Draw',when(col('Result')=='D','1').otherwise('0'))\
                                .groupBy('season','AwayTeam').agg(sum('Wins').alias('Wins'),sum('Loses').alias('Loses'),sum('Draw').alias('Draw'))\
                                .withColumnRenamed('AwayTeam','Team')

In [0]:
epl_total_wins_loses = epl_home_wins_loses.union(epl_away_wins_loses)\
                            .groupBy('season','Team').agg(sum('Wins').alias('Wins'),sum('Loses').alias('Loses'),sum('Draw').alias('Draw'))

#Join goal difference table and total wins table

In [0]:
epl_table_df = epl_goal_difference.join(epl_total_wins_loses,['season','Team'],'inner')

# To Get Total Points in the season

In [0]:
epl_points_df = epl_table_df.withColumn('Points',(col('Wins')*3+col('Draw')))

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank, row_number, lit

w = Window.partitionBy('season').orderBy(col('Points').desc(),col('GoalDifference').desc())

epl_table_df = epl_points_df.withColumn('Position',dense_rank().over(w)).withColumn('League',lit('epl'))

In [0]:
epl_table_df.write.format('delta').partitionBy('League','season').mode('overwrite').saveAsTable('football.europeanfootball.epl')

In [0]:
%sql
select * from football.europeanfootball.epl
where season = '2018/2019'