In [1]:
# Importing Libraries
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [2]:
spark = SparkSession.builder.master("local").appName('Ops').getOrCreate()

In [3]:
#path = "E:/Rutgers/Projects/MDSR/IPL-MSDR"
path = '/Users/nidhiharwani/Desktop/Most_Valuable_Player_Prediction_using_IPL_Dataset'

# Final Analysis

In [4]:
# Reading data
matches = spark.read.csv(path + '/dataset/clean_data/matches.csv',inferSchema=True,header=True)
deliveries = spark.read.csv(path + '/dataset/clean_data/deliveries.csv',inferSchema=True,header=True)

In [5]:
# Creating temporary tables of the data
matches.registerTempTable('matches_db')
deliveries.registerTempTable('deliveries_db')

In [6]:
# Merging both the tables
merged_db = spark.sql('select m.*,d.* from matches_db as m inner join deliveries_db as d on m.id=d.match_id')
merged_db.registerTempTable('analysis_db')

## Batting Metrics 

In [7]:
# nmba: no. of batsmen
# nm: no. of matches played by a batsman
# hha: hard hitting ability
# f: finisher
# fsa: fast scoring ability
# con: consistency
# rbw: running between wickets

In [8]:
# Number of Batsmen
nmba = spark.sql('select count(distinct(batsman)) as No_of_Batsman from analysis_db')
nmba.show()

+-------------+
|No_of_Batsman|
+-------------+
|          516|
+-------------+



In [9]:
# Number of Matches played by a batsmen
nm = spark.sql('select batsman, count(distinct(match_id)) as No_of_Matches \
                from analysis_db group by batsman')
nm.registerTempTable('no_of_matches_table')

### Hard Hitting Ability 

In [10]:
# Hard Hitting Ability = (4*Fours + 6*Sixes)/Balls Played by Batsman
hha = spark.sql('select nmt.batsman as Batsman, round(nvl(t4.hard_hitting_ability,0), 5) as \
                Hard_Hitting_Ability from \
                (select t1.batsman, (t1.fours*4 + t2.sixes*6)/t3.balls_played as hard_hitting_ability\
                from (select batsman,count(*) as fours from analysis_db where batsman_runs = 4 group by batsman) t1 \
                inner join  \
                (select batsman,count(*) as sixes from analysis_db where batsman_runs = 6 \
                group by batsman) t2 on t1.batsman=t2.batsman\
                inner join\
                (select batsman,count(*) as balls_played from analysis_db \
                group by batsman) t3 on t3.batsman=t1.batsman) t4 \
                right join \
                no_of_matches_table nmt on t4.batsman = nmt.batsman')
hha.registerTempTable('hard_hitting_ability')

In [11]:
hha = spark.sql('select rank() over (order by Hard_Hitting_Ability desc) as Rank, t1.* \
                from hard_hitting_ability t1 \
                inner join \
                no_of_matches_table t2\
                on t1.batsman = t2.batsman where no_of_matches>9')
hha.registerTempTable('hard_hitting_ability')

In [12]:
hha = spark.sql('select t1.*, round((240-rank)/240, 5) as Weights \
                from hard_hitting_ability t1')
hha.registerTempTable('hard_hitting_ability')
hha.show(10)

+----+-------------+--------------------+-------+
|Rank|      Batsman|Hard_Hitting_Ability|Weights|
+----+-------------+--------------------+-------+
|   1|   AD Russell|             1.37733|0.99583|
|   2|    SP Narine|             1.33056|0.99167|
|   3|        M Ali|             1.21311| 0.9875|
|   4|    KK Cooper|                 1.2|0.98333|
|   5|  BCJ Cutting|             1.19178|0.97917|
|   6|    K Gowtham|             1.16279|  0.975|
|   7|CR Brathwaite|             1.13333|0.97083|
|   8|     CH Gayle|             1.10699|0.96667|
|   9|  Rashid Khan|             1.10448| 0.9625|
|  10|   GJ Maxwell|             1.09313|0.95833|
+----+-------------+--------------------+-------+
only showing top 10 rows



### Finisher 

