### In this notebook we will try to do some basics around Pyspark, we have taken very small dataset which consist of records of cricket captains around the world,by this data we will try to answer few questions

In [1]:
from pyspark.sql import SparkSession
ss = SparkSession.builder.appName("cricket").getOrCreate()

In [2]:
ss

In [3]:
mainDf = ss.read.text('captain_record.txt')

In [4]:
type(mainDf)

pyspark.sql.dataframe.DataFrame

difference between show() ,take() & head() 
- show will display result in tabular format & return type is None 
- Take will return list of values i.e. rows which can be further utilized
- head & take is extacly same

In [5]:
# take first 5 record & put it into another variable
new_rdd = mainDf.take(5)

In [6]:
new_1 = mainDf.show(5)

+--------------------+
|               value|
+--------------------+
|RT Ponting (AUS/I...|
|SP Fleming (NZ)	1...|
|MS Dhoni (INDIA)	...|
|A Ranatunga (SL)	...|
|AR Border (AUS)	1...|
+--------------------+
only showing top 5 rows



In [7]:
mainDf.head(5)

[Row(value='RT Ponting (AUS/ICC)\t2002-2012\t230\t165\t51\t2\t12\t76.14'),
 Row(value='SP Fleming (NZ)\t1997-2007\t218\t98\t106\t1\t13\t48.04'),
 Row(value='MS Dhoni (INDIA)\t2007-2018\t200\t110\t74\t5\t11\t59.52'),
 Row(value='A Ranatunga (SL)\t1988-1999\t193\t89\t95\t1\t8\t48.37'),
 Row(value='AR Border (AUS)\t1985-1994\t178\t107\t67\t1\t3\t61.42')]

In [8]:
type(new_rdd)

list

In [9]:
type(new_1)

NoneType

In [10]:
# First we will work with RDD
# row object has only one column which is value, its single string 
# we need to change that to table format rows & columns

In [12]:
mainRDD = mainDf.rdd

In [13]:
from collections import namedtuple
Record = namedtuple("record",["name","span","matches","win","lost","tie","no_result","strike_rate"])

In [14]:
def convert_record(s):
    fe = s.value.split("\t")
    return Record(fe[0],fe[1],int(fe[2]),int(fe[3]),int(fe[4]),int(fe[5]),int(fe[6]),float(fe[7]))

In [15]:
# data frame does not have map functionality so first converting it into RDD 
record_rdd = mainRDD.map(convert_record)

In [17]:
record_rdd.take(1)

[record(name='RT Ponting (AUS/ICC)', span='2002-2012', matches=230, win=165, lost=51, tie=2, no_result=12, strike_rate=76.14)]

#### Data analysis

##### Most decorated captain in the world highest percentage wins

In [18]:
decorated_rdd = record_rdd.map(lambda x: (x.name,x.win/x.matches))

In [19]:
decorated_rdd.sortBy(lambda x: x[1],False).collect()

[('CH Lloyd (WI)', 0.7619047619047619),
 ('V Kohli (INDIA)', 0.725),
 ('F du Plessis (SA)', 0.717948717948718),
 ('RT Ponting (AUS/ICC)', 0.717391304347826),
 ('WJ Cronje (SA)', 0.717391304347826),
 ('MW Gatting (ENG)', 0.7027027027027027),
 ('MJ Clarke (AUS)', 0.6756756756756757),
 ('IVA Richards (WI)', 0.638095238095238),
 ('SR Waugh (AUS)', 0.6320754716981132),
 ('SM Pollock (Afr/ICC/SA)', 0.6185567010309279),
 ('Saleem Malik (PAK)', 0.6176470588235294),
 ('GC Smith (Afr/SA)', 0.6133333333333333),
 ('EJG Morgan (ENG)', 0.6126126126126126),
 ('Shoaib Malik (PAK)', 0.6097560975609756),
 ('Wasim Akram (PAK)', 0.6055045871559633),
 ('AR Border (AUS)', 0.601123595505618),
 ('KC Sangakkara (SL)', 0.6),
 ('Waqar Younis (PAK)', 0.5967741935483871),
 ('Moin Khan (PAK)', 0.5882352941176471),
 ('BB McCullum (NZ)', 0.5806451612903226),
 ('Inzamam-ul-Haq (Asia/PAK)', 0.5777777777777777),
 ('AB de Villiers (SA)', 0.5728155339805825),
 ('Sarfraz Ahmed (PAK)', 0.56),
 ('ST Jayasuriya (SL)', 0.55932

##### Most decorated indian Captain

In [20]:
record_rdd.take(1)

[record(name='RT Ponting (AUS/ICC)', span='2002-2012', matches=230, win=165, lost=51, tie=2, no_result=12, strike_rate=76.14)]

In [21]:
decorated_rdd.filter(lambda x:"INDIA" in x[0]).collect()

[('MS Dhoni (INDIA)', 0.55),
 ('M Azharuddin (INDIA)', 0.5172413793103449),
 ('SC Ganguly (Asia/INDIA)', 0.5170068027210885),
 ('V Kohli (INDIA)', 0.725),
 ('R Dravid (INDIA)', 0.5316455696202531),
 ('N Kapil Dev (INDIA)', 0.527027027027027),
 ('SR Tendulkar (INDIA)', 0.3150684931506849),
 ('SM Gavaskar (INDIA)', 0.3783783783783784)]

##### Un-lucky capatin with max number of no results

In [22]:
record_rdd.sortBy(lambda x:x.no_result,False).take(1)

[record(name='SP Fleming (NZ)', span='1997-2007', matches=218, win=98, lost=106, tie=1, no_result=13, strike_rate=48.04)]

#### analysing RDD

In [29]:
# RDD dont have printschema function , so converting RDD to dataframe first
record_rdd.toDF().printSchema()

root
 |-- name: string (nullable = true)
 |-- span: string (nullable = true)
 |-- matches: long (nullable = true)
 |-- win: long (nullable = true)
 |-- lost: long (nullable = true)
 |-- tie: long (nullable = true)
 |-- no_result: long (nullable = true)
 |-- strike_rate: double (nullable = true)



In [30]:
# Dataframe dont have function to get number of partitions so converting into RDD first
record_rdd.getNumPartitions()

1

In [31]:
# size of data
record_rdd.count()

88

##### Repartition vs coalesce

In [32]:
# repartition will shuffle all data & make data distribution even where as coalesce is 
# optimized versin of repartition where data movemet is minimum

# normally we do repartitioning after we filter large dataset which could lead to imbalance data in the 
#nodes

In [33]:
record_rdd = record_rdd.repartition(2)

In [34]:
record_rdd.getNumPartitions()

2

#### We will solve same problem with dataframe

In [54]:
# create schema 
from pyspark.sql.types import StructType,IntegerType,StringType,DecimalType,StructField

schema = StructType([
    StructField("Name",StringType(),False),
    StructField("span",StringType(),False),
    StructField("Played",IntegerType(),False),
    StructField("Won",IntegerType(),False),
    StructField("lost",IntegerType(),False),
    StructField("Draw",IntegerType(),False),
    StructField("No Results",IntegerType(),False),
    StructField("Rate",DecimalType(scale=2),False),
    
])

In [55]:
mainDf = ss.read.csv('captain_record.txt',sep="\t",schema=schema)

In [56]:
mainDf.show()

+--------------------+---------+------+---+----+----+----------+-----+
|                Name|     span|Played|Won|lost|Draw|No Results| Rate|
+--------------------+---------+------+---+----+----+----------+-----+
|RT Ponting (AUS/ICC)|2002-2012|   230|165|  51|   2|        12|76.14|
|     SP Fleming (NZ)|1997-2007|   218| 98| 106|   1|        13|48.04|
|    MS Dhoni (INDIA)|2007-2018|   200|110|  74|   5|        11|59.52|
|    A Ranatunga (SL)|1988-1999|   193| 89|  95|   1|         8|48.37|
|     AR Border (AUS)|1985-1994|   178|107|  67|   1|         3|61.42|
|M Azharuddin (INDIA)|1990-1999|   174| 90|  76|   2|         6|54.16|
|   GC Smith (Afr/SA)|2003-2011|   150| 92|  51|   1|         6|64.23|
|SC Ganguly (Asia/...|1999-2005|   147| 76|  66|   0|         5|53.52|
|    Imran Khan (PAK)|1982-1992|   139| 75|  59|   1|         4|55.92|
|      WJ Cronje (SA)|1994-2000|   138| 99|  35|   1|         3|73.70|
|DPMD Jayawardene ...|2004-2013|   129| 71|  49|   1|         8|59.09|
|     

In [57]:
mainDf.printSchema()

root
 |-- Name: string (nullable = true)
 |-- span: string (nullable = true)
 |-- Played: integer (nullable = true)
 |-- Won: integer (nullable = true)
 |-- lost: integer (nullable = true)
 |-- Draw: integer (nullable = true)
 |-- No Results: integer (nullable = true)
 |-- Rate: decimal(10,2) (nullable = true)



In [58]:
mainDf.rdd.getNumPartitions()

1

In [62]:
mainDf = mainDf.withColumn("per_win",mainDf['Won']/mainDf['Played'])

In [66]:
mainDf.sort("per_win",ascending=False).head(1)

[Row(Name='CH Lloyd (WI)', span='1975-1985', Played=84, Won=64, lost=18, Draw=1, No Results=1, Rate=Decimal('77.71'), per_win=0.7619047619047619)]

In [76]:
import pyspark.sql.functions as sf
indian_captain = mainDf.sort("per_win",ascending=False).filter(sf.col("Name").contains("INDIA")).select(['Name','per_win']).collect()

In [78]:
type(indian_captain[0])

pyspark.sql.types.Row

In [81]:
# creating data frame from Row type
ss.createDataFrame(indian_captain).show()

+--------------------+------------------+
|                Name|           per_win|
+--------------------+------------------+
|     V Kohli (INDIA)|             0.725|
|    MS Dhoni (INDIA)|              0.55|
|    R Dravid (INDIA)|0.5316455696202531|
| N Kapil Dev (INDIA)| 0.527027027027027|
|M Azharuddin (INDIA)|0.5172413793103449|
|SC Ganguly (Asia/...|0.5170068027210885|
| SM Gavaskar (INDIA)|0.3783783783783784|
|SR Tendulkar (INDIA)|0.3150684931506849|
+--------------------+------------------+



##### which country is the  best in terms of percentage win

In [94]:
from pyspark.sql.functions import regexp_extract,col
mainDf = mainDf.withColumn("country_name",regexp_extract(col("Name"),r'\(.*\)',0))

In [119]:
mainDf.select("country_name").distinct().show()

+------------+
|country_name|
+------------+
|       (ENG)|
|        (SL)|
|        (NL)|
|       (PAK)|
|     (BDESH)|
|   (AUS/ICC)|
|     (KENYA)|
|       (IRE)|
|(Afr/ICC/SA)|
|     (BMUDA)|
|       (ZIM)|
|  (Asia/PAK)|
|(Asia/INDIA)|
|        (SA)|
|   (Asia/SL)|
|    (Afr/SA)|
|     (INDIA)|
|        (WI)|
|        (NZ)|
|       (AUS)|
+------------+
only showing top 20 rows



In [129]:
lookup_dict = {"(AUS/ICC)":"(AUS)",
               "(Afr/ICC/SA)":"(SA)",
               "(Afr/SA)":"(SA)",
               "(Asia/SL)":"(SL)",
               "(Asia/INDIA)":"(INDIA)",
               "(Asia/PAK)":"(PAK)"}

In [130]:
from pyspark.sql.functions import udf
unified_country = udf(lambda x: lookup_dict[x] if x in list(lookup_dict.keys()) else x,StringType())

In [131]:
# clean country name data
mainDf = mainDf.withColumn("clean_country",unified_country("country_name"))

In [137]:
from pyspark.sql import functions as F
mainDf.select("clean_country","per_win").groupBy("clean_country").agg(F.mean('per_win').alias("avg_per_win")).sort("avg_per_win",ascending=False).collect()

[Row(clean_country='(SA)', avg_per_win=0.6041101625427954),
 Row(clean_country='(AUS)', avg_per_win=0.5638648019546605),
 Row(clean_country='(AFG)', avg_per_win=0.5535714285714286),
 Row(clean_country='(PAK)', avg_per_win=0.5432463622101326),
 Row(clean_country='(ENG)', avg_per_win=0.5129823986286141),
 Row(clean_country='(INDIA)', avg_per_win=0.5076709562759721),
 Row(clean_country='(WI)', avg_per_win=0.48135842577529886),
 Row(clean_country='(SL)', avg_per_win=0.48128530036028533),
 Row(clean_country='(NZ)', avg_per_win=0.4733916824624134),
 Row(clean_country='(IRE)', avg_per_win=0.43998893805309736),
 Row(clean_country='(KENYA)', avg_per_win=0.3698630136986301),
 Row(clean_country='(BDESH)', avg_per_win=0.34573132966052605),
 Row(clean_country='(NL)', avg_per_win=0.3225806451612903),
 Row(clean_country='(ZIM)', avg_per_win=0.2909022250483666),
 Row(clean_country='(BMUDA)', avg_per_win=0.1875)]