In [None]:
#INSY 5376 Big Data Analytics - Project - IPL Player Performance Analysis
#Team Members :
# Amuluru, Sriram Sai
# Grandhi, Anish
# Potukuchi, Sameer Kumar
# Thanikonda, Pruthvi Sai Kumar

#Import the required packages
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from collections import namedtuple

#Create and initialize Spark Conf, Spark Context and SQL Context
conf = SparkConf().setMaster("local[*]").setAppName("IPL Data Analysis")
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)

#Read the CSVs and Parse the data
deliveriesFields = ('matchID','inning','battingTeam','bowlingTeam','over','ball','batsman','nonStriker','bowler','isSuperOver','wideRuns','byeRuns','legByeRuns','noballRuns','penaltyRuns','batsmanRuns','extraRuns','totalRuns','playerDismissed','dismissalKind','fielder')
deliveriesColumns = namedtuple('deliveries',deliveriesFields)
def parse(line):
    line = line.encode('ascii','ignore')
    fields = line.split(",")
    match_id = fields[0]
    inning = fields[1]
    batting_team = fields[2]
    bowling_team = fields[3]
    over = int(fields[4])
    ball = int(fields[5])
    batsman = fields[6]
    non_striker = fields[7]
    bowler = fields[8]
    is_super_over = fields[9]
    wide_runs = fields[10]
    bye_runs = fields[11]
    legbye_runs = fields[12]
    noball_runs = fields[13]
    penalty_runs = fields[14]
    batsman_runs = int(fields[15])
    extra_runs= int(fields[16])
    total_runs = int(fields[17])
    player_dismissed = fields[18]
    dismissal_kind = fields[19]
    fielder = fields[20]
    return deliveriesColumns(match_id,inning,batting_team,bowling_team,over,ball,batsman,non_striker,bowler,is_super_over,wide_runs,bye_runs,legbye_runs,noball_runs,penalty_runs,batsman_runs,extra_runs,total_runs,player_dismissed,dismissal_kind,fielder)
    
    



deliveriesRDD = sc.textFile('deliveries.csv')
deliveriesRDD = deliveriesRDD.filter(lambda x : 'inning' not in x)
deliveriesRDD = deliveriesRDD.map(parse)
deliveriesDF = sqlContext.createDataFrame(deliveriesRDD)

In [None]:
matchesFields = ('id','season','city','date','team1','team2','tossWinner','tossDecision','result','dlApplied','winner','winByRuns','winByWickets','playerOfMatch','venue','umpire1','umpire2','umpire3')
matchesColumns = namedtuple('matches',matchesFields)
def parseMatches(line):
    line = line.encode('ascii','ignore')
    fields = line.split(",")
    match_id = fields[0]
    season = fields[1]
    city = fields[2]
    date = fields[3]
    team1 = fields[4]
    team2 = fields[5]
    tossWinner = fields[6]
    tossDecision = fields[7]
    result = fields[8]
    dlApplied = fields[9]
    winner = fields[10]
    winByRuns = fields[11]
    winByWickets = fields[12]
    playerOfMatch = fields[13]
    venue = fields[14]
    umpire1 = fields[15]
    umpire2= fields[16]
    umpire3 = fields[17]
    return matchesColumns(match_id,season,city,date,team1,team2,tossWinner,tossDecision,result,dlApplied,winner,winByRuns,winByWickets,playerOfMatch,venue,umpire1,umpire2,umpire3)
    
    



matchesRDD = sc.textFile('matches.csv')
matchesRDD = matchesRDD.filter(lambda x : 'season' not in x)
matchesRDD = matchesRDD.map(parseMatches)
matchesDF = sqlContext.createDataFrame(matchesRDD)
matchesDF = matchesDF.drop(matchesDF['umpire3'])

In [None]:
#Joining matches and Deliveries for season specific filtering
batsmenDF = matchesDF.join(deliveriesDF, matchesDF.id == deliveriesDF.matchID, "inner")

In [None]:
#Import required packages and Create a variable named minimum number of seasons for batsmen, this is the minimum number
#of seasons a batsman has featured, the variable will be 1 if the number of seasons is more than 2 and 0 other wise.
#This variable will play an important role in regression
from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql.functions import udf
from pyspark.sql.functions import count, col
seasonsDF = batsmenDF.select('batsman','season').distinct().groupBy('batsman').agg(count('season').alias('num_seasons'))
function = udf(lambda numSeasons: 1 if numSeasons>=3 else 0, IntegerType())
seasonsDF = seasonsDF.select(col('batsman').alias('batsman1'),'num_seasons', function(col('num_seasons')).alias('batsman_min_seasons'))
batsmenDF = batsmenDF.join(seasonsDF, batsmenDF.batsman == seasonsDF.batsman1,"inner")


