In [0]:
import pyspark

#SECTION-1

In [0]:
file_location = "/FileStore/tables/ipl_ball_by_ball.csv"  # File location and type
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df1 = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

To change the data type I have used withcolumn() function

In [0]:
df1 = df1.withColumn("inning",df1["inning"].cast('integer')) \
         .withColumn("is_wicket",df1["is_wicket"].cast('integer')) 

* All the other names are already updated with correct names as we have used header value as column name
* Using withColumnRenamed() function To change Column name

In [0]:
New_df1 = df1.withColumnRenamed("ball","ball_num") \
             .withColumnRenamed("batsman","current_batsman")\
             .withColumnRenamed("fielder","fielder_name") \
             .withColumnRenamed("bowler","bowler_name") \
             .withColumnRenamed("overs","current_over")
New_df1.printSchema()

Creating a view or table from our New_df1 Dataframe

In [0]:
per_ball_Table = "ipl_ball_by_ball_csv"
New_df1.createOrReplaceTempView("per_ball_Table")

Now Lets work on second Dataset which is Summary of IPL Matches

In [0]:
file_location2 = "/FileStore/tables/ipl_matches.csv"

df2 = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location2)

In [0]:
from pyspark.sql.functions import regexp_replace

New_df2 = df2.withColumn('method', regexp_replace('method','D/L','1'))   # Changing The Value of 'D/L' in method column by 1 for easier reference

To change the data type I have used withcolumn() function

In [0]:
New_df2 = New_df2.withColumn("date",New_df2["date"].cast('TimeStamp')) \
                 .withColumn("neutral_venue",New_df2["neutral_venue"].cast('Boolean'))  \
                 .withColumn("eliminator",New_df2["eliminator"].cast('Boolean')) \
                 .withColumn("result_margin",New_df2["result_margin"].cast('integer')) \
                 .withColumn("venue_id",New_df2["venue_id"].cast('integer'))

All the other names are already updated with correct names as we have used header value as column name

Using withColumnRenamed() function To change Column name

In [0]:
New_df2 = New_df2.withColumnRenamed("result_margin","winning_margin") \
                 .withColumnRenamed("umpire1","umpire1_name")\
                 .withColumnRenamed("umpire2","umpire2_name") \
                 .withColumnRenamed("method","Dl_method_used")

New_df2.printSchema()

Lets work on The Third Dataframe

In [0]:
file_location3 = "/FileStore/tables/ipl_venue.csv"

df3 = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location3)

Changing names for third dataframe

In [0]:
New_df3 = df3.withColumnRenamed("venue","stadium_name") \
             .withColumnRenamed("city","city_name")

New_df3.printSchema()

The table for third dataframe

In [0]:
ipl_venue_table = "ipl_venue_csv"
New_df3.createOrReplaceTempView("ipl_venue_table")

Lets Join the New_df2 and New_df3  dataframes using the common Id of venue_ID to make our analysis work easy

In [0]:
joined_summary_df = New_df2.join(New_df3, on=['venue_id'], how='inner')

Lets create a new dataframe with count of umpire1_name and umpire2_name columns

We will use this dataframe by converting it into Table for performing SQl query for umpires count

In [0]:
import pandas as pd

umpire_column_1 = joined_summary_df.select('umpire1_name')
umpire_column_2 = joined_summary_df.select('umpire2_name')
  
umpire_name_union = umpire_column_1.union(umpire_column_2) # Creating new dataframe with union of these two dataframes

umpire_name_union.toPandas().to_csv('umpire_name_union.csv') # Converting  umpire_name_union to csv file

umpire_union_table = "umpire_name_union_csv"

umpire_name_union.createOrReplaceTempView("umpire_name_union_table") # Table using the  umpire_name_union_csv

Created a new CSV file for joined_summary_df to create its Table later

In [0]:
joined_summary_df.toPandas().to_csv('mycsv.csv')

Lets create a table for joined_summary_df which we will use to run query for venue details in more optmized way

In [0]:
venue_summary_table =  "mycsv_csv"
joined_summary_df.createOrReplaceTempView("venue_summary_table")

Lets do the same above steps to join all the three datasets using join on match_id and create its table

