In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
!wget -q https://archive.apache.org/dist/spark/spark3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

In [None]:
!tar xf /content/drive/MyDrive/DataGrokr_Project/spark-3.1.2-bin-hadoop2.7.tgz

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
!pip install -q findspark
import findspark
#findspark.init()

In [None]:
findspark.init()

In [None]:
!pip install pyspark

In [None]:
from pyspark.sql import SparkSession
spark = (SparkSession
 .builder
 .appName("DataGrokr")
 .getOrCreate())

In [None]:
#creating dataframes with header row values as column names
ball_by_ball_temp= spark.read.option("header",True) \
     .csv("/content/drive/MyDrive/DataGrokr_Project/ipl_dataset_2/ipl_ball_by_ball.csv")
matches_temp= spark.read.option("header",True) \
     .csv("/content/drive/MyDrive/DataGrokr_Project/ipl_dataset_2/ipl_matches.csv")
venue_temp= spark.read.option("header",True) \
     .csv("/content/drive/MyDrive/DataGrokr_Project/ipl_dataset_2/ipl_venue.csv")

In [None]:
#ipl matches dataframe and table object creation
matches_df= matches_temp.selectExpr("cast(match_id as int) match_id",\
                                    "cast(date as date) date","cast(player_of_match as string)",\
                                    "cast(team1 as string) team1",\
                                    "cast(team2 as string) team2",\
                                    "cast(toss_winner as string) toss_winner",\
                                    "cast(toss_decision as string) toss_decision",\
                                    "cast(winner as string) winner",\
                                    "cast(result as string) result",\
                                    "cast(result_margin as int) resultmargin",\
                                    "cast(eliminator as string) eliminator",\
                                    "cast(method as string) method",\
                                    "cast(umpire1 as string) umpire1",\
                                    "cast(umpire2 as string) umpire2",\
                                    "cast(venue_id as int) venue_id",\
                                    
                                    )
matches_df.createOrReplaceTempView("matches_table")

In [None]:
#Ball by ball dataframe and table object creation
ball_by_ball_df= ball_by_ball_temp.selectExpr("cast(inning as int) inning"\
                            ,"cast(overs as int) overs","cast(batsman as string) batsman"\
                            ,"cast(non_striker as string) non_striker", "cast(bowler as string) bowler"\
                            ,"cast(batsman_runs as int) batsman_runs", "cast(extra_runs as int) extra_runs"\
                            ,"cast(total_runs as int) total_runs", "cast(non_boundary as int) non_boundary"\
                            ,"cast(is_wicket as int) is_wicket", "cast(dismissal_kind as string) dismissal_kind"\
                            ,"cast(player_dismissed as string) player_dismissed", "cast(fielder as string) fielder"\
                            ,"cast(extras_type as string) extras_type", "cast(batting_team as string) batting_team"\
                            ,"cast(bowling_team as string) bowling_team", "cast(match_id as int) match_id")
ball_by_ball_df.createOrReplaceTempView("ball_by_ball_table")

In [None]:
#IPL Venue dataframe and table object creation
venue_df= venue_temp.selectExpr("cast(venue_id as int) venue_id", \
                                "cast(venue as string) venue", "cast(city as string) city")
venue_df.createOrReplaceTempView("venue_table")

In [None]:
#three_in_one_df.createOrReplaceTempView('three_in_one_table')
spark.catalog.listTables()