In [13]:
# Finisher = Not Out innings/Total Innings played
f = spark.sql('select t3.batsman as Batsman, round(t3.not_out_innings/t4.total_matches_played, 5) as Finisher from\
              (select t1.batsman, t1.matches_played-t2.number_of_times_out as not_out_innings from \
              (select batsman, count(distinct(match_id)) as matches_played from analysis_db group by batsman) t1\
              inner join \
              (select batsman, count(*) as number_of_times_out from analysis_db where player_dismissed = batsman \
              group by batsman) t2\
              on t1.batsman=t2.batsman) t3\
              inner join\
              (select batsman, count(distinct(match_id)) as total_matches_played \
              from analysis_db group by batsman) t4\
              on t3.batsman = t4.batsman')
f.registerTempTable('finisher')

In [14]:
f = spark.sql('select rank() over (order by finisher desc) as Rank, t1.* \
              from finisher t1 \
              inner join \
              no_of_matches_table t2 \
              on t1.batsman = t2.batsman \
              where no_of_matches>9')
f.registerTempTable('finisher')

In [15]:
f = spark.sql('select t1.*, round((240-rank)/240, 5) as Weights \
              from finisher t1')
f.registerTempTable('finisher')
f.show(10)

+----+--------------+--------+-------+
|Rank|       Batsman|Finisher|Weights|
+----+--------------+--------+-------+
|   1| Iqbal Abdulla| 0.92308|0.99583|
|   2|      A Kumble| 0.86667|0.99167|
|   3|Sandeep Sharma| 0.78571| 0.9875|
|   4|   S Sreesanth|    0.75|0.98333|
|   5|     S Aravind|     0.7|0.97917|
|   5|     JJ Bumrah|     0.7|0.97917|
|   5|      VR Aaron|     0.7|0.97917|
|   8|     YS Chahal| 0.66667|0.96667|
|   8|      I Sharma| 0.66667|0.96667|
|  10|  Bipul Sharma| 0.64706|0.95833|
+----+--------------+--------+-------+
only showing top 10 rows



### Fast Scoring Ability

In [16]:
# Fast Scoring Ability = Total Runs/Balls Played by Batsman
fsa = spark.sql('select batsman as Batsman, round(Total_Runs/balls_played, 5) as Fast_Scoring_Ability \
                  from (select batsman,sum(batsman_runs) as Total_Runs, count(*) as balls_played \
                  from analysis_db group by batsman)')
fsa.registerTempTable('fast_scoring_ability')

In [17]:
fsa = spark.sql('select rank() over (order by fast_scoring_ability desc) as Rank, t1.* \
                  from fast_scoring_ability t1 \
                  inner join \
                  no_of_matches_table t2 \
                  on t1.batsman = t2.batsman where no_of_matches>9')
fsa.registerTempTable('fast_scoring_ability')

In [18]:
fsa = spark.sql('select t1.*, round((240-rank)/240, 5) as Weights \
                from fast_scoring_ability t1')
fsa.registerTempTable('fast_scoring_ability')
fsa.show(10)

+----+-------------+--------------------+-------+
|Rank|      Batsman|Fast_Scoring_Ability|Weights|
+----+-------------+--------------------+-------+
|   1|   AD Russell|              1.7995|0.99583|
|   2|    K Gowtham|             1.72093|0.99167|
|   3|        M Ali|             1.69945| 0.9875|
|   4|    SP Narine|             1.66944|0.98333|
|   5|    KK Cooper|             1.65714|0.97917|
|   6|  BCJ Cutting|             1.64384|  0.975|
|   7|  Rashid Khan|             1.62687|0.97083|
|   8|      RR Pant|             1.62319|0.96667|
|   9|   J Bairstow|             1.59727| 0.9625|
|  10|CR Brathwaite|             1.56667|0.95833|
+----+-------------+--------------------+-------+
only showing top 10 rows



### Consistency

In [19]:
# Consistency = Total Runs/Number of Times Out
con = spark.sql('select t1.batsman as Batsman, round(t1.Total_runs/t2.no_of_times_dismissed, 5) as Consistency \
                from (select batsman,sum(batsman_runs) as Total_runs \
                from analysis_db group by batsman) t1 \
                inner join \
                (select batsman, count(*) as no_of_times_dismissed \
                from analysis_db where player_dismissed is not null \
                group by batsman) t2 on t1.batsman=t2.batsman')