In [0]:
Entire_dataset = New_df1.join(joined_summary_df, on=['match_id'], how='inner')
Entire_dataset.toPandas().to_csv('all_data.csv')

In [0]:
Entire_dataset_table =  "all_data_csv"
Entire_dataset.createOrReplaceTempView("Entire_dataset_table")

#SECTION-2

***Lets Start with the Queries which we have to solve for our data analysis work***

1)  Write a query to find the highest extra runs given by a team in a match.

In [0]:
sqlContext.sql('select batting_team,match_id,bowling_team,SUM(extra_runs) FROM Entire_dataset_table \
GROUP BY match_id,bowling_team,batting_team ORDER BY SUM(extra_runs) DESC Limit 1').show()

2) Write a query to find the Leading wicket taker in the IPL?
Note: A wicket won't be accounted for the bowler, if the dismissal is run out,
obstructing the field.

In [0]:
sqlContext.sql('select bowler_name,SUM(is_wicket)AS Total_wickets FROM Entire_dataset_table where Not dismissal_kind = "run out" "obstructing the field" \
GROUP BY bowler_name ORDER BY SUM(is_wicket) DESC Limit 1').show()

3)    Write a query to return a report for highest run scorer in matches which were
affected by Duckworth-Lewis’s method (D/L method).

Here I have written an query to return top 3 batsmen who scored highest runs in mathces affected by D/L Rule.


We can simple use limit 1 to get batsmen with most runs

In [0]:
sqlContext.sql(' select current_batsman As Player_name,SUM(batsman_runs) AS Total_runs FROM Entire_dataset_table \
where Dl_method_used = 1 GROUP BY current_batsman ORDER BY SUM(batsman_runs) DESC limit 3').show()

4) Write a query to return a report for highest strike rate by a batsman in
powerplay (1-6 overs)
Note: strike rate = (Total Runs scored/Total balls faced by player) *100, Make
sure that balls faced by player should be legal delivery (not wide balls or no
balls)

In [0]:
sqlContext.sql('select  current_batsman As Player_name ,Round((sum(batsman_runs)/count(batsman_runs))*100,2) as Strike_rate \
from Entire_dataset_table  where current_over between 1 and 6 AND  extras_type = "NA" \
GROUP BY current_batsman ORDER BY Strike_rate DESC Limit 1' ).show()   # Extra Runs Like Wide ball, No ball, Byes, Leg-byes & penalty are not included into batsman runs

5) Write a query to return a report for highest extra runs in a venue (stadium,
city).

In [0]:
sqlContext.sql(' select city_name,stadium_name,SUM(extra_runs) AS Total_Extra_runs FROM Entire_dataset_table \
GROUP BY stadium_name,city_name ORDER BY SUM(extra_runs) DESC Limit 1').show()

6)  Write a query to return a report for the cricketers with the most number of
players of the match award in neutral venues.

In [0]:
sqlContext.sql('select player_of_match AS Player_name, COUNT(player_of_match) AS MOM_Awards from venue_summary_table \
where neutral_venue = true GROUP BY player_of_match  ORDER BY COUNT(player_of_match)  DESC Limit 5').show()

7 )  Write a query to get a list of top 10 players with the highest batting average
Note: Batting average is the total number of runs scored divided by the
number of times they have been out (Make sure to include run outs (on nonstriker end) as valid out while calculating average).

In [0]:
sqlContext.sql('select  current_batsman AS Player_name, Round((sum(batsman_runs)/ sum(is_wicket)), 2 ) as Batting_average \
from Entire_dataset_table  GROUP BY current_batsman ORDER BY Batting_average DESC Limit 10' ).show()

8)  Write a query to find out who has officiated (as an umpire) the most number of
matches in IPL

In [0]:
sqlContext.sql('select umpire1_name AS Umpire_name, Count(umpire1_name) as Total_matches from umpire_name_union_table \
GROUP BY umpire1_name ORDER BY COUNT(umpire1_name) DESC Limit 1').show()

9)   Find venue details of the match where V Kohli scored his highest individual runs
in IPL.

