In [0]:
spark

In [0]:
#Importing Libraries

from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, DecimalType, BooleanType, DateType
from pyspark.sql.functions import col, when, sum, avg, row_number
from pyspark.sql.window import Window


In [0]:
#creating SparkSession

spark = SparkSession.builder.appName('Ipl_Analysis').getOrCreate()


In [0]:
#importing Dataset Ball by ball
ball_by_ball_schema = StructType([
  StructField('MatcH_id', IntegerType(), True),
  StructField("over_id", IntegerType(), True),
  StructField("ball_id", IntegerType(), True),
  StructField("innings_no", IntegerType(), True),
  StructField("team_batting", StringType(), True),
  StructField("team_bowling", StringType(), True),
  StructField("striker_batting_position", IntegerType(), True),
  StructField("extra_type", StringType(), True),
  StructField("runs_scored", IntegerType(), True),
  StructField("extra_runs", IntegerType(), True),
  StructField("wides", IntegerType(), True),
  StructField("legbyes", IntegerType(), True),
  StructField("byes", IntegerType(), True),
  StructField("noballs", IntegerType(), True),
  StructField("penalty", IntegerType(), True),
  StructField("bowler_extras", IntegerType(), True),
  StructField("out_type", StringType(), True),
  StructField("caught", BooleanType()),
  StructField("bowled", BooleanType()),
  StructField("run_out", BooleanType()),
  StructField("lbw", BooleanType()),
  StructField("retired_hurt", BooleanType()),
  StructField("stumped", BooleanType(), True),
  StructField("caught_and_bowled", BooleanType(), True),
  StructField("hit_wicket", BooleanType(), True),
  StructField("obstructingfeild", BooleanType(), True),
  StructField("bowler_wicket", BooleanType(), True),
  StructField("match_date", DateType(), True),
  StructField("season", IntegerType(), True),
  StructField("striker", IntegerType(), True),
  StructField("non_striker", IntegerType(), True),
  StructField("bowler", IntegerType(), True),
  StructField("player_out", IntegerType(), True),
  StructField("fielders", IntegerType(), True),
  StructField("striker_match_sk", IntegerType(), True),
  StructField("strikersk", StringType(), True),
  StructField("nonstriker_match_sk", IntegerType(), True),
  StructField("nonstriker_sk", IntegerType(), True),
  StructField("fielder_match_sk", IntegerType(), True),
  StructField("fielder_sk", IntegerType(), True),
  StructField("bowler_match_sk", IntegerType(), True),
  StructField("bowler_sk", IntegerType(), True),
  StructField("playerout_match_sk", IntegerType(), True),
  StructField("battingteam_sk", IntegerType(), True),
  StructField("bowlingteam_sk", IntegerType(), True),
  StructField("keeper_catch", BooleanType(), True),
  StructField("player_out_sk", IntegerType(), True),
  StructField("matchdatesk", DateType(), True),
])


ball_by_ball_df = spark.read.schema(ball_by_ball_schema).format('csv').option('header','true').load('s3://ipl-data-analysis-project/Ball_By_Ball.csv')


In [0]:
#importing DataSet matches

