In [0]:
spark


In [0]:
from pyspark.sql import SparkSession

# Create an custmom Spark session
spark = SparkSession.builder.appName('IPL_Data_Analysis').getOrCreate()

In [0]:
spark

In [0]:
## Reading data from AWS S3 Bucket
ball_by_ball_df = spark.read.format('csv').option("header","true").load("s3://ipl-data-analysis-project/Ball_By_Ball.csv")

In [0]:
ball_by_ball_df.show(5)

+--------+-------+-------+----------+------------+------------+------------------------+----------+-----------+----------+-----+-------+----+-------+-------+-------------+--------------+------+------+-------+---+------------+-------+-----------------+----------+----------------+-------------+----------+------+-------+-----------+------+----------+--------+----------------+---------+-------------------+-------------+----------------+----------+---------------+---------+------------------+--------------+--------------+------------+-------------+-----------+
|MatcH_id|Over_id|Ball_id|Innings_No|Team_Batting|Team_Bowling|Striker_Batting_Position|Extra_Type|Runs_Scored|Extra_runs|Wides|Legbyes|Byes|Noballs|Penalty|Bowler_Extras|      Out_type|Caught|Bowled|Run_out|LBW|Retired_hurt|Stumped|caught_and_bowled|hit_wicket|ObstructingFeild|Bowler_Wicket|Match_Date|Season|Striker|Non_Striker|Bowler|Player_Out|Fielders|Striker_match_SK|StrikerSK|NonStriker_match_SK|NONStriker_SK|Fielder_match_SK|Fi

In [0]:
## Doing above reads all the columns in string format
## Defining own schema during loading of data into spark_dataframe
from pyspark.sql.types import StructField, StructType, IntegerType, StringType, BooleanType, DateType, DecimalType

ball_by_ball_schema = StructType([
    StructField("match_id", IntegerType(), True),
    StructField("over_id", IntegerType(), True),
    StructField("ball_id", IntegerType(), True),
    StructField("innings_no", IntegerType(), True),
    StructField("team_batting", StringType(), True),
    StructField("team_bowling", StringType(), True),
    StructField("striker_batting_position", IntegerType(), True),
    StructField("extra_type", StringType(), True),
    StructField("runs_scored", IntegerType(), True),
    StructField("extra_runs", IntegerType(), True),
    StructField("wides", IntegerType(), True),
    StructField("legbyes", IntegerType(), True),
    StructField("byes", IntegerType(), True),
    StructField("noballs", IntegerType(), True),
    StructField("penalty", IntegerType(), True),
    StructField("bowler_extras", IntegerType(), True),
    StructField("out_type", StringType(), True),
    StructField("caught", BooleanType(), True),
    StructField("bowled", BooleanType(), True),
    StructField("run_out", BooleanType(), True),
    StructField("lbw", BooleanType(), True),
    StructField("retired_hurt", BooleanType(), True),
    StructField("stumped", BooleanType(), True),
    StructField("caught_and_bowled", BooleanType(), True),
    StructField("hit_wicket", BooleanType(), True),
    StructField("obstructingfeild", BooleanType(), True),
    StructField("bowler_wicket", BooleanType(), True),
    StructField("match_date", DateType(), True),
    StructField("season", IntegerType(), True),
    StructField("striker", IntegerType(), True),
    StructField("non_striker", IntegerType(), True),
    StructField("bowler", IntegerType(), True),
    StructField("player_out", IntegerType(), True),
    StructField("fielders", IntegerType(), True),
    StructField("striker_match_sk", IntegerType(), True),
    StructField("strikersk", IntegerType(), True),
    StructField("nonstriker_match_sk", IntegerType(), True),
    StructField("nonstriker_sk", IntegerType(), True),
    StructField("fielder_match_sk", IntegerType(), True),
    StructField("fielder_sk", IntegerType(), True),
    StructField("bowler_match_sk", IntegerType(), True),
    StructField("bowler_sk", IntegerType(), True),
    StructField("playerout_match_sk", IntegerType(), True),
    StructField("battingteam_sk", IntegerType(), True),
    StructField("bowlingteam_sk", IntegerType(), True),
    StructField("keeper_catch", BooleanType(), True),
    StructField("player_out_sk", IntegerType(), True),
    StructField("matchdatesk", DateType(), True)
])

