Databricks notebook source

In [None]:
spark

COMMAND ----------

In [None]:
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, BooleanType, DateType, DecimalType
from pyspark.sql.functions import col, when, sum, avg, row_number 
from pyspark.sql.window import Window
     

COMMAND ----------

In [None]:
from pyspark.sql import SparkSession

COMMAND ----------

In [None]:
spark = SparkSession.builder.appName("IPL_DE").getOrCreate()

COMMAND ----------

In [None]:
spark

COMMAND ----------

In [None]:
df=spark.read.csv("s3://ipl-data-analysis-project/Ball_By_Ball.csv",header=True,inferSchema=True)

COMMAND ----------

In [None]:
df.show()

COMMAND ----------

In [None]:
df.printSchema()

COMMAND ----------

In [None]:
df.display()

COMMAND ----------

In [None]:
ball_by_ball_schema = StructType([
    StructField("match_id", IntegerType(), True),
    StructField("over_id", IntegerType(), True),
    StructField("ball_id", IntegerType(), True),
    StructField("innings_no", IntegerType(), True),
    StructField("team_batting", StringType(), True),
    StructField("team_bowling", StringType(), True),
    StructField("striker_batting_position", IntegerType(), True),
    StructField("extra_type", StringType(), True),
    StructField("runs_scored", IntegerType(), True),
    StructField("extra_runs", IntegerType(), True),
    StructField("wides", IntegerType(), True),
    StructField("legbyes", IntegerType(), True),
    StructField("byes", IntegerType(), True),
    StructField("noballs", IntegerType(), True),
    StructField("penalty", IntegerType(), True),
    StructField("bowler_extras", IntegerType(), True),
    StructField("out_type", StringType(), True),
    StructField("caught", BooleanType(), True),
    StructField("bowled", BooleanType(), True),
    StructField("run_out", BooleanType(), True),
    StructField("lbw", BooleanType(), True),
    StructField("retired_hurt", BooleanType(), True),
    StructField("stumped", BooleanType(), True),
    StructField("caught_and_bowled", BooleanType(), True),
    StructField("hit_wicket", BooleanType(), True),
    StructField("obstructingfeild", BooleanType(), True),
    StructField("bowler_wicket", BooleanType(), True),
    StructField("match_date", DateType(), True),
    StructField("season", IntegerType(), True),
    StructField("striker", IntegerType(), True),
    StructField("non_striker", IntegerType(), True),
    StructField("bowler", IntegerType(), True),
    StructField("player_out", IntegerType(), True),
    StructField("fielders", IntegerType(), True),
    StructField("striker_match_sk", IntegerType(), True),
    StructField("strikersk", IntegerType(), True),
    StructField("nonstriker_match_sk", IntegerType(), True),
    StructField("nonstriker_sk", IntegerType(), True),
    StructField("fielder_match_sk", IntegerType(), True),
    StructField("fielder_sk", IntegerType(), True),
    StructField("bowler_match_sk", IntegerType(), True),
    StructField("bowler_sk", IntegerType(), True),
    StructField("playerout_match_sk", IntegerType(), True),
    StructField("battingteam_sk", IntegerType(), True),
    StructField("bowlingteam_sk", IntegerType(), True),
    StructField("keeper_catch", BooleanType(), True),
    StructField("player_out_sk", IntegerType(), True),
    StructField("matchdatesk", DateType(), True)
])
     

