In [1]:
#Data Sampling with spark
dataset = sc.parallelize([1,2,3,4,5,6,7,8,9,10])

dataset.takeSample(False, 3)

[2, 8, 9]

In [2]:
#spark transformation functions - map(),flatmap() and filter()
lorem = sc.textFile('lorem.txt')

lorem.map(lambda x: x.upper()) \ # maps the strings to upper case and output is a list of strings in upper case
.flatMap(lambda x: x.split()) \ # result is flattened ie. output is single list with all uppercase letters
.filter(lambda x: len(x) > 4) \ # only considers the strings with more than 4 characters for processing
.collect()

['LOREM',
 'IPSUM',
 'DOLOR',
 'AMET,',
 'CONSECTETUR',
 'ADIPISCING',
 'ELIT,',
 'EIUSMOD',
 'TEMPOR',
 'INCIDIDUNT',
 'LABORE',
 'DOLORE',
 'MAGNA',
 'ALIQUA.',
 'MINIM',
 'VENIAM,',
 'NOSTRUD',
 'EXERCITATION',
 'ULLAMCO',
 'LABORIS',
 'ALIQUIP',
 'COMMODO',
 'CONSEQUAT.',
 'IRURE',
 'DOLOR',
 'REPREHENDERIT',
 'VOLUPTATE',
 'VELIT',
 'CILLUM',
 'DOLORE',
 'FUGIAT',
 'NULLA',
 'PARIATUR.',
 'EXCEPTEUR',
 'OCCAECAT',
 'CUPIDATAT',
 'PROIDENT,',
 'CULPA',
 'OFFICIA',
 'DESERUNT',
 'MOLLIT',
 'LABORUM.']

In [22]:
#Grouping, Sorting, and Distinct Functions
exmple = sc.parallelize([(1,'aaa'),(2,'bbb'),(3,'ccc'),(1,'lll')])


In [23]:
exmple.collect()

[(1, 'aaa'), (2, 'bbb'), (3, 'ccc'), (1, 'lll')]

In [24]:
distnctrecs=exmple.distinct()

In [29]:
distnctrecs

PythonRDD[26] at RDD at PythonRDD.scala:53

In [26]:
distnctrecs.collect()

[(3, 'ccc'), (1, 'lll'), (1, 'aaa'), (2, 'bbb')]

In [12]:
grouped = distnctrecs.groupBy(lambda x: x[0]) \
.map(lambda x: (x[0], list(x[1])))

In [13]:
grouped.collect()

[(1, [(1, 'lll'), (1, 'aaa')]), (2, [(2, 'bbb')]), (3, [(3, 'ccc')])]

In [30]:
sorted = distnctrecs.sortBy(lambda x: x[1]) \
.map(lambda x: (x[1], x[0]))

In [31]:
sorted.collect()

[('aaa', 1), ('bbb', 2), ('ccc', 3), ('lll', 1)]

In [14]:
#Set Operations
odds = sc.parallelize([1,3,5,7,9])

fibonacci = sc.parallelize([0,1,2,3,5,8])

odds.union(fibonacci).collect()

[1, 3, 5, 7, 9, 0, 1, 2, 3, 5, 8]

In [15]:
#Intersection
odds = sc.parallelize([1,3,5,7,9])

fibonacci = sc.parallelize([0,1,2,3,5,8])

odds.intersection(fibonacci).collect()

[1, 3, 5]

In [16]:
#Subtract
odds = sc.parallelize([1,3,5,7,9])

fibonacci = sc.parallelize([0,1,2,3,5,8])

odds.subtract(fibonacci).collect()


[9, 7]

In [17]:
#Spark Actions
#The count Action
lorem = sc.textFile('lorem.txt')

words = lorem.flatMap(lambda x: x.split())

words.count()

69

In [18]:
#The collect, take, top, and first Actions
lorem = sc.textFile('lorem.txt')

words = lorem.flatMap(lambda x: x.split())

words.collect()