In [None]:
#Filter out the data from 2017, the data from 2008 to 2016 forms our training set and the entire data from 2008 to 2017 forms
#our test set.
batsmenDF2016 = batsmenDF.where('season !=2017')
batsmenDF2016 = batsmenDF2016.drop('num_seasons', 'batsman_min_seasons')
seasonsDF1 = batsmenDF2016.select('batsman','season').distinct().groupBy('batsman').agg(count('season').alias('num_seasons'))
function = udf(lambda numSeasons: 1 if numSeasons>=3 else 0, IntegerType())
seasonsDF1 = seasonsDF1.select(col('batsman').alias('batsman1'),'num_seasons', function(col('num_seasons')).alias('batsman_min_seasons'))
batsmenDF2016 = batsmenDF2016.join(seasonsDF1, batsmenDF2016.batsman == seasonsDF1.batsman1,"inner")
batsmenDF2016 = batsmenDF2016.drop('batsman1')


In [None]:
#Spark SQL Processing to calculate the total number of not outs for a batsman
batsmanDF2016Dismissed = batsmenDF2016.where("playerDismissed !=''").collect()
batsmanDF2016Dismissed = sqlContext.createDataFrame(batsmanDF2016Dismissed)
batsmanDF2016Dismissed = batsmanDF2016Dismissed.select('matchID', 'inning',col('playerDismissed').alias('batsman'),'dismissalKind','fielder')

In [None]:
#Spark SQL Processing to determine the number of innings a batsman has played.
from pyspark.sql.functions import sum
batsmenDFGrouped2016 = batsmenDF2016.groupBy('matchID','inning','batsman').agg(sum('batsmanRuns').alias('batsmanRuns'))


In [None]:
batsmenDFGrouped2016 = batsmenDFGrouped2016.join(batsmanDF2016Dismissed,['matchID','inning','batsman'], how="left")

In [None]:
#Spark SQL processing to determine the number of not outs for a batsman
replaceFunction = udf(lambda fielder: "-" if (fielder==None or fielder=='') else fielder)
replaceDismissal = udf(lambda dismissal: "not-out" if dismissal == None else dismissal)
batsmenDFGrouped2016 = batsmenDFGrouped2016.withColumn('fielder',replaceFunction(col('fielder')))
batsmenDFGrouped2016 = batsmenDFGrouped2016.withColumn('dismissalKind',replaceDismissal(col('dismissalKind')))

In [None]:
#Spark SQL Processing to determine the number of innings a batsman has played.
num_of_innings = batsmenDFGrouped2016.groupBy('inning','batsman').count()
num_of_innings=num_of_innings.groupBy('batsman').agg(sum('count').alias('num_of_innings'))

In [None]:
#Spark SQL processing to determine the number of not outs for a batsman
batsmanDismissal2016DF = batsmenDFGrouped2016.select('dismissalKind','batsman')
batsmanDismissal2016DF = batsmanDismissal2016DF.groupBy('batsman','dismissalKind').count()
batsmanDismissal2016DF = batsmanDismissal2016DF.where("dismissalKind=='not-out'")
batsmanDismissal2016DF = batsmanDismissal2016DF.drop('dismissalKind')
batsmanDismissal2016DF = batsmanDismissal2016DF.withColumnRenamed('count','num_not_outs')

In [None]:
#Spark SQL processing to calculate the batting average of a batsman
from pyspark.sql.types import DecimalType
import pyspark.sql.functions as func
batsmenDFGrouped2016 = batsmenDFGrouped2016.groupBy('batsman').agg(sum('batsmanRuns').alias('batsmanRuns'))
batsmenDFGrouped2016 = batsmenDFGrouped2016.join(num_of_innings, ['batsman'])
batsmenDFGrouped2016 = batsmenDFGrouped2016.join(batsmanDismissal2016DF, ['batsman'])
calculateBattingAverage = udf(lambda batsmanRuns, numInnings, numNotOuts: float(batsmanRuns) / float(numInnings-numNotOuts) if (numInnings-numNotOuts) != 0 else float(0))
batsmenDFGrouped2016 = batsmenDFGrouped2016.withColumn('battingAverage', calculateBattingAverage(col('batsmanRuns'), col('num_of_innings'), col('num_not_outs')))


In [None]:
##Spark SQL processing to round the averge to two decimal places.
import pyspark.sql.functions as func
batsmenDFGrouped2016 = batsmenDFGrouped2016.withColumn('battingAverage', func.round(col('battingAverage'),2))

In [None]:
#Perform various joins to create a dataframe with all the required batsman stats
balls2016 = batsmenDF2016.groupBy('batsman').agg(count('ball').alias('balls'))
runs2016 = batsmenDF2016.groupBy('batsman').agg(sum('batsmanRuns').alias('batsmanRuns'))
batsmenStats2016 = balls2016.join(runs2016, ['batsman'])