con.registerTempTable('consistency')

In [20]:
con = spark.sql('select rank() over (order by consistency desc) as Rank, t1.* \
                  from consistency t1 \
                  inner join \
                  no_of_matches_table t2 \
                  on t1.batsman = t2.batsman where no_of_matches>9')
con.registerTempTable('consistency')

In [21]:
con = spark.sql('select t1.*, round((240-Rank)/240, 5) as Weights \
                from consistency t1')
con.registerTempTable('consistency')
con.show(10)

+----+-------------+-----------+-------+
|Rank|      Batsman|Consistency|Weights|
+----+-------------+-----------+-------+
|   1|   AD Russell|     1.7995|0.99583|
|   2|    K Gowtham|    1.72093|0.99167|
|   3|        M Ali|    1.69945| 0.9875|
|   4|    SP Narine|    1.66944|0.98333|
|   5|    KK Cooper|    1.65714|0.97917|
|   6|  BCJ Cutting|    1.64384|  0.975|
|   7|  Rashid Khan|    1.62687|0.97083|
|   8|      RR Pant|    1.62319|0.96667|
|   9|   J Bairstow|    1.59727| 0.9625|
|  10|CR Brathwaite|    1.56667|0.95833|
+----+-------------+-----------+-------+
only showing top 10 rows



###  Running Between Wickets

In [22]:
# Running Between Wickets = (Total Runs – (4*Fours + 6*Sixes))/(Total Balls Played – Boundary Balls)
rbw = spark.sql('select t9.batsman as Batsman, round(nvl(t8.running_between_wickets,0), 5) as Running_Between_Wickets \
                from (select t4.batsman, t4.first_bracket/t7.second_bracket as Running_Between_Wickets \
                from (select t1.batsman, t3.total_runs-(t1.fours*4 + t2.sixes*6) as first_bracket \
                from (select batsman,count(*) as fours from analysis_db where batsman_runs = 4 \
                group by batsman) t1 \
                inner join \
                (select batsman,count(*) as sixes from analysis_db where batsman_runs = 6 group by batsman) t2 \
                on t1.batsman=t2.batsman \
                inner join \
                (select batsman,sum(batsman_runs) as total_runs from analysis_db group by batsman) t3 \
                on t3.batsman=t1.batsman) t4 \
                inner join\
                (select t5.batsman, t5.total_balls_played-t6.boundry_balls as second_bracket from \
                (select batsman, count(*) as total_balls_played from analysis_db group by batsman) t5 \
                inner join \
                (select batsman, count(*) as boundry_balls from analysis_db where batsman_runs=4 or batsman_runs=6 \
                group by batsman) t6\
                on t5.batsman=t6.batsman) t7 \
                on t4.batsman=t7.batsman) t8 \
                right join \
                no_of_matches_table t9 \
                on t8.batsman = t9.batsman')
rbw.registerTempTable('running_between_wickets')

In [23]:
rbw = spark.sql('select rank() over (order by running_between_wickets desc) as Rank, t1.* \
                  from running_between_wickets t1 \
                  inner join \
                  no_of_matches_table t2\
                  on t1.batsman = t2.batsman where no_of_matches>9')
rbw.registerTempTable('running_between_wickets')

In [24]:
rbw = spark.sql('select t1.*, round((240-rank)/240, 5) as Weights \
                from running_between_wickets t1')
rbw.registerTempTable('running_between_wickets')
rbw.show(10)

+----+--------------+-----------------------+-------+
|Rank|       Batsman|Running_Between_Wickets|Weights|
+----+--------------+-----------------------+-------+
|   1|  Bipul Sharma|                0.85577|0.99583|
|   2|       TM Head|                0.83206|0.99167|
|   3|    WPUJC Vaas|                0.80882| 0.9875|
|   4|     V Shankar|                0.79245|0.98333|
|   5|      M Kartik|                0.78218|0.97917|
|   6| Mohammad Nabi|                0.77922|  0.975|
|   7|     BA Stokes|                0.76471|0.97083|
|   8|A Ashish Reddy|                0.76364|0.96667|
|   8|     CH Morris|                0.76364|0.96667|
|  10|        S Gill|                0.76235|0.95833|
+----+--------------+-----------------------+-------+
only showing top 10 rows



