In [0]:
## Initiate Spark Session to access the clusters and Spark operations
from pyspark.sql import SparkSession
## pyspark SQL is the place where we can access all the neccassary Dataframe related functions
spark = SparkSession.builder.getOrCreate()

## DataFrames - Create , Read , Load - Operations

In [0]:
## Create Dataframe using List of Dictionaries

from pyspark.sql.types import IntegerType, StringType, DoubleType, BooleanType
from pyspark.sql.types import StructType, StructField

#Generate toy data using a dictionary list
data = [{"Product": 'Laptop', "Brand": "HP", "Price": 99000, "Units": 2},
        {"Product": 'Smartband', "Brand": "MI", "Price": 4000, "Units": 3},
        {"Product": 'Headset', "Brand": "Boat", "Price": 3000, "Units": 2},
        {"Product": 'Mobilephone', "Brand": "Apple", "Price": 120000, "Units": 2}
        ]

# Declare the schema for the output of our function

#structfield has 3 arguments to be passed : 1. column name , 2. data type , 3. Nullable

outSchema = StructType([StructField('Product',StringType(),True),
                        StructField('Brand',StringType(),True),
                        StructField('Price',IntegerType(),True),
                        StructField('Units',IntegerType(),True),
                       ])

## Check the function createDataFrame function to conver the list of dictinoaries using 
#Create a DataFrame from the data list
df = spark.createDataFrame(data ,schema= outSchema)

## show method of dataframe class will display the created dataframe
df.show()

+-----------+-----+------+-----+
|    Product|Brand| Price|Units|
+-----------+-----+------+-----+
|     Laptop|   HP| 99000|    2|
|  Smartband|   MI|  4000|    3|
|    Headset| Boat|  3000|    2|
|Mobilephone|Apple|120000|    2|
+-----------+-----+------+-----+



In [0]:
#Print the schema and view the DataFrame in table format
df.printSchema()
df.show()

root
 |-- Product: string (nullable = true)
 |-- Brand: string (nullable = true)
 |-- Price: integer (nullable = true)
 |-- Units: integer (nullable = true)

+-----------+-----+------+-----+
|    Product|Brand| Price|Units|
+-----------+-----+------+-----+
|     Laptop|   HP| 99000|    2|
|  Smartband|   MI|  4000|    3|
|    Headset| Boat|  3000|    2|
|Mobilephone|Apple|120000|    2|
+-----------+-----+------+-----+



In [0]:
## Create Dataframes using RDDs by defining the Schema on top of it
## Create RDD using parallelize function()
rdd = sc.parallelize([('C',85,76,87,91), ('B',85,76,87,91), ("A", 85,78,96,92), ("A", 92,76,89,96)], 4)
print("Type of the datastructure :" ,type(rdd))
## Define the schema substitute
sub = ['Division','English','Mathematics','Physics','Chemistry']
## convert rdd into dataframe using createDataFrame function by defining schema parameter
marks_df = spark.createDataFrame(rdd, schema=sub)
print("Type of the datastructure :" ,type(marks_df))
marks_df.printSchema()
marks_df.show()

Type of the datastructure : <class 'pyspark.rdd.RDD'>
Type of the datastructure : <class 'pyspark.sql.dataframe.DataFrame'>
root
 |-- Division: string (nullable = true)
 |-- English: long (nullable = true)
 |-- Mathematics: long (nullable = true)
 |-- Physics: long (nullable = true)
 |-- Chemistry: long (nullable = true)

+--------+-------+-----------+-------+---------+
|Division|English|Mathematics|Physics|Chemistry|
+--------+-------+-----------+-------+---------+
|       C|     85|         76|     87|       91|
|       B|     85|         76|     87|       91|
|       A|     85|         78|     96|       92|
|       A|     92|         76|     89|       96|
+--------+-------+-----------+-------+---------+



In [0]:
### Reading file and converting into Dataframe
# # File location and type
# dbutils.fs.ls("/FileStore/tables/") 
# # let's consider IPL matches dataset
# file_location = "/FileStore/tables/IPL_Matches_2008_2020.csv"
# file_type = "csv"
# ##read csv files using spark.read.csv function 
# ipl_dataframe = spark.read.csv(file_location, inferSchema = True, header = True) 
# ipl_dataframe.show()

# Dataset link
# https://www.kaggle.com/datasets/patrickb1912/ipl-complete-dataset-20082020

ipl_dataframe = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/darshan.ingle@thadomal.org/IPL_Matches_2008_2020.csv", inferSchema = True, header = True)
ipl_dataframe.show()

+------+----------+----------+---------------+--------------------+-------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+----------+------+-----------+--------------+
|    id|      city|      date|player_of_match|               venue|neutral_venue|               team1|               team2|         toss_winner|toss_decision|              winner| result|result_margin|eliminator|method|    umpire1|       umpire2|
+------+----------+----------+---------------+--------------------+-------------+--------------------+--------------------+--------------------+-------------+--------------------+-------+-------------+----------+------+-----------+--------------+
|335982| Bangalore|2008-04-18|    BB McCullum|M Chinnaswamy Sta...|            0|Royal Challengers...|Kolkata Knight Ri...|Royal Challengers...|        field|Kolkata Knight Ri...|   runs|          140|         N|    NA|  Asad Rauf|   RE Koertzen|
|335983|Chan

