# Who is best ODI captain?

# Who is best Test captain?

Using RDD APIs, we explore the following functionalities:
1. Read text files
2. Parse records and associate with a schema
3. Filter the records
4. Group the records
5. Sort by key and values
6. Apply map and reduce functions
7. Caching RDDs
8. Apply groupByKey() and mapValues() functions
9. Collect data into driver
10. Join multiple RDDs
11. Save RDDs into files
12. Use Broadcast variables
13. Use Accumulators

In [9]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark import StorageLevel
from pyspark.sql.session import SparkSession

In [10]:
sc = SparkContext()
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

22/03/02 12:30:42 WARN Utils: Your hostname, ikom-ThinkPad-L450 resolves to a loopback address: 127.0.1.1; using 192.168.0.102 instead (on interface wlp4s0)
22/03/02 12:30:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/02 12:30:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/03/02 12:30:46 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [11]:
from pyspark import StorageLevel

Check spark version

In [12]:
sc.version

'3.2.1'

Check Spark Master Deployment Mode

In [13]:
sc.master

'local[*]'

Spark Application Name and ID

In [14]:
sc.appName

'pyspark-shell'

In [15]:
sc.applicationId

'local-1646204446999'

Spark python version

In [16]:
sc.pythonVer

'3.8'

Read ODI Captains data

In [17]:
odiData = sc.textFile( "ODIData.csv" )

Check first record

In [18]:
odiData.first()

                                                                                

'Ponting  R T,Australia,1995-2012,230,165,51,14,124'

Display first 5 records

In [19]:
odiData.take(5) 

['Ponting  R T,Australia,1995-2012,230,165,51,14,124',
 'Fleming  S P,New Zealand,1994-2007,218,98,106,14,105',
 'Ranatunga  A,Sri Lanka,1982-1999,193,89,95,9,102',
 'Dhoni  M S*,India,2004-,186,103,68,15,88',
 'Border  A R,Australia,1979-1994,178,107,67,4,86']

Check the RDD type

In [20]:
#odiData should be of RDD type
type( odiData )

pyspark.rdd.RDD

In [21]:
odiData.persist( StorageLevel.MEMORY_ONLY_SER )

AttributeError: type object 'StorageLevel' has no attribute 'MEMORY_ONLY_SER'

Count the number of records

In [22]:
odiData.count()

98

Parse the lines to create records¶

Each line from the file is taken as a record. The records are not associated with a schema. 

Now we define a schema and associate the fields in each line with the schema. One way of doing it is defining a named tuple and converting the lines into named tuples.

In [23]:
fields = ("name", "country", "career", "matches", "won", "lost", "ties", "toss" )

In [24]:
from collections import namedtuple

Let us define a tuple ( a record ) for each line. We need to iterate through each line and convert that into a record. The record can be defined as a namedTuple type and called Captain. Let's also link the names for the fields

In [25]:
Captain = namedtuple( 'Captain', fields )
print(Captain)

<class 'collections.Captain'>


Parse lines into named tuples

Create a function to parse the lines and create namedtuples. Then iterate through the data and convert them into records i.e. named tuples.

In [26]:
# Function to parse each line and convert them into records
def parseRecs( line ):
  fields = line.split(",")
  return Captain( fields[0], fields[1], fields[2], int( fields[3] ),
                 int( fields[4] ), int(fields[5]), int(fields[6]), int(fields[7] ) )

In [31]:
captains = odiData.map( lambda rec: parseRecs( rec) ) 
captains.collect()[:5]

[Captain(name='Ponting  R T', country='Australia', career='1995-2012', matches=230, won=165, lost=51, ties=14, toss=124),
 Captain(name='Fleming  S P', country='New Zealand', career='1994-2007', matches=218, won=98, lost=106, ties=14, toss=105),
 Captain(name='Ranatunga  A', country='Sri Lanka', career='1982-1999', matches=193, won=89, lost=95, ties=9, toss=102),
 Captain(name='Dhoni  M S*', country='India', career='2004-', matches=186, won=103, lost=68, ties=15, toss=88),
 Captain(name='Border  A R', country='Australia', career='1979-1994', matches=178, won=107, lost=67, ties=4, toss=86)]

22/03/02 13:48:27 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 3221445 ms exceeds timeout 120000 ms
22/03/02 13:48:27 WARN SparkContext: Killing executors is not supported by current scheduler.


Cache the RDD

Since we will be using the RDD in future for many analysis, let's cache the RDD

In [None]:
captains.cache()

PythonRDD[5] at RDD at PythonRDD.scala:53

Now captains refer to all the records. Let's display the first 5 records