In [25]:
# Table Name for each Metric
# Hard Hitting Ability: hard_hitting_ability
# Finisher: finisher
# Fast Scoring Ability: fast_scoring_ability
# Consistency: consistency
# Running Between Wickets: running_between_wickets

##  Total Batting Weights

In [26]:
total_batting_weight = spark.sql('select hht.Batsman, round((hht.Weights+f.Weights+fsa.Weights+c.Weights+rbw.Weights), 5) as Total_Batting_Weights \
                                 from hard_hitting_ability hht \
                                 inner join finisher f \
                                 on hht.Batsman = f.Batsman \
                                 inner join fast_scoring_ability fsa \
                                 on hht.Batsman = fsa.Batsman \
                                 inner join consistency c \
                                 on hht.Batsman = c.Batsman \
                                 inner join running_between_wickets rbw \
                                 on hht.Batsman = rbw.Batsman \
                                 order by Total_Batting_Weights desc')
total_batting_weight.registerTempTable('total_batting_weight')
total_batting_weight.show(100)

+-----------------+---------------------+
|          Batsman|Total_Batting_Weights|
+-----------------+---------------------+
|        CH Morris|              4.60416|
|     Bipul Sharma|              4.55833|
|        K Gowtham|              4.40834|
|Washington Sundar|              4.38332|
|        HH Pandya|              4.35417|
|         HV Patel|              4.24999|
|     Ankit Sharma|              4.19582|
|      Rashid Khan|              4.14583|
|      BCJ Cutting|              4.14167|
|    Mohammad Nabi|              4.12916|
|   A Ashish Reddy|                4.125|
|          SN Khan|              4.12084|
|   AB de Villiers|              4.11667|
|            M Ali|              4.10417|
|       J Bairstow|                  4.1|
|        KH Pandya|              4.06667|
|         M Morkel|              4.04165|
|      MF Maharoof|              4.02916|
|        JA Morkel|              3.99583|
|          RR Pant|              3.97084|
|         MS Dhoni|               

In [27]:
# Dropping intermediate tables
table_names = ['no_of_matches_table', 'hard_hitting_ability', 'finisher', 'fast_scoring_ability', 'consistency', 'running_between_wickets']
for table in table_names:
    cmd = 'drop table if exists {}'.format(table)
    drop = spark.sql(cmd)
check = spark.sql('show tables')
check.show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
|        |         analysis_db|       true|
|        |       deliveries_db|       true|
|        |          matches_db|       true|
|        |total_batting_weight|       true|
+--------+--------------------+-----------+



## Bowling Metrics 

In [28]:
# nmbo: no. of bowlers
# nmb: no. of matches played by a bowler
# eco: economy
# wta: wicket taking ability
# cons: consistency
# cwta: crucial wicket taking ability
# spi: short performance index

In [29]:
# Number of Bowlers
nmbo = spark.sql('Select count(distinct(bowler)) as No_of_Bowlers from analysis_db')
nmbo.show()

+-------------+
|No_of_Bowlers|
+-------------+
|          405|
+-------------+



In [30]:
# Number of matches played by a bowler
nmb = spark.sql('select bowler as Bowler, count(distinct(match_id)) as No_of_Matches from analysis_db group by bowler')
nmb.registerTempTable('no_of_matches_bowlers')

### Economy 

In [31]:
# Economy = Runs Scored/(Number of balls bowled by bowler/6)
eco = spark.sql('Select bowler as Bowler, round(runs/overs, 5) as Economy \
                from (Select bowler,round(count(*)/6) \
                as overs,sum(total_runs) as runs \
                from analysis_db \
                group by bowler)')
eco.registerTempTable('economy')

In [32]:
eco = spark.sql('select row_number() over (order by e.Economy asc) as Rank, e.*,n.No_of_Matches \
                from economy e \
                inner join \
                no_of_matches_bowlers n \
                on e.Bowler = n.Bowler where n.No_of_Matches>9')
eco.registerTempTable('economy')

In [33]:
eco = spark.sql('select *, round((212 - Rank)/212, 5) as Weights from economy')
eco.registerTempTable('economy')
eco.show(10)

