In [None]:
spark

In [None]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, BooleanType, DecimalType, DateType
from pyspark.sql.functions import col, when, sum, avg, row_number, year, month, dayofmonth, dayofweek, lower, regexp_replace, lit, current_date, expr
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("IPL Data Analysis").getOrCreate()

In [None]:
ballSchema = StructType([
    StructField("match_id", IntegerType(), True),
    StructField("over_id", IntegerType(), True),
    StructField("ball_id", IntegerType(), True),
    StructField("innings_no", IntegerType(), True),
    StructField("team_batting", StringType(), True),
    StructField("team_bowling", StringType(), True),
    StructField("striker_batting_position", IntegerType(), True),
    StructField("extra_type", StringType(), True),
    StructField("runs_scored", IntegerType(), True),
    StructField("extra_runs", IntegerType(), True),
    StructField("wides", IntegerType(), True),
    StructField("legbyes", IntegerType(), True),
    StructField("byes", IntegerType(), True),
    StructField("noballs", IntegerType(), True),
    StructField("penalty", IntegerType(), True),
    StructField("bowler_extras", IntegerType(), True),
    StructField("out_type", StringType(), True),
    StructField("caught", BooleanType(), True),
    StructField("bowled", BooleanType(), True),
    StructField("run_out", BooleanType(), True),
    StructField("lbw", BooleanType(), True),
    StructField("retired_hurt", BooleanType(), True),
    StructField("stumped", BooleanType(), True),
    StructField("caught_and_bowled", BooleanType(), True),
    StructField("hit_wicket", BooleanType(), True),
    StructField("obstructingfeild", BooleanType(), True),
    StructField("bowler_wicket", BooleanType(), True),
    StructField("match_date", DateType(), True),
    StructField("season", IntegerType(), True),
    StructField("striker", IntegerType(), True),
    StructField("non_striker", IntegerType(), True),
    StructField("bowler", IntegerType(), True),
    StructField("player_out", IntegerType(), True),
    StructField("fielders", IntegerType(), True),
    StructField("striker_match_sk", IntegerType(), True),
    StructField("strikersk", IntegerType(), True),
    StructField("nonstriker_match_sk", IntegerType(), True),
    StructField("nonstriker_sk", IntegerType(), True),
    StructField("fielder_match_sk", IntegerType(), True),
    StructField("fielder_sk", IntegerType(), True),
    StructField("bowler_match_sk", IntegerType(), True),
    StructField("bowler_sk", IntegerType(), True),
    StructField("playerout_match_sk", IntegerType(), True),
    StructField("battingteam_sk", IntegerType(), True),
    StructField("bowlingteam_sk", IntegerType(), True),
    StructField("keeper_catch", BooleanType(), True),
    StructField("player_out_sk", IntegerType(), True),
    StructField("matchdatesk", DateType(), True)
])

In [None]:
ballDF = spark.read.schema(ballSchema).format("csv").option("header", "true").load("s3://ipl-data-analysis-project/Ball_By_Ball.csv")

In [None]:
playerSchema = StructType([
    StructField("player_sk", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True)
])

