In [0]:
spark

In [0]:
import matplotlib.pyplot as plt

In [0]:
sc = spark.sparkContext

In [0]:
captains_odis = sc.textFile( "/FileStore/tables/captains_ODI.csv" )
captains_odis.take(5)

Out[163]: ['Ponting  R T,Australia,1995-2012,230,165,51,14,124',
 'Fleming  S P,New Zealand,1994-2007,218,98,106,14,105',
 'Ranatunga  A,Sri Lanka,1982-1999,193,89,95,9,102',
 'Dhoni  M S*,India,2004-,186,103,68,15,88',
 'Border  A R,Australia,1979-1994,178,107,67,4,86']

#### Fields: "name", "country", "career", "matches", "won", "lost", "ties", "toss"

In [0]:
captains_test = sc.textFile( "/FileStore/tables/captains_Test.csv" )
captains_test.take(5)

Out[164]: ['Smith  G C,South Africa,2002-2014,109,53,29,27,58',
 'Border  A R,Australia,1978-1994,93,32,22,38,47',
 'Fleming  S P,New Zealand,1994-2008,80,28,27,25,38',
 'Ponting  R T,Australia,1995-2012,77,48,16,13,37',
 'Lloyd  C H,West Indies,1966-1984,74,36,12,26,35']

In [0]:
def getWinningPerc(line):
  
  tokens = line.split(',')
  winPerc = round(float(tokens[4])/float(tokens[3]), 2) # win Percentage = Won/Total Matches
  
  return (tokens[0], winPerc) # return (key,value). eg.('Ponting  R T', 0.72)

In [0]:
captains_odi_winPerc = captains_odis.map( lambda line: getWinningPerc(line) ) #finding winning percentage of each captain
captains_odi_winPerc.take(10)

Out[166]: [('Ponting  R T', 0.72),
 ('Fleming  S P', 0.45),
 ('Ranatunga  A', 0.46),
 ('Dhoni  M S*', 0.55),
 ('Border  A R', 0.6),
 ('Azharuddin  M', 0.51),
 ('Smith  G C', 0.61),
 ('Ganguly  S C', 0.52),
 ('Cronje  W J', 0.71),
 ('Imran Khan', 0.54)]

In [0]:
captains_test_winPerc = captains_test.map( lambda line: getWinningPerc(line) ) #finding winning percentage of each captain
captains_test_winPerc.take(10)

Out[167]: [('Smith  G C', 0.49),
 ('Border  A R', 0.34),
 ('Fleming  S P', 0.35),
 ('Ponting  R T', 0.62),
 ('Lloyd  C H', 0.49),
 ('Dhoni  M S*', 0.45),
 ('Waugh  S R', 0.72),
 ('Ranatunga  A', 0.21),
 ('Atherton  M A', 0.24),
 ('Cronje  W J', 0.51)]

In [0]:
#join two results

# returns ODI win percentage and Test win Percentage
def getWins(vals):
  odiWins, testWins = list(vals[0]), list(vals[1])
  
  return (odiWins, testWins)

In [0]:
# Display the first 10 records after grouping
grouped_data=captains_test_winPerc.groupWith( captains_odi_winPerc ).mapValues(getWins)
grouped_data.take(10)

Out[169]: [('Smith  G C', ([0.49], [0.61])),
 ('Ganguly  S C', ([0.43], [0.52])),
 ('Simpson  R B', ([0.31], [])),
 ('Jayasuriya  S T', ([0.47], [0.56])),
 ('Brearley  J M', ([0.58], [0.6])),
 ('Sammy  D J G*', ([0.27], [0.36])),
 ('Hughes  K J', ([0.14], [0.43])),
 ('Cowdrey  M C', ([0.3], [])),
 ('Tendulkar  S R', ([0.16], [0.32])),
 ('Goddard  J D C', ([0.36], []))]

In [0]:
# Transform the RDD to the desired format
transformed_rdd = grouped_data.map(lambda x: (x[0], x[1][0], x[1][1])     )
transformed_rdd.take(10)                 

Out[170]: [('Smith  G C', [0.49], [0.61]),
 ('Ganguly  S C', [0.43], [0.52]),
 ('Simpson  R B', [0.31], []),
 ('Jayasuriya  S T', [0.47], [0.56]),
 ('Brearley  J M', [0.58], [0.6]),
 ('Sammy  D J G*', [0.27], [0.36]),
 ('Hughes  K J', [0.14], [0.43]),
 ('Cowdrey  M C', [0.3], []),
 ('Tendulkar  S R', [0.16], [0.32]),
 ('Goddard  J D C', [0.36], [])]

In [0]:
# Transform the RDD
transformed_rdd = transformed_rdd.map(lambda x: (x[0], x[1][0], x[2][0] if x[2] else 0))
transformed_rdd.take(5)

Out[171]: [('Smith  G C', 0.49, 0.61),
 ('Ganguly  S C', 0.43, 0.52),
 ('Simpson  R B', 0.31, 0),
 ('Jayasuriya  S T', 0.47, 0.56),
 ('Brearley  J M', 0.58, 0.6)]

In [0]:
captain_stats_all = transformed_rdd.map(lambda rec: (rec[0],
                                               {"Test_wins": rec[1],
                                               "ODI_wins": rec[2]}))
                                                #data is formatted to two categories as "Test_wins" and "ODI_wins"


captain_stats_all.take(13)
                                