+----+----------------+-------+-------------+-------+
|Rank|          Bowler|Economy|No_of_Matches|Weights|
+----+----------------+-------+-------------+-------+
|   1|   Sohail Tanvir|   6.25|           11|0.99528|
|   2|      A Chandila|6.28205|           12|0.99057|
|   3|         J Yadav|6.52632|           12|0.98585|
|   4|      SM Pollock|6.53191|           13|0.98113|
|   5|        A Kumble|6.64024|           42|0.97642|
|   6|      GD McGrath|6.65455|           14| 0.9717|
|   7|        DW Steyn|6.66848|           92|0.96698|
|   8|  M Muralitharan|6.68561|           66|0.96226|
|   9|RN ten Doeschate|6.71429|           10|0.95755|
|  10|       RD Chahar|6.71429|           15|0.95283|
+----+----------------+-------+-------------+-------+
only showing top 10 rows



### Wicket Taking Ability

In [34]:
# Wicket Taking Ability = Number of balls bowled/Wickets Taken
wta = spark.sql('(Select t1.bowler as Bowler, round(t2.balls/t1.wickets, 5) as Wicket_Taking_Ability from \
                (Select bowler,count(*) as wickets from analysis_db where player_dismissed is not null \
                and (dismissal_kind = \'bowled\' or  dismissal_kind = \'hit wicket\' \
                or  dismissal_kind = \'stumped\' or  dismissal_kind = \'lbw\' \
                or  dismissal_kind = \'caught and bowled\' or  dismissal_kind = \'caught\') \
                group by bowler) t1 \
                inner join \
                (select count(*) as balls,bowler from analysis_db group by bowler)t2 on \
                t1.bowler = t2.bowler)')
wta.registerTempTable('wicket_taking_ability')

In [35]:
wta = spark.sql('select row_number() over (order by w.Wicket_Taking_Ability asc) as Rank, w.*, n.No_of_Matches \
                from wicket_taking_ability w \
                inner join \
                no_of_matches_bowlers n on \
                w.Bowler = n.Bowler where n.No_of_Matches>9')
wta.registerTempTable('wicket_taking_ability')

In [36]:
wta = spark.sql('select *,round((212-Rank)/212, 5) as Weights \
                from wicket_taking_ability')
wta.registerTempTable('wicket_taking_ability')
wta.show(10)

+----+--------------+---------------------+-------------+-------+
|Rank|        Bowler|Wicket_Taking_Ability|No_of_Matches|Weights|
+----+--------------+---------------------+-------------+-------+
|   1|       A Zampa|             11.84211|           11|0.99528|
|   2| Sohail Tanvir|             12.04545|           11|0.99057|
|   3|       K Ahmed|             12.68421|           10|0.98585|
|   4|        N Rana|             13.42857|           12|0.98113|
|   5|      K Rabada|                 14.0|           18|0.97642|
|   6|      BJ Hodge|                 14.0|           20| 0.9717|
|   7|  CRD Fernando|             14.64706|           10|0.96698|
|   8|    YA Abdulla|                 14.8|           11|0.96226|
|   9|A Ashish Reddy|                 15.0|           20|0.95755|
|  10|       S Gopal|             15.60526|           30|0.95283|
+----+--------------+---------------------+-------------+-------+
only showing top 10 rows



### Consistency 

In [37]:
# Consistency = Runs Conceded/Wickets Taken
cons = spark.sql('select t1.bowler as Bowler, round(t1.runs/t2.wickets, 5) as Consistency \
                 from (select sum(total_runs) as runs,bowler from analysis_db group by bowler) t1 \
                 inner join \
                 (Select bowler,count(*) as wickets from analysis_db where player_dismissed is not null \
                 and (dismissal_kind = \'bowled\' or  dismissal_kind = \'hit wicket\' \
                 or dismissal_kind = \'stumped\' or  dismissal_kind = \'lbw\' \
                 or dismissal_kind = \'caught and bowled\' or  dismissal_kind = \'caught\') \
                 group by bowler)t2 on t1.bowler = t2.bowler')
cons.registerTempTable('consistency')

In [38]:
cons = spark.sql('select row_number() over (order by c.Consistency asc) as Rank, c.*,n.No_of_Matches \
                 from consistency c \
                 inner join \
                 no_of_matches_bowlers n on \
                 c.Bowler = n.Bowler where n.No_of_Matches>9')
