In [214]:
import findspark
import pandas as pd
findspark.init()
findspark.find()
import numpy as np

In [215]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
spark = (
    SparkSession
    .builder
    .appName("sparktest")
    .master("local[6]")
    .getOrCreate()
)
#A SparkContext represents the connection to a Spark cluster, and can be used to create RDD and broadcast variables on that cluster. It is the first step to using RDDs and connecting to a cluster

sc = spark.sparkContext
spark

ConnectionRefusedError: [Errno 61] Connection refused

In [None]:
#can read csv using sc.textFile("C:/...")

In [None]:
import findspark
from pyspark import SparkContext
from pyspark.sql import SparkSession, Window, Row
from pyspark.sql.functions import *
from pyspark.sql.types import *
import matplotlib.pyplot as plt

In [None]:
spark = SparkSession \
        .builder \
        .appName("firstSpark") \
        .getOrCreate()

In [None]:
df = spark.read.format('csv').options(header = 'true').load('serieA-matches.csv')
df.limit(5).show()

In [None]:
df.limit(5).toPandas()

In [None]:
#who are the winners of the serie A in the last 10 years?

In [None]:
#create 3 new columns that stand for the number of home, away wins and draws
df_matches = df.withColumn('HomeTeamWin', when (col('FinalResult') == 'H', 1).otherwise(0))\
             .withColumn('AwayTeamWin', when(col('FinalResult') == 'A', 1).otherwise(0)) \
             .withColumn('GameDraw', when(col('FinalResult') == 'D', 1).otherwise(0))


In [None]:
#remove any unnecessary columns we may not use
df_matches = df_matches.drop(*['Attendance','Referee','Round'])
df_matches.toPandas()

In [None]:
#create a dataframe that groups home matches such as the number of home wins,losses, home goals, goals conceded and call it home
home = df_matches.groupby('Season','Home')\
       .agg(sum('HomeTeamWin').alias('TotalHomeWin'),
           sum('AwayTeamWin').alias('TotalHomeLoss'),
           sum('GameDraw').alias('TotalHomeDraw'),
           sum('HomeGoals').alias('HomeScoredGoals'),
           sum('AwayGoals').alias('HomeAgainstGoals'))\
       .withColumnRenamed('Home','Team')

In [None]:
#do the same for away matches
away = df_matches.groupby('Season','Away')\
       .agg(sum('AwayTeamWin').alias('TotalAwayWin'),
           sum('HomeTeamWin').alias('TotalAwayLoss'),
           sum('GameDraw').alias('TotalAwayDraw'),
           sum('AwayGoals').alias('AwayScoredGoals'),
           sum('HomeGoals').alias('AwayAgainstGoals'))\
       .withColumnRenamed('Away','Team')

In [None]:
home.toPandas()

In [None]:
#order by the window which is the Season but the ranking is in descending order by win percentage and goal difference taking the second priority. Also inner join on Team and season to create a single df containing each Serie A table season

window = ['Season']
window = Window.partitionBy(window).orderBy(col('WinPct').desc(), col('GoalDifferentials').desc())
table = home.join(away, ['Team', 'Season'],  'inner') \
    .withColumn('GoalsScored', col('HomeScoredGoals') + col('AwayScoredGoals')) \
    .withColumn('GoalsAgainst', col('HomeAgainstGoals') + col('AwayAgainstGoals')) \
    .withColumn('GoalDifferentials', col('GoalsScored') - col('GoalsAgainst')) \
    .withColumn('Win', col('TotalHomeWin') + col('TotalAwayWin')) \
    .withColumn('Loss', col('TotalHomeLoss') + col('TotalAwayLoss')) \
    .withColumn('Tie', col('TotalHomeDraw') + col('TotalAwayDraw')) \
    .withColumn('WinPct', round((100* col('Win')/(col('Win') + col('Loss') + col('Tie'))), 2)) \
    .drop('HomeScoredGoals', 'AwayScoredGoals', 'HomeAgainstGoals', 'AwayAgainstGoals') \
    .drop('TotalHomeWin', 'TotalAwayWin', 'TotalHomeLoss', 'TotalAwayLoss', 'TotalHomeDraw', 'TotalAwayDraw') \
    .withColumn('TeamPosition', rank().over(window)) 
table_df = table.filter(col('TeamPosition') == 1).orderBy(asc('Season')).toPandas()
table_df

In [None]:
#same here but totalpoints are used instead

window = ['Season']
window_totalpoints = Window.partitionBy(window).orderBy(col('TotalPoints').desc(), col('GoalDifferentials').desc())
table_totalpoints = home.join(away, ['Team', 'Season'],  'inner') \
    .withColumn('GoalsScored', col('HomeScoredGoals') + col('AwayScoredGoals')) \
    .withColumn('GoalsAgainst', col('HomeAgainstGoals') + col('AwayAgainstGoals')) \
    .withColumn('GoalDifferentials', col('GoalsScored') - col('GoalsAgainst')) \
    .withColumn('Win', col('TotalHomeWin') + col('TotalAwayWin')) \
    .withColumn('Loss', col('TotalHomeLoss') + col('TotalAwayLoss')) \
    .withColumn('Tie', col('TotalHomeDraw') + col('TotalAwayDraw')) \
    .withColumn('TotalPoints', (col('Win') *3 + col('Tie'))) \
    .drop('HomeScoredGoals', 'AwayScoredGoals', 'HomeAgainstGoals', 'AwayAgainstGoals') \
    .drop('TotalHomeWin', 'TotalAwayWin', 'TotalHomeLoss', 'TotalAwayLoss', 'TotalHomeDraw', 'TotalAwayDraw') \
    .withColumn('TeamPosition', rank().over(window_totalpoints))
totalpoints_df = table_totalpoints.filter(col('TeamPosition') == 1).orderBy(asc('Season')).toPandas()
totalpoints_df

In [None]:
#Which teams have been relegated in the last 3 years?
relegated = table_totalpoints.filter(col('TeamPosition') > 17).orderBy(asc('Season')).toPandas()
relegated

In [None]:
#Does the winter affect the perfomance of the bundesliga
#compare overall win percentage for each team with the average win percentage over the winter and look at charts
#generate a chart of win percentage fluctuating over the season, so do a cumulating sum of wins and away wins

In [None]:
#Which season of the Serie A was the most competitive in the last decade?
#Find which season has the least points difference between first and sixth
competitive_df = table_totalpoints.filter((col('TeamPosition') == 1) | (col('TeamPosition') == 6)).orderBy(asc('Season')).drop('Team')
# Define the window specification partitioned by 'Season'
window_spec = Window.partitionBy('Season').orderBy('Season')  # Replace 'SomeColumn' with the column used for ordering
# Calculate the difference between rows with the same 'Season' value
df_with_diff = competitive_df.withColumn('Difference', col('TotalPoints') - lead(col('TotalPoints')).over(window_spec)).drop('GoalsScored','GoalsAgainst','GoalDifferentials','Win','Loss','Tie','TotalPoint','TeamPosition','TotalPoints')
df_with_diff.toPandas().dropna()

In [None]:
#What's the best month to watch Bundesliga?
#find which month of the season has the highest average goals scored in the league
#check type of date, include date and month in groupby transformation, do total of goals scored based on season and monthly window.

In [None]:
x = table_totalpoints.groupby('Season').sum('GoalsScored').orderBy(asc('Season'))
x.toPandas()

In [None]:
df.limit(5).toPandas()