['Lorem',
 'ipsum',
 'dolor',
 'sit',
 'amet,',
 'consectetur',
 'adipiscing',
 'elit,',
 'sed',
 'do',
 'eiusmod',
 'tempor',
 'incididunt',
 'ut',
 'labore',
 'et',
 'dolore',
 'magna',
 'aliqua.',
 'Ut',
 'enim',
 'ad',
 'minim',
 'veniam,',
 'quis',
 'nostrud',
 'exercitation',
 'ullamco',
 'laboris',
 'nisi',
 'ut',
 'aliquip',
 'ex',
 'ea',
 'commodo',
 'consequat.',
 'Duis',
 'aute',
 'irure',
 'dolor',
 'in',
 'reprehenderit',
 'in',
 'voluptate',
 'velit',
 'esse',
 'cillum',
 'dolore',
 'eu',
 'fugiat',
 'nulla',
 'pariatur.',
 'Excepteur',
 'sint',
 'occaecat',
 'cupidatat',
 'non',
 'proident,',
 'sunt',
 'in',
 'culpa',
 'qui',
 'officia',
 'deserunt',
 'mollit',
 'anim',
 'id',
 'est',
 'laborum.']

In [20]:
words.take(5)

['Lorem', 'ipsum', 'dolor', 'sit', 'amet,']

In [21]:
words.top(3)

['voluptate', 'veniam,', 'velit']

In [24]:
words.first()

'Lorem'

In [26]:
#Reduce and fold functions
numbers = sc.parallelize([1,2,3,4,5,6,7,8,9])

numbers.reduce(lambda x, y: x + y)

45

In [27]:
#fold functions 
#difference between fold and reduce is reduce will give exception on empty rdd but fold will give 0 as result
numbers = sc.parallelize([1,2,3,4,5,6,7,8,9])

numbers.fold(0, lambda x, y: x + y)

45

In [51]:
#foreach function ---not working since print not functioning with lambda
def printfunc(x): print(x)

lorem = sc.textFile('lorem.txt')

words = lorem.flatMap(lambda x: x.split())

words.foreach(lambda x: printfunc(x))


In [52]:
#KEY VALUE PAIR OPERATIONS
kvpairs = sc.parallelize([('city','Hayward')

                         ,('state','CA')

                         ,('zip',94541)

                         ,('country','USA')])

kvpairs.keys().collect()

['city', 'state', 'zip', 'country']

In [53]:
kvpairs = sc.parallelize([('city','Hayward')

                         ,('state','CA')

                         ,('zip',94541)

                         ,('country','USA')])

kvpairs.values().collect()

['Hayward', 'CA', 94541, 'USA']

In [55]:
#Functional Key Value Pair RDD Transformations
locations = sc.parallelize([('Hayward', 'USA', 1)

                           ,('Baumholder','Germany', 2)

                           ,('Alexandria','USA', 3)

                           ,('Melbourne','Australia', 4)])

bylocno = locations.keyBy(lambda x: x[2])
bylocno.collect()


[(1, ('Hayward', 'USA', 1)),
 (2, ('Baumholder', 'Germany', 2)),
 (3, ('Alexandria', 'USA', 3)),
 (4, ('Melbourne', 'Australia', 4))]

In [56]:
#Mapvalues and flatmapvalues
locwtemps = sc.parallelize(['Hayward,71|69|71|71|72',

                            'Baumholder,46|42|40|37|39',

                            'Alexandria,50|48|51|53|44',

                            'Melbourne,88|101|85|77|74'])

kvpairs = locwtemps.map(lambda x: x.split(','))

In [58]:
locwtemplist = kvpairs.mapValues(lambda x: x.split('|')) \
.mapValues(lambda x: [int(s) for s in x])

In [59]:
locwtemplist.collect()

[('Hayward', [71, 69, 71, 71, 72]),
 ('Baumholder', [46, 42, 40, 37, 39]),
 ('Alexandria', [50, 48, 51, 53, 44]),
 ('Melbourne', [88, 101, 85, 77, 74])]

In [60]:
locwtemps = kvpairs.flatMapValues(lambda x: x.split('|')) \
.map(lambda x: (x[0], int(x[1])))