In [None]:
#Count the number of fours for a batsman
fours = batsmenDF2016.where('batsmanRuns == 4')
fours = fours.groupBy('batsman').agg(count('batsmanRuns').alias('fours'))


In [None]:
#Fill the null values with 0 since the batsman has not scored any fours.
batsmenStats2016 = batsmenStats2016.join(fours, ['batsman'], how = 'left')
batsmenStats2016 = batsmenStats2016.na.fill({'fours':0})
batsmenStats2016.show()

In [None]:
#Count the number of sixes for a batsman
sixes = batsmenDF2016.where('batsmanRuns == 6')
sixes = sixes.groupBy('batsman').agg(count('batsmanRuns').alias('sixes'))


In [None]:
#Fill the null values with 0 since the batsman has not scored any sixes
batsmenStats2016 = batsmenStats2016.join(sixes, ['batsman'], how = 'left')
batsmenStats2016 = batsmenStats2016.na.fill({'sixes':0})
batsmenStats2016.show()

In [None]:
#Calculate the number of hundreds and number of fifties for a batsman
from pyspark.sql.functions import max
highestScore2016 = batsmenDF2016.groupBy('batsman', 'matchID').agg(sum('batsmanRuns').alias('highestScore')).drop('matchID')
fiftiesDF2016 = highestScore2016.where('highestScore >= 50 AND highestScore <100')
hundredsDF2016 = highestScore2016.where('highestScore >= 100')
fiftiesDF2016 = fiftiesDF2016.groupBy('batsman').agg(count('highestScore').alias('num_fifties'))
hundredsDF2016 = hundredsDF2016.groupBy('batsman').agg(count('highestScore').alias('num_hundreds'))

In [None]:
#Calculate the highest score for a batsman across seasons and matches
highestScore2016 = highestScore2016.groupBy('batsman').agg(max('highestScore').alias('highestScore'))

In [None]:
#Calculate the strike rate for each batsman and round the strikerate to two decimal places.
calculateStrikeRate = udf(lambda runs, balls : float(runs)/float(balls) * float(100))
batsmenStats2016 = batsmenStats2016.withColumn('strikeRate', calculateStrikeRate(col('batsmanRuns'),col('balls')))
batsmenStats2016 = batsmenStats2016.withColumn('strikeRate', func.round(col('strikeRate'),2))


In [None]:
#Add highest score to batsman Stats
batsmenStats2016 = batsmenStats2016.join(highestScore2016,['batsman'])


In [None]:
#Select batsman, batting average, number of innings and number of not outs
batsmanSelectStats2016 = batsmenDFGrouped2016.select('batsman','battingAverage','num_of_innings','num_not_outs')

In [None]:
#merge the previously selected columns with batsmanstats which is our main dataframe
batsmenStats2016 = batsmenStats2016.join(batsmanSelectStats2016, ['batsman'])


In [None]:
#Get the minimum number of seasons that was calculated previously.
minBatsmanSeasons = batsmenDF2016.select('batsman','batsman_min_seasons').distinct()

In [None]:
#Merge The number of seasons, number of fifties and number of hundreds and replace null values with 0 since the batsman
#has not scored any fifties or hundreds.
batsmenStats2016 = batsmenStats2016.join(minBatsmanSeasons,['batsman'])
batsmenStats2016 = batsmenStats2016.join(fiftiesDF2016, ['batsman'],how ='left')
batsmenStats2016 = batsmenStats2016.join(hundredsDF2016, ['batsman'], how = 'left')
batsmenStats2016 = batsmenStats2016.na.fill({'num_fifties':0})
batsmenStats2016 = batsmenStats2016.na.fill({'num_hundreds':0})

In [None]:
#View the final dataframe and verify that it has all the required values.
batsmenStats2016.show()

In [None]:
#Convert the dataframe to pandas and write the data to CSV, this is the batsmen training data. 
batsmenStats2016 = batsmenStats2016.toPandas()

In [None]:
batsmenStats2016.to_csv('batsmen_training_data.csv')

In [None]:
#Extract Bowlers Data
from pyspark.sql.functions import count
bowlersDF = matchesDF.join(deliveriesDF, matchesDF.id == deliveriesDF.matchID, "inner")
bowlersDF2016 = bowlersDF.where('season !=2017')
bowlersDF2016 = bowlersDF2016.select('matchID','season','bowler','ball','extraRuns','wideRuns','noballRuns','byeRuns','legbyeRuns','totalRuns','playerDismissed', 'dismissalKind','fielder')
bowlersDFLegal2016 = bowlersDF2016.where('noballRuns == 0 AND wideRuns ==0')
bowlersGroupedDF2016 = bowlersDFLegal2016.groupBy('bowler','ball').agg(count('ball').alias('num_each_ball'))