In [0]:
sqlContext.sql(' select stadium_name,city_name,SUM(batsman_runs) FROM Entire_dataset_table where current_batsman = "V Kohli" \
GROUP BY stadium_name,city_name ORDER BY SUM(batsman_runs) DESC limit 1').show()

#Section - 3#

To create a SQLlite database import all the required python libraries

In [0]:
import sqlite3 as sqlt

import pandas as pd

Entire_IPL_Data = pd.read_csv('all_data.csv')

try:
  conn = sqlt.connect('Entire_dataset_table.db')
except:
  pass

Entire_IPL_Data.to_sql('Entire_dataset_table', conn)

Entire_IPL_Data1 = pd.read_sql('SELECT * FROM Entire_dataset_table', conn)

Unnamed: 0,match_id,inning,current_over,ball_num,current_batsman,non_striker,bowler_name,batsman_runs,extra_runs,total_runs,non_boundary,is_wicket,dismissal_kind,player_dismissed,fielder_name,extras_type,batting_team,bowling_team,venue_id,date,player_of_match,neutral_venue,team1,team2,toss_winner,toss_decision,winner,result,winning_margin,eliminator,Dl_method_used,umpire1_name,umpire2_name,stadium_name,city_name
0,419157,2,16,4,V Kohli,R Vinay Kumar,CRD Fernando,0,0,0,0,0,,,,,Royal Challengers Bangalore,Mumbai Indians,35,,R McLaren,False,Royal Challengers Bangalore,Mumbai Indians,Royal Challengers Bangalore,field,Mumbai Indians,runs,57.0,False,,HDPK Dharmasena,SJA Taufel,M.Chinnaswamy Stadium,Bengaluru
1,419157,2,16,5,V Kohli,R Vinay Kumar,CRD Fernando,1,0,1,0,0,,,,,Royal Challengers Bangalore,Mumbai Indians,35,,R McLaren,False,Royal Challengers Bangalore,Mumbai Indians,Royal Challengers Bangalore,field,Mumbai Indians,runs,57.0,False,,HDPK Dharmasena,SJA Taufel,M.Chinnaswamy Stadium,Bengaluru
2,419157,2,16,6,R Vinay Kumar,V Kohli,CRD Fernando,1,0,1,0,0,,,,,Royal Challengers Bangalore,Mumbai Indians,35,,R McLaren,False,Royal Challengers Bangalore,Mumbai Indians,Royal Challengers Bangalore,field,Mumbai Indians,runs,57.0,False,,HDPK Dharmasena,SJA Taufel,M.Chinnaswamy Stadium,Bengaluru
3,419157,2,17,1,R Vinay Kumar,V Kohli,R McLaren,1,0,1,0,0,,,,,Royal Challengers Bangalore,Mumbai Indians,35,,R McLaren,False,Royal Challengers Bangalore,Mumbai Indians,Royal Challengers Bangalore,field,Mumbai Indians,runs,57.0,False,,HDPK Dharmasena,SJA Taufel,M.Chinnaswamy Stadium,Bengaluru
4,419157,2,17,2,V Kohli,R Vinay Kumar,R McLaren,1,0,1,0,0,,,,,Royal Challengers Bangalore,Mumbai Indians,35,,R McLaren,False,Royal Challengers Bangalore,Mumbai Indians,Royal Challengers Bangalore,field,Mumbai Indians,runs,57.0,False,,HDPK Dharmasena,SJA Taufel,M.Chinnaswamy Stadium,Bengaluru
5,419157,2,17,3,R Vinay Kumar,V Kohli,R McLaren,0,0,0,0,0,,,,,Royal Challengers Bangalore,Mumbai Indians,35,,R McLaren,False,Royal Challengers Bangalore,Mumbai Indians,Royal Challengers Bangalore,field,Mumbai Indians,runs,57.0,False,,HDPK Dharmasena,SJA Taufel,M.Chinnaswamy Stadium,Bengaluru
6,419157,2,17,4,R Vinay Kumar,V Kohli,R McLaren,0,0,0,0,0,,,,,Royal Challengers Bangalore,Mumbai Indians,35,,R McLaren,False,Royal Challengers Bangalore,Mumbai Indians,Royal Challengers Bangalore,field,Mumbai Indians,runs,57.0,False,,HDPK Dharmasena,SJA Taufel,M.Chinnaswamy Stadium,Bengaluru
7,419157,2,17,5,R Vinay Kumar,V Kohli,R McLaren,1,0,1,0,0,,,,,Royal Challengers Bangalore,Mumbai Indians,35,,R McLaren,False,Royal Challengers Bangalore,Mumbai Indians,Royal Challengers Bangalore,field,Mumbai Indians,runs,57.0,False,,HDPK Dharmasena,SJA Taufel,M.Chinnaswamy Stadium,Bengaluru
8,419157,2,17,6,V Kohli,R Vinay Kumar,R McLaren,1,0,1,0,0,,,,,Royal Challengers Bangalore,Mumbai Indians,35,,R McLaren,False,Royal Challengers Bangalore,Mumbai Indians,Royal Challengers Bangalore,field,Mumbai Indians,runs,57.0,False,,HDPK Dharmasena,SJA Taufel,M.Chinnaswamy Stadium,Bengaluru
9,419157,2,18,1,V Kohli,R Vinay Kumar,KA Pollard,4,0,4,0,0,,,,,Royal Challengers Bangalore,Mumbai Indians,35,,R McLaren,False,Royal Challengers Bangalore,Mumbai Indians,Royal Challengers Bangalore,field,Mumbai Indians,runs,57.0,False,,HDPK Dharmasena,SJA Taufel,M.Chinnaswamy Stadium,Bengaluru