In [61]:
locwtemps.collect()

[('Hayward', 71),
 ('Hayward', 69),
 ('Hayward', 71),
 ('Hayward', 71),
 ('Hayward', 72),
 ('Baumholder', 46),
 ('Baumholder', 42),
 ('Baumholder', 40),
 ('Baumholder', 37),
 ('Baumholder', 39),
 ('Alexandria', 50),
 ('Alexandria', 48),
 ('Alexandria', 51),
 ('Alexandria', 53),
 ('Alexandria', 44),
 ('Melbourne', 88),
 ('Melbourne', 101),
 ('Melbourne', 85),
 ('Melbourne', 77),
 ('Melbourne', 74)]

In [62]:
#Groupbykeys
grouped = locwtemps.groupByKey()


In [65]:
grouped.collect()

[('Hayward', <pyspark.resultiterable.ResultIterable at 0x12af3246c18>),
 ('Baumholder', <pyspark.resultiterable.ResultIterable at 0x12af32465c0>),
 ('Melbourne', <pyspark.resultiterable.ResultIterable at 0x12af32464a8>),
 ('Alexandria', <pyspark.resultiterable.ResultIterable at 0x12af3246b38>)]

In [63]:
avgtemps = grouped.mapValues(lambda x: sum(x)/len(x))


In [64]:
avgtemps.collect()

[('Hayward', 70.8),
 ('Baumholder', 40.8),
 ('Melbourne', 85.0),
 ('Alexandria', 49.2)]

In [66]:
#Reducebykey
temptups = locwtemps.mapValues(lambda x: (x, 1))
inputstoavg = temptups.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))
averages = inputstoavg.map(lambda x: (x[0], x[1][0]/x[1][1]))


In [69]:
averages.collect()

[('Hayward', 70.8),
 ('Baumholder', 40.8),
 ('Melbourne', 85.0),
 ('Alexandria', 49.2)]

In [68]:
#Foldbykey
maxbycity = locwtemps.foldByKey(0, lambda x, y: x if x > y else y)

maxbycity.collect()

[('Hayward', 72), ('Baumholder', 46), ('Melbourne', 101), ('Alexandria', 53)]

In [70]:
#SortbyKey
sortedbykey = locwtemps.sortByKey()
sortedbykey.collect()

[('Alexandria', 50),
 ('Alexandria', 48),
 ('Alexandria', 51),
 ('Alexandria', 53),
 ('Alexandria', 44),
 ('Baumholder', 46),
 ('Baumholder', 42),
 ('Baumholder', 40),
 ('Baumholder', 37),
 ('Baumholder', 39),
 ('Hayward', 71),
 ('Hayward', 69),
 ('Hayward', 71),
 ('Hayward', 71),
 ('Hayward', 72),
 ('Melbourne', 88),
 ('Melbourne', 101),
 ('Melbourne', 85),
 ('Melbourne', 77),
 ('Melbourne', 74)]

In [71]:
sortedbyval = locwtemps.map(lambda x: (x[1],x[0])) \
.sortByKey(ascending=False)
sortedbyval.collect()

[(101, 'Melbourne'),
 (88, 'Melbourne'),
 (85, 'Melbourne'),
 (77, 'Melbourne'),
 (74, 'Melbourne'),
 (72, 'Hayward'),
 (71, 'Hayward'),
 (71, 'Hayward'),
 (71, 'Hayward'),
 (69, 'Hayward'),
 (53, 'Alexandria'),
 (51, 'Alexandria'),
 (50, 'Alexandria'),
 (48, 'Alexandria'),
 (46, 'Baumholder'),
 (44, 'Alexandria'),
 (42, 'Baumholder'),
 (40, 'Baumholder'),
 (39, 'Baumholder'),
 (37, 'Baumholder')]

In [73]:
#Subtractkey
cities1 = sc.parallelize([('Hayward',(37.668819,-122.080795)),

                          ('Baumholder',(49.6489,7.3975)),

                          ('Alexandria',(38.820450,-77.050552)),

                          ('Melbourne', (37.663712,144.844788))])