In [None]:
#Calculate the total balls bowled by a bowler. 
from pyspark.sql.functions import sum
bowlersGroupedDF2016 = bowlersGroupedDF2016.groupBy('bowler').agg(sum('num_each_ball').alias('totalBalls'))
bowlersGroupedDF2016.show()

In [None]:
#Calculate the total number of runs conceded by a bowler, this does not inclue the byes and legbyes as they are
#not calculated towards a bowler, also calculate the number of dot balls. 
bowlersLegalRunsDF2016 = bowlersDF2016.where('byeRuns == 0 and legByeRuns ==0')
runsConceded2016 = bowlersLegalRunsDF2016.groupBy('bowler','totalRuns').agg(count('totalRuns').alias('runs_in_different_ways'))
num_dot_balls = runsConceded2016.where("totalRuns ==0")
num_dot_balls = num_dot_balls.drop('totalRuns')
num_dot_balls = num_dot_balls.withColumnRenamed('runs_in_different_ways','dot_balls')
num_dot_balls.show()

In [None]:
#Aggregate the total number of runs for each bowler. 
from pyspark.sql.functions import udf, col,sum
from pyspark.sql.types import IntegerType
calculateTotalRuns = udf(lambda typeOfRuns, runsConceded: typeOfRuns*runsConceded, IntegerType()) 
runsConceded2016 = runsConceded2016.withColumn('total_runs_conceded', calculateTotalRuns(col('totalRuns'),col('runs_in_different_ways')))
runsConceded2016 = runsConceded2016.drop('totalRuns','runs_in_different_ways')
runsConceded2016 = runsConceded2016.groupBy('bowler').agg(sum('total_runs_conceded').alias('total_runs_conceded'))
runsConceded2016.show()

                                        

In [None]:
#Calculate the total number of extras conceded by each bowler. 
extras = bowlersDF2016.where("wideRuns > 0 OR noballRuns >0")
extras = extras.groupBy('bowler').agg(sum('wideRuns').alias('num_wides'), sum('noballRuns').alias('num_noballs'))
extras = extras.withColumn('num_wides', col('num_wides').cast(IntegerType()))
extras = extras.withColumn('num_noballs', col('num_noballs').cast(IntegerType()))
extras.show()


In [None]:
#Calculate the total number of wickets taken by each bowler in 2016, the run outs are not counted towards wickets. 
wickets2016 = bowlersDF2016.where("dismissalKind!='' AND dismissalKind!='run out'" )
wickets2016 = wickets2016.select('bowler','dismissalKind','matchID')
totalBowlerWickets2016 = wickets2016.groupBy('bowler').count()
totalBowlerWickets2016 = totalBowlerWickets2016.withColumnRenamed('count', 'totalWickets')
bestBowling2016 = wickets2016.groupBy('bowler','matchID').count()
bestBowling2016 = bestBowling2016.drop('matchID')

In [None]:
#Calculate the best bowling for each bolwer. 
from pyspark.sql.functions import max
bestBowling2016 = bestBowling2016.groupBy('bowler').agg(max('count').alias('bestBowlingWickets'))
bestBowling2016.show()

In [None]:
bowlingAverage2016 = runsConceded2016.join(totalBowlerWickets2016,['bowler'], how='left')

In [None]:
#Replace any null values with 0 as this means that the bowler has not taken any wickets. 
bowlingAverage2016 = bowlingAverage2016.na.fill({'totalWickets':0})


In [None]:
#Calculate the bowling average for each bowler
import pyspark.sql.functions as func
calculateBowlingAverage = udf(lambda runsConceded, totalWickets: 0 if totalWickets == 0 else float(runsConceded)/float(totalWickets))
bowlingAverage2016= bowlingAverage2016.withColumn('bowlingAverage', calculateBowlingAverage(col('total_runs_conceded'),col('totalWickets')))
bowlingAverage2016 = bowlingAverage2016.withColumn('bowlingAverage', func.round(col('bowlingAverage'),2))

In [None]:
bowlingAverage2016.show()

In [None]:
#Calculate the economy rate for each bowler. 
bowlerEconomy2016 = runsConceded2016.join(bowlersGroupedDF2016, ['bowler'], how='left')
calculateOvers = udf(lambda totalBalls: float(totalBalls/6))
bowlerEconomy2016 = bowlerEconomy2016.withColumn('num_overs', calculateOvers(col('totalBalls')))
bowlerEconomy2016 = bowlerEconomy2016.withColumn('num_overs', func.round(col('num_overs'),1))
calculateEconomy = udf(lambda totalRuns, numOvers: 0 if numOvers == 0 else float(totalRuns)/float(numOvers))
bowlerEconomy2016 = bowlerEconomy2016.withColumn('bowlerEconomy', calculateEconomy(col('total_runs_conceded'),col('num_overs')))
bowlerEconomy2016 = bowlerEconomy2016.withColumn('bowlerEconomy', func.round(col('bowlerEconomy'),2))
bowlerEconomy2016.show()