In [None]:
captains.take(5)

[Captain(name='Ponting  R T', country='Australia', career='1995-2012', matches=230, won=165, lost=51, ties=14, toss=124),
 Captain(name='Fleming  S P', country='New Zealand', career='1994-2007', matches=218, won=98, lost=106, ties=14, toss=105),
 Captain(name='Ranatunga  A', country='Sri Lanka', career='1982-1999', matches=193, won=89, lost=95, ties=9, toss=102),
 Captain(name='Dhoni  M S*', country='India', career='2004-', matches=186, won=103, lost=68, ties=15, toss=88),
 Captain(name='Border  A R', country='Australia', career='1979-1994', matches=178, won=107, lost=67, ties=4, toss=86)]

What is the type of the captains RDD?

In [None]:
type( captains )


pyspark.rdd.PipelinedRDD

Exploratory data analysis

Filter the records: Captains' who captained for more than 100 ODIs

Filter only those captains that have captained for at least 100 ODI matches. And then we can compare the statistics of these captains

In [None]:
captains_100 = captains.filter( lambda rec: rec.matches > 100 ) 

How many captains have captained their country for more than 100 ODIs?

In [None]:
captains_100.count()

16

Who are these captains

In [None]:
captains_100.take( 10 )

[Captain(name='Ponting  R T', country='Australia', career='1995-2012', matches=230, won=165, lost=51, ties=14, toss=124),
 Captain(name='Fleming  S P', country='New Zealand', career='1994-2007', matches=218, won=98, lost=106, ties=14, toss=105),
 Captain(name='Ranatunga  A', country='Sri Lanka', career='1982-1999', matches=193, won=89, lost=95, ties=9, toss=102),
 Captain(name='Dhoni  M S*', country='India', career='2004-', matches=186, won=103, lost=68, ties=15, toss=88),
 Captain(name='Border  A R', country='Australia', career='1979-1994', matches=178, won=107, lost=67, ties=4, toss=86),
 Captain(name='Azharuddin  M', country='India', career='1985-2000', matches=174, won=89, lost=77, ties=8, toss=96),
 Captain(name='Smith  G C', country='South Africa', career='2002-2013', matches=149, won=91, lost=51, ties=7, toss=74),
 Captain(name='Ganguly  S C', country='India', career='1992-2007', matches=147, won=76, lost=66, ties=5, toss=74),
 Captain(name='Cronje  W J', country='South Africa',

Write an utility / data function to return num captains

In [None]:
def getNumCaptainsByMinMatches( anRDD, num_matches ):
  return anRDD.map( lambda rec: parseRecs( rec) ).filter( lambda rec: rec.matches > num_matches ).count()

In [None]:
getNumCaptainsByMinMatches(odiData, 100 )

16

Captains with more wins than losses

In [None]:
captains_more_wins = captains_100.filter( lambda rec: rec.won > rec.lost )

Collect the captain names in the driver

In [None]:
# Captains with more wins than losses
captains_more_wins.map( lambda rec: rec.name ).collect()

['Ponting  R T',
 'Dhoni  M S*',
 'Border  A R',
 'Azharuddin  M',
 'Smith  G C',
 'Ganguly  S C',
 'Cronje  W J',
 'Imran Khan',
 'Jayawardene  D P M',
 'Jayasuriya  S T',
 'Wasim Akram',
 'Waugh  S R',
 'Richards  I V A']

In [None]:
# Captains with less wins than losses
captains_more_losts = captains_100.filter( lambda rec: rec.won <= rec.lost )
captains_more_losts.map( lambda rec: rec.name ).collect()

['Fleming  S P', 'Ranatunga  A', 'Lara  B C']

Creat a subset of data by filtering columns

In [None]:
# Which country has played how many matches..
countries = captains.map( lambda rec: ( rec.country , rec.matches) )

In [None]:
countries.take( 10 )

[('Australia', 230),
 ('New Zealand', 218),
 ('Sri Lanka', 193),
 ('India', 186),
 ('Australia', 178),
 ('India', 174),
 ('South Africa', 149),
 ('India', 147),
 ('South Africa', 140),
 ('Pakistan', 139)]

In [None]:
# Aggregate by countries
matches_countries = countries.reduceByKey( lambda a, b: a + b )

In [None]:
matches_countries.take( 20 )

[('Australia', 832),
 ('India', 770),
 ('South Africa', 463),
 ('Pakistan', 781),
 ('West Indies', 658),
 ('Kenya', 114),
 ('Ireland', 93),
 ('Netherlands', 31),
 ('Bermuda', 31),
 ('New Zealand', 608),
 ('Sri Lanka', 710),
 ('Zimbabwe', 394),
 ('England', 554),
 ('Bangladesh', 251),
 ('Afghanistan', 50),
 ('Canada', 27)]

# Aggregate values by keys

In [None]:
# Aggregate by countries
matches_countries = countries.reduceByKey( lambda a, b: a + b )

In [None]:
matches_countries.take( 20 )

[('Australia', 832),
 ('India', 770),
 ('South Africa', 463),
 ('Pakistan', 781),
 ('West Indies', 658),
 ('Kenya', 114),
 ('Ireland', 93),
 ('Netherlands', 31),
 ('Bermuda', 31),
 ('New Zealand', 608),
 ('Sri Lanka', 710),
 ('Zimbabwe', 394),
 ('England', 554),
 ('Bangladesh', 251),
 ('Afghanistan', 50),
 ('Canada', 27)]

In [None]:
# Sort the countries by the number of matches they played. 
# Sort by names...(sort by key)
matches_countries.sortByKey().collect()

[('Afghanistan', 50),
 ('Australia', 832),
 ('Bangladesh', 251),
 ('Bermuda', 31),
 ('Canada', 27),
 ('England', 554),
 ('India', 770),
 ('Ireland', 93),
 ('Kenya', 114),
 ('Netherlands', 31),
 ('New Zealand', 608),
 ('Pakistan', 781),
 ('South Africa', 463),
 ('Sri Lanka', 710),
 ('West Indies', 658),
 ('Zimbabwe', 394)]

Sort records

In [None]:
matches_countries.sortByKey( ascending = False ).collect()

[('Zimbabwe', 394),
 ('West Indies', 658),
 ('Sri Lanka', 710),
 ('South Africa', 463),
 ('Pakistan', 781),
 ('New Zealand', 608),
 ('Netherlands', 31),
 ('Kenya', 114),
 ('Ireland', 93),
 ('India', 770),
 ('England', 554),
 ('Canada', 27),
 ('Bermuda', 31),
 ('Bangladesh', 251),
 ('Australia', 832),
 ('Afghanistan', 50)]

In [None]:
# Sort by values.. default is ascending...
matches_countries.sortBy( lambda rec: rec[1] ).collect()

[('Canada', 27),
 ('Netherlands', 31),
 ('Bermuda', 31),
 ('Afghanistan', 50),
 ('Ireland', 93),
 ('Kenya', 114),
 ('Bangladesh', 251),
 ('Zimbabwe', 394),
 ('South Africa', 463),
 ('England', 554),
 ('New Zealand', 608),
 ('West Indies', 658),
 ('Sri Lanka', 710),
 ('India', 770),
 ('Pakistan', 781),
 ('Australia', 832)]

In [None]:
# Sort by value by descending
matches_countries.sortBy( lambda rec: rec[1], ascending = False ).collect()

[('Australia', 832),
 ('Pakistan', 781),
 ('India', 770),
 ('Sri Lanka', 710),
 ('West Indies', 658),
 ('New Zealand', 608),
 ('England', 554),
 ('South Africa', 463),
 ('Zimbabwe', 394),
 ('Bangladesh', 251),
 ('Kenya', 114),
 ('Ireland', 93),
 ('Afghanistan', 50),
 ('Netherlands', 31),
 ('Bermuda', 31),
 ('Canada', 27)]

Write a data function to return countries and total matches played in sorted order

In [None]:
def getNumMatchesPerCountry( anRDD ):
  return anRDD.map( lambda rec: parseRecs( rec) )          \
  .map( lambda rec: ( rec.country , rec.matches) )         \
  .reduceByKey( lambda a, b: a + b )                       \
  .sortBy( lambda rec: rec[1], ascending = False )

Test the Function: getNumMatchesPerCountry

Invoking the Function

In [None]:
getNumMatchesPerCountry( odiData ).take( 10 )

[('Australia', 832),
 ('Pakistan', 781),
 ('India', 770),
 ('Sri Lanka', 710),
 ('West Indies', 658),
 ('New Zealand', 608),
 ('England', 554),
 ('South Africa', 463),
 ('Zimbabwe', 394),
 ('Bangladesh', 251)]

Top Captains by Percentage Wins

In [None]:
# Captains by percentage of wins
captains_100_percent_wins = captains_100.map(
  lambda rec: ( rec.name, round( rec.won/rec.matches, 2 ) ) )

# Sort by percentage wins
captains_100_percent_wins.sortBy(
  lambda rec: rec[1], ascending = False ).collect()

[('Ponting  R T', 0.72),
 ('Cronje  W J', 0.71),
 ('Richards  I V A', 0.64),
 ('Waugh  S R', 0.63),
 ('Smith  G C', 0.61),
 ('Wasim Akram', 0.61),
 ('Border  A R', 0.6),
 ('Jayasuriya  S T', 0.56),
 ('Dhoni  M S*', 0.55),
 ('Jayawardene  D P M', 0.55),
 ('Imran Khan', 0.54),
 ('Ganguly  S C', 0.52),
 ('Azharuddin  M', 0.51),
 ('Lara  B C', 0.47),
 ('Ranatunga  A', 0.46),
 ('Fleming  S P', 0.45)]

Lucky Captains

In [None]:
# Captains by percentage of wins
lucky_captains = captains_100.map(
  lambda rec: ( rec.name, round( rec.toss / rec.matches, 2 ) ) )

In [None]:
lucky_captains.sortBy( lambda rec: rec[1], ascending = False ).take( 5 )

[('Azharuddin  M', 0.55),
 ('Ponting  R T', 0.54),
 ('Ranatunga  A', 0.53),
 ('Cronje  W J', 0.53),
 ('Wasim Akram', 0.53)]

# Now lets Captains'' Test Match Performances

Load the dataset

In [None]:
testsData = sc.textFile( "hdfs://hadoopns/user/narayana_greatlearning/TestsData.csv" )

In [None]:
# Parse the records
testsData_recs = testsData.map( lambda rec: parseRecs( rec ) )

In [None]:
# Display the first 10 records
testsData_recs.take( 10 )

In [None]:
# Filter the captains who have captained for more than 100 tests
testsData_100 = testsData_recs.filter( lambda rec: rec.matches > 100 )

In [None]:
## How many captains?
testsData_100.take( 10 )

Observe that:

There is only one captain who has captained more then 100 tests. So, we must lower the number of matches to look at more captains performance. We will filter out all captains who have played more than 50 matches.

In [None]:
# Filter the captains who have captained for more than 50 tests
testsData_50 = testsData_recs.filter( lambda rec: rec.matches > 50 )

In [None]:
testsData_50.take( 5 )

Calculate the test percentage wins

In [None]:
# Sort the captains by percentage of wins
captain_top = testsData_50.map(
  lambda rec: ( rec.name,
               round( rec.won/rec.matches,
                     2 ) ) ).sortBy( lambda rec: rec[1], ascending = False )

In [None]:
captain_top.collect()

Join multiple data sets

In [None]:
# Lets join both ODI and Test captaincy details. 
# Default is inner join...
all_time_best_captains = captains_100_percent_wins.join( captain_top )

In [None]:
all_time_best_captains.collect()

In [None]:
## Best by test match wins
all_time_best_captains.sortBy( lambda rec: rec[1][1],
                            ascending = False ).collect()

In [None]:
## Best by ODI match wins
all_time_best_captains.sortBy( lambda rec: rec[1][0],
                            ascending = False ).collect()

Flattening the tuples

In [None]:
## Now let's flatten the structure and store the results into a file...
best_captains = all_time_best_captains.map( lambda rec:
                                         ( rec[0],
                                          rec[1][0],
                                          rec[1][1] ) )
best_captains.take( 10 )

Save results into filesystem

In [None]:
best_captains.saveAsTextFile( "hdfs://hadoopns/user/narayana_greatlearning/captains")

Generate the key and values

In [None]:
country_matches_list = captains.map( lambda rec: ( rec.country , ( rec.won, rec.matches ) ) )

In [None]:
country_matches_list.take( 5 )

Group the values

In [None]:
country_matches_list.groupByKey().mapValues( lambda rec: list( rec ) ).take( 2 )

Apply sum() to each element of values tuple

Broadcasting Variables

In [None]:
def getNumCaptainsByMinimumMatches( anRDD, num_matches ):
  min_matches = sc.broadcast( num_matches )
  return anRDD.map( lambda rec: parseRecs( rec) ).filter( lambda rec: rec.matches > min_matches.value ).count()

In [None]:
getNumCaptainsByMinimumMatches( odiData, 100 )

In [None]:
getNumCaptainsByMinimumMatches( odiData, 50 )

Accumulators

In [None]:
def parRecsNew( line ):
  global playing_caps
  if "*" in line:
     playing_caps.add( 1 )
  return parseRecs( line )

In [None]:
playing_caps = sc.accumulator(0)

In [None]:
captains_odis_new = odiData.map( lambda line: parRecsNew( line ) )

In [None]:
captains_odis_new.count()

In [None]:
playing_caps.value