cities2 = sc.parallelize([('Boulder Creek',(64.0708333,-148.2236111)),

                          ('Hayward',(37.668819,-122.080795)),

                          ('Alexandria',(38.820450,-77.050552)),

                          ('Arlington', (38.878337,-77.100703))])

cities1.subtractByKey(cities2).collect()

[('Baumholder', (49.6489, 7.3975)), ('Melbourne', (37.663712, 144.844788))]

In [74]:
cities2.subtractByKey(cities1).collect()

[('Boulder Creek', (64.0708333, -148.2236111)),
 ('Arlington', (38.878337, -77.100703))]

In [76]:
#JOIN TRANSFORMATIONS
stores = sc.parallelize(['100\tBoca Raton', '101\tColumbia',

                         '102\tCambridge', '103\tNaperville']) \
.map(lambda x: x.split('\t')) \
.keyBy(lambda x: x[0])

salespeople = sc.parallelize(['1\tHenry\t100', '2\tKaren\t100',

                              '3\tPaul\t101', '4\tJimmy\t102',

                              '5\tJanice\t']) \
.map(lambda x: x.split('\t')) \
.keyBy(lambda x: x[2])

salespeople.join(stores).collect()

[('102', (['4', 'Jimmy', '102'], ['102', 'Cambridge'])),
 ('100', (['1', 'Henry', '100'], ['100', 'Boca Raton'])),
 ('100', (['2', 'Karen', '100'], ['100', 'Boca Raton'])),
 ('101', (['3', 'Paul', '101'], ['101', 'Columbia']))]

In [77]:
#Left outer join
salespeople.leftOuterJoin(stores) \
.filter(lambda x: x[1][1] is None) \
.map(lambda x: "salesperson " + x[1][0][1] + " has no store") \
.collect()

['salesperson Janice has no store']

In [78]:
salespeople.leftOuterJoin(stores).collect()

[('102', (['4', 'Jimmy', '102'], ['102', 'Cambridge'])),
 ('', (['5', 'Janice', ''], None)),
 ('100', (['1', 'Henry', '100'], ['100', 'Boca Raton'])),
 ('100', (['2', 'Karen', '100'], ['100', 'Boca Raton'])),
 ('101', (['3', 'Paul', '101'], ['101', 'Columbia']))]

In [79]:
#Right outer join
salespeople.rightOuterJoin(stores).collect()

[('102', (['4', 'Jimmy', '102'], ['102', 'Cambridge'])),
 ('100', (['1', 'Henry', '100'], ['100', 'Boca Raton'])),
 ('100', (['2', 'Karen', '100'], ['100', 'Boca Raton'])),
 ('101', (['3', 'Paul', '101'], ['101', 'Columbia'])),
 ('103', (None, ['103', 'Naperville']))]

In [80]:
salespeople.rightOuterJoin(stores) \
.filter(lambda x: x[1][0] is None) \
.map(lambda x: x[1][1][1] + " store has no salespeople") \
.collect()


['Naperville store has no salespeople']

In [81]:
salespeople.fullOuterJoin(stores).collect()

[('102', (['4', 'Jimmy', '102'], ['102', 'Cambridge'])),
 ('', (['5', 'Janice', ''], None)),
 ('100', (['1', 'Henry', '100'], ['100', 'Boca Raton'])),
 ('100', (['2', 'Karen', '100'], ['100', 'Boca Raton'])),
 ('101', (['3', 'Paul', '101'], ['101', 'Columbia'])),
 ('103', (None, ['103', 'Naperville']))]

In [82]:
salespeople.fullOuterJoin(stores) \
.filter(lambda x: x[1][0] is None or x[1][1] is None) \
.collect()

[('', (['5', 'Janice', ''], None)), ('103', (None, ['103', 'Naperville']))]

In [83]:
#Cogroup
salespeople.cogroup(stores).collect()