In [None]:
bowlerStrikeRate2016 = bowlersGroupedDF2016.join(totalBowlerWickets2016, ['bowler'], how='left')
bowlerStrikeRate2016.show()

In [None]:
#Calculate the bowling strike rate for each bowler. 
bowlerStrikeRate2016 = bowlerStrikeRate2016.na.fill({'totalWickets':0})
calculateBowlerStrikeRate = udf(lambda totalBalls, totalWickets: 0 if totalWickets ==0 else float(totalBalls)/float(totalWickets))
bowlerStrikeRate2016 = bowlerStrikeRate2016.withColumn('bowlingStrikeRate', calculateBowlerStrikeRate(col('totalBalls'), col('totalWickets')))
bowlerStrikeRate2016 = bowlerStrikeRate2016.withColumn('bowlingStrikeRate', func.round(col('bowlingStrikeRate'),2))
bowlerStrikeRate2016.show()

In [None]:
#Calculate the number of seasons each bolwer has featured in, this variable will be 1 if the number of seasons is greater than 2 else 0.
seasonsDF2 = bowlersDF2016.select('bowler','season').distinct().groupBy('bowler').agg(count('season').alias('num_seasons'))
seasonsFunction = udf(lambda numSeasons: 1 if numSeasons>=3 else 0, IntegerType())
seasonsDF2 = seasonsDF2.select('bowler','num_seasons', seasonsFunction(col('num_seasons')).alias('bowler_min_seasons'))


In [None]:
#Clean the dataframe and merge all the calculated fields into a single data frame.
seasonsDF2 = seasonsDF2.drop('num_seasons')
seasonsDF2.show()

In [None]:
bowlerStats2016DF = bowlersGroupedDF2016.join(runsConceded2016, ['bowler'], how = 'left')


In [None]:
bowlerStats2016DF = bowlerStats2016DF.join(totalBowlerWickets2016, ['bowler'], how = 'left')
bowlerStats2016DF.columns

In [None]:
bowlerStats2016DF = bowlerStats2016DF.na.fill({'totalWickets':0})

In [None]:
bowlerEconomy2016DF = bowlerEconomy2016.select('bowler','bowlerEconomy')
bowlerStats2016DF = bowlerStats2016DF.join(bowlerEconomy2016DF, ['bowler'], how = 'left')
bowlerStats2016DF.columns

In [None]:
bowlingAverage2016DF = bowlingAverage2016.select('bowler','bowlingAverage')
bowlerStats2016DF = bowlerStats2016DF.join(bowlingAverage2016DF, ['bowler'], how = 'left')
bowlerStats2016DF.columns

In [None]:
bowlerStrikeRate2016DF = bowlerStrikeRate2016.select('bowler','bowlingStrikeRate')
bowlerStats2016DF = bowlerStats2016DF.join(bowlerStrikeRate2016DF, ['bowler'], how='left')
bowlerStats2016DF.columns

In [None]:
bowlerStats2016DF = bowlerStats2016DF.join(bestBowling2016, ['bowler'], how = 'left')



In [None]:
bowlerStats2016DF = bowlerStats2016DF.na.fill({'bestBowlingWickets':0})
bowlerStats2016DF = bowlerStats2016DF.join(num_dot_balls, ['bowler'], how = 'left') 


In [None]:
bowlerStats2016DF = bowlerStats2016DF.na.fill({'dot_balls':0})
bowlerStats2016DF = bowlerStats2016DF.join(extras, ['bowler'], how = 'left')


In [None]:
bowlerStats2016DF = bowlerStats2016DF.na.fill({'num_wides':0})
bowlerStats2016DF = bowlerStats2016DF.na.fill({'num_noballs':0})
bowlerStats2016DF = bowlerStats2016DF.join(seasonsDF2, ['bowler'], how = 'left')
bowlerStats2016DF.show()

In [None]:
#Convert the dataframe into pandas dataframe and write the data to CSV, we have our bowlers training data ready. 
bowlerStats2016DFPandas = bowlerStats2016DF.toPandas()
bowlerStats2016DFPandas.to_csv('bowlers_training_data.csv')

In [None]:
#Batsmen Test Data Extraction, logic remains same as above except the fact that 2017 data is included in the test