Creating class Database with methods to execute our queries from section-2

In [0]:
class Database:

    def __init__(self, db_name='Entire_IPL_Data',):
        self.name = db_name
        self.conn = sqlt.connect('Entire_dataset_table.db')
        
        try:
          self.entire = Entire_IPL_Data.to_sql('Entire_dataset_table', conn)
        except:
          pass  
        
    def get_status(self): # To get the status of Sqlite database connection
      return self.conn    
    
    def Query_A(self):
      x = sqlContext.sql('select batting_team,match_id,bowling_team,SUM(extra_runs) AS Total_Extra_Runs \
      FROM Entire_dataset_table GROUP BY match_id,bowling_team,batting_team \
      ORDER BY SUM(extra_runs) DESC Limit 1')
      
      dict1 = {} # Creating a Dictionary python object
      
      x = x.toPandas()
  
      for column in x.columns:
         dict1[column] = x[column].values.tolist() # Traverse through each column
      return dict1
    
    def Query_B(self):
      x = sqlContext.sql('select bowler_name,SUM(is_wicket)AS Total_wickets FROM Entire_dataset_table \
      where Not dismissal_kind = "run out" "obstructing the field" \
      GROUP BY bowler_name ORDER BY SUM(is_wicket) DESC Limit 1')
      
      dict1 = {} # Creating a Dictionary python object
      
      x = x.toPandas()
  
      for column in x.columns:
         dict1[column] = x[column].values.tolist()
        
      return dict1
       
    def Query_C(self):
      x = sqlContext.sql(' select current_batsman As Player_name,SUM(batsman_runs) AS Total_runs FROM Entire_dataset_table \
      where Dl_method_used = 1 GROUP BY current_batsman ORDER BY SUM(batsman_runs) DESC limit 3')
      
      dict1 = {}
      x = x.toPandas()
      for column in x.columns:
         dict1[column] = x[column].values.tolist() # Traverse through each column
          
      return  dict1
    
    def Query_D(self):
      
      x = sqlContext.sql('select  current_batsman AS Player_name , Round((sum(batsman_runs)/count(batsman_runs))*100,2) as Strike_rate \
      from Entire_dataset_table where current_over between 1 and 6 AND  extras_type = "NA" \
      GROUP BY current_batsman ORDER BY Strike_rate DESC Limit 1')
      
      dict1 = {} # Creating a Dictionary python object
      
      x = x.toPandas()
    
      for column in x.columns:
         dict1[column] = x[column].values.tolist()
      
      return dict1
    
    def Query_E(self):
      x = sqlContext.sql(' select city_name,stadium_name,SUM(extra_runs) AS Total_Extra_runs \
      FROM Entire_dataset_table GROUP BY stadium_name,city_name ORDER BY SUM(extra_runs) DESC Limit 1')
      
      dict1 = {} # Creating a Dictionary python object
      
      x = x.toPandas()
  
      for column in x.columns:
         dict1[column] = x[column].values.tolist()
        
      return dict1
         
    def Query_F(self):
      x = sqlContext.sql('select player_of_match AS Player_name, COUNT(player_of_match) AS MOM_Awards from venue_summary_table \
      where neutral_venue = true GROUP BY player_of_match ORDER BY COUNT(player_of_match)  DESC Limit 5')
     
      x = x.select("*").toPandas()
      result = dict(x.values)
      return result       
      
    def Query_G(self):
      x = sqlContext.sql('select  current_batsman AS Player_name , Round((sum(batsman_runs)/ sum(is_wicket)), 2 ) as Batting_average \
      from Entire_dataset_table GROUP BY current_batsman ORDER BY Batting_average DESC Limit 10')
      
      x = x.select("*").toPandas()
      result = dict(x.values)
      return result
        
    def Query_H(self):
      x = sqlContext.sql('select umpire1_name, Count(umpire1_name) as Total_matches from umpire_name_union_table GROUP BY umpire1_name \
      ORDER BY COUNT(umpire1_name) DESC Limit 1')
      
      dict1 = {} # Creating a Dictionary python object
      
      x = x.toPandas()
  
      for column in x.columns:
         dict1[column] = x[column].values.tolist()
        
      return dict1
    
    
    def Query_I(self):
      x = sqlContext.sql(' select stadium_name,city_name,SUM(batsman_runs) As Total_Runs FROM Entire_dataset_table \
      where current_batsman = "V Kohli" GROUP BY stadium_name,city_name ORDER BY SUM(batsman_runs) DESC limit 1')
      
      dict1 = {} # Creating a Dictionary python object
      
      x = x.toPandas()
  
      for column in x.columns:
         dict1[column] = x[column].values.tolist()
        
      return dict1
                 
    def __del__(self):
        self.conn.close()
        