cons.registerTempTable('consistency')

In [39]:
cons = spark.sql('select *, round((212-Rank)/212, 5) as Weights from consistency')
cons.registerTempTable('consistency')
cons.show(10)

+----+--------------+-----------+-------------+-------+
|Rank|        Bowler|Consistency|No_of_Matches|Weights|
+----+--------------+-----------+-------------+-------+
|   1| Sohail Tanvir|       12.5|           11|0.99528|
|   2|       A Zampa|   14.78947|           11|0.99057|
|   3|        N Rana|   17.71429|           12|0.98585|
|   4|  CRD Fernando|       18.0|           10|0.98113|
|   5|      BJ Hodge|   18.23529|           20|0.97642|
|   6|       K Ahmed|   18.47368|           10| 0.9717|
|   7|AD Mascarenhas|   19.21053|           13|0.96698|
|   8|      K Rabada|   19.32258|           18|0.96226|
|   9|  DE Bollinger|   19.35135|           27|0.95755|
|  10|   MF Maharoof|    19.7037|           20|0.95283|
+----+--------------+-----------+-------------+-------+
only showing top 10 rows



### Crucial Wicket Taking Ability

In [40]:
# Crucial Wicket Taking Ability = Number of times Four or Five Wickets Taken/Number of Innings Played
cwta = spark.sql('select t2.bowler as Bowler, round(nvl(t1.no_of_4wickets/t2.innings,0), 5) as Crucial_Wicket_Taking_Ablity \
                 from (select bowler,count(*) as no_of_4wickets from (select * from \
                 (select match_id,bowler,count(*) as wickets from analysis_db where player_dismissed \
                 is not null \
                 and (dismissal_kind = \'bowled\' or  dismissal_kind = \'hit wicket\' \
                 or  dismissal_kind = \'stumped\' or  dismissal_kind = \'lbw\' \
                 or  dismissal_kind = \'caught and bowled\' or  dismissal_kind = \'caught\') \
                 group by bowler,match_id ) \
                 where wickets > 3) group by bowler)t1 \
                 right join \
                 (select bowler,count(match_id) as \
                 innings from (select distinct(match_id),bowler from analysis_db) \
                 group by bowler)t2 \
                 on t1.bowler = t2.bowler order by Crucial_Wicket_Taking_Ablity desc')
cwta.registerTempTable('crucial_wicket_taking_ablity')

In [41]:
cwta = spark.sql('select rank() over (order by cw.Crucial_Wicket_Taking_Ablity desc) as Rank, cw.*,n.No_of_Matches \
                 from crucial_wicket_taking_ablity cw \
                 inner join no_of_matches_bowlers n on \
                 cw.Bowler = n.Bowler where n.No_of_Matches>9')
cwta.registerTempTable('crucial_wicket_taking_ablity')

In [42]:
cwta = spark.sql('select *, round((212-Rank)/212, 5) as Weights \
                 from crucial_wicket_taking_ablity')
cwta.registerTempTable('crucial_wicket_taking_ablity')
cwta.show(10)

+----+-------------+----------------------------+-------------+-------+
|Rank|       Bowler|Crucial_Wicket_Taking_Ablity|No_of_Matches|Weights|
+----+-------------+----------------------------+-------------+-------+
|   1|Sohail Tanvir|                     0.18182|           11|0.99528|
|   1|   YA Abdulla|                     0.18182|           11|0.99528|
|   3|       AJ Tye|                     0.15385|           26|0.98585|
|   4|     K Rabada|                     0.11111|           18|0.98113|
|   5|     J Theron|                         0.1|           10|0.97642|
|   5|  PC Valthaty|                         0.1|           10|0.97642|
|   5| CRD Fernando|                         0.1|           10|0.97642|
|   8|      A Zampa|                     0.09091|           11|0.96226|
|   8|    CJ Jordan|                     0.09091|           11|0.96226|
|  10|   A Chandila|                     0.08333|           12|0.95283|
+----+-------------+----------------------------+-------------+-

### Short Performance Index