# Using own schema to define the dataframe
ball_by_ball_df = spark.read.schema(ball_by_ball_schema).format('csv').option('header','true').load('s3://ipl-data-analysis-project/Ball_By_Ball.csv')

In [0]:
ball_by_ball_df.show(5)

+--------+-------+-------+----------+------------+------------+------------------------+----------+-----------+----------+-----+-------+----+-------+-------+-------------+--------------+------+------+-------+----+------------+-------+-----------------+----------+----------------+-------------+----------+------+-------+-----------+------+----------+--------+----------------+---------+-------------------+-------------+----------------+----------+---------------+---------+------------------+--------------+--------------+------------+-------------+-----------+
|match_id|over_id|ball_id|innings_no|team_batting|team_bowling|striker_batting_position|extra_type|runs_scored|extra_runs|wides|legbyes|byes|noballs|penalty|bowler_extras|      out_type|caught|bowled|run_out| lbw|retired_hurt|stumped|caught_and_bowled|hit_wicket|obstructingfeild|bowler_wicket|match_date|season|striker|non_striker|bowler|player_out|fielders|striker_match_sk|strikersk|nonstriker_match_sk|nonstriker_sk|fielder_match_sk|

In [0]:
match_schema = StructType([
    StructField("match_sk", IntegerType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("team1", StringType(), True),
    StructField("team2", StringType(), True),
    StructField("match_date", DateType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("venue_name", StringType(), True),
    StructField("city_name", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("toss_winner", StringType(), True),
    StructField("match_winner", StringType(), True),
    StructField("toss_name", StringType(), True),
    StructField("win_type", StringType(), True),
    StructField("outcome_type", StringType(), True),
    StructField("manofmach", StringType(), True),
    StructField("win_margin", IntegerType(), True),
    StructField("country_id", IntegerType(), True)
])
match_df = spark.read.schema(match_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Match.csv")


In [0]:
player_schema = StructType([
    StructField("player_sk", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True)
])

player_df = spark.read.schema(player_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Player.csv")


In [0]:
player_match_schema = StructType([
    StructField("player_match_sk", IntegerType(), True),
    StructField("playermatch_key", DecimalType(), True),
    StructField("match_id", IntegerType(), True),
    StructField("player_id", IntegerType(), True),
    StructField("player_name", StringType(), True),
    StructField("dob", DateType(), True),
    StructField("batting_hand", StringType(), True),
    StructField("bowling_skill", StringType(), True),
    StructField("country_name", StringType(), True),
    StructField("role_desc", StringType(), True),
    StructField("player_team", StringType(), True),
    StructField("opposit_team", StringType(), True),
    StructField("season_year", IntegerType(), True),
    StructField("is_manofthematch", BooleanType(), True),
    StructField("age_as_on_match", IntegerType(), True),
    StructField("isplayers_team_won", BooleanType(), True),
    StructField("batting_status", StringType(), True),
    StructField("bowling_status", StringType(), True),
    StructField("player_captain", StringType(), True),
    StructField("opposit_captain", StringType(), True),
    StructField("player_keeper", StringType(), True),
    StructField("opposit_keeper", StringType(), True)
])

player_match_df = spark.read.schema(player_match_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Player_match.csv")

     

In [0]:
team_schema = StructType([
    StructField("team_sk", IntegerType(), True),
    StructField("team_id", IntegerType(), True),
    StructField("team_name", StringType(), True)
])

team_df = spark.read.schema(team_schema).format("csv").option("header","true").load("s3://ipl-data-analysis-project/Team.csv")


In [0]:
from pyspark.sql.functions import col, when, sum, avg, row_number
# Filter extra deliveries from the dataframe. i.e This updated dataframe only consists of valid deliveries.

ball_by_ball_df = ball_by_ball_df.filter((col("wides") == 0) & (col("noballs") == 0))
## This won't apply transformation. In spark, generally all the transformations are stacked one after another at first, and after we apply some action, then only those transformation will be applied. Here, no any action is applied, only transformation logic is written. So, no any transformation occures.

In [0]:
# Apply aggregation function: Calculate the total runs scored in each match and inning
total_and_avg_run = ball_by_ball_df.groupBy("match_id", "innings_no").agg(
    sum("runs_scored").alias("total_runs"),
    avg("runs_scored").alias("average runs")
)

In [0]:
total_and_avg_run.show(5)

+--------+----------+----------+------------------+
|match_id|innings_no|total_runs|      average runs|
+--------+----------+----------+------------------+
|  980940|         1|       138|              1.15|
|  419132|         1|       162|              1.35|
| 1082632|         2|       202|1.9238095238095239|
|  335993|         2|       131|1.6794871794871795|
| 1082617|         1|       123|1.0512820512820513|
+--------+----------+----------+------------------+
only showing top 5 rows



In [0]:
# Using window function to calculate running total of runs in each match for each over

from pyspark.sql.window import Window
windowSpec = Window.partitionBy("match_id","innings_no").orderBy('over_id')

ball_by_ball_df = ball_by_ball_df.withColumn(
    "running_total_runs",
    sum("runs_scored").over(windowSpec)
)



In [0]:
# Flag a ball which have wicket or more than 6 runs is scored. Adding new flag column for this
# Conditional Column
ball_by_ball_df = ball_by_ball_df.withColumn(
    "high impact", 
    when((col("runs_scored") + col("extra_runs") > 6) | (col('bowler_wicket') == True), True).otherwise(False)
)

In [0]:
ball_by_ball_df.show(5)

+--------+-------+-------+----------+------------+------------+------------------------+----------+-----------+----------+-----+-------+----+-------+-------+-------------+--------------+------+------+-------+----+------------+-------+-----------------+----------+----------------+-------------+----------+------+-------+-----------+------+----------+--------+----------------+---------+-------------------+-------------+----------------+----------+---------------+---------+------------------+--------------+--------------+------------+-------------+-----------+------------------+-----------+
|match_id|over_id|ball_id|innings_no|team_batting|team_bowling|striker_batting_position|extra_type|runs_scored|extra_runs|wides|legbyes|byes|noballs|penalty|bowler_extras|      out_type|caught|bowled|run_out| lbw|retired_hurt|stumped|caught_and_bowled|hit_wicket|obstructingfeild|bowler_wicket|match_date|season|striker|non_striker|bowler|player_out|fielders|striker_match_sk|strikersk|nonstriker_match_sk|

In [0]:
from pyspark.sql.functions import year, month, dayofmonth
match_df = match_df.withColumn("Year",year("match_date"))
match_df = match_df.withColumn("Month",month("match_date"))
match_df = match_df.withColumn("Day", dayofmonth("match_date"))

## Creating flag field that highlightes the team which have won the toss and match both
match_df = match_df.withColumn(
    "toss_match_winner",
    when(col("toss_winner") == col("match_winner"),"Yes").otherwise("No")
)

match_df.show(5)

+--------+--------+--------------------+--------------------+----------+-----------+--------------------+----------+------------+--------------------+--------------------+---------+--------+------------+-----------+----------+----------+----+-----+----+-----------------+
|match_sk|match_id|               team1|               team2|match_date|season_year|          venue_name| city_name|country_name|         toss_winner|        match_winner|toss_name|win_type|outcome_type|  manofmach|win_margin|country_id|Year|Month| Day|toss_match_winner|
+--------+--------+--------------------+--------------------+----------+-----------+--------------------+----------+------------+--------------------+--------------------+---------+--------+------------+-----------+----------+----------+----+-----+----+-----------------+
|       0|  335987|Royal Challengers...|Kolkata Knight Ri...|      null|       2008|M Chinnaswamy Sta...| Bangalore|       India|Royal Challengers...|Kolkata Knight Ri...|    field|   

In [0]:
from pyspark.sql.functions import lower, regexp_replace

# Clean player name
player_df = player_df.withColumn("player_name",lower(regexp_replace("player_name","[^a-zA-Z0-9]","")))
#Handle missing entries
player_df = player_df.na.fill({"batting_hand": "unknown", "bowling_skill": "unknown"})

# Categorizing players based on batting hand
player_df = player_df.withColumn(
    "batting_style",
    when(col("batting_hand").contains("left"), "Left-Handed").otherwise("Right-Handed")
)


In [0]:
player_df.show()

+---------+---------+--------------+----+--------------+--------------------+------------+-------------+
|player_sk|player_id|   player_name| dob|  batting_hand|       bowling_skill|country_name|batting_style|
+---------+---------+--------------+----+--------------+--------------------+------------+-------------+
|        0|        1|     scganguly|null| Left-hand bat|    Right-arm medium|       India| Right-Handed|
|        1|        2|    bbmccullum|null|Right-hand bat|    Right-arm medium| New Zealand| Right-Handed|
|        2|        3|     rtponting|null|Right-hand bat|    Right-arm medium|   Australia| Right-Handed|
|        3|        4|      djhussey|null|Right-hand bat|  Right-arm offbreak|   Australia| Right-Handed|
|        4|        5|mohammadhafeez|null|Right-hand bat|  Right-arm offbreak|    Pakistan| Right-Handed|
|        5|        6|       rdravid|null|Right-hand bat|  Right-arm offbreak|       India| Right-Handed|
|        6|        7|       wjaffer|null|Right-hand bat

In [0]:
from pyspark.sql.functions import current_date, expr

# Add a column that highlights player under age 20.
player_match_df = player_match_df.withColumn(
    "under_20",
    when(col("age_as_on_match") < 20, 1).otherwise(0)
)

player_match_df.show(5)

+---------------+---------------+--------+---------+-----------+----+--------------+--------------------+------------+---------+--------------------+--------------------+-----------+----------------+---------------+------------------+--------------+--------------+--------------+---------------+-------------+--------------+--------+
|player_match_sk|playermatch_key|match_id|player_id|player_name| dob|  batting_hand|       bowling_skill|country_name|role_desc|         player_team|        opposit_team|season_year|is_manofthematch|age_as_on_match|isplayers_team_won|batting_status|bowling_status|player_captain|opposit_captain|player_keeper|opposit_keeper|under_20|
+---------------+---------------+--------+---------+-----------+----+--------------+--------------------+------------+---------+--------------------+--------------------+-----------+----------------+---------------+------------------+--------------+--------------+--------------+---------------+-------------+--------------+--------

In [0]:
## Converting all these sparkdataframe to sql tables
ball_by_ball_df.createOrReplaceTempView("ball_by_ball")
match_df.createOrReplaceTempView("match_")
player_df.createOrReplaceTempView("player")
player_match_df.createOrReplaceTempView("player_match")
team_df.createOrReplaceTempView("team")

In [0]:
ball_by_ball_df.columns

Out[23]: ['match_id',
 'over_id',
 'ball_id',
 'innings_no',
 'team_batting',
 'team_bowling',
 'striker_batting_position',
 'extra_type',
 'runs_scored',
 'extra_runs',
 'wides',
 'legbyes',
 'byes',
 'noballs',
 'penalty',
 'bowler_extras',
 'out_type',
 'caught',
 'bowled',
 'run_out',
 'lbw',
 'retired_hurt',
 'stumped',
 'caught_and_bowled',
 'hit_wicket',
 'obstructingfeild',
 'bowler_wicket',
 'match_date',
 'season',
 'striker',
 'non_striker',
 'bowler',
 'player_out',
 'fielders',
 'striker_match_sk',
 'strikersk',
 'nonstriker_match_sk',
 'nonstriker_sk',
 'fielder_match_sk',
 'fielder_sk',
 'bowler_match_sk',
 'bowler_sk',
 'playerout_match_sk',
 'battingteam_sk',
 'bowlingteam_sk',
 'keeper_catch',
 'player_out_sk',
 'matchdatesk',
 'running_total_runs',
 'high impact']

In [0]:
match_df.columns

Out[24]: ['match_sk',
 'match_id',
 'team1',
 'team2',
 'match_date',
 'season_year',
 'venue_name',
 'city_name',
 'country_name',
 'toss_winner',
 'match_winner',
 'toss_name',
 'win_type',
 'outcome_type',
 'manofmach',
 'win_margin',
 'country_id',
 'Year',
 'Month',
 'Day',
 'toss_match_winner']

In [0]:
orange_cap_winner = spark.sql("""
    SELECT season_year, player_name, runs_scored FROM
        (SELECT m.season_year
        ,p.player_name
        ,sum(b.runs_scored) as runs_scored
        ,rank()over(partition by m.season_year order by sum(b.runs_scored) desc) as rank
        FROM ball_by_ball b
        JOIN match_ m on b.match_id = m.match_id
        join player_match pm on pm.match_id = m.match_id and b.striker = pm.player_id  
        join player p on pm.player_id = p.player_id
        GROUP BY 1, 2
        order by 1,3 desc)
    WHERE rank = 1
                              """)

In [0]:
orange_cap_winner.show()

+-----------+-----------+-----------+
|season_year|player_name|runs_scored|
+-----------+-----------+-----------+
|       2008|    semarsh|        614|
|       2009|   mlhayden|        571|
|       2010|srtendulkar|        615|
|       2011|    chgayle|        598|
|       2012|    chgayle|        720|
|       2013|  mekhussey|        732|
|       2014|  rvuthappa|        659|
|       2015|   dawarner|        554|
|       2016|     vkohli|        962|
|       2017|   dawarner|        634|
+-----------+-----------+-----------+



In [0]:
# Highest run scored by young player
highest_run_by_young_player = spark.sql(
    """
        (SELECT m.season_year
        ,p.player_name
        ,pm.age_as_on_match
        ,sum(b.runs_scored) as runs_scored
        FROM ball_by_ball b
        JOIN match_ m on b.match_id = m.match_id
        join player_match pm on pm.match_id = m.match_id and b.striker = pm.player_id  
        join player p on pm.player_id = p.player_id
        where pm.under_20 = 1
        GROUP BY 1, 2,3
        order by 3 desc)
    """
)

In [0]:
highest_run_by_young_player.show(30)

+-----------+----------------+---------------+-----------+
|season_year|     player_name|age_as_on_match|runs_scored|
+-----------+----------------+---------------+-----------+
|       2017|     ishankishan|             19|        277|
|       2013|        svsamson|             19|        206|
|       2010|    mandeepsingh|             19|          4|
|       2008|        mkpandey|             19|          3|
|       2008|        sstiwary|             19|         26|
|       2009|     shoaibahmed|             19|          1|
|       2016|          rrpant|             19|        198|
|       2008|  pmsarveshkumar|             19|          1|
|       2008|    iqbalabdulla|             19|          1|
|       2008|    tmsrivastava|             19|          8|
|       2017|      rashidkhan|             19|         11|
|       2008|       spgoswami|             19|         82|
|       2012|           pnegi|             19|         36|
|       2009|          ynagar|             19|         2

In [0]:
top_5_indian_batsman = spark.sql("""
    SELECT season_year, player_name, runs_scored FROM
        (SELECT m.season_year
        ,p.player_name
        ,sum(b.runs_scored) as runs_scored
        ,rank()over(partition by m.season_year order by sum(b.runs_scored) desc) as rank
        FROM ball_by_ball b
        JOIN match_ m on b.match_id = m.match_id
        join player_match pm on pm.match_id = m.match_id and b.striker = pm.player_id  
        join player p on pm.player_id = p.player_id
        where p.country_name = 'India'
        GROUP BY 1, 2
        order by 1,3 desc)
    WHERE rank <= 5
                              """)

In [0]:
top_5_indian_batsman.show(100)

+-----------+-----------+-----------+
|season_year|player_name|runs_scored|
+-----------+-----------+-----------+
|       2008|   ggambhir|        532|
|       2008|   ykpathan|        430|
|       2008|    skraina|        420|
|       2008|    msdhoni|        414|
|       2008|    vsehwag|        399|
|       2008|   rgsharma|        399|
|       2009|    skraina|        433|
|       2009|srtendulkar|        363|
|       2009|   rgsharma|        354|
|       2009|yuvrajsingh|        340|
|       2009|    msdhoni|        332|
|       2010|srtendulkar|        615|
|       2010|    skraina|        520|
|       2010|  scganguly|        493|
|       2010|     mvijay|        458|
|       2010|   sstiwary|        417|
|       2011|     vkohli|        557|
|       2011|srtendulkar|        553|
|       2011| pcvalthaty|        453|
|       2011|    skraina|        437|
|       2011|     mvijay|        434|
|       2012|   ggambhir|        590|
|       2012|    sdhawan|        568|
|       2012

In [0]:
## How many teams does won the match after wining toss

toss_match_winner = spark.sql(
    """
    SELECT m.match_winner, count(*)
    from match_ m
    where m.toss_match_winner = 'Yes'
    GROUP BY 1
    order by 2 DESC
    """
)


In [0]:
toss_match_winner.show(30)

+--------------------+--------+
|        match_winner|count(1)|
+--------------------+--------+
|      Mumbai Indians|      48|
|Kolkata Knight Ri...|      44|
| Chennai Super Kings|      42|
|Royal Challengers...|      35|
|    Rajasthan Royals|      34|
|    Delhi Daredevils|      33|
|     Kings XI Punjab|      28|
|     Deccan Chargers|      19|
| Sunrisers Hyderabad|      17|
|       Gujarat Lions|      10|
|Rising Pune Super...|       7|
|Kochi Tuskers Kerala|       4|
|       Pune Warriors|       3|
+--------------------+--------+



In [0]:
## Most economical bowler in powerplay who bowls at least 5 overs in a powerplay in a season
economical_bowler_in_pp = spark.sql(
    """
    WITH CTE AS (
        SELECT p.player_name, pm.season_year, avg(b.runs_scored) as AVG_RUN
        , dense_rank()over(partition by pm.season_year order by avg(b.runs_scored)) as ranki
        FROM ball_by_ball b
        JOIN player p  on b.bowler = p.player_id
        JOIN player_match pm on pm.player_id = b.bowler_sk and pm.match_id = b.match_id
        JOIN match_ m on m.match_id = b.match_id
        where b.over_id < 6
        group by 2, 1
        having count(*) > 30
        order by 2,3,1
    )
    SELECT
    player_name, season_year, AVG_RUN
    FROM CTE WHERE ranki <= 5
    """
)


In [0]:
economical_bowler_in_pp.show(50)

+-------------+-----------+------------------+
|  player_name|season_year|           AVG_RUN|
+-------------+-----------+------------------+
|      isharma|       2008|0.7222222222222222|
| sohailtanvir|       2008|0.7407407407407407|
|      dwsteyn|       2008|0.8571428571428571|
|   ssreesanth|       2008|0.9444444444444444|
|       mntini|       2008|0.9722222222222222|
|         blee|       2009|0.9166666666666666|
|      isharma|       2009|0.9473684210526315|
|     jhkallis|       2009|               1.0|
|     ikpathan|       2009|1.2647058823529411|
|       msgony|       2009|            1.5625|
|     rmclaren|       2010|1.2380952380952381|
|     srwatson|       2010| 1.271186440677966|
|      isharma|       2010|1.3636363636363635|
|   mfmaharoof|       2010|1.4166666666666667|
|      rpsingh|       2010|1.4583333333333333|
| sjsrivastava|       2010|1.4583333333333333|
|      dwsteyn|       2011|0.9696969696969697|
|     acthomas|       2011| 1.037037037037037|
|      dwstey

In [0]:
ball_by_ball_df = ball_by_ball_df.withColumn(
    "bower_wicket_num", 
    when((col('bowler_wicket') == 'true'), 1).otherwise(0)
)
ball_by_ball_df.createOrReplaceTempView("ball_by_ball")

In [0]:
# purple cap holder
purple_cap = spark.sql(
    """
    WITH CTE AS (
        SELECT p.player_name, pm.season_year, avg(b.runs_scored) AS AVG_RUN 
        ,sum(bower_wicket_num) AS total_wicket
        , dense_rank()over(partition by pm.season_year order by avg(b.runs_scored)) as ranki
        FROM ball_by_ball b
        JOIN player p  on b.bowler = p.player_id
        JOIN player_match pm on pm.player_id = b.bowler_sk and pm.match_id = b.match_id
        JOIN match_ m on m.match_id = b.match_id
        group by 2, 1
        order by 2,4 desc,3,1
    )
    SELECT
    player_name, season_year, TOTAL_WICKET, AVG_RUN
    FROM CTE WHERE ranki = 1
    """
)

In [0]:
# Since we have data issue from the source. There is no data in bowler_wicket column, so we are getting 0 wickets
purple_cap.show()

+-----------+-----------+------------+------------------+
|player_name|season_year|TOTAL_WICKET|           AVG_RUN|
+-----------+-----------+------------+------------------+
|  pjsangwan|       2008|           0|0.7916666666666666|
|     cnanda|       2009|           0|0.6666666666666666|
|    vsehwag|       2010|           0|0.6764705882352942|
|    rvgomez|       2011|           0|0.6666666666666666|
|dtchristian|       2012|           0|0.6666666666666666|
| dskulkarni|       2013|           0|0.5833333333333334|
|   srwatson|       2014|           0|0.8333333333333334|
|mchenriques|       2015|           0|0.3333333333333333|
|     azampa|       2016|           0|0.7916666666666666|
|      ajtye|       2017|           0|               0.5|
+-----------+-----------+------------+------------------+