batsmanDFDismissed = batsmenDF.where("playerDismissed !=''").collect()
batsmanDFDismissed = sqlContext.createDataFrame(batsmanDFDismissed)
batsmanDFDismissed = batsmanDFDismissed.select('matchID', 'inning',col('playerDismissed').alias('batsman'),'dismissalKind','fielder')

In [None]:
from pyspark.sql.functions import sum
batsmenDFGrouped = batsmenDF.groupBy('matchID','inning','batsman').agg(sum('batsmanRuns').alias('batsmanRuns'))

In [None]:
batsmenDFGrouped = batsmenDFGrouped.join(batsmanDFDismissed,['matchID','inning','batsman'], how="left")

In [None]:
replaceFunction = udf(lambda fielder: "-" if (fielder==None or fielder=='') else fielder)
replaceDismissal = udf(lambda dismissal: "not-out" if dismissal == None else dismissal)
batsmenDFGrouped = batsmenDFGrouped.withColumn('fielder',replaceFunction(col('fielder')))
batsmenDFGrouped = batsmenDFGrouped.withColumn('dismissalKind',replaceDismissal(col('dismissalKind')))

In [None]:
num_of_innings = batsmenDFGrouped.groupBy('inning','batsman').count()
num_of_innings=num_of_innings.groupBy('batsman').agg(sum('count').alias('num_of_innings'))

In [None]:
batsmanDismissalDF = batsmenDFGrouped.select('dismissalKind','batsman')
batsmanDismissalDF = batsmanDismissalDF.groupBy('batsman','dismissalKind').count()
batsmanDismissalDF = batsmanDismissalDF.where("dismissalKind=='not-out'")
batsmanDismissalDF = batsmanDismissalDF.drop('dismissalKind')
batsmanDismissalDF = batsmanDismissalDF.withColumnRenamed('count','num_not_outs')

In [None]:
from pyspark.sql.types import DecimalType
import pyspark.sql.functions as func
batsmenDFGrouped = batsmenDFGrouped.groupBy('batsman').agg(sum('batsmanRuns').alias('batsmanRuns'))
batsmenDFGrouped = batsmenDFGrouped.join(num_of_innings, ['batsman'])
batsmenDFGrouped = batsmenDFGrouped.join(batsmanDismissalDF, ['batsman'])
calculateBattingAverage = udf(lambda batsmanRuns, numInnings, numNotOuts: float(batsmanRuns) / float(numInnings-numNotOuts) if (numInnings-numNotOuts) != 0 else float(0))
batsmenDFGrouped = batsmenDFGrouped.withColumn('battingAverage', calculateBattingAverage(col('batsmanRuns'), col('num_of_innings'), col('num_not_outs')))



In [None]:
import pyspark.sql.functions as func
batsmenDFGrouped = batsmenDFGrouped.withColumn('battingAverage', func.round(col('battingAverage'),2))

In [None]:
balls = batsmenDF.groupBy('batsman').agg(count('ball').alias('balls'))
runs = batsmenDF.groupBy('batsman').agg(sum('batsmanRuns').alias('batsmanRuns'))
batsmenStats = balls.join(runs, ['batsman'])

In [None]:
fours = batsmenDF.where('batsmanRuns == 4')
fours = fours.groupBy('batsman').agg(count('batsmanRuns').alias('fours'))

In [None]:
batsmenStats = batsmenStats.join(fours, ['batsman'], how = 'left')
batsmenStats = batsmenStats.na.fill({'fours':0})
batsmenStats.show()

In [None]:
sixes = batsmenDF.where('batsmanRuns == 6')
sixes = sixes.groupBy('batsman').agg(count('batsmanRuns').alias('sixes'))

In [None]:
batsmenStats = batsmenStats.join(sixes, ['batsman'], how = 'left')
batsmenStats = batsmenStats.na.fill({'sixes':0})


In [None]:
batsmenStats.where("batsman == 'CH Gayle'").show()

In [None]:
from pyspark.sql.functions import max
highestScore = batsmenDF.groupBy('batsman', 'matchID').agg(sum('batsmanRuns').alias('highestScore')).drop('matchID')
fiftiesDF = highestScore.where('highestScore >= 50 AND highestScore <100')
hundredsDF = highestScore.where('highestScore >= 100')
fiftiesDF = fiftiesDF.groupBy('batsman').agg(count('highestScore').alias('num_fifties'))
hundredsDF = hundredsDF.groupBy('batsman').agg(count('highestScore').alias('num_hundreds'))

In [None]:
highestScore = highestScore.groupBy('batsman').agg(max('highestScore').alias('highestScore'))

In [None]:
calculateStrikeRate = udf(lambda runs, balls : float(runs)/float(balls) * float(100))
batsmenStats = batsmenStats.withColumn('strikeRate', calculateStrikeRate(col('batsmanRuns'),col('balls')))
batsmenStats = batsmenStats.withColumn('strikeRate', func.round(col('strikeRate'),2))