In [43]:
# Short Performance Index = (Wickets Taken – 4* Number of Times Four Wickets Taken – 5* Number of Times Five Wickets Taken)/(Innings Played – Number of Times Four Wickets or Five Wickets Taken)
spi = spark.sql('select n.bowler as Bowler, round(nvl(t5.Short_Performance_Index,0), 5) as Short_Performance_Index \
                from (select t1.bowler,(t3.wickets - 4*t1.no_of_4wickets - 5*t2.no_of_4wickets)/ \
                (t4.innings - t1.no_of_4wickets - t2.no_of_4wickets) as Short_Performance_Index \
                from (select bowler,count(*) as no_of_4wickets \
                from (select * from (select match_id,bowler,count(*) as wickets from analysis_db where player_dismissed \
                is not null \
                and (dismissal_kind = \'bowled\' or  dismissal_kind = \'hit wicket\' \
                or  dismissal_kind = \'stumped\' or  dismissal_kind = \'lbw\' \
                or  dismissal_kind = \'caught and bowled\' or  dismissal_kind = \'caught\')\
                group by bowler, match_id ) \
                where wickets = 4) group by bowler) t1 \
                inner join \
                (select bowler,count(*) as no_of_4wickets from (select * from \
                (select match_id,bowler,count(*) as wickets from analysis_db where player_dismissed \
                is not null \
                and (dismissal_kind = \'bowled\' or  dismissal_kind = \'hit wicket\' \
                or dismissal_kind = \'stumped\' or  dismissal_kind = \'lbw\' \
                or dismissal_kind = \'caught and bowled\' or  dismissal_kind = \'caught\')\
                group by bowler,match_id ) \
                where wickets = 5) group by bowler) t2 \
                inner join \
                (select bowler,count(*) as wickets from analysis_db where player_dismissed is not null \
                and (dismissal_kind = \'bowled\' or  dismissal_kind = \'hit wicket\' \
                or  dismissal_kind = \'stumped\' or  dismissal_kind = \'lbw\' \
                or  dismissal_kind = \'caught and bowled\' or  dismissal_kind = \'caught\') \
                group by bowler) t3 \
                inner join \
                (select bowler,count(match_id) as \
                innings from (select distinct(match_id),bowler from analysis_db) group by bowler) t4 \
                on t1.bowler = t2.bowler and t1.bowler = t3.bowler and t1.bowler = t4.bowler) t5 \
                right join \
                no_of_matches_bowlers n on t5.Bowler = n.Bowler order by Short_Performance_Index desc')
spi.registerTempTable('short_performance_index')

In [44]:
spi = spark.sql('select rank() over (order by sp.Short_Performance_Index desc) as Rank, sp.*, n.No_of_Matches \
                from short_performance_index sp \
                inner join \
                no_of_matches_bowlers n on \
                sp.Bowler = n.Bowler where n.No_of_Matches>9')
spi.registerTempTable('short_performance_index')

In [45]:
spi = spark.sql('select *, round((212-Rank)/212, 5) as Weights \
                from short_performance_index')
spi.registerTempTable('short_performance_index')
spi.show(10)

+----+---------------+-----------------------+-------------+-------+
|Rank|         Bowler|Short_Performance_Index|No_of_Matches|Weights|
+----+---------------+-----------------------+-------------+-------+
|   1|     SL Malinga|                1.22609|          122|0.99528|
|   2|        B Kumar|                1.05263|          117|0.99057|
|   3|       MM Patel|                1.01667|           63|0.98585|
|   4|         AJ Tye|                    1.0|           26|0.98113|
|   5|       A Mishra|                0.97203|          147|0.97642|
|   6|      SP Narine|                0.91176|          109| 0.9717|
|   7|Harbhajan Singh|                0.90968|          157|0.96698|
|   8|       L Balaji|                0.85507|           73|0.96226|
|   9|    JP Faulkner|                0.82456|           60|0.95755|
|  10|       A Kumble|                0.82051|           42|0.95283|
+----+---------------+-----------------------+-------------+-------+
only showing top 10 rows



In [46]:
# Table Name for each Metric
# Economy: economy
# Wicket Taking Ability: wicket_taking_ability
# Consistency: consistency
# Crucial Wicket Taking Ability: crucial_wicket_taking_ablity
# Short Performance Index: short_performance_index

## Total Bowling Weights