Db = Database() # Object of class Database to implement SQL queires using class methods

Running the Status method of class Databse to check database connectivity

In [0]:
Db.get_status()

1) QUERY-A : Object to run SQl query to find the highest extra runs given by a team in a match.

In [0]:
Highest_Extras = Db.Query_A()
print(Highest_Extras)
print(type(Highest_Extras)) # To confirm the output is in Dictionary format or not

2) QUERY-B : Object to find the Leading wicket taker in the IPL?

In [0]:
Highest_Wicketaker = Db.Query_B()
print(Highest_Wicketaker)

3) QUERY-C : Object to return a report for highest run scorer in matches which were
affected by Duckworth-Lewis’s method (D/L method).

This Queries return top 5 player who have most runs in D/L affected Matches

In [0]:
Highest_run_DL = Db.Query_C()
print(Highest_run_DL)

4) QUERY-D :  Object to return a report for highest strike rate by a batsman in
powerplay (1-6 overs) excluding leg byes

In [0]:
Highest_strike_rate = Db.Query_D()
print(Highest_strike_rate)

5) QUERY-E : Object to to return a report for highest extra runs in a venue (stadium,
city)

In [0]:
Highest_extras_venue = Db.Query_E()
print(Highest_extras_venue)

6) QUERY-F : Object to run SQl query to  return a report for the cricketers with the most number of
players of the match award in neutral venues.

In [0]:
Most_MOM_awards = Db.Query_F()
print(Most_MOM_awards)

7) QUERY-G : Object to get a list of top 10 players with the highest batting average

In [0]:
highest_batting_avg = Db.Query_G()
print(highest_batting_avg)

8) QUERY-H : Object to find out who has officiated (as an umpire) the most number of
matches in IPL

In [0]:
Most_Umpired_official = Db.Query_H()
print(Most_Umpired_official)

9) QUERY-I : Object to Find venue details of the match where V Kohli scored his highest individual runs
in IPL

In [0]:
Kohli_best_score = Db.Query_I()
print(Kohli_best_score)