In [None]:
ball_by_ball_df = spark.read.schema(ball_by_ball_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Ball_By_Ball.csv")
     

In [None]:
match_schema = StructType([
    StructField("match_sk", IntegerType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("team1", StringType(), True),
    StructField("team2", StringType(), True),
    StructField("match_date", DateType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("venue_name", StringType(), True),
    StructField("city_name", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("toss_winner", StringType(), True),
    StructField("match_winner", StringType(), True),
    StructField("toss_name", StringType(), True),
    StructField("win_type", StringType(), True),
    StructField("outcome_type", StringType(), True),
    StructField("manofmach", StringType(), True),
    StructField("win_margin", IntegerType(), True),
    StructField("country_id", IntegerType(), True)
])
match_df = spark.read.schema(match_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Match.csv")
     

In [None]:
player_schema = StructType([
    StructField("player_sk", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True)
])

In [None]:
player_df = spark.read.schema(player_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Player.csv")
     

In [None]:
player_match_schema = StructType([
    StructField("player_match_sk", IntegerType(), True),
    StructField("playermatch_key", DecimalType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("role_desc", StringType(), True),
    StructField("player_team", StringType(), True),
    StructField("opposit_team", StringType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("is_manofthematch", BooleanType(), True),
    StructField("age_as_on_match", IntegerType(), True),
    StructField("isplayers_team_won", BooleanType(), True),
    StructField("batting_status", StringType(), True),
    StructField("bowling_status", StringType(), True),
    StructField("player_captain", StringType(), True),
    StructField("opposit_captain", StringType(), True),
    StructField("player_keeper", StringType(), True),
    StructField("opposit_keeper", StringType(), True)
])

In [None]:
player_match_df = spark.read.schema(player_match_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Player_match.csv")
     

In [None]:
team_schema = StructType([
    StructField("team_sk", IntegerType(), True),
    StructField("team_id", IntegerType(), True),
    StructField("team_name", StringType(), True)
])

In [None]:
team_df = spark.read.schema(team_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Team.csv")
     

COMMAND ----------

Without Extra

In [None]:
df_new = ball_by_ball_df.filter((col("wides")==0) & (col("noballs")==0))

COMMAND ----------

Total Runs & Average Runs Scored by each team in each innings

In [None]:
df_new.groupBy("match_id","innings_no").agg(sum("runs_scored").alias("Total runs"),avg("runs_scored").alias("Avg Runs")).orderBy("match_id").show()

COMMAND ----------

In [None]:
df_new.display(5)

COMMAND ----------

Window Function: Calculate running total of runs in each match for each over

In [None]:
windowSpec = Window.partitionBy("match_id","innings_no").orderBy("over_id")

In [None]:
ball_by_ball_df = ball_by_ball_df.withColumn(
    "running_total_runs",
    sum("runs_scored").over(windowSpec)
)

In [None]:
ball_by_ball_df.display()

COMMAND ----------

Impact Balls

In [None]:
ball_by_ball_df = ball_by_ball_df.withColumn(
    "Impactful Over",when((col("runs_scored")+col("extra_runs")>=6) | (col("bowler_wicket")==True),True).otherwise(False)
)
ball_by_ball_df.display()

COMMAND ----------

In [None]:
from pyspark.sql.functions import year, month, dayofmonth, when

Extracting year, month, and day from the match date for more detailed time-based analysis

In [None]:
match_df = match_df.withColumn("year", year("match_date"))
match_df = match_df.withColumn("month", month("match_date"))
match_df = match_df.withColumn("day", dayofmonth("match_date"))

High margin win: categorizing win margins into 'high', 'medium', and 'low'

In [None]:
match_df = match_df.withColumn(
    "win_margin_category",
    when(col("win_margin") >= 100, "High")
    .when((col("win_margin") >= 50) & (col("win_margin") < 100), "Medium")
    .otherwise("Low")
)

Analyze the impact of the toss: who wins the toss and the match

In [None]:
match_df = match_df.withColumn(
    "toss_match_winner",
    when(col("toss_winner") == col("match_winner"), "Yes").otherwise("No")
)

COMMAND ----------

In [None]:
from pyspark.sql.functions import lower, regexp_replace

Normalize and clean player names

In [None]:
player_df = player_df.withColumn("player_name", lower(regexp_replace("player_name", "[^a-zA-Z0-9 ]", "")))

Handle missing values in 'batting_hand' and 'bowling_skill' with a default 'unknown'

In [None]:
player_df = player_df.na.fill({"batting_hand": "unknown", "bowling_skill": "unknown"})

Categorizing players based on batting hand

In [None]:
player_df = player_df.withColumn(
    "batting_style",
    when(col("batting_hand").contains("left"), "Left-Handed").otherwise("Right-Handed")
)

Show the modified player DataFrame

In [None]:
player_df.show(2)

COMMAND ----------

In [None]:
from pyspark.sql.functions import col, when, current_date, expr

Add a 'veteran_status' column based on player age

In [None]:
player_match_df = player_match_df.withColumn(
    "veteran_status",
    when(col("age_as_on_match") >= 35, "Veteran").otherwise("Non-Veteran")
)

Dynamic column to calculate years since debut

In [None]:
player_match_df = player_match_df.withColumn(
    "years_since_debut",
    (year(current_date()) - col("season_year"))
)

Show the enriched DataFrame

In [None]:
player_match_df.show()

COMMAND ----------

In [None]:
ball_by_ball_df.createOrReplaceTempView("ball_by_ball")
match_df.createOrReplaceTempView("match")
player_df.createOrReplaceTempView("player")
player_match_df.createOrReplaceTempView("player_match")
team_df.createOrReplaceTempView("team")

COMMAND ----------

In [None]:
player_match_df.display(5)
match_df.display(5)
ball_by_ball_df.display(5)
player_df.display(5)

COMMAND ----------

In [None]:
match_df.display(5)

COMMAND ----------

In [None]:
player_match_df.display(5)

COMMAND ----------

Top scorers of seasons

top_scorer = spark.sql(
<br>
select pm.player_name,m.season_year,sum(runs_scored) as runs from ball_by_ball b join match m on b.match_id=m.match_id<br>
                       join player_match pm on b.striker = pm.player_id and b.match_id=pm.match_id<br>
                       group by pm.player_name , m.season_year order by runs desc<br>
                     
)

COMMAND ----------

In [None]:
top_scorer.display(5)

COMMAND ----------

economical_bowlers_powerplay = spark.sql(
<br>
SELECT <br>
    p.player_name, <br>
    AVG(b.runs_scored) AS avg_runs_per_ball, <br>
    COUNT(b.bowler_wicket) AS total_wickets<br>
FROM ball_by_ball b<br>
JOIN player_match pm ON b.match_id = pm.match_id AND b.bowler = pm.player_id<br>
JOIN player p ON pm.player_id = p.player_id<br>
WHERE b.over_id <= 6<br>
GROUP BY p.player_name<br>
HAVING COUNT(*) >= 1<br>
ORDER BY avg_runs_per_ball ASC, total_wickets DESC<br>


In [None]:
economical_bowlers_powerplay.display()

COMMAND ----------

toss_impact_individual_matches = spark.sql(
<br>
SELECT m.match_id, m.toss_winner, m.toss_name, m.match_winner,<br>
       CASE WHEN m.toss_winner = m.match_winner THEN 'Won' ELSE 'Lost' END AS match_outcome<br>
FROM match m<br>
WHERE m.toss_name IS NOT NULL<br>
ORDER BY m.match_id<br>


In [None]:
toss_impact_individual_matches.show()

COMMAND ----------

In [None]:
import matplotlib.pyplot as plt

COMMAND ----------

Assuming 'economical_bowlers_powerplay' is already executed and available as a Spark DataFrame

In [None]:
economical_bowlers_pd = economical_bowlers_powerplay.toPandas()

Visualizing using Matplotlib

In [None]:
plt.figure(figsize=(12, 8))
# Limiting to top 10 for clarity in the plot
top_economical_bowlers = economical_bowlers_pd.nsmallest(10, 'avg_runs_per_ball')
plt.bar(top_economical_bowlers['player_name'], top_economical_bowlers['avg_runs_per_ball'], color='skyblue')
plt.xlabel('Bowler Name')
plt.ylabel('Average Runs per Ball')
plt.title('Most Economical Bowlers in Powerplay Overs (Top 10)')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

COMMAND ----------

In [None]:
import seaborn as sns
     

In [None]:
toss_impact_pd = toss_impact_individual_matches.toPandas()

Creating a countplot to show win/loss after winning toss

In [None]:
plt.figure(figsize=(10, 6))
sns.countplot(x='toss_winner', hue='match_outcome', data=toss_impact_pd)
plt.title('Impact of Winning Toss on Match Outcomes')
plt.xlabel('Toss Winner')
plt.ylabel('Number of Matches')
plt.legend(title='Match Outcome')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
     

COMMAND ----------

Execute SQL Query

scores_by_venue = spark.sql(
<br>
SELECT venue_name, AVG(total_runs) AS average_score, MAX(total_runs) AS highest_score<br>
FROM (<br>
    SELECT ball_by_ball.match_id, match.venue_name, SUM(runs_scored) AS total_runs<br>
    FROM ball_by_ball<br>
    JOIN match ON ball_by_ball.match_id = match.match_id<br>
    GROUP BY ball_by_ball.match_id, match.venue_name<br>
)<br>
GROUP BY venue_name<br>
ORDER BY average_score DESC<br>


Convert to Pandas DataFrame

In [None]:
scores_by_venue_pd = scores_by_venue.toPandas()

Plot

In [None]:
plt.figure(figsize=(14, 8))
sns.barplot(x='average_score', y='venue_name', data=scores_by_venue_pd)
plt.title('Distribution of Scores by Venue')
plt.xlabel('Average Score')
plt.ylabel('Venue')
plt.show()

COMMAND ----------

Execute SQL Query

dismissal_types = spark.sql(
<br>
SELECT out_type, COUNT(*) AS frequency<br>
FROM ball_by_ball<br>
WHERE out_type IS NOT NULL AND out_type != 'Not Applicable'<br>
GROUP BY out_type<br>
ORDER BY frequency DESC<br>


Convert to Pandas DataFrame

In [None]:
dismissal_types_pd = dismissal_types.toPandas()

Plot

In [None]:
plt.figure(figsize=(12, 6))
sns.barplot(x='frequency', y='out_type', data=dismissal_types_pd, palette='pastel')
plt.title('Most Frequent Dismissal Types')
plt.xlabel('Frequency')
plt.ylabel('Dismissal Type')
plt.show()

COMMAND ----------

team_toss_win_performance = spark.sql(
<br>
SELECT team1, COUNT(*) AS matches_played, SUM(CASE WHEN toss_winner = match_winner THEN 1 ELSE 0 END) AS wins_after_toss<br>
FROM match<br>
WHERE toss_winner = team1<br>
GROUP BY team1<br>
ORDER BY wins_after_toss DESC<br>


Convert to Pandas DataFrame

In [None]:
team_toss_win_pd = team_toss_win_performance.toPandas()

Plot