In [0]:
from pyspark.sql import SparkSession

In [0]:
from pyspark.sql.types import StructField,StructType,IntegerType,StringType,BooleanType,DateType,DecimalType
from pyspark.sql.functions import col,when,sum,avg,row_number
from pyspark.sql.window import Window

In [0]:
spark=SparkSession.builder.appName("ipl_neeraj").getOrCreate()

In [0]:
team_schema = "team_sk integer, team_id integer, team_name string"

#or 
"""
team_schema = StructType([
    StructField("team_sk", IntegerType(), True),
    StructField("team_id", IntegerType(), True),
    StructField("team_name", StringType(), True)
])
"""

team_df = spark.read.schema(team_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Team.csv")

In [0]:
ball_by_ball_schema = StructType([
    StructField("match_id", IntegerType(), True), #True means can have NULL values
    StructField("over_id", IntegerType(), True),
    StructField("ball_id", IntegerType(), True),
    StructField("innings_no", IntegerType(), True),
    StructField("team_batting", StringType(), True),
    StructField("team_bowling", StringType(), True),
    StructField("striker_batting_position", IntegerType(), True),
    StructField("extra_type", StringType(), True),
    StructField("runs_scored", IntegerType(), True),
    StructField("extra_runs", IntegerType(), True),
    StructField("wides", IntegerType(), True),
    StructField("legbyes", IntegerType(), True),
    StructField("byes", IntegerType(), True),
    StructField("noballs", IntegerType(), True),
    StructField("penalty", IntegerType(), True),
    StructField("bowler_extras", IntegerType(), True),
    StructField("out_type", StringType(), True),
    StructField("caught", BooleanType(), True),
    StructField("bowled", BooleanType(), True),
    StructField("run_out", BooleanType(), True),
    StructField("lbw", BooleanType(), True),
    StructField("retired_hurt", BooleanType(), True),
    StructField("stumped", BooleanType(), True),
    StructField("caught_and_bowled", BooleanType(), True),
    StructField("hit_wicket", BooleanType(), True),
    StructField("obstructingfeild", BooleanType(), True),
    StructField("bowler_wicket", BooleanType(), True),
    StructField("match_date", DateType(), True),
    StructField("season", IntegerType(), True),
    StructField("striker", IntegerType(), True),
    StructField("non_striker", IntegerType(), True),
    StructField("bowler", IntegerType(), True),
    StructField("player_out", IntegerType(), True),
    StructField("fielders", IntegerType(), True),
    StructField("striker_match_sk", IntegerType(), True),
    StructField("strikersk", IntegerType(), True),
    StructField("nonstriker_match_sk", IntegerType(), True),
    StructField("nonstriker_sk", IntegerType(), True),
    StructField("fielder_match_sk", IntegerType(), True),
    StructField("fielder_sk", IntegerType(), True),
    StructField("bowler_match_sk", IntegerType(), True),
    StructField("bowler_sk", IntegerType(), True),
    StructField("playerout_match_sk", IntegerType(), True),
    StructField("battingteam_sk", IntegerType(), True),
    StructField("bowlingteam_sk", IntegerType(), True),
    StructField("keeper_catch", BooleanType(), True),
    StructField("player_out_sk", IntegerType(), True),
    StructField("matchdatesk", DateType(), True)
])

In [0]:
ball_by_ball_df = spark.read \
.format("csv") \
.option("header","true") \
.schema(ball_by_ball_schema) \
.load("s3://ipl-data-analysis-project/Ball_By_Ball.csv")

In [0]:
match_schema = StructType([
    StructField("match_sk", IntegerType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("team1", StringType(), True),
    StructField("team2", StringType(), True),
    StructField("match_date", DateType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("venue_name", StringType(), True),
    StructField("city_name", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("toss_winner", StringType(), True),
    StructField("match_winner", StringType(), True),
    StructField("toss_name", StringType(), True),
    StructField("win_type", StringType(), True),
    StructField("outcome_type", StringType(), True),
    StructField("manofmach", StringType(), True),
    StructField("win_margin", IntegerType(), True),
    StructField("country_id", IntegerType(), True)
])
match_df = spark.read.schema(match_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Match.csv")

     

In [0]:
player_schema = StructType([
    StructField("player_sk", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True)
])

player_df = spark.read.schema(player_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Player.csv")

     

In [0]:
player_match_schema = StructType([
    StructField("player_match_sk", IntegerType(), True),
    StructField("playermatch_key", DecimalType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("role_desc", StringType(), True),
    StructField("player_team", StringType(), True),
    StructField("opposit_team", StringType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("is_manofthematch", BooleanType(), True),
    StructField("age_as_on_match", IntegerType(), True),
    StructField("isplayers_team_won", BooleanType(), True),
    StructField("batting_status", StringType(), True),
    StructField("bowling_status", StringType(), True),
    StructField("player_captain", StringType(), True),
    StructField("opposit_captain", StringType(), True),
    StructField("player_keeper", StringType(), True),
    StructField("opposit_keeper", StringType(), True)
])

player_match_df = spark.read.schema(player_match_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Player_match.csv")


In [0]:
#Filter to include only valid deliveries (excluding extras like wides and no balls for specific anlayses)
ball_by_ball_df =  ball_by_ball_df.filter((col("wides")==0) & (col("noballs")==0))

# Aggregation: Calculate the total and average runs scored in each match and inning
total_and_avg_runs = ball_by_ball_df.groupBy("match_id","innings_no").agg(
    sum("runs_scored").alias("total_runs"),
    avg("runs_scored").alias("average_runs"))

total_and_avg_runs.show(5)


+--------+----------+----------+------------------+
|match_id|innings_no|total_runs|      average_runs|
+--------+----------+----------+------------------+
|  980940|         1|       138|              1.15|
|  419132|         1|       162|              1.35|
| 1082632|         2|       202|1.9238095238095239|
|  335993|         2|       131|1.6794871794871795|
| 1082617|         1|       123|1.0512820512820513|
+--------+----------+----------+------------------+
only showing top 5 rows



In [0]:
# Window Function: Calculate running total of runs in each match for each over
windowspec = Window.partitionBy("match_id","innings_no").orderBy("over_id")

ball_by_ball_df = ball_by_ball_df.withColumn(
    "running_total_runs",
    sum("runs_scored").over(windowspec)
)

In [0]:
ball_by_ball_df.select("match_id","innings_no","running_total_runs").show(4)

+--------+----------+------------------+
|match_id|innings_no|running_total_runs|
+--------+----------+------------------+
|  335987|         1|                 0|
|  335987|         1|                 0|
|  335987|         1|                 0|
|  335987|         1|                 0|
+--------+----------+------------------+
only showing top 4 rows



In [0]:
# Conditional Column: Flag for high impact balls (either a wicket or more than 6 runs including extras)
ball_by_ball_df = ball_by_ball_df.withColumn(
    "high_impact_balls",
    when((col("runs_scored")+col("extra_runs") > 6) | (col("bowler_wicket")==True),True).otherwise(False) 
    #checks if either of condition true - then high impact ball, else false.
)

ball_by_ball_df.select("match_id","innings_no","running_total_runs","runs_scored","extra_runs","bowler_wicket","high_impact_balls").show(10)

+--------+----------+------------------+-----------+----------+-------------+-----------------+
|match_id|innings_no|running_total_runs|runs_scored|extra_runs|bowler_wicket|high_impact_balls|
+--------+----------+------------------+-----------+----------+-------------+-----------------+
|  335987|         1|                 0|          0|         1|         null|            false|
|  335987|         1|                 0|          0|         0|         null|            false|
|  335987|         1|                 0|          0|         0|         null|            false|
|  335987|         1|                 0|          0|         0|         null|            false|
|  335987|         1|                 0|          0|         0|         null|            false|
|  335987|         1|                 0|          0|         1|         null|            false|
|  335987|         1|                18|          0|         0|         null|            false|
|  335987|         1|                18|

In [0]:
from pyspark.sql.functions import dayofmonth,month,year,when

# Extracting year, month, and day from the match date for more detailed time-based analysis
match_df = match_df.withColumn("day",dayofmonth("match_date"))
match_df = match_df.withColumn("month",month("match_date"))
match_df = match_df.withColumn("year",year("match_date"))

# High margin win: categorizing win margins into 'high', 'medium', and 'low'
match_df = match_df.withColumn(
    "high_margin_win",
     when(col("win_margin")>100,"high")
    .when( (col("win_margin")>50) & (col("win_margin")<100),"medium")
    .otherwise("low")
)


# Analyze the impact of the toss: who wins the toss and the match
match_df = match_df.withColumn(
    "toss_match_winner",
    when(col("toss_winner")==col("match_winner"),"Yes")
    .otherwise("No")
    )

# Show the enhanced match DataFrame
match_df.show(2)

+--------+--------+--------------------+--------------------+----------+-----------+--------------------+----------+------------+--------------------+--------------------+---------+--------+------------+-----------+----------+----------+----+-----+----+---------------+-----------------+
|match_sk|match_id|               team1|               team2|match_date|season_year|          venue_name| city_name|country_name|         toss_winner|        match_winner|toss_name|win_type|outcome_type|  manofmach|win_margin|country_id| day|month|year|high_margin_win|toss_match_winner|
+--------+--------+--------------------+--------------------+----------+-----------+--------------------+----------+------------+--------------------+--------------------+---------+--------+------------+-----------+----------+----------+----+-----+----+---------------+-----------------+
|       0|  335987|Royal Challengers...|Kolkata Knight Ri...|      null|       2008|M Chinnaswamy Sta...| Bangalore|       India|Royal C

In [0]:
from pyspark.sql.functions import lower, regexp_replace

# Normalize and clean player names
player_df=player_df.withColumn(
    "player_name",
    regexp_replace("player_name","[^a-zA-Z0-9]","")
)

# Handle missing values in 'batting_hand' and 'bowling_skill' with a default 'unknown'
player_df = player_df.na.fill({"batting_hand":"unknown","bowling_skill":"unknown"})
#or
player_df=player_df.na.fill("unknown",["batting_hand"]).fillna("unknonwn",["bowling_skill"])
#.fillna("unknonwn",col("bowling_skill")) => PySparkTypeError: [NOT_LIST_OR_TUPLE] Argument `subset` should be a list or tuple, got Column.

# Categorizing players based on batting hand
player_df=player_df.withColumn(
    "batting_hand",
     when(col("batting_hand")=="right","right_handed")
     .when(col("batting_hand").contains("left"),"left_handed")
     #.when(["batting_hand"]=="left","left_handed") => TypeError: condition should be a Column
)

# Show the modified player DataFrame
player_df.show(2)

+---------+---------+-----------+----+------------+----------------+------------+
|player_sk|player_id|player_name| dob|batting_hand|   bowling_skill|country_name|
+---------+---------+-----------+----+------------+----------------+------------+
|        0|        1|  SCGanguly|null|        null|Right-arm medium|       India|
|        1|        2| BBMcCullum|null|        null|Right-arm medium| New Zealand|
+---------+---------+-----------+----+------------+----------------+------------+
only showing top 2 rows



In [0]:
from pyspark.sql.functions import col, when, current_date, expr

#Add a 'veteran_status' column based on player age
player_match_df = player_match_df.withColumn(
    "veteran_status",
    when(col("age_as_on_match") >= 35, "Veteran").otherwise("Non-Veteran")
)

#filter to include only players who played the match (excluding bench players)
#player_match_df = player_match_df.filter(col("batting_status")!="Did Not Bat")

#Dynamic column to calculate years since debut
player_match_df = player_match_df.withColumn(
    "years_since_debut",
    (year(current_date()) - col("season_year"))
)

# Show the enriched DataFrame
player_match_df.show(2)



+---------------+---------------+--------+---------+-----------+----+--------------+------------------+------------+---------+--------------------+--------------------+-----------+----------------+---------------+------------------+--------------+--------------+--------------+---------------+-------------+--------------+--------------+-----------------+
|player_match_sk|playermatch_key|match_id|player_id|player_name| dob|  batting_hand|     bowling_skill|country_name|role_desc|         player_team|        opposit_team|season_year|is_manofthematch|age_as_on_match|isplayers_team_won|batting_status|bowling_status|player_captain|opposit_captain|player_keeper|opposit_keeper|veteran_status|years_since_debut|
+---------------+---------------+--------+---------+-----------+----+--------------+------------------+------------+---------+--------------------+--------------------+-----------+----------------+---------------+------------------+--------------+--------------+--------------+-----------

In [0]:
#SQL Transformations:

ball_by_ball_df.createOrReplaceTempView("ball_by_ball")
match_df.createOrReplaceTempView("match")
player_df.createOrReplaceTempView("player")
player_match_df.createOrReplaceTempView("player_match")
team_df.createOrReplaceTempView("team")

In [0]:
ball_by_ball_df.columns

Out[111]: ['match_id',
 'over_id',
 'ball_id',
 'innings_no',
 'team_batting',
 'team_bowling',
 'striker_batting_position',
 'extra_type',
 'runs_scored',
 'extra_runs',
 'wides',
 'legbyes',
 'byes',
 'noballs',
 'penalty',
 'bowler_extras',
 'out_type',
 'caught',
 'bowled',
 'run_out',
 'lbw',
 'retired_hurt',
 'stumped',
 'caught_and_bowled',
 'hit_wicket',
 'obstructingfeild',
 'bowler_wicket',
 'match_date',
 'season',
 'striker',
 'non_striker',
 'bowler',
 'player_out',
 'fielders',
 'striker_match_sk',
 'strikersk',
 'nonstriker_match_sk',
 'nonstriker_sk',
 'fielder_match_sk',
 'fielder_sk',
 'bowler_match_sk',
 'bowler_sk',
 'playerout_match_sk',
 'battingteam_sk',
 'bowlingteam_sk',
 'keeper_catch',
 'player_out_sk',
 'matchdatesk',
 'running_total_runs',
 'high_impact_balls']

In [0]:
#top_scoring_batsmen_per_season
top_scoring_batsmen_per_season = spark.sql(
    """
    select p.player_name,m.season_year,sum(b.runs_scored) as total_runs
    from ball_by_ball b
    join match m on m.match_id=b.match_id
    join player_match pm on pm.match_id=m.match_id 
    and b.striker=pm.player_id 
    --this player was facing ball that particular time
    join player p on p.player_id=pm.player_id
    group by p.player_name, m.season_year
    order by m.season_year, total_runs desc
    """
)

In [0]:
top_scoring_batsmen_per_season.show(5)

+------------+-----------+----------+
| player_name|season_year|total_runs|
+------------+-----------+----------+
|     SEMarsh|       2008|       614|
|    GGambhir|       2008|       532|
|STJayasuriya|       2008|       508|
|    SRWatson|       2008|       463|
|     GCSmith|       2008|       437|
+------------+-----------+----------+
only showing top 5 rows



In [0]:
economical_bowlers_powerplay = spark.sql("""
SELECT 
p.player_name, 
AVG(b.runs_scored) AS avg_runs_per_ball, 
COUNT(b.bowler_wicket) AS total_wickets
FROM ball_by_ball b
JOIN player_match pm ON b.match_id = pm.match_id AND b.bowler = pm.player_id
JOIN player p ON pm.player_id = p.player_id
WHERE b.over_id <= 6
GROUP BY p.player_name
HAVING COUNT(*) >= 1  --i.e. the no. of b.bowler_wicket >=1
ORDER BY avg_runs_per_ball, total_wickets DESC
""")
economical_bowlers_powerplay.show()

+-------------+------------------+-------------+
|  player_name| avg_runs_per_ball|total_wickets|
+-------------+------------------+-------------+
|    SMHarwood|0.3333333333333333|            0|
|    AnkitSoni|               0.5|            0|
|     GRNapier|               0.5|            0|
|      AJFinch|               0.5|            0|
|       AZampa|               0.5|            0|
|    AveshKhan|               0.5|            0|
|      NBSingh|0.5833333333333334|            0|
|    AGMurtaza|0.6538461538461539|            0|
|     SBBangar|0.6666666666666666|            0|
|     DduPreez|0.6666666666666666|            0|
|       SGopal|0.6666666666666666|            0|
|    FHEdwards|0.6923076923076923|            0|
|      AKumble|0.7685185185185185|            0|
|JSyedMohammad|0.7777777777777778|            0|
|      UmarGul|0.7777777777777778|            0|
|  KPPietersen|0.7777777777777778|            0|
|     SSMundhe|0.8333333333333334|            0|
| LACarseldine|0.833

In [0]:
toss_impact_individual_matches = spark.sql("""
SELECT m.match_id, m.toss_winner, m.toss_name, m.match_winner,
       CASE WHEN m.toss_winner = m.match_winner THEN 'Won' ELSE 'Lost' END AS match_outcome
FROM match m
WHERE m.toss_name IS NOT NULL
ORDER BY m.match_id
""")
toss_impact_individual_matches.show()


+--------+--------------------+---------+--------------------+-------------+
|match_id|         toss_winner|toss_name|        match_winner|match_outcome|
+--------+--------------------+---------+--------------------+-------------+
|  335987|Royal Challengers...|    field|Kolkata Knight Ri...|         Lost|
|  335988| Chennai Super Kings|      bat| Chennai Super Kings|          Won|
|  335989|    Rajasthan Royals|      bat|    Delhi Daredevils|         Lost|
|  335990|      Mumbai Indians|      bat|Royal Challengers...|         Lost|
|  335991|     Deccan Chargers|      bat|Kolkata Knight Ri...|         Lost|
|  335992|     Kings XI Punjab|      bat|    Rajasthan Royals|         Lost|
|  335993|     Deccan Chargers|      bat|    Delhi Daredevils|         Lost|
|  335994|      Mumbai Indians|    field| Chennai Super Kings|         Lost|
|  335995|    Rajasthan Royals|    field|    Rajasthan Royals|          Won|
|  335996|      Mumbai Indians|    field|     Kings XI Punjab|         Lost|

In [0]:
average_runs_in_wins = spark.sql("""
SELECT p.player_name, AVG(b.runs_scored) AS avg_runs_in_wins, COUNT(*) AS innings_played
FROM ball_by_ball b
JOIN player_match pm ON b.match_id = pm.match_id AND b.striker = pm.player_id
JOIN player p ON pm.player_id = p.player_id
JOIN match m ON pm.match_id = m.match_id
WHERE m.match_winner = pm.player_team
GROUP BY p.player_name
ORDER BY avg_runs_in_wins ASC
""")
average_runs_in_wins.show()

+-------------+-------------------+--------------+
|  player_name|   avg_runs_in_wins|innings_played|
+-------------+-------------------+--------------+
|    KPAppanna|                0.0|             1|
|     JJBumrah|                0.0|             2|
|      ISharma|                0.0|             1|
|       ANehra|                0.0|             2|
|      JTheron|                0.0|             1|
|      TSMills|                0.0|             3|
|     SNThakur|                0.0|             2|
|      VRAaron|                0.0|             5|
| AnirudhSingh|                0.0|             1|
|    TThushara|                0.2|             5|
|     SAAbbott|               0.25|             4|
| YashpalSingh| 0.3157894736842105|            19|
|   KCCariappa| 0.3333333333333333|             3|
|   SSreesanth| 0.3333333333333333|             3|
|    JDUnadkat|                0.4|             5|
|    SMHarwood|0.42857142857142855|             7|
|      RShukla|                

In [0]:

scores_by_venue = spark.sql("""
SELECT venue_name, AVG(total_runs) AS average_score, MAX(total_runs) AS highest_score
FROM (
    SELECT ball_by_ball.match_id, match.venue_name, SUM(runs_scored) AS total_runs
    FROM ball_by_ball
    JOIN match ON ball_by_ball.match_id = match.match_id
    GROUP BY ball_by_ball.match_id, match.venue_name
)
GROUP BY venue_name
ORDER BY average_score DESC
""")

scores_by_venue.show()

+--------------------+------------------+-------------+
|          venue_name|     average_score|highest_score|
+--------------------+------------------+-------------+
|   Brabourne Stadium| 325.8181818181818|          399|
|Saurashtra Cricke...|             319.1|          393|
|Punjab Cricket As...|317.42857142857144|          341|
|    Barabati Stadium|311.57142857142856|          404|
|          Green Park|             308.0|          378|
|Maharashtra Crick...|302.06666666666666|          377|
|Rajiv Gandhi Inte...|             302.0|          365|
|Himachal Pradesh ...| 301.1111111111111|          371|
|MA Chidambaram St...| 299.7291666666667|          447|
|Sardar Patel Stad...|             298.0|          377|
|Punjab Cricket As...|296.77142857142854|          426|
|    Wankhede Stadium| 296.4736842105263|          419|
|Holkar Cricket St...|             296.2|          371|
|Punjab Cricket As...|             296.0|          380|
|M Chinnaswamy Sta...| 292.7878787878788|       

In [0]:
dismissal_types = spark.sql("""
SELECT out_type, COUNT(*) AS frequency
FROM ball_by_ball
WHERE out_type IS NOT NULL
GROUP BY out_type
ORDER BY frequency DESC
""")

dismissal_types.show()

+--------------------+---------+
|            out_type|frequency|
+--------------------+---------+
|      Not Applicable|   137852|
|              caught|     3678|
|              bowled|     1382|
|             run out|      744|
|        Keeper Catch|      695|
|                 lbw|      455|
|             stumped|      222|
|   caught and bowled|      211|
|          hit wicket|        9|
|        retired hurt|        9|
|obstructing the f...|        1|
+--------------------+---------+



In [0]:
team_toss_win_performance = spark.sql("""
SELECT team1, COUNT(*) AS matches_played, SUM(CASE WHEN toss_winner = match_winner THEN 1 ELSE 0 END) AS wins_after_toss
FROM match
WHERE toss_winner = team1
GROUP BY team1
ORDER BY wins_after_toss DESC
""")

team_toss_win_performance.show()

+--------------------+--------------+---------------+
|               team1|matches_played|wins_after_toss|
+--------------------+--------------+---------------+
| Chennai Super Kings|            37|             27|
|      Mumbai Indians|            42|             26|
|Kolkata Knight Ri...|            36|             24|
|Royal Challengers...|            34|             19|
|    Delhi Daredevils|            36|             17|
|    Rajasthan Royals|            25|             16|
|     Kings XI Punjab|            32|             14|
| Sunrisers Hyderabad|            16|              8|
|     Deccan Chargers|            22|              6|
|Rising Pune Super...|             6|              4|
|       Gujarat Lions|             7|              3|
|Kochi Tuskers Kerala|             4|              2|
|       Pune Warriors|            10|              2|
+--------------------+--------------+---------------+