[('102',
  (<pyspark.resultiterable.ResultIterable at 0x12af332ef60>,
   <pyspark.resultiterable.ResultIterable at 0x12af332e780>)),
 ('',
  (<pyspark.resultiterable.ResultIterable at 0x12af332e2e8>,
   <pyspark.resultiterable.ResultIterable at 0x12af332e470>)),
 ('100',
  (<pyspark.resultiterable.ResultIterable at 0x12af332efd0>,
   <pyspark.resultiterable.ResultIterable at 0x12af313ffd0>)),
 ('101',
  (<pyspark.resultiterable.ResultIterable at 0x12af313f3c8>,
   <pyspark.resultiterable.ResultIterable at 0x12af328dac8>)),
 ('103',
  (<pyspark.resultiterable.ResultIterable at 0x12af328dbe0>,
   <pyspark.resultiterable.ResultIterable at 0x12af32cdf60>))]

In [84]:
salespeople.cogroup(stores) \
.mapValues(lambda x: [item for sublist in x for item in sublist]) \
.collect()

[('102', [['4', 'Jimmy', '102'], ['102', 'Cambridge']]),
 ('', [['5', 'Janice', '']]),
 ('100',
  [['1', 'Henry', '100'], ['2', 'Karen', '100'], ['100', 'Boca Raton']]),
 ('101', [['3', 'Paul', '101'], ['101', 'Columbia']]),
 ('103', [['103', 'Naperville']])]

In [85]:
#cartesian
salespeople.cartesian(stores).collect()