[Table(name='ball_by_ball_table', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='matches_table', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='venue_table', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [None]:
#1. Find the top 3 venues which hosted the most number of eliminator matches?
query= """ SELECT venue, COUNT(matches_table.venue_id) as Number_of_matches  FROM venue_table, matches_table WHERE venue_table.venue_id= matches_table.venue_id and matches_table.eliminator != 'N' and venue_table.venue_id IN
(SELECT matches_table.venue_id FROM matches_table, venue_table \
GROUP BY matches_table.venue_id ORDER BY COUNT(matches_table.venue_id) DESC LIMIT 1)
GROUP BY venue_table.venue
""" #CREATE SEPERATE VIEWS FOR EACH DATAFRAME AND REFRAME QUERY
top3venues= spark.sql(query).show()
top3venues

+--------------------+-----------------+
|               venue|Number_of_matches|
+--------------------+-----------------+
|M.Chinnaswamy Sta...|                4|
+--------------------+-----------------+



In [None]:
#2. Return most number of catches taken by a player in IPL history?
query= "SELECT fielder, COUNT(fielder) FROM ball_by_ball_table WHERE dismissal_kind= 'caught' AND is_wicket= 1 GROUP BY fielder ORDER BY COUNT(fielder) DESC"
spark.sql(query).show(n=1)

+----------+--------------+
|   fielder|count(fielder)|
+----------+--------------+
|KD Karthik|           118|
+----------+--------------+
only showing top 1 row



In [None]:
#3. Write a query to return a report for highest wicket taker in matches which were affected by 
#Duckworth-Lewis’s method (D/L method).
query= "SELECT bowler, COUNT(bowler) FROM ball_by_ball_table WHERE method= 'D/L' GROUP BY bowler ORDER BY COUNT(bowler) DESC"
spark.sql(query).show(n=1)

In [None]:
#5. Write a query to return a report for highest extra runs in a venue (stadium, city).
query= "SELECT COUNT(extra_runs),venue_table.venue FROM ball_by_ball_table, venue_table, matches_table WHERE ball_by_ball_table.match_id= matches_table.match_id AND matches_table.venue_id= venue_table.venue_id GROUP BY venue ORDER BY COUNT(extra_runs) DESC LIMIT 1"
spark.sql(query).show(truncate= False)

In [None]:
#6. Write a query to return a report for the cricketers with the most number of players of the
#match award in neutral venues.
query= "SELECT player_of_match, COUNT(player_of_match) FROM matches_table GROUP BY player_of_match ORDER BY COUNT(player_of_match) DESC"
spark.sql(query).show(n=5)

+---------------+----------------------+
|player_of_match|count(player_of_match)|
+---------------+----------------------+
| AB de Villiers|                    23|
|       CH Gayle|                    22|
|      RG Sharma|                    18|
|      DA Warner|                    17|
|       MS Dhoni|                    17|
+---------------+----------------------+
only showing top 5 rows



In [None]:
#7. Write a query to get a list of top 10 players with the highest batting average
query= """select sevent.Batsman_, sum(sevent.batsman_runs)/count(sevent.player_dismissed) as Average 
        from
            (
             (select batsman as Batsman_,batsman_runs,player_dismissed from ball_by_ball_table) 
                 union all 
            (select non_striker as Batsman_,batsman_runs,player_dismissed from ball_by_ball_table)
            ) sevent
            group by sevent.Batsman_ 
            order by Average desc LIMIT 10 """
spark.sql(query).show()

In [None]:
#8 Write a query to find out who has officiated (as an umpire) the most number of matches in IPL
query= """SELECT nipl.umpire, COUNT(nipl.umpire) as count FROM
                ((SELECT umpire1 as umpire FROM matches_table )
                union all
                (SELECT umpire2 as umpire FROM matches_table)) nipl
                GROUP BY nipl.umpire
                ORDER BY COUNT(nipl.umpire) DESC LIMIT 1"""
spark.sql(query).show()

In [None]:
#9. Find venue details of the match where V Kohli scored his highest individual runs in IPL.
query= """SELECT MAX(ball_by_ball_table.batsman_runs) , venue_table.venue, ball_by_ball_table.batsman \
        FROM ball_by_ball_table, venue_table, matches_table WHERE ball_by_ball_table.batsman= 'V Kohli' \
        and ball_by_ball_table.match_id= matches_table.match_id and matches_table.venue_id= venue_table.venue_id GROUP BY ball_by_ball_table.batsman """
spark.sql(query).show()


In [None]:
import mysql.conncector as msql
class Database:
    def __init__(self):
        self.conn= msql.connect(host= "127.0.0.1", user= "root", password= "@*Root12345#", database= "ipldb")
        self.cur= self.conn.cursor()
    def qone(self):
        """ Find the top 3 venues which hosted the most number of eliminator matches?"""
        query= "SELECT venue, COUNT(venue) FROM ipl_venue, ipl_matches \
                WHERE ipl_venue.venue_id= ipl_matches.venue_id GROUP BY venue ORDER BY COUNT(venue) DESC LIMIT 3 "
        self.cur.execute(query)
        return (dict(self.cur.fetchall()))
    def qtwo(self):
        """Return most number of catches taken by a player in IPL history?"""
        query= "SELECT fielder, COUNT(fielder) FROM ipl_ball_by_ball \
            WHERE dismissal_kind= 'caught' GROUP BY fielder ORDER BY COUNT(fielder) DESC LIMIT 1"
        self.cur.execute(query)
        return dict(self.cur.fetchall())
    def qthree(self):
        """Write a query to return a report for highest wicket taker in matches which were affected by
            Duckworth-Lewis’s method (D/L method).""" #ADD is_wicket= 1 in query
        query= "SELECT bowler, COUNT(bowler) FROM ipl_ball_by_ball WHERE method= 'D/L'\
                AND is_wicket = 1 GROUP BY bowler ORDER BY COUNT(bowler) DESC LIMIT 1"
        self.cur.execute(query)
        return dict(self.cur.fetchall())
    def qfour(self):
        """Write a query to return a report for highest strike rate by a batsman in non powerplay
        overs(7-20 overs)
        Note: strike rate = (Total Runs scored/Total balls faced by player) *100, Make sure that balls
        faced by players should be legal delivery (not wide balls or no balls)"""
        pass
    def qfive(self):
        """ Write a query to return a report for highest extra runs in a venue (stadium, city)."""
        query= "SELECT COUNT(ipl_ball_by_ball.extra_runs), ipl_venue.venue FROM \
        ipl_ball_by_ball b, ipl_venue v, ipl_matches m WHERE b.match_id= m.match_id AND \
        m.venue_id= v.venue_id  GROUP BY venue ORDER BY COUNT(extra_runs) LIMIT 3"
        self.cur.execute(query)
        return dict(self.cur.fetchall())
    def qsix(self):
        """ Write a query to return a report for the cricketers with the most number of players of the
        match award in neutral venues."""
        query= "SELECT player_of_match, COUNT(player_of_match) FROM ipl_matches GROUP BY \
        player_of_match ORDER BY COUNT(player_of_match) DESC LIMIT 1"
        self.cur.execute(query)
        return dict(self.cur.fetchll())
    def qseven(self):
        """Write a query to get a list of top 10 players with the highest batting average """
        query= """select sevent.Batsman_, sum(sevent.batsman_runs)/count(sevent.player_dismissed) as Average 
        from
            (
             (select batsman as Batsman_,batsman_runs,player_dismissed from ipl_ball_by_ball) 
                 union all 
            (select non_striker as Batsman_,batsman_runs,player_dismissed from ipl_ball_by_ball)
            ) sevent
            group by sevent.Batsman_ 
            order by Average desc LIMIT 10 """
        self.cur.execute(query)
        return dict(self.cur.fetchall())
    def qeight(self):
        """Write a query to find out who has officiated (as an umpire) the most number of matches in IPL"""
        query= """SELECT nipl.umpire, COUNT(nipl.umpire) as count FROM
                ((SELECT umpire1 as umpire FROM ipl_matches )
                union all
                (SELECT umpire2 as umpire FROM ipl_matches)) nipl
                GROUP BY nipl.umpire
                ORDER BY COUNT(nipl.umpire) DESC LIMIT 2"""
        self.cur.execute(query)
        return dict(self.cur.fetchall())
    def qnine(self):
        """9. Find venue details of the match where V Kohli scored his highest individual runs in IPL."""
        query= "SELECT MAX(ipl_ball_by_ball.batsman_runs) , ipl_venue.venue, ipl_ball_by_ball.batsman \
        FROM ipl_ball_by_ball, ipl_venue, ipl_matches WHERE batsman= 'V Kohli' \
        and ipl_ball_by_ball.match_id= ipl_matches.match_id and ipl_matches.venue_id= ipl_venue.venue_id GROUP BY ipl_ball_by_ball.match_id "
        self.cur.execute(query)
        return dict(self.cur.fetchall())