Out[172]: [('Smith  G C', {'Test_wins': 0.49, 'ODI_wins': 0.61}),
 ('Ganguly  S C', {'Test_wins': 0.43, 'ODI_wins': 0.52}),
 ('Simpson  R B', {'Test_wins': 0.31, 'ODI_wins': 0}),
 ('Jayasuriya  S T', {'Test_wins': 0.47, 'ODI_wins': 0.56}),
 ('Brearley  J M', {'Test_wins': 0.58, 'ODI_wins': 0.6}),
 ('Sammy  D J G*', {'Test_wins': 0.27, 'ODI_wins': 0.36}),
 ('Hughes  K J', {'Test_wins': 0.14, 'ODI_wins': 0.43}),
 ('Cowdrey  M C', {'Test_wins': 0.3, 'ODI_wins': 0}),
 ('Tendulkar  S R', {'Test_wins': 0.16, 'ODI_wins': 0.32}),
 ('Goddard  J D C', {'Test_wins': 0.36, 'ODI_wins': 0}),
 ('MacLaren  A C', {'Test_wins': 0.18, 'ODI_wins': 0}),
 ('Darling  J', {'Test_wins': 0.33, 'ODI_wins': 0}),
 ('Hammond  W R', {'Test_wins': 0.2, 'ODI_wins': 0})]

### Joins

In [0]:
# Splitting each line in the RDDs by comma and keying each element by country
captains_odis_rdd = captains_odis.map(lambda x: x.split(",")).keyBy(lambda x: x[1])
captains_test_rdd = captains_test.map(lambda x: x.split(",")).keyBy(lambda x: x[1])

In [0]:
captains_odis_rdd.take(5)

Out[174]: [('Australia',
  ['Ponting  R T', 'Australia', '1995-2012', '230', '165', '51', '14', '124']),
 ('New Zealand',
  ['Fleming  S P',
   'New Zealand',
   '1994-2007',
   '218',
   '98',
   '106',
   '14',
   '105']),
 ('Sri Lanka',
  ['Ranatunga  A', 'Sri Lanka', '1982-1999', '193', '89', '95', '9', '102']),
 ('India', ['Dhoni  M S*', 'India', '2004-', '186', '103', '68', '15', '88']),
 ('Australia',
  ['Border  A R', 'Australia', '1979-1994', '178', '107', '67', '4', '86'])]

In [0]:
captains_test_rdd.take(5)

Out[175]: [('South Africa',
  ['Smith  G C', 'South Africa', '2002-2014', '109', '53', '29', '27', '58']),
 ('Australia',
  ['Border  A R', 'Australia', '1978-1994', '93', '32', '22', '38', '47']),
 ('New Zealand',
  ['Fleming  S P', 'New Zealand', '1994-2008', '80', '28', '27', '25', '38']),
 ('Australia',
  ['Ponting  R T', 'Australia', '1995-2012', '77', '48', '16', '13', '37']),
 ('West Indies',
  ['Lloyd  C H', 'West Indies', '1966-1984', '74', '36', '12', '26', '35'])]

#### 1. RDD.leftOuterJoin:

In [0]:
# Performing a left outer join between captains_odis_rdd and captains_test_rdd
left_outer_join = captains_odis_rdd.leftOuterJoin(captains_test_rdd)
left_outer_join.take(2)

Out[176]: [('South Africa',
  (['Smith  G C', 'South Africa', '2002-2013', '149', '91', '51', '7', '74'],
   ['Smith  G C',
    'South Africa',
    '2002-2014',
    '109',
    '53',
    '29',
    '27',
    '58'])),
 ('South Africa',
  (['Smith  G C', 'South Africa', '2002-2013', '149', '91', '51', '7', '74'],
   ['Cronje  W J',
    'South Africa',
    '1992-2000',
    '53',
    '27',
    '11',
    '15',
    '22']))]

#### 2. RDD.rightOuterJoin:

In [0]:
# Performing a right outer join between captains_odis_rdd and captains_test_rdd
right_outer_join = captains_test_rdd.rightOuterJoin(captains_odis_rdd)
right_outer_join.take(2)

Out[177]: [('South Africa',
  (['Smith  G C', 'South Africa', '2002-2014', '109', '53', '29', '27', '58'],
   ['Smith  G C', 'South Africa', '2002-2013', '149', '91', '51', '7', '74'])),
 ('South Africa',
  (['Smith  G C', 'South Africa', '2002-2014', '109', '53', '29', '27', '58'],
   ['Cronje  W J',
    'South Africa',
    '1992-2000',
    '140',
    '99',
    '37',
    '4',
    '74']))]

#### 3. RDD.fullOuterJoin:

In [0]:
# Performing a full outer join between captains_odis_rdd and captains_test_rdd
full_outer_join = captains_odis_rdd.fullOuterJoin(captains_test_rdd)
full_outer_join.take(2)

Out[178]: [('South Africa',
  (['Smith  G C', 'South Africa', '2002-2013', '149', '91', '51', '7', '74'],
   ['Smith  G C',
    'South Africa',
    '2002-2014',
    '109',
    '53',
    '29',
    '27',
    '58'])),
 ('South Africa',
  (['Smith  G C', 'South Africa', '2002-2013', '149', '91', '51', '7', '74'],
   ['Cronje  W J',
    'South Africa',
    '1992-2000',
    '53',
    '27',
    '11',
    '15',
    '22']))]