[(('100', ['1', 'Henry', '100']), ('100', ['100', 'Boca Raton'])),
 (('100', ['1', 'Henry', '100']), ('101', ['101', 'Columbia'])),
 (('100', ['1', 'Henry', '100']), ('102', ['102', 'Cambridge'])),
 (('100', ['1', 'Henry', '100']), ('103', ['103', 'Naperville'])),
 (('100', ['2', 'Karen', '100']), ('100', ['100', 'Boca Raton'])),
 (('100', ['2', 'Karen', '100']), ('101', ['101', 'Columbia'])),
 (('100', ['2', 'Karen', '100']), ('102', ['102', 'Cambridge'])),
 (('100', ['2', 'Karen', '100']), ('103', ['103', 'Naperville'])),
 (('101', ['3', 'Paul', '101']), ('100', ['100', 'Boca Raton'])),
 (('101', ['3', 'Paul', '101']), ('101', ['101', 'Columbia'])),
 (('101', ['3', 'Paul', '101']), ('102', ['102', 'Cambridge'])),
 (('101', ['3', 'Paul', '101']), ('103', ['103', 'Naperville'])),
 (('102', ['4', 'Jimmy', '102']), ('100', ['100', 'Boca Raton'])),
 (('', ['5', 'Janice', '']), ('100', ['100', 'Boca Raton'])),
 (('102', ['4', 'Jimmy', '102']), ('101', ['101', 'Columbia'])),
 (('', ['5', 'J

In [4]:
#Try it urself
stations = sc.textFile('stations.csv')
status = sc.textFile('status.csv')

In [5]:
#Split the status data into discrete fields projecting only the fields necessary.
status2 = status.map(lambda x: x.split(',')) \
.map(lambda x: (x[0], x[1], x[2], x[3].replace('"',''))) \
.map(lambda x: (x[0], x[1], x[2], x[3].split(' '))) \
.map(lambda x: (x[0], x[1], x[2], x[3][0].split('-'), x[3][1].split(':'))) \
.map(lambda x: (int(x[0]), int(x[1]), int(x[3][0]), int(x[3][1]), int(x[3][2]),int(x[4][0])))

In [89]:
status2.collect()

[(10, 7, 2014, 12, 30, 15),
 (10, 7, 2014, 12, 30, 15),
 (10, 7, 2014, 12, 30, 15),
 (10, 7, 2014, 12, 30, 15),
 (10, 7, 2014, 12, 30, 15),
 (10, 7, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 30, 15),
 (10, 6, 2014, 12, 3

In [6]:
#Because status.csv is the biggest of your datasets (over 36M records), restrict the dataset to only the dates required, and then drop the date fields because they are no longer necessary:
status3 = status2.filter(lambda x: x[2]==2015 and \
                         x[3]==2 and \
                         x[4]>=22) \
.map(lambda x: (x[0], x[1], x[5]))

In [91]:
status3.collect()

[(10, 9, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 9, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 23),
 (10, 8, 22),
 (10, 8, 22),
 (10, 8, 22),
 (10, 8, 22),
 (10, 8, 22),
 (10, 8, 22),
 (10, 8, 22),
 (10, 8, 22),
 (10, 8, 22),
 (10, 8, 22),
 (10, 8, 22),
 (10, 

In [7]:
#Filter the stations dataset to only include stations where the landmark='San Jose'
stations2 = stations.map(lambda x: x.split(',')) \
.filter(lambda x: x[5] == 'San Jose') \
.map(lambda x: (int(x[0]), x[1]))


In [8]:
#Convert both RDDs to key value pair RDDs to prepare for a join operation:
status_kv = status3.keyBy(lambda x: x[0])


In [94]:
status_kv.collect()

[(10, (10, 9, 23)),
 (10, (10, 9, 23)),
 (10, (10, 9, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 9, 23)),
 (10, (10, 9, 23)),
 (10, (10, 9, 23)),
 (10, (10, 9, 23)),
 (10, (10, 9, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 9, 23)),
 (10, (10, 9, 23)),
 (10, (10, 9, 23)),
 (10, (10, 9, 23)),
 (10, (10, 9, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 9, 23)),
 (10, (10, 9, 23)),
 (10, (10, 9, 23)),
 (10, (10, 9, 23)),
 (10, (10, 9, 23)),
 (10, (10, 9, 23)),
 (10, (10, 9, 23)),
 (10, (10, 9, 23)),
 (10, (10, 9, 23)),
 (10, (10, 9, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 8, 23)),
 (10, (10, 9, 23)),
 (10, (10, 9, 23)),


In [9]:
stations_kv = stations2.keyBy(lambda x: x[0])
stations_kv.collect()

[(2, (2, 'San Jose Diridon Caltrain Station')),
 (3, (3, 'San Jose Civic Center')),
 (4, (4, 'Santa Clara at Almaden')),
 (5, (5, 'Adobe on Almaden')),
 (6, (6, 'San Pedro Square')),
 (7, (7, 'Paseo de San Antonio')),
 (8, (8, 'San Salvador at 1st')),
 (9, (9, 'Japantown')),
 (10, (10, 'San Jose City Hall')),
 (11, (11, 'MLK Library')),
 (12, (12, 'SJSU 4th at San Carlos')),
 (13, (13, 'St James Park')),
 (14, (14, 'Arena Green / SAP Center')),
 (16, (16, 'SJSU - San Salvador at 9th')),
 (80, (80, 'Santa Clara County Civic Center')),
 (84, (84, 'Ryland Park'))]

In [10]:
#Join the filtered status key value pair RDD to the filtered stations key value pair RDD by their keys (station_id):
joined = status_kv.join(stations_kv)


In [12]:
# Clean the joined RDD:
cleaned = joined.map(lambda x: (x[0], x[1][0][1], x[1][0][2], x[1][1][1]))

In [13]:
#Create a key value pair with the key being a tuple consisting of the station name and the hour, and then compute the averages by each hour for each station:
avgbyhour = cleaned.keyBy(lambda x: (x[3],x[2])) \
.mapValues(lambda x: (x[1], 1)) \
.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
.mapValues(lambda x: (x[0]/x[1]))

In [36]:
#Find the top 10 averages by station and hour using the sortBy function:
topavail = avgbyhour.keyBy(lambda x: x[1]) \
            .sortByKey(ascending=False) \
.map(lambda x: (x[1][0][0], x[1][0][1], x[0]))

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 29.0 failed 1 times, most recent failure: Lost task 3.0 in stage 29.0 (TID 80, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 377, in main
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 372, in process
  File "C:\spark-2.4.4-bin-hadoop2.7\python\pyspark\rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark-2.4.4-bin-hadoop2.7\python\pyspark\rdd.py", line 352, in func
    return f(iterator)
  File "C:\spark-2.4.4-bin-hadoop2.7\python\pyspark\rdd.py", line 1945, in combine
    merger.mergeValues(iterator)
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 147, in load_stream
    yield self._read_with_length(stream)
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 164, in _read_with_length
    length = read_int(stream)
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 715, in read_int
    length = stream.read(4)
  File "C:\Users\Vanitha\Anaconda3\lib\socket.py", line 589, in readinto
    return self._sock.recv_into(b)
socket.timeout: timed out

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 377, in main
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 372, in process
  File "C:\spark-2.4.4-bin-hadoop2.7\python\pyspark\rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark-2.4.4-bin-hadoop2.7\python\pyspark\rdd.py", line 352, in func
    return f(iterator)
  File "C:\spark-2.4.4-bin-hadoop2.7\python\pyspark\rdd.py", line 1945, in combine
    merger.mergeValues(iterator)
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 147, in load_stream
    yield self._read_with_length(stream)
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 164, in _read_with_length
    length = read_int(stream)
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 715, in read_int
    length = stream.read(4)
  File "C:\Users\Vanitha\Anaconda3\lib\socket.py", line 589, in readinto
    return self._sock.recv_into(b)
socket.timeout: timed out

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [17]:
topavail.take(5)

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 4.0 failed 1 times, most recent failure: Lost task 1.0 in stage 4.0 (TID 16, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 377, in main
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 372, in process
  File "C:\spark-2.4.4-bin-hadoop2.7\python\pyspark\rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark-2.4.4-bin-hadoop2.7\python\pyspark\rdd.py", line 352, in func
    return f(iterator)
  File "C:\spark-2.4.4-bin-hadoop2.7\python\pyspark\rdd.py", line 1945, in combine
    merger.mergeValues(iterator)
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 147, in load_stream
    yield self._read_with_length(stream)
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 164, in _read_with_length
    length = read_int(stream)
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 715, in read_int
    length = stream.read(4)
  File "C:\Users\Vanitha\Anaconda3\lib\socket.py", line 589, in readinto
    return self._sock.recv_into(b)
socket.timeout: timed out

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 377, in main
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 372, in process
  File "C:\spark-2.4.4-bin-hadoop2.7\python\pyspark\rdd.py", line 2499, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "C:\spark-2.4.4-bin-hadoop2.7\python\pyspark\rdd.py", line 352, in func
    return f(iterator)
  File "C:\spark-2.4.4-bin-hadoop2.7\python\pyspark\rdd.py", line 1945, in combine
    merger.mergeValues(iterator)
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\shuffle.py", line 238, in mergeValues
    for k, v in iterator:
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 147, in load_stream
    yield self._read_with_length(stream)
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 164, in _read_with_length
    length = read_int(stream)
  File "C:\spark-2.4.4-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 715, in read_int
    length = stream.read(4)
  File "C:\Users\Vanitha\Anaconda3\lib\socket.py", line 589, in readinto
    return self._sock.recv_into(b)
socket.timeout: timed out

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1124)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [None]:
#min Function
numbers = sc.parallelize([0,1,1,2,3,5,8,13,21,34])
numbers.min()

In [104]:
#max function
numbers = sc.parallelize([0,1,1,2,3,5,8,13,21,34])
numbers.max()

34

In [105]:
#mean function
numbers = sc.parallelize([0,1,1,2,3,5,8,13,21,34])
numbers.mean()

8.8

In [106]:
#sum function
numbers = sc.parallelize([0,1,1,2,3,5,8,13,21,34])
numbers.sum()

88

In [107]:
#stdev()
numbers = sc.parallelize([0,1,1,2,3,5,8,13,21,34])
numbers.stdev()

10.467091286503619

In [108]:
#variance
numbers = sc.parallelize([0,1,1,2,3,5,8,13,21,34])
numbers.variance()

109.55999999999999

In [109]:
#stats
numbers = sc.parallelize([0,1,1,2,3,5,8,13,21,34])
numbers.stats()

(count: 10, mean: 8.8, stdev: 10.467091286503619, max: 34.0, min: 0.0)