In [None]:
batsmenStats = batsmenStats.join(highestScore,['batsman'])



In [None]:
batsmanSelectStats = batsmenDFGrouped.select('batsman','battingAverage','num_of_innings','num_not_outs')

In [None]:
batsmenStats = batsmenStats.join(batsmanSelectStats, ['batsman'])

In [None]:
minBatsmanSeasons = batsmenDF.select('batsman','batsman_min_seasons').distinct()

In [None]:
batsmenStats = batsmenStats.join(minBatsmanSeasons,['batsman'])
batsmenStats = batsmenStats.join(fiftiesDF, ['batsman'],how ='left')
batsmenStats = batsmenStats.join(hundredsDF, ['batsman'], how = 'left')
batsmenStats = batsmenStats.na.fill({'num_fifties':0})
batsmenStats = batsmenStats.na.fill({'num_hundreds':0})

In [None]:
batsmenStatsPandas =  batsmenStats.toPandas()

In [None]:
batsmenStatsPandas.to_csv('batsmen_test_data.csv')

In [None]:
#Bowlers Test Data Extraction, the logic remains the same except that 2017 data is included in the test dataset
from pyspark.sql.functions import count
bowlersDF = matchesDF.join(deliveriesDF, matchesDF.id == deliveriesDF.matchID, "inner")
bowlersDF = bowlersDF.select('matchID','season','bowler','ball','extraRuns','wideRuns','noballRuns','byeRuns','legbyeRuns','totalRuns','playerDismissed', 'dismissalKind','fielder')
bowlersDFLegal = bowlersDF.where('noballRuns == 0 AND wideRuns ==0')
bowlersGroupedDF = bowlersDFLegal.groupBy('bowler','ball').agg(count('ball').alias('num_each_ball'))


In [None]:
from pyspark.sql.functions import sum
bowlersGroupedDF = bowlersGroupedDF.groupBy('bowler').agg(sum('num_each_ball').alias('totalBalls'))
bowlersGroupedDF.show()

In [None]:
bowlersLegalRunsDF = bowlersDF.where('byeRuns == 0 and legByeRuns ==0')
runsConceded = bowlersLegalRunsDF.groupBy('bowler','totalRuns').agg(count('totalRuns').alias('runs_in_different_ways'))
num_dot_balls = runsConceded.where("totalRuns ==0")
num_dot_balls = num_dot_balls.drop('totalRuns')
num_dot_balls = num_dot_balls.withColumnRenamed('runs_in_different_ways','dot_balls')
num_dot_balls.show()

In [None]:
from pyspark.sql.functions import udf, col,sum
from pyspark.sql.types import IntegerType
calculateTotalRuns = udf(lambda typeOfRuns, runsConceded: typeOfRuns*runsConceded, IntegerType()) 
runsConceded = runsConceded.withColumn('total_runs_conceded', calculateTotalRuns(col('totalRuns'),col('runs_in_different_ways')))
runsConceded = runsConceded.drop('totalRuns','runs_in_different_ways')
runsConceded = runsConceded.groupBy('bowler').agg(sum('total_runs_conceded').alias('total_runs_conceded'))
runsConceded.show()

In [None]:
extras = bowlersDF.where("wideRuns > 0 OR noballRuns >0")
extras = extras.groupBy('bowler').agg(sum('wideRuns').alias('num_wides'), sum('noballRuns').alias('num_noballs'))
extras = extras.withColumn('num_wides', col('num_wides').cast(IntegerType()))
extras = extras.withColumn('num_noballs', col('num_noballs').cast(IntegerType()))
extras.show()



In [None]:
wickets = bowlersDF.where("dismissalKind!='' AND dismissalKind!='run out'" )
wicket = wickets.select('bowler','dismissalKind','matchID')
totalBowlerWickets = wickets.groupBy('bowler').count()
totalBowlerWickets = totalBowlerWickets.withColumnRenamed('count', 'totalWickets')
bestBowling = wickets.groupBy('bowler','matchID').count()
bestBowling = bestBowling.drop('matchID')

In [None]:
from pyspark.sql.functions import max
bestBowling = bestBowling.groupBy('bowler').agg(max('count').alias('bestBowlingWickets'))
bestBowling.show()

In [None]:
bowlingAverage = runsConceded.join(totalBowlerWickets,['bowler'], how='left')

In [None]:
bowlingAverage = bowlingAverage.na.fill({'totalWickets':0})