In [0]:
ipl_dataframe.collect()

Out[43]: [Row(id=335982, city='Bangalore', date=datetime.date(2008, 4, 18), player_of_match='BB McCullum', venue='M Chinnaswamy Stadium', neutral_venue=0, team1='Royal Challengers Bangalore', team2='Kolkata Knight Riders', toss_winner='Royal Challengers Bangalore', toss_decision='field', winner='Kolkata Knight Riders', result='runs', result_margin='140', eliminator='N', method='NA', umpire1='Asad Rauf', umpire2='RE Koertzen'),
 Row(id=335983, city='Chandigarh', date=datetime.date(2008, 4, 19), player_of_match='MEK Hussey', venue='Punjab Cricket Association Stadium, Mohali', neutral_venue=0, team1='Kings XI Punjab', team2='Chennai Super Kings', toss_winner='Chennai Super Kings', toss_decision='bat', winner='Chennai Super Kings', result='runs', result_margin='33', eliminator='N', method='NA', umpire1='MR Benson', umpire2='SL Shastri'),
 Row(id=335984, city='Delhi', date=datetime.date(2008, 4, 19), player_of_match='MF Maharoof', venue='Feroz Shah Kotla', neutral_venue=0, team1='Delhi Dare

In [0]:
## view first 2 rows of ipl_dataframe
ipl_dataframe.take(2)

Out[44]: [Row(id=335982, city='Bangalore', date=datetime.date(2008, 4, 18), player_of_match='BB McCullum', venue='M Chinnaswamy Stadium', neutral_venue=0, team1='Royal Challengers Bangalore', team2='Kolkata Knight Riders', toss_winner='Royal Challengers Bangalore', toss_decision='field', winner='Kolkata Knight Riders', result='runs', result_margin='140', eliminator='N', method='NA', umpire1='Asad Rauf', umpire2='RE Koertzen'),
 Row(id=335983, city='Chandigarh', date=datetime.date(2008, 4, 19), player_of_match='MEK Hussey', venue='Punjab Cricket Association Stadium, Mohali', neutral_venue=0, team1='Kings XI Punjab', team2='Chennai Super Kings', toss_winner='Chennai Super Kings', toss_decision='bat', winner='Chennai Super Kings', result='runs', result_margin='33', eliminator='N', method='NA', umpire1='MR Benson', umpire2='SL Shastri')]

In [0]:
## view last 2 rows of ipl_dataframe
ipl_dataframe.tail(2)

Out[45]: [Row(id=1237180, city='Abu Dhabi', date=datetime.date(2020, 11, 8), player_of_match='MP Stoinis', venue='Sheikh Zayed Stadium', neutral_venue=0, team1='Delhi Capitals', team2='Sunrisers Hyderabad', toss_winner='Delhi Capitals', toss_decision='bat', winner='Delhi Capitals', result='runs', result_margin='17', eliminator='N', method='NA', umpire1='PR Reiffel', umpire2='S Ravi'),
 Row(id=1237181, city='Dubai', date=datetime.date(2020, 11, 10), player_of_match='TA Boult', venue='Dubai International Cricket Stadium', neutral_venue=0, team1='Delhi Capitals', team2='Mumbai Indians', toss_winner='Delhi Capitals', toss_decision='bat', winner='Mumbai Indians', result='wickets', result_margin='5', eliminator='N', method='NA', umpire1='CB Gaffaney', umpire2='Nitin Menon')]

In [0]:
ipl_dataframe.head(2)

Out[46]: [Row(id=335982, city='Bangalore', date=datetime.date(2008, 4, 18), player_of_match='BB McCullum', venue='M Chinnaswamy Stadium', neutral_venue=0, team1='Royal Challengers Bangalore', team2='Kolkata Knight Riders', toss_winner='Royal Challengers Bangalore', toss_decision='field', winner='Kolkata Knight Riders', result='runs', result_margin='140', eliminator='N', method='NA', umpire1='Asad Rauf', umpire2='RE Koertzen'),
 Row(id=335983, city='Chandigarh', date=datetime.date(2008, 4, 19), player_of_match='MEK Hussey', venue='Punjab Cricket Association Stadium, Mohali', neutral_venue=0, team1='Kings XI Punjab', team2='Chennai Super Kings', toss_winner='Chennai Super Kings', toss_decision='bat', winner='Chennai Super Kings', result='runs', result_margin='33', eliminator='N', method='NA', umpire1='MR Benson', umpire2='SL Shastri')]

In [0]:
# #different file reading ways

# dataframe_json = spark.read.json('data\json_data.json') 
# dataframe_txt = spark.read.text('data\text_data.txt') 
# dataframe_csv = spark.read.csv('data\csv_data.csv') 
# dataframe_parquet = spark.read.load('data\parquet_data.parquet')

In [0]:
ipl_dataframe.columns

Out[48]: ['id',
 'city',
 'date',
 'player_of_match',
 'venue',
 'neutral_venue',
 'team1',
 'team2',
 'toss_winner',
 'toss_decision',
 'winner',
 'result',
 'result_margin',
 'eliminator',
 'method',
 'umpire1',
 'umpire2']

In [0]:
print("List of Columns :" , ipl_dataframe.columns)
print("Row Count :",ipl_dataframe.count()) 
print("Column Count :",len(ipl_dataframe.columns))

List of Columns : ['id', 'city', 'date', 'player_of_match', 'venue', 'neutral_venue', 'team1', 'team2', 'toss_winner', 'toss_decision', 'winner', 'result', 'result_margin', 'eliminator', 'method', 'umpire1', 'umpire2']
Row Count : 816
Column Count : 17


In [0]:
## select few columns only / subset the dataframe using select function 

subset_ipl_data = ipl_dataframe.select("id","city","date","player_of_match","venue")

subset_ipl_data.show()

+------+----------+----------+---------------+--------------------+
|    id|      city|      date|player_of_match|               venue|
+------+----------+----------+---------------+--------------------+
|335982| Bangalore|2008-04-18|    BB McCullum|M Chinnaswamy Sta...|
|335983|Chandigarh|2008-04-19|     MEK Hussey|Punjab Cricket As...|
|335984|     Delhi|2008-04-19|    MF Maharoof|    Feroz Shah Kotla|
|335985|    Mumbai|2008-04-20|     MV Boucher|    Wankhede Stadium|
|335986|   Kolkata|2008-04-20|      DJ Hussey|        Eden Gardens|
|335987|    Jaipur|2008-04-21|      SR Watson|Sawai Mansingh St...|
|335988| Hyderabad|2008-04-22|       V Sehwag|Rajiv Gandhi Inte...|
|335989|   Chennai|2008-04-23|      ML Hayden|MA Chidambaram St...|
|335990| Hyderabad|2008-04-24|      YK Pathan|Rajiv Gandhi Inte...|
|335991|Chandigarh|2008-04-25|  KC Sangakkara|Punjab Cricket As...|
|335992| Bangalore|2008-04-26|      SR Watson|M Chinnaswamy Sta...|
|335993|   Chennai|2008-04-26|       JDP Oram|MA

In [0]:
ipl_dataframe.printSchema()

root
 |-- id: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- date: date (nullable = true)
 |-- player_of_match: string (nullable = true)
 |-- venue: string (nullable = true)
 |-- neutral_venue: integer (nullable = true)
 |-- team1: string (nullable = true)
 |-- team2: string (nullable = true)
 |-- toss_winner: string (nullable = true)
 |-- toss_decision: string (nullable = true)
 |-- winner: string (nullable = true)
 |-- result: string (nullable = true)
 |-- result_margin: string (nullable = true)
 |-- eliminator: string (nullable = true)
 |-- method: string (nullable = true)
 |-- umpire1: string (nullable = true)
 |-- umpire2: string (nullable = true)



In [0]:
## select and creating new columns / modifying them using column expressions
from pyspark.sql.functions import weekofyear

#here we use weekofyear function from sql.functions 
## we are creating a new column "Weekofyear" by extracting the week of year from date column 
## alias is used to rename the column

subset_ipl_data = ipl_dataframe.select("id","city","date","player_of_match","venue",(weekofyear(ipl_dataframe.date) + 10).alias('WeekOfYear'))

subset_ipl_data.show()

+------+----------+----------+---------------+--------------------+----------+
|    id|      city|      date|player_of_match|               venue|WeekOfYear|
+------+----------+----------+---------------+--------------------+----------+
|335982| Bangalore|2008-04-18|    BB McCullum|M Chinnaswamy Sta...|        26|
|335983|Chandigarh|2008-04-19|     MEK Hussey|Punjab Cricket As...|        26|
|335984|     Delhi|2008-04-19|    MF Maharoof|    Feroz Shah Kotla|        26|
|335985|    Mumbai|2008-04-20|     MV Boucher|    Wankhede Stadium|        26|
|335986|   Kolkata|2008-04-20|      DJ Hussey|        Eden Gardens|        26|
|335987|    Jaipur|2008-04-21|      SR Watson|Sawai Mansingh St...|        27|
|335988| Hyderabad|2008-04-22|       V Sehwag|Rajiv Gandhi Inte...|        27|
|335989|   Chennai|2008-04-23|      ML Hayden|MA Chidambaram St...|        27|
|335990| Hyderabad|2008-04-24|      YK Pathan|Rajiv Gandhi Inte...|        27|
|335991|Chandigarh|2008-04-25|  KC Sangakkara|Punjab

In [0]:
## selecting columns based on regex - colRegex function

regex_subset = ipl_dataframe.select(ipl_dataframe.colRegex("`player.+$`"))
regex_subset.show(5)

+---------------+
|player_of_match|
+---------------+
|    BB McCullum|
|     MEK Hussey|
|    MF Maharoof|
|     MV Boucher|
|      DJ Hussey|
+---------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import lit

## Create New column using withColumn method
## lit function is being used to create a constant value across all the rows 
ipl_dataframe.select(ipl_dataframe.columns[2:4]).withColumn("Country", lit("India")).show(5)

+----------+---------------+-------+
|      date|player_of_match|Country|
+----------+---------------+-------+
|2008-04-18|    BB McCullum|  India|
|2008-04-19|     MEK Hussey|  India|
|2008-04-19|    MF Maharoof|  India|
|2008-04-20|     MV Boucher|  India|
|2008-04-20|      DJ Hussey|  India|
+----------+---------------+-------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import lit , weekofyear , dayofyear

## Create multiple columns , conversions using chaining operations withColumn function

## the popular way of writing multiple chain operations

ipl_dataframe.select(ipl_dataframe.columns[1:5]).withColumn("WeekOfYear", weekofyear(ipl_dataframe.date)).withColumn("DayofYear",dayofyear(ipl_dataframe.date)).withColumn("Country",lit("india")).show(5)


+----------+----------+---------------+--------------------+----------+---------+-------+
|      city|      date|player_of_match|               venue|WeekOfYear|DayofYear|Country|
+----------+----------+---------------+--------------------+----------+---------+-------+
| Bangalore|2008-04-18|    BB McCullum|M Chinnaswamy Sta...|        16|      109|  india|
|Chandigarh|2008-04-19|     MEK Hussey|Punjab Cricket As...|        16|      110|  india|
|     Delhi|2008-04-19|    MF Maharoof|    Feroz Shah Kotla|        16|      110|  india|
|    Mumbai|2008-04-20|     MV Boucher|    Wankhede Stadium|        16|      111|  india|
|   Kolkata|2008-04-20|      DJ Hussey|        Eden Gardens|        16|      111|  india|
+----------+----------+---------------+--------------------+----------+---------+-------+
only showing top 5 rows



In [0]:
## creating a custom user defined function to get the shortform of the given colum ( winner column )
def customudf(column):
  values = column.split(" ")
  if len(values) <=2:
    short_form = values[0][0]+values[1][0]
  elif len(values)>2:
    short_form = values[0][0]+values[1][0]+values[2][0]
  return short_form
      
## we use udf function to convert our custom udf into pyspark udf       
from pyspark.sql.functions import udf,col
from pyspark.sql.types import StringType

## conversion of udf to pyspark udf
udf_spark= udf(lambda x:customudf(x),StringType())

## applying our function on top of winner column
ipl_dataframe.withColumn("Winner_ShortForm",udf_spark(col("winner"))).select("id","winner","Winner_ShortForm").show(5)



+------+--------------------+----------------+
|    id|              winner|Winner_ShortForm|
+------+--------------------+----------------+
|335982|Kolkata Knight Ri...|             KKR|
|335983| Chennai Super Kings|             CSK|
|335984|    Delhi Daredevils|              DD|
|335985|Royal Challengers...|             RCB|
|335986|Kolkata Knight Ri...|             KKR|
+------+--------------------+----------------+
only showing top 5 rows



In [0]:
## distinct function 

ipl_dataframe.withColumn("Winner_ShortForm",udf_spark(col("winner"))).select("winner","Winner_ShortForm").distinct().show(9)

+--------------------+----------------+
|              winner|Winner_ShortForm|
+--------------------+----------------+
|Kolkata Knight Ri...|             KKR|
|Royal Challengers...|             RCB|
|      Mumbai Indians|              MI|
|Kochi Tuskers Kerala|             KTK|
|    Rajasthan Royals|              RR|
|     Deccan Chargers|              DC|
| Chennai Super Kings|             CSK|
|     Kings XI Punjab|             KXP|
|    Delhi Daredevils|              DD|
+--------------------+----------------+
only showing top 9 rows



In [0]:
### Droping columns 

#lets create a copy of the ipl_dataframe

ipl_dataframe_copy = ipl_dataframe

ipl_dataframe_copy.select(ipl_dataframe_copy.columns[1:5]).drop("winner").show(5)

+----------+----------+---------------+--------------------+
|      city|      date|player_of_match|               venue|
+----------+----------+---------------+--------------------+
| Bangalore|2008-04-18|    BB McCullum|M Chinnaswamy Sta...|
|Chandigarh|2008-04-19|     MEK Hussey|Punjab Cricket As...|
|     Delhi|2008-04-19|    MF Maharoof|    Feroz Shah Kotla|
|    Mumbai|2008-04-20|     MV Boucher|    Wankhede Stadium|
|   Kolkata|2008-04-20|      DJ Hussey|        Eden Gardens|
+----------+----------+---------------+--------------------+
only showing top 5 rows



In [0]:
## Describe function 

ipl_dataframe.select("player_of_match").describe().show()

+-------+---------------+
|summary|player_of_match|
+-------+---------------+
|  count|            816|
|   mean|           null|
| stddev|           null|
|    min|     A Chandila|
|    max|         Z Khan|
+-------+---------------+



In [0]:
## converting into Pandas dataframe - to_pandas_on_spark() 

# ipl_dataframe.to_pandas_on_spark()

In [0]:
## Rename column names using withColumnRenamed 

ipl_dataframe = ipl_dataframe.withColumnRenamed("winner","Winning_team_name")

ipl_dataframe.columns

Out[61]: ['id',
 'city',
 'date',
 'player_of_match',
 'venue',
 'neutral_venue',
 'team1',
 'team2',
 'toss_winner',
 'toss_decision',
 'Winning_team_name',
 'result',
 'result_margin',
 'eliminator',
 'method',
 'umpire1',
 'umpire2']

In [0]:
ipl_dataframe

Out[62]: DataFrame[id: int, city: string, date: date, player_of_match: string, venue: string, neutral_venue: int, team1: string, team2: string, toss_winner: string, toss_decision: string, Winning_team_name: string, result: string, result_margin: string, eliminator: string, method: string, umpire1: string, umpire2: string]

In [0]:
ipl_dataframe.printSchema()

root
 |-- id: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- date: date (nullable = true)
 |-- player_of_match: string (nullable = true)
 |-- venue: string (nullable = true)
 |-- neutral_venue: integer (nullable = true)
 |-- team1: string (nullable = true)
 |-- team2: string (nullable = true)
 |-- toss_winner: string (nullable = true)
 |-- toss_decision: string (nullable = true)
 |-- Winning_team_name: string (nullable = true)
 |-- result: string (nullable = true)
 |-- result_margin: string (nullable = true)
 |-- eliminator: string (nullable = true)
 |-- method: string (nullable = true)
 |-- umpire1: string (nullable = true)
 |-- umpire2: string (nullable = true)



In [0]:
# ipl_dataframe.select("result_margin").show()

In [0]:
## Converting String variable : result margin into integer type
from pyspark.sql.types import DoubleType, IntegerType, StringType

ipl_dataframe = ipl_dataframe.withColumn('result_margin', col('result_margin').cast(IntegerType()))
# cases = cases.withColumn('city', F.col('city').cast(StringType()))
ipl_dataframe

Out[65]: DataFrame[id: int, city: string, date: date, player_of_match: string, venue: string, neutral_venue: int, team1: string, team2: string, toss_winner: string, toss_decision: string, Winning_team_name: string, result: string, result_margin: int, eliminator: string, method: string, umpire1: string, umpire2: string]

In [0]:
ipl_dataframe.printSchema()

root
 |-- id: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- date: date (nullable = true)
 |-- player_of_match: string (nullable = true)
 |-- venue: string (nullable = true)
 |-- neutral_venue: integer (nullable = true)
 |-- team1: string (nullable = true)
 |-- team2: string (nullable = true)
 |-- toss_winner: string (nullable = true)
 |-- toss_decision: string (nullable = true)
 |-- Winning_team_name: string (nullable = true)
 |-- result: string (nullable = true)
 |-- result_margin: integer (nullable = true)
 |-- eliminator: string (nullable = true)
 |-- method: string (nullable = true)
 |-- umpire1: string (nullable = true)
 |-- umpire2: string (nullable = true)



In [0]:
ipl_dataframe.select("result").distinct().show()

+-------+
| result|
+-------+
|wickets|
|     NA|
|   runs|
|    tie|
+-------+



In [0]:
mi_home_wins = ipl_dataframe.filter((ipl_dataframe.Winning_team_name=="Mumbai Indians") &             (ipl_dataframe.city=='Mumbai')).select("id").distinct().count()

print("Number of Matches won by MI in their home ground : ",mi_home_wins)

Number of Matches won by MI in their home ground :  53


In [0]:
## Maximum and Minimum Run Margin scored by Chennai Super Kings
from pyspark.sql.functions import mean, min, max

csk_max_result_margin_runs = ipl_dataframe.filter((ipl_dataframe.Winning_team_name=="Chennai Super Kings") &             (ipl_dataframe.result=='runs')).select([max("result_margin"),min("result_margin")])

## Result shows that maxium run margin and minimum margin by CSK winning matchs
csk_max_result_margin_runs.show()

+------------------+------------------+
|max(result_margin)|min(result_margin)|
+------------------+------------------+
|                97|                 1|
+------------------+------------------+



In [0]:
#Sorting dataframe
from pyspark.sql import functions as F

csk_max_result_margin_runs = ipl_dataframe.filter((ipl_dataframe.Winning_team_name=="Chennai Super Kings") &             (ipl_dataframe.result=='runs'))

csk_max_result_margin_runs.select("id","city","team1","team2","Winning_team_name","result_margin").sort(F.desc("result_margin")).show()

+-------+--------------+--------------------+--------------------+-------------------+-------------+
|     id|          city|               team1|               team2|  Winning_team_name|result_margin|
+-------+--------------+--------------------+--------------------+-------------------+-------------+
| 829753|       Chennai| Chennai Super Kings|     Kings XI Punjab|Chennai Super Kings|           97|
| 729293|     Abu Dhabi| Chennai Super Kings|    Delhi Daredevils|Chennai Super Kings|           93|
| 392185|Port Elizabeth|Royal Challengers...| Chennai Super Kings|Chennai Super Kings|           92|
| 548380|       Chennai|    Delhi Daredevils| Chennai Super Kings|Chennai Super Kings|           86|
| 598020|         Delhi|    Delhi Daredevils| Chennai Super Kings|Chennai Super Kings|           86|
|1178425|       Chennai| Chennai Super Kings|      Delhi Capitals|Chennai Super Kings|           80|
| 392209|   East London| Chennai Super Kings|     Deccan Chargers|Chennai Super Kings|     

In [0]:
## Lets use groupby function to find out team wise most awarded player of the match

## we use groupBy function and passing Winning team and Player of the match columns , so that it will give us the team , player wise number of awards 

## agg function is to consolidate the group by results 
ipl_dataframe.groupBy(["Winning_team_name","player_of_match"]).agg(F.count("id").alias("Number_of_POM")).show()

+--------------------+---------------+-------------+
|   Winning_team_name|player_of_match|Number_of_POM|
+--------------------+---------------+-------------+
| Sunrisers Hyderabad|       A Mishra|            4|
|    Delhi Daredevils|         Z Khan|            1|
|Kolkata Knight Ri...|       CH Gayle|            1|
|    Delhi Daredevils|      CH Morris|            1|
|     Deccan Chargers|       A Mishra|            2|
| Chennai Super Kings|         P Negi|            1|
|      Mumbai Indians|     MEK Hussey|            2|
|      Mumbai Indians|        A Nehra|            1|
|     Deccan Chargers|       HH Gibbs|            1|
|    Rajasthan Royals|        KK Nair|            1|
|    Rajasthan Royals|       DJ Hooda|            1|
|      Delhi Capitals|       A Mishra|            1|
|Royal Challengers...|   KP Pietersen|            2|
|    Delhi Daredevils|     MA Agarwal|            1|
|    Delhi Daredevils|       A Mishra|            4|
|      Mumbai Indians|      KV Sharma|        

In [0]:
# ## For Mathematical operations , Join , lets use a new dataset which is ball by ball record 
# ### Reading file and converting into Dataframe
# # File location and type
# dbutils.fs.ls("/FileStore/tables/") 
# # let's consider IPL matches dataset
# file_location = "/FileStore/tables/IPL_Ball_by_Ball_2008_2020.csv"
# file_type = "csv"
# ##read csv files using spark.read.csv function 
# ipl_ball_df = spark.read.csv(file_location, inferSchema = True, header = True) 
# ipl_ball_df.show(5)

ipl_ball_df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/darshan.ingle@thadomal.org/IPL_Ball_by_Ball_2008_2020-3.csv", inferSchema = True, header = True) 
ipl_ball_df.show(5)


+------+------+----+----+-----------+-----------+---------+------------+----------+----------+------------+---------+--------------+----------------+-------+-----------+--------------------+--------------------+
|    id|inning|over|ball|    batsman|non_striker|   bowler|batsman_runs|extra_runs|total_runs|non_boundary|is_wicket|dismissal_kind|player_dismissed|fielder|extras_type|        batting_team|        bowling_team|
+------+------+----+----+-----------+-----------+---------+------------+----------+----------+------------+---------+--------------+----------------+-------+-----------+--------------------+--------------------+
|335982|     1|   6|   5| RT Ponting|BB McCullum|AA Noffke|           1|         0|         1|           0|        0|            NA|              NA|     NA|         NA|Kolkata Knight Ri...|Royal Challengers...|
|335982|     1|   6|   6|BB McCullum| RT Ponting|AA Noffke|           1|         0|         1|           0|        0|            NA|              NA|   

In [0]:
## Joining Match dataset and balls dataset 

# join function is used for merging two datasets
# we need to pass on condition for the matching columns ie ipl_dataframe.id==ipl_ball_df.id
# how parameter defines the type of join
# even after we merged the datasets , we can access the columns with the orgin dataframe name

combined_dataset = ipl_dataframe.join(ipl_ball_df,ipl_dataframe.id==ipl_ball_df.id,how='inner')

combined_dataset.select(ipl_dataframe.id,"team1","team2","city","player_of_match","inning","over","ball","batsman","bowler","total_runs","is_wicket","batting_team","bowling_team")

Out[76]: DataFrame[id: int, team1: string, team2: string, city: string, player_of_match: string, inning: int, over: int, ball: int, batsman: string, bowler: string, total_runs: int, is_wicket: int, batting_team: string, bowling_team: string]

In [0]:
### Let's find the highest run scored batsman and highest wickets taken bowler in Chennai Ground match 

## Combine the dataset
combined_dataset = ipl_dataframe.join(ipl_ball_df,ipl_dataframe.id==ipl_ball_df.id,how='inner')

## select the required dataset
adsDf=combined_dataset.select(ipl_dataframe.id,"team1","team2","city","player_of_match","inning","over","ball","batsman","bowler","total_runs","is_wicket","batting_team","bowling_team")

adsDf.filter(adsDf.city=="Chennai").groupBy("id","batsman").agg(F.sum("total_runs").alias("Batsman_runs")).sort(F.desc("Batsman_runs")).show()
adsDf.filter(adsDf.city=="Chennai").groupBy("id","bowler").agg(F.sum("is_wicket").alias("bowler_wickets")).sort(F.desc("bowler_wickets")).show()

+-------+-----------+------------+
|     id|    batsman|Batsman_runs|
+-------+-----------+------------+
| 419137|    M Vijay|         128|
| 548380|    M Vijay|         119|
| 598034|   MS Bisla|         108|
| 829711|BB McCullum|         107|
| 598026|  SR Watson|         106|
| 598041|   SK Raina|         102|
| 419137|    NV Ojha|          99|
|1178416|  SR Watson|          98|
| 501271|    M Vijay|          97|
| 501270|   CH Gayle|          96|
| 598034| MEK Hussey|          95|
| 336033|   GC Smith|          93|
| 548381|   MS Bisla|          93|
| 598026| MEK Hussey|          91|
| 335989|  ML Hayden|          89|
|1136565| AD Russell|          89|
| 829797|BB McCullum|          88|
| 501211| MEK Hussey|          86|
| 419153|   SK Raina|          85|
|1178416|  MK Pandey|          84|
+-------+-----------+------------+
only showing top 20 rows

+-------+---------------+--------------+
|     id|         bowler|bowler_wickets|
+-------+---------------+--------------+
| 336013|  

In [0]:
## Using SQL to Query the Dataframes 

## createOrReplaceTempView function used to register the dataframe as temperory view / t

ipl_dataframe.createOrReplaceTempView("ipl_data")

spark.sql("SELECT city , Winning_team_name , count(id) as wins from ipl_data group by city , Winning_team_name Order by city , wins ").show()


+---------+--------------------+----+
|     city|   Winning_team_name|wins|
+---------+--------------------+----+
|Abu Dhabi|      Delhi Capitals|   2|
|Abu Dhabi|     Kings XI Punjab|   2|
|Abu Dhabi| Sunrisers Hyderabad|   2|
|Abu Dhabi|Royal Challengers...|   2|
|Abu Dhabi| Chennai Super Kings|   3|
|Abu Dhabi|Kolkata Knight Ri...|   6|
|Abu Dhabi|    Rajasthan Royals|   6|
|Abu Dhabi|      Mumbai Indians|   6|
|Ahmedabad|Royal Challengers...|   1|
|Ahmedabad|    Delhi Daredevils|   1|
|Ahmedabad| Sunrisers Hyderabad|   1|
|Ahmedabad|      Mumbai Indians|   1|
|Ahmedabad|     Kings XI Punjab|   1|
|Ahmedabad|    Rajasthan Royals|   7|
|Bangalore|       Gujarat Lions|   1|
|Bangalore|     Deccan Chargers|   1|
|Bangalore|Rising Pune Super...|   1|
|Bangalore|                  NA|   2|
|Bangalore| Sunrisers Hyderabad|   2|
|Bangalore|    Delhi Daredevils|   3|
+---------+--------------------+----+
only showing top 20 rows



## Window Functions

In [0]:
# lets define a demonstration DataFrame to work on window functions
import pandas as pd 

df_data = {'partition': ['a','a', 'a', 'a', 'b', 'b', 'b', 'c', 'c',],
           'col_1': [1,1,1,1,2,2,2,3,3,], 
           'aggregation': [1,2,3,4,5,6,7,8,9,],
           'ranking': [4,3,2,1,1,1,3,1,5,],
           'lagging': [9,8,7,6,5,4,3,2,1,],
           'cumulative': [1,2,4,6,1,1,1,20,30,],
          }
df_pandas = pd.DataFrame.from_dict(df_data)
# create spark dataframe
df = spark.createDataFrame(df_pandas)

df.show()



+---------+-----+-----------+-------+-------+----------+
|partition|col_1|aggregation|ranking|lagging|cumulative|
+---------+-----+-----------+-------+-------+----------+
|        a|    1|          1|      4|      9|         1|
|        a|    1|          2|      3|      8|         2|
|        a|    1|          3|      2|      7|         4|
|        a|    1|          4|      1|      6|         6|
|        b|    2|          5|      1|      5|         1|
|        b|    2|          6|      1|      4|         1|
|        b|    2|          7|      3|      3|         1|
|        c|    3|          8|      1|      2|        20|
|        c|    3|          9|      5|      1|        30|
+---------+-----+-----------+-------+-------+----------+



In [0]:
# aggregation functions use the simplest form of window 
# which just defines grouping
from pyspark.sql import Window
from pyspark.sql import functions as F
# create partition windows on the column partition
aggregation_window = Window.partitionBy('partition')
# then we can use this window function for our aggregations on each partitions
df_aggregations = df.select(
    'partition', 'aggregation'
).withColumn(
    'aggregation_sum', F.sum('aggregation').over(aggregation_window),
).withColumn(
    'aggregation_avg', F.avg('aggregation').over(aggregation_window),
).withColumn(
    'aggregation_min', F.min('aggregation').over(aggregation_window),
).withColumn(
    'aggregation_max', F.max('aggregation').over(aggregation_window),
)
df_aggregations.show()
# note that after this operation the row order of display within the dataframe may have changed

+---------+-----------+---------------+---------------+---------------+---------------+
|partition|aggregation|aggregation_sum|aggregation_avg|aggregation_min|aggregation_max|
+---------+-----------+---------------+---------------+---------------+---------------+
|        a|          1|             10|            2.5|              1|              4|
|        a|          2|             10|            2.5|              1|              4|
|        a|          3|             10|            2.5|              1|              4|
|        a|          4|             10|            2.5|              1|              4|
|        b|          5|             18|            6.0|              5|              7|
|        b|          6|             18|            6.0|              5|              7|
|        b|          7|             18|            6.0|              5|              7|
|        c|          8|             17|            8.5|              8|              9|
|        c|          9|         

In [0]:
# lets define a ranking window
ranking_window = Window.partitionBy('partition').orderBy('ranking')

df_ranks = df.select(
  'partition', 'ranking'
).withColumn(
  # note that fn.row_number() does not take any arguments
  'ranking_row_number', F.row_number().over(ranking_window) 
).withColumn(
  # rank will leave spaces in ranking to account for preceding rows receiving equal ranks
  'ranking_rank', F.rank().over(ranking_window)
).withColumn(
  # dense rank does not account for previous equal rankings
  'ranking_dense_rank', F.dense_rank().over(ranking_window)
).withColumn(
  # percent rank ranges between 0-1 not 0-100
  'ranking_percent_rank', F.percent_rank().over(ranking_window)
).withColumn(
  # fn.ntile takes a parameter for now many 'buckets' to divide rows into when ranking
  'ranking_ntile_rank', F.ntile(2).over(ranking_window)
)

df_ranks.show()

+---------+-------+------------------+------------+------------------+--------------------+------------------+
|partition|ranking|ranking_row_number|ranking_rank|ranking_dense_rank|ranking_percent_rank|ranking_ntile_rank|
+---------+-------+------------------+------------+------------------+--------------------+------------------+
|        a|      1|                 1|           1|                 1|                 0.0|                 1|
|        a|      2|                 2|           2|                 2|  0.3333333333333333|                 1|
|        a|      3|                 3|           3|                 3|  0.6666666666666666|                 2|
|        a|      4|                 4|           4|                 4|                 1.0|                 2|
|        b|      1|                 1|           1|                 1|                 0.0|                 1|
|        b|      1|                 2|           1|                 1|                 0.0|                 1|
|

In [0]:
lag_window = Window.partitionBy('partition').orderBy('lagging')

df_lagged = df.select(
  'partition', 'lagging'
).withColumn(
  # note that lag requires both column and lag amount to be specified
  # It is possible to lag a column which was not the orderBy column
  'lagging_lag_1', F.lag('lagging', 1).over(lag_window)
).withColumn(
  'lagging_lag_2', F.lag('lagging', 2).over(lag_window)
).withColumn(
  'lagging_lead_1', F.lead('lagging', 1).over(lag_window)
).withColumn(
  # note how 'lagging_lag_1' == 'lagging_lead_minus_1'
  'lagging_lead_minus_1', F.lead('lagging', -1).over(lag_window)
).withColumn(
  # we can also perform calculations between lagged and unlagged columns of course
  'difference_between', F.col('lagging') - F.lag('lagging', 1).over(lag_window)
)

df_lagged.show()

+---------+-------+-------------+-------------+--------------+--------------------+------------------+
|partition|lagging|lagging_lag_1|lagging_lag_2|lagging_lead_1|lagging_lead_minus_1|difference_between|
+---------+-------+-------------+-------------+--------------+--------------------+------------------+
|        a|      6|         null|         null|             7|                null|              null|
|        a|      7|            6|         null|             8|                   6|                 1|
|        a|      8|            7|            6|             9|                   7|                 1|
|        a|      9|            8|            7|          null|                   8|                 1|
|        b|      3|         null|         null|             4|                null|              null|
|        b|      4|            3|         null|             5|                   3|                 1|
|        b|      5|            4|            3|          null|           

In [0]:

pandas_ipl = ipl_dataframe.to_pandas_on_spark()
pandas_ipl.head(2)



Unnamed: 0,id,city,date,player_of_match,venue,neutral_venue,team1,team2,toss_winner,toss_decision,Winning_team_name,result,result_margin,eliminator,method,umpire1,umpire2
0,335982,Bangalore,2008-04-18,BB McCullum,M Chinnaswamy Stadium,0,Royal Challengers Bangalore,Kolkata Knight Riders,Royal Challengers Bangalore,field,Kolkata Knight Riders,runs,140,N,,Asad Rauf,RE Koertzen
1,335983,Chandigarh,2008-04-19,MEK Hussey,"Punjab Cricket Association Stadium, Mohali",0,Kings XI Punjab,Chennai Super Kings,Chennai Super Kings,bat,Chennai Super Kings,runs,33,N,,MR Benson,SL Shastri


In [0]:
# Happy Learning