In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext

In [2]:
# All this setup required to access GCS bucket, is only for local/VM machines only, not for the GCP dataproc clusters
credentials_location = '/home/jagadish/.gc/finaldatazoomcamp.json'
conf = SparkConf() \
        .setMaster("local[*]") \
        .setAppName('generate-stats-local') \
        .set("spark.jars", "./lib/gcs-connector-hadoop3-2.2.5.jar,./lib/spark-bigquery-latest_2.12.jar") \
        .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
        .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [3]:
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set("fs.AbstractFileSystem.gs.impl",  "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
hadoop_conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
hadoop_conf.set("fs.gs.auth.service.account.json.keyfile", credentials_location)
hadoop_conf.set("fs.gs.auth.service.account.enable", "true")

23/04/08 07:19:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/08 07:19:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
spark = SparkSession.builder \
            .config(conf=sc.getConf()) \
            .getOrCreate()

In [97]:
ipl_matches_data_gcs_path = "gs://jagadish_data_lake_datazoomcamp-jagadish-final/IPL_Matches_2008_2022.parquet"
ipl_matches = spark.read.parquet(ipl_matches_data_gcs_path)

                                                                                

In [86]:
ipl_matches.printSchema()

root
 |-- id: long (nullable = true)
 |-- city: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- season: long (nullable = true)
 |-- match_number: string (nullable = true)
 |-- team1: string (nullable = true)
 |-- team2: string (nullable = true)
 |-- venue: string (nullable = true)
 |-- toss_winner: string (nullable = true)
 |-- toss_decision: string (nullable = true)
 |-- superover: string (nullable = true)
 |-- winning_team: string (nullable = true)
 |-- won_by: string (nullable = true)
 |-- margin: long (nullable = true)
 |-- method: string (nullable = true)
 |-- player_of_the_match: string (nullable = true)
 |-- team1_players: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- team2_players: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- umpire1: string (nullable = true)
 |-- umpire2: string (nullable = true)
 |-- __index_level_0__: long (nullable = true)



In [87]:
ipl_ball_by_ball_data_gcs_path = "gs://jagadish_data_lake_datazoomcamp-jagadish-final/IPL_Ball_by_Ball_2008_2022.parquet"
ipl_ball_by_ball = spark.read.parquet(ipl_ball_by_ball_data_gcs_path)


                                                                                

In [88]:
ipl_ball_by_ball.printSchema()

root
 |-- id: long (nullable = true)
 |-- innings: long (nullable = true)
 |-- overs: long (nullable = true)
 |-- ball_number: long (nullable = true)
 |-- batter: string (nullable = true)
 |-- bowler: string (nullable = true)
 |-- non_striker: string (nullable = true)
 |-- extra_type: string (nullable = true)
 |-- batsman_run: long (nullable = true)
 |-- extras_run: long (nullable = true)
 |-- total_run: long (nullable = true)
 |-- non_boundary: long (nullable = true)
 |-- is_wicket_delivery: long (nullable = true)
 |-- player_out: string (nullable = true)
 |-- dismissal_type: string (nullable = true)
 |-- fielders_involved: string (nullable = true)
 |-- batting_team: string (nullable = true)



In [98]:
# store matches data into BigQuery table
# ipl_matches.write \
#     .format("bigquery") \
#     .option("project", "datazoomcamp-jagadish-final") \
#     .option("writeMethod", "direct") \
#     .mode("overwrite") \
#     .save("datazoomcamp-jagadish-final.ipl_data.matches")
ipl_matches.write
.format("bigquery").mode('Overwrite') \
.option("parentProject", "datazoomcamp-jagadish-final") \
.option("temporaryGcsBucket","jagadish_data_lake_datazoomcamp-jagadish-final").option('table', 'ipl_data.matches').save()

                                                                                

In [90]:
# store scores data into BigQuery table
# ipl_ball_by_ball.write \
#     .format("bigquery") \
#     .option("project", "datazoomcamp-jagadish-final") \
#     .option("writeMethod", "direct") \
#     .mode("overwrite") \
#     .save("ipl_data.scores")
ipl_matches.write.format("bigquery").mode('Overwrite') \
.option("parentProject", "datazoomcamp-jagadish-final") \
.option("temporaryGcsBucket","jagadish_data_lake_datazoomcamp-jagadish-final").option('table', 'ipl_data.scores').save()

                                                                                

In [91]:
ipl_matches.registerTempTable('matches')



In [110]:
first_innings_stats = spark.sql("""
select team1 as team, season, count(1) as matches_played, 
sum(CASE
    WHEN winning_team == team1 THEN 1
    ELSE 0
END ) AS matches_won
from matches m
group by team1, season
""")

first_innings_stats.show()


[Stage 312:>                                                        (0 + 1) / 1]

+---------+------+--------------+-----------+
|     team|season|matches_played|matches_won|
+---------+------+--------------+-----------+
|Rajasthan|  2021|             7|          2|
|   Punjab|  2021|             9|          3|
|   Punjab|  2010|             7|          1|
|  Gujarat|  2022|             7|          4|
|Hyderabad|  2017|             8|          6|
|   Punjab|  2015|             7|          1|
|    Delhi|  2013|             8|          3|
| Banglore|  2009|            16|          9|
|    Delhi|  2008|             7|          4|
|Rajasthan|  2011|             7|          4|
|Rajasthan|  2022|            13|          7|
|  Chennai|  2019|             6|          3|
|Hyderabad|  2013|             8|          7|
|  Kolkata|  2020|            10|          6|
|Hyderabad|  2022|             3|          1|
|  Kolkata|  2012|             8|          4|
|Hyderabad|  2012|             8|          3|
|Hyderabad|  2008|             7|          0|
|   Mumbai|  2019|            10| 

                                                                                

In [111]:
second_innings_stats = spark.sql("""
select team2 as team, season, count(1) as matches_played,
sum(CASE
    WHEN winning_team == team2 THEN 1
    ELSE 0
END ) AS matches_won
from matches m
group by team2, season
""")

second_innings_stats.show()

[Stage 315:>                                                        (0 + 1) / 1]

+---------+------+--------------+-----------+
|     team|season|matches_played|matches_won|
+---------+------+--------------+-----------+
|Rajasthan|  2021|             7|          3|
|   Punjab|  2021|             5|          3|
|   Punjab|  2010|             7|          3|
|  Gujarat|  2022|             9|          8|
|Hyderabad|  2017|             6|          2|
|   Punjab|  2015|             7|          2|
|    Delhi|  2013|             8|          0|
|    Delhi|  2008|             7|          3|
|Rajasthan|  2011|             6|          2|
|Rajasthan|  2022|             4|          3|
|  Chennai|  2019|            11|          7|
|Hyderabad|  2013|             9|          3|
|  Kolkata|  2020|             4|          1|
|Hyderabad|  2022|            11|          5|
|  Kolkata|  2012|             9|          8|
|Hyderabad|  2012|             7|          1|
|Hyderabad|  2008|             7|          2|
|   Mumbai|  2019|             6|          4|
|  Chennai|  2009|             3| 

                                                                                

In [122]:
teams_stats = first_innings_stats.unionAll(second_innings_stats)
teams_stats = teams_stats.groupBy('team', 'season').sum('matches_played', 'matches_won').sort('season', ascending=True)
teams_stats = teams_stats.withColumnRenamed('sum(matches_played)', 'matches_played').withColumnRenamed('sum(matches_won)', 'matches_won')
teams_stats.show()


[Stage 390:>                (0 + 1) / 1][Stage 391:>                (0 + 1) / 1]

+---------+------+--------------+-----------+
|     team|season|matches_played|matches_won|
+---------+------+--------------+-----------+
|    Delhi|  2008|            14|          7|
|  Chennai|  2008|            16|          9|
|  Kolkata|  2008|            13|          6|
|   Punjab|  2008|            15|         10|
|   Mumbai|  2008|            14|          7|
|Hyderabad|  2008|            14|          2|
| Banglore|  2008|            14|          4|
|Rajasthan|  2008|            16|         13|
|   Punjab|  2009|            14|          7|
|Hyderabad|  2009|            16|          9|
|    Delhi|  2009|            15|         10|
|Rajasthan|  2009|            13|          6|
| Banglore|  2009|            16|          9|
|   Mumbai|  2009|            13|          5|
|  Chennai|  2009|            14|          8|
|  Kolkata|  2009|            13|          3|
|   Mumbai|  2010|            16|         11|
|  Chennai|  2010|            16|          9|
|  Kolkata|  2010|            14| 

                                                                                

In [123]:
# writing teams statistics yearwise
teams_stats.write.format("bigquery").mode('Overwrite') \
.option("parentProject", "datazoomcamp-jagadish-final") \
.option("temporaryGcsBucket","jagadish_data_lake_datazoomcamp-jagadish-final").option('table', 'ipl_data.teams_stats').save()

                                                                                