In [47]:
total_bowling_weight = spark.sql('select e.bowler as Bowler, round((e.Weights+wta.Weights+c.Weights+cwta.Weights+spi.Weights), 5) as Total_Bowling_Weights \
                                 from economy e \
                                 inner join wicket_taking_ability wta \
                                 on e.Bowler = wta.Bowler \
                                 inner join consistency c \
                                 on e.Bowler = c.Bowler \
                                 inner join crucial_wicket_taking_ablity cwta \
                                 on e.Bowler = cwta.Bowler \
                                 inner join short_performance_index spi \
                                 on e.Bowler = spi.Bowler \
                                 order by Total_Bowling_Weights desc')
total_bowling_weight.registerTempTable('total_bowling_weight')
total_bowling_weight.show(10)

+---------------+---------------------+
|         Bowler|Total_Bowling_Weights|
+---------------+---------------------+
|  Sohail Tanvir|              4.91981|
|   CRD Fernando|              4.68397|
| AD Mascarenhas|              4.67925|
|        A Zampa|              4.67925|
|     SL Malinga|              4.59906|
|   DE Bollinger|              4.51416|
|     A Chandila|              4.45755|
|       MA Starc|              4.42454|
|NM Coulter-Nile|              4.34435|
|      SP Narine|              4.33491|
+---------------+---------------------+
only showing top 10 rows



In [48]:
# Dropping intermediate Tables
table_names = ['no_of_matches_bowlers', 'economy', 'wicket_taking_ability', 'consistency', 'crucial_wicket_taking_ablity', 'short_performance_index']
for table in table_names:
    cmd = 'drop table if exists {}'.format(table)
    drop = spark.sql(cmd)
check = spark.sql('show tables')
check.show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
|        |         analysis_db|       true|
|        |       deliveries_db|       true|
|        |          matches_db|       true|
|        |total_batting_weight|       true|
|        |total_bowling_weight|       true|
+--------+--------------------+-----------+



## Total Weights per Player

In [49]:
total_weight = spark.sql('select *, round((coalesce(total_batting_weight, 0) + coalesce(total_bowling_weight, 0)),5) as Total_Weight \
                          from (select t1.batsman as Player, round(nvl(t1.Total_Batting_Weights,0),5) as Total_Batting_Weight, \
                          round(nvl(t2.Total_Bowling_Weights,0),5) as Total_Bowling_Weight \
                          from total_batting_weight t1 \
                          full outer join total_bowling_weight t2 \
                          on t1.batsman = t2.bowler) \
                          order by Total_Weight desc')
total_weight.registerTempTable('total_weight')
total_weight.show(10)

+-----------------+--------------------+--------------------+------------+
|           Player|Total_Batting_Weight|Total_Bowling_Weight|Total_Weight|
+-----------------+--------------------+--------------------+------------+
|        CH Morris|             4.60416|             3.74057|     8.34473|
|      MF Maharoof|             4.02916|             4.26415|     8.29331|
|      Rashid Khan|             4.14583|             4.12265|     8.26848|
|    Mohammad Nabi|             4.12916|             3.77831|     7.90747|
|        KK Cooper|             3.88334|              3.9953|     7.87864|
|        SP Narine|             3.53749|             4.33491|      7.8724|
|           AJ Tye|             3.69167|             4.08491|     7.77658|
|   A Ashish Reddy|               4.125|             3.51887|     7.64387|
|         M Morkel|             4.04165|             3.58491|     7.62656|
|Washington Sundar|             4.38332|              3.2217|     7.60502|
+-----------------+------

In [50]:
# Dropping intermediate tables
table_names = ['total_batting_weight', 'total_bowling_weight']
for table in table_names:
    cmd = 'drop table if exists {}'.format(table)
    drop = spark.sql(cmd)
check = spark.sql('show tables')
check.show()

+--------+-------------+-----------+
|database|    tableName|isTemporary|
+--------+-------------+-----------+
|        |  analysis_db|       true|
|        |deliveries_db|       true|
|        |   matches_db|       true|
|        | total_weight|       true|
+--------+-------------+-----------+



In [51]:
# Saving the player weight data
total_weight.toPandas().to_csv(path + '/dataset/weights_data/player_weights.csv')