In [None]:
import pyspark.sql.functions as func
calculateBowlingAverage = udf(lambda runsConceded, totalWickets: 0 if totalWickets == 0 else float(runsConceded)/float(totalWickets))
bowlingAverage= bowlingAverage.withColumn('bowlingAverage', calculateBowlingAverage(col('total_runs_conceded'),col('totalWickets')))
bowlingAverage = bowlingAverage.withColumn('bowlingAverage', func.round(col('bowlingAverage'),2))

In [None]:
bowlingAverage.show()

In [None]:
bowlerEconomy = runsConceded.join(bowlersGroupedDF, ['bowler'], how='left')
calculateOvers = udf(lambda totalBalls: float(totalBalls/6))
bowlerEconomy = bowlerEconomy.withColumn('num_overs', calculateOvers(col('totalBalls')))
bowlerEconomy = bowlerEconomy.withColumn('num_overs', func.round(col('num_overs'),1))
calculateEconomy = udf(lambda totalRuns, numOvers: 0 if numOvers == 0 else float(totalRuns)/float(numOvers))
bowlerEconomy = bowlerEconomy.withColumn('bowlerEconomy', calculateEconomy(col('total_runs_conceded'),col('num_overs')))
bowlerEconomy = bowlerEconomy.withColumn('bowlerEconomy', func.round(col('bowlerEconomy'),2))
bowlerEconomy.show()

In [None]:
bowlerStrikeRate = bowlersGroupedDF.join(totalBowlerWickets, ['bowler'], how='left')
bowlerStrikeRate.show()

In [None]:
bowlerStrikeRate = bowlerStrikeRate.na.fill({'totalWickets':0})
calculateBowlerStrikeRate = udf(lambda totalBalls, totalWickets: 0 if totalWickets ==0 else float(totalBalls)/float(totalWickets))
bowlerStrikeRate = bowlerStrikeRate.withColumn('bowlingStrikeRate', calculateBowlerStrikeRate(col('totalBalls'), col('totalWickets')))
bowlerStrikeRate = bowlerStrikeRate.withColumn('bowlingStrikeRate', func.round(col('bowlingStrikeRate'),2))
bowlerStrikeRate.show()

In [None]:
seasonsDF3 = bowlersDF.select('bowler','season').distinct().groupBy('bowler').agg(count('season').alias('num_seasons'))
seasonsFunction = udf(lambda numSeasons: 1 if numSeasons>=3 else 0, IntegerType())
seasonsDF3 = seasonsDF3.select('bowler','num_seasons', seasonsFunction(col('num_seasons')).alias('bowler_min_seasons'))



In [None]:
seasonsDF3 = seasonsDF3.drop('num_seasons')
seasonsDF3.show()

In [None]:
bowlerStatsDF = bowlersGroupedDF.join(runsConceded, ['bowler'], how = 'left')



In [None]:
bowlerStatsDF = bowlerStatsDF.join(totalBowlerWickets, ['bowler'], how = 'left')
bowlerStatsDF.columns

In [None]:
bowlerStatsDF = bowlerStatsDF.na.fill({'totalWickets':0})

In [None]:
bowlerEconomyDF = bowlerEconomy.select('bowler','bowlerEconomy')
bowlerStatsDF = bowlerStatsDF.join(bowlerEconomyDF, ['bowler'], how = 'left')
bowlerStatsDF.columns

In [None]:
bowlingAverageDF = bowlingAverage.select('bowler','bowlingAverage')
bowlerStatsDF = bowlerStatsDF.join(bowlingAverageDF, ['bowler'], how = 'left')
bowlerStatsDF.columns

In [None]:
bowlerStrikeRateDF = bowlerStrikeRate.select('bowler','bowlingStrikeRate')
bowlerStatsDF = bowlerStatsDF.join(bowlerStrikeRateDF, ['bowler'], how='left')
bowlerStatsDF.columns

In [None]:
bowlerStatsDF = bowlerStatsDF.join(bestBowling, ['bowler'], how = 'left')

In [None]:
bowlerStatsDF = bowlerStatsDF.na.fill({'bestBowlingWickets':0})
bowlerStatsDF = bowlerStatsDF.join(num_dot_balls, ['bowler'], how = 'left') 

In [None]:
bowlerStatsDF = bowlerStatsDF.na.fill({'dot_balls':0})
bowlerStatsDF = bowlerStatsDF.join(extras, ['bowler'], how = 'left')

In [None]:
bowlerStatsDF = bowlerStatsDF.na.fill({'num_wides':0})
bowlerStatsDF = bowlerStatsDF.na.fill({'num_noballs':0})
bowlerStatsDF = bowlerStatsDF.join(seasonsDF3, ['bowler'], how = 'left')
bowlerStatsDF.show()

In [None]:
bowlerStatsDFPandas = bowlerStatsDF.toPandas()
bowlerStatsDFPandas.to_csv('bowlers_test_data.csv')

In [None]:
matchesDF.printSchema()