match_schema = StructType([
    StructField("match_sk", IntegerType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("team1", StringType(), True),
    StructField("team2", StringType(), True),
    StructField("match_date", DateType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("venue_name", StringType(), True),
    StructField("city_name", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("toss_winner", StringType(), True),
    StructField("match_winner", StringType(), True),
    StructField("toss_name", StringType(), True),
    StructField("win_type", StringType(), True),
    StructField("outcome_type", StringType(), True),
    StructField("manofmach", StringType(), True),
    StructField("win_margin", IntegerType(), True),
    StructField("country_id", IntegerType(), True)
])
match_df = spark.read.schema(match_schema).format("csv").option("header","true").load("/FileStore/tables/Match.csv")


In [0]:
#importing Dataset Players

Players_schema = StructType([
  StructField("player_sk", IntegerType(), True),
  StructField("player_id", IntegerType(), True),
  StructField("player_name", StringType(), True),
  StructField("dob", DateType(), True),
  StructField("batting_hand", StringType(), True),
  StructField("bowling_skill", StringType(), True),
  StructField("country_name", StringType(), True),
])

Players_df = spark.read.schema(Players_schema).format('csv').option('header','True').load("s3://ipl-data-analysis-project/Player.csv")

In [0]:
#importing dataset Teams

Teams_schema = StructType([
    StructField('team_sk', IntegerType(), True),
    StructField('team_id', IntegerType(), True),
    StructField('team_name', StringType(), True)
])

Teams_df = spark.read.schema(Teams_schema).format('csv').option('Header','True').load('s3://ipl-data-analysis-project/Team.csv')

In [0]:
#importing Dataset player_match

Player_match_schema = StructType([
  StructField("player_match_sk", IntegerType(), True),
  StructField("playermatch_key", DecimalType(10, 0), True),
  StructField("match_id", IntegerType(), True),
  StructField("player_id", IntegerType(), True),
  StructField("player_name", StringType(), True),
  StructField("dob", DateType(), True),
  StructField("batting_hand", StringType(), True),
  StructField("bowling_skill", StringType(), True),
  StructField("country_name", StringType(), True),
  StructField("role_desc", StringType(), True),
  StructField("player_team", StringType(), True),
  StructField("opposit_team", StringType(), True),
  StructField("season_year", IntegerType(), True),
  StructField("is_manofthematch", BooleanType(), True),
  StructField("age_as_on_match", IntegerType(), True),
  StructField("isplayers_team_won", BooleanType(), True),
  StructField("batting_status", StringType(), True),
  StructField("bowling_status", StringType(), True),
  StructField("player_captain", StringType(), True),
  StructField("opposit_captain", StringType(), True),
  StructField("player_keeper", StringType(), True),
  StructField("opposit_keeper", StringType(), True),
])

Player_match_df = spark.read.schema(Player_match_schema).format('csv').option('Header','True').load('s3://ipl-data-analysis-project/Player_match.csv')

In [0]:
#filter to include only valid deliveries (excluding extras like wide and no ball for specific analysis)

ball_by_ball_df = ball_by_ball_df.filter((col('wides') == 0) & (col('noballs') == 0))

# Aggregation: Calculate the total and average runs scored in each match and inning

Total_and_average_runs = ball_by_ball_df.groupBy("match_id", "innings_no").agg(
    sum('runs_scored').alias('Total_runs'),
    avg('runs_scored').alias('Average_runs'))

In [0]:

# Window Function: Calculate running total of runs in each match for each over

window_spec = Window.partitionBy('match_id','innings_no').orderBy('over_id')

ball_by_ball_df = ball_by_ball_df.withColumn('running_total_runs', sum('runs_scored').over(window_spec))


In [0]:
 # Conditional Column: Flag for high impact balls (either a wicket or more than 6 runs including extras)

 ball_by_ball_df = ball_by_ball_df.withColumn('High_Impact', when((col('runs_scored') + col('extra_runs') > 6) | (col('bowler_wicket') == True), True).otherwise(False))

In [0]:
from pyspark.sql.functions import dayofmonth, year, month, when

# Extracting year, month, and day from the match date for more detailed time-based analysis
match_df = match_df.withColumn("year", year("match_date"))
match_df = match_df.withColumn('month', month("match_date"))
match_df = match_df.withColumn('Day', dayofmonth("match_date"))

# High margin win: categorizing win margins into 'high', 'medium', and 'low'

match_df = match_df.withColumn("Win_margin_category", when((col("Win_Margin") < 50) & ((col("Win_Type") == 'run') | (col("Win_Type") == 'runs')),'Low') \
                               .when((col("Win_margin") >= 50) & (col("Win_margin") <= 100),'Medium').when((col("win_margin") >= 1) & (col("win_margin") <= 3) & (col("Win_type") == 'wickets'),'Low') \
                               .when((col("Win_margin") > 3) & (col("Win_margin") < 7) & (col("Win_Type") == 'wickets'),"Medium") \
                               .when(col("win_margin") > 100, 'High') \
                               .when((col("Win_margin") >= 7) & (col("Win_margin") <= 10) & (col("Win_Type") == 'wickets'),"High") \
                               .otherwise('NA'))
 


# Analyze the impact of the toss: who wins the toss and the match

match_df = match_df.withColumn("Toss_match_winner", when(col("toss_winner") == col("match_winner"), "Yes").otherwise("No"))


In [0]:
from pyspark.sql.functions import regexp_replace, lower

# Normalize and clean player names
Players_df = Players_df.withColumn("player_name", lower(regexp_replace("player_name","[^a-zA-Z0-9 ]", "")))

## Handle missing values in 'batting_hand' and 'bowling_skill' with a default 'unknown'
Players_df = Players_df.na.fill({"batting_hand":"unknown", "bowling_skill":"unknown"})


# Categorizing players based on batting hand
Players_df = Players_df.withColumn("Batting_style", when(col("batting_hand").contains("Left"),'Left-Handed').otherwise("Right-Handed"))




In [0]:
from pyspark.sql.functions import col, when, expr, current_date

# Add a 'veteran_status' column based on player age
Player_match_df = Player_match_df.withColumn("Veterran_Status", when(col("Age_as_on_match") >= 35,"Veteran").otherwise("Non-Veteran"))

## Dynamic column to calculate years since debut

Player_match_df = Player_match_df.withColumn("Years_since_debut", (2017 - col("season_year")))


In [0]:
#creating tempviews from dfs:

ball_by_ball_df.createOrReplaceTempView("ball_by_ball")
match_df.createOrReplaceTempView("matches")
Players_df.createOrReplaceTempView("player")
Player_match_df.createOrReplaceTempView("player_match")
Teams_df.createOrReplaceTempView("team")

In [0]:
display(Player_match_df.limit(20))

In [0]:


Top_scoring_batsmen_per_season = spark.sql("""
select t.season_year,t.player_name, t.Total_runs as Highest_total_Runs from
(select pm.player_name, m.season_year, sum(b.runs_scored) as Total_runs,dense_rank() over(partition by m.season_year order by sum(b.runs_scored)  desc) as rnk from ball_by_ball b
join matches m on b.match_id = m.match_id
join player_match pm on m.match_id = pm.match_id and b.striker = pm.player_id
group by pm.player_name, m.season_year
order by m.season_year, Total_runs desc) t
where rnk = 1""")

Top_scoring_batsmen_per_season.show(10)


+-----------+------------+------------------+
|season_year| player_name|Highest_total_Runs|
+-----------+------------+------------------+
|       2008|    SE Marsh|               614|
|       2009|   ML Hayden|               571|
|       2010|SR Tendulkar|               615|
|       2011|    CH Gayle|               598|
|       2012|    CH Gayle|               720|
|       2013|  MEK Hussey|               732|
|       2014|  RV Uthappa|               659|
|       2015|   DA Warner|               554|
|       2016|     V Kohli|               962|
|       2017|   DA Warner|               634|
+-----------+------------+------------------+



In [0]:
display(Top_scoring_batsmen_per_season)

season_year,player_name,Highest_total_Runs
2008,SE Marsh,614
2009,ML Hayden,571
2010,SR Tendulkar,615
2011,CH Gayle,598
2012,CH Gayle,720
2013,MEK Hussey,732
2014,RV Uthappa,659
2015,DA Warner,554
2016,V Kohli,962
2017,DA Warner,634


Databricks visualization. Run in Databricks to view.

In [0]:
economical_bowlers_powerplay = spark.sql("""
select pm.player_name, AVG(b.runs_scored) as Avg_runs_per_ball, count(b.player_out) as Total_wickets
from ball_by_ball b 
join player_match pm on b.match_id = pm.match_id and b.bowler = pm.player_id
where b.over_id < 6
group by pm.player_name
order by Avg_runs_per_ball, Total_wickets desc""")






player_name,Avg_runs_per_ball,Total_wickets
SM Harwood,0.1666666666666666,0
AJ Tye,0.3333333333333333,2
Ankit Soni,0.5,1
Avesh Khan,0.5,1
GR Napier,0.5,0
AJ Finch,0.5,0
A Zampa,0.5,0
Mohammad Nabi,0.5833333333333334,1
NB Singh,0.5833333333333334,1
Rashid Khan,0.6666666666666666,2


In [0]:
display(economical_bowlers_powerplay.limit(10))

player_name,Avg_runs_per_ball,Total_wickets
SM Harwood,0.1666666666666666,0
AJ Tye,0.3333333333333333,2
Ankit Soni,0.5,1
Avesh Khan,0.5,1
GR Napier,0.5,0
AJ Finch,0.5,0
A Zampa,0.5,0
Mohammad Nabi,0.5833333333333334,1
NB Singh,0.5833333333333334,1
Rashid Khan,0.6666666666666666,2


Databricks visualization. Run in Databricks to view.

In [0]:

toss_impact_individual_matches = spark.sql("""
SELECT m.match_id, m.toss_winner, m.toss_name, m.match_winner,
       CASE WHEN m.toss_winner = m.match_winner THEN 'Won' ELSE 'Lost' END AS match_outcome
FROM match m
WHERE m.toss_name IS NOT NULL and m.match_winner not in ('NULL','tied','abandoned')
ORDER BY m.match_id
""")


In [0]:
display(toss_impact_individual_matches)

match_id,toss_winner,toss_name,match_winner,match_outcome
335987,Royal Challengers Bangalore,field,Kolkata Knight Riders,Lost
335988,Chennai Super Kings,bat,Chennai Super Kings,Won
335989,Rajasthan Royals,bat,Delhi Daredevils,Lost
335990,Mumbai Indians,bat,Royal Challengers Bangalore,Lost
335991,Deccan Chargers,bat,Kolkata Knight Riders,Lost
335992,Kings XI Punjab,bat,Rajasthan Royals,Lost
335993,Deccan Chargers,bat,Delhi Daredevils,Lost
335994,Mumbai Indians,field,Chennai Super Kings,Lost
335995,Rajasthan Royals,field,Rajasthan Royals,Won
335996,Mumbai Indians,field,Kings XI Punjab,Lost


Databricks visualization. Run in Databricks to view.

In [0]:
average_runs_in_wins = spark.sql("""
SELECT pm.player_name, AVG(b.runs_scored) AS avg_runs_in_wins, COUNT(*) AS innings_played
FROM ball_by_ball b
JOIN player_match pm ON b.match_id = pm.match_id AND b.striker = pm.player_id
JOIN match m ON pm.match_id = m.match_id
WHERE m.match_winner = pm.player_team
GROUP BY pm.player_name
ORDER BY avg_runs_in_wins desc
""")

In [0]:
display(average_runs_in_wins.limit(10))

player_name,avg_runs_in_wins,innings_played
Rashid Khan,6.0,1
Shahid Afridi,3.333333333333333,6
AN Ahmed,3.0,3
BA Bhatt,3.0,2
SN Khan,2.5789473684210527,19
SP Jackson,2.4,5
Harmeet Singh,2.333333333333333,6
Umar Gul,2.3076923076923075,13
SM Pollock,2.2,15
MS Gony,2.1379310344827585,29


Databricks visualization. Run in Databricks to view.

In [0]:
scores_by_venues = spark.sql("""
select t.venue_name,
round(avg(t.Total_runs),3) as Average_Runs,max(t.total_runs) as Highest_Total
from (
select b.match_id,m.venue_name,sum(b.runs_scored) as Total_runs
from ball_by_ball b
join matches m on m.match_id = b.match_id
group by b.match_id,m.venue_name
order by 1,2) t

group by 1
order by 2 desc""")


In [0]:
display(scores_by_venues)

venue_name,Average_Runs,Highest_Total
Brabourne Stadium,325.818,399
Saurashtra Cricket Association Stadium,319.1,393
"Punjab Cricket Association IS Bindra Stadium, Mohali",317.429,341
Barabati Stadium,311.571,404
Green Park,308.0,378
Maharashtra Cricket Association Stadium,302.067,377
Rajiv Gandhi International Stadium Uppal,302.0,365
Himachal Pradesh Cricket Association Stadium,301.111,371
"MA Chidambaram Stadium, Chepauk",299.729,447
"Sardar Patel Stadium, Motera",298.0,377


Databricks visualization. Run in Databricks to view.

In [0]:
dismissal_types = spark.sql("""
select out_type, count(1) as Frequency
from ball_by_ball
where out_type not in ('Not Applicable')
group by out_type""")


In [0]:
display(dismissal_types)

out_type,Frequency
stumped,222
hit wicket,9
bowled,1382
lbw,455
caught and bowled,211
Keeper Catch,695
retired hurt,9
caught,3678
run out,744
obstructing the field,1


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
select team1 as Team, SUM(case when toss_winner = match_winner then 1 else 0 end) as win_after_toss, count(*) as matches_won, round((win_after_toss/matches_won),3) as Win_prob_after_tosswin
from matches 
where toss_winner = team1
group by Team
order by Team;


Team,win_after_toss,matches_won,Win_prob_after_tosswin
Chennai Super Kings,27,37,0.73
Deccan Chargers,6,22,0.273
Delhi Daredevils,17,36,0.472
Gujarat Lions,3,7,0.429
Kings XI Punjab,14,32,0.438
Kochi Tuskers Kerala,2,4,0.5
Kolkata Knight Riders,24,36,0.667
Mumbai Indians,26,42,0.619
Pune Warriors,2,10,0.2
Rajasthan Royals,16,25,0.64


Databricks visualization. Run in Databricks to view.

In [0]:
%sql
select team1,count(team1) from matches
group by team1
order by 2 desc;

team1,count(team1)
Royal Challengers Bangalore,85
Delhi Daredevils,76
Chennai Super Kings,74
Kings XI Punjab,72
Mumbai Indians,72
Kolkata Knight Riders,69
Rajasthan Royals,52
Deccan Chargers,39
Sunrisers Hyderabad,38
Pune Warriors,23


In [0]:
most_wicket_each_season = spark.sql("""
select t.player_name,t.season_year,t.total_wickets as No_of_wicket from
(select p.player_name,p.season_year,sum(case when b.out_type = 'Not Applicable' then 0 else 1 end) as total_wickets,dense_rank() over(partition by p.season_year order by sum(case when b.out_type = 'Not Applicable' then 0 else 1 end) desc) as rnk
from ball_by_ball b 
join player_match p on p.player_id = b.bowler and p.match_id = b.match_id
group by 1,2
order by 2,total_wickets desc) t
where rnk = 1
order by 2""")

In [0]:
display(most_wicket_each_season)


player_name,season_year,No_of_wicket
Sohail Tanvir,2008,24
RP Singh,2009,26
PP Ojha,2010,22
SL Malinga,2011,30
M Morkel,2012,30
DJ Bravo,2013,34
MM Sharma,2014,26
DJ Bravo,2015,27
B Kumar,2016,24
B Kumar,2017,28


Databricks visualization. Run in Databricks to view.