playerDF = spark.read.schema(playerSchema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Player.csv")

In [None]:
matchSchema = StructType([
    StructField("match_sk", IntegerType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("team1", StringType(), True),
    StructField("team2", StringType(), True),
    StructField("match_date", DateType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("venue_name", StringType(), True),
    StructField("city_name", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("toss_winner", StringType(), True),
    StructField("match_winner", StringType(), True),
    StructField("toss_name", StringType(), True),
    StructField("win_type", StringType(), True),
    StructField("outcome_type", StringType(), True),
    StructField("manofmach", StringType(), True),
    StructField("win_margin", IntegerType(), True),
    StructField("country_id", IntegerType(), True)
])

matchDF = spark.read.schema(matchSchema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Match.csv")

In [None]:
playerMatchSchema = StructType([
    StructField("player_match_sk", IntegerType(), True),
    StructField("playermatch_key", DecimalType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("role_desc", StringType(), True),
    StructField("player_team", StringType(), True),
    StructField("opposit_team", StringType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("is_manofthematch", BooleanType(), True),
    StructField("age_as_on_match", IntegerType(), True),
    StructField("isplayers_team_won", BooleanType(), True),
    StructField("batting_status", StringType(), True),
    StructField("bowling_status", StringType(), True),
    StructField("player_captain", StringType(), True),
    StructField("opposit_captain", StringType(), True),
    StructField("player_keeper", StringType(), True),
    StructField("opposit_keeper", StringType(), True)
])

playerMatchDF = spark.read.schema(playerMatchSchema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Player_match.csv")

In [None]:
teamSchema = StructType([
    StructField("team_sk", IntegerType(), True),
    StructField("team_id", IntegerType(), True),
    StructField("team_name", StringType(), True)
])

teamDF = spark.read.schema(teamSchema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Team.csv")
     

In [None]:
# filter to include only valid deliveries and exclude extras like wides and no balls
ballDF = ballDF.filter((col("wides") == 0) & (col("noballs") == 0))

# Calculate total and average runs scored in each match and inning

total_average_runs = ballDF.groupBy("match_id", "innings_no").agg(
    sum("runs_scored").alias("total_runs"),
    avg("runs_scored").alias("average_runs")
)

In [None]:
# Use window function to calculate running total of runs in each match for each over
windowFunc = Window.partitionBy("match_id", "innings_no").orderBy("over_id")

ballDF = ballDF.withColumn(
    "running_total_runs",
    sum("runs_scored").over(windowFunc)
)

In [None]:
# Conditionally formatted column named high_impact: flags true or false for high impact balls (a wicket or more than 6 runs including extras)

ballDF = ballDF.withColumn(
    "high_impact",
    when((col("runs_scored") + col("extra_runs") > 6) | (col("bowler_wicket") == True), True).otherwise(False)
    )

In [None]:
ballDF.show(5)

+--------+-------+-------+----------+------------+------------+------------------------+----------+-----------+----------+-----+-------+----+-------+-------+-------------+--------------+------+------+-------+----+------------+-------+-----------------+----------+----------------+-------------+----------+------+-------+-----------+------+----------+--------+----------------+---------+-------------------+-------------+----------------+----------+---------------+---------+------------------+--------------+--------------+------------+-------------+-----------+------------------+-----------+
|match_id|over_id|ball_id|innings_no|team_batting|team_bowling|striker_batting_position|extra_type|runs_scored|extra_runs|wides|legbyes|byes|noballs|penalty|bowler_extras|      out_type|caught|bowled|run_out| lbw|retired_hurt|stumped|caught_and_bowled|hit_wicket|obstructingfeild|bowler_wicket|match_date|season|striker|non_striker|bowler|player_out|fielders|striker_match_sk|strikersk|nonstriker_match_sk|

In [None]:
# Extracting year, month, and day from the match date for detailed time-based analysis

matchDF = matchDF.withColumn("year", year("match_date"))
matchDF = matchDF.withColumn("month", month("match_date"))
matchDF = matchDF.withColumn("day", dayofmonth("match_date"))

# Extracting win margins by categorizing win margins into high, medium, and low
matchDF = matchDF.withColumn(
    "win_margin_category",
    when(col("win_margin") >= 100, "High")
    .when((col("win_margin") >= 50) & (col("win_margin") < 100), "Medium")
    .otherwise("Low")
)

# Analyze the impact of the toss: who wins the toss and the match
matchDF = matchDF.withColumn(
    "toss_and_match_winner",
    when(col("toss_winner") == col("match_winner"), "Yes")
    .otherwise("No")
)

matchDF.show(5)

+--------+--------+--------------------+--------------------+----------+-----------+--------------------+----------+------------+--------------------+--------------------+---------+--------+------------+-----------+----------+----------+----+-----+----+-------------------+---------------------+
|match_sk|match_id|               team1|               team2|match_date|season_year|          venue_name| city_name|country_name|         toss_winner|        match_winner|toss_name|win_type|outcome_type|  manofmach|win_margin|country_id|year|month| day|win_margin_category|toss_and_match_winner|
+--------+--------+--------------------+--------------------+----------+-----------+--------------------+----------+------------+--------------------+--------------------+---------+--------+------------+-----------+----------+----------+----+-----+----+-------------------+---------------------+
|       0|  335987|Royal Challengers...|Kolkata Knight Ri...|      null|       2008|M Chinnaswamy Sta...| Bangal

In [None]:
playerDF.show(5)

+---------+---------+---------------+----+--------------+------------------+------------+
|player_sk|player_id|    player_name| dob|  batting_hand|     bowling_skill|country_name|
+---------+---------+---------------+----+--------------+------------------+------------+
|        0|        1|     SC Ganguly|null| Left-hand bat|  Right-arm medium|       India|
|        1|        2|    BB McCullum|null|Right-hand bat|  Right-arm medium| New Zealand|
|        2|        3|     RT Ponting|null|Right-hand bat|  Right-arm medium|   Australia|
|        3|        4|      DJ Hussey|null|Right-hand bat|Right-arm offbreak|   Australia|
|        4|        5|Mohammad Hafeez|null|Right-hand bat|Right-arm offbreak|    Pakistan|
+---------+---------+---------------+----+--------------+------------------+------------+
only showing top 5 rows



In [None]:
# Normalize and clean player names
playerDF = playerDF.withColumn("player_name", lower(regexp_replace("player_name", "[^a-zA-Z0-9]", "")))

# Handle missing values in "batting_hand" and "bowling_skill" with a default "unknown"
playerDF = playerDF.na.fill({"batting_hand":"unknown", "bowling_skill":"unkown"})

# fixing my playground to restore dob column back to original settings
playerDF = playerDF.drop(col("dateOfBirth"))
playerDF = playerDF.withColumn(
    "dob",
    when(col("dob").isNull(), lit("unknown")).otherwise(col("dob"))
)


# categorizing players based on their batting hand
playerDF = playerDF.withColumn(
    "batting_style",
    when(col("batting_hand").contains("Left"), "Lefty").otherwise("Righty")
)
playerDF.show(5)

+---------+---------+--------------+-------+--------------+------------------+------------+-------------+
|player_sk|player_id|   player_name|    dob|  batting_hand|     bowling_skill|country_name|batting_style|
+---------+---------+--------------+-------+--------------+------------------+------------+-------------+
|        0|        1|     scganguly|unknown| Left-hand bat|  Right-arm medium|       India|        Lefty|
|        1|        2|    bbmccullum|unknown|Right-hand bat|  Right-arm medium| New Zealand|       Righty|
|        2|        3|     rtponting|unknown|Right-hand bat|  Right-arm medium|   Australia|       Righty|
|        3|        4|      djhussey|unknown|Right-hand bat|Right-arm offbreak|   Australia|       Righty|
|        4|        5|mohammadhafeez|unknown|Right-hand bat|Right-arm offbreak|    Pakistan|       Righty|
+---------+---------+--------------+-------+--------------+------------------+------------+-------------+
only showing top 5 rows

