This is a review and an exercise of USF msan694 course - distributed computing (Spark)

In [135]:
sc

## Week 2:

### Creation - two ways
    1(a) loading an external data
    1(b) taking a collection such as seq (array or list)

### Tranformation: creating a new RDD, like map, filter, distinct.

    2(a) Difference between map and flatMap: flatMap returns the one level structure

### Action: trigger a computation to return the result of calling program or to perform some actions on RDD objects, like collect(), reduce(), fold(), aggregate(). Action returns non-RDDs.
    
    3(a)Compared to reduce, fold will take an initial values.
    
    3(b)Compared to reduce and fold which return same data type, aggregate take initial value(s) and is also able to return a different data type

Exercises
- RDD creation - method 1
- Transformation - map/flatMap, filter, distinct
- Function - split

In [2]:
lines = sc.textFile("Documents/Spark/ignatian_pedagogy.txt")

exercise: split line on space - map returns a list for each paragraph

In [3]:
words = lines.map(lambda line: line.split())

In [4]:
words2 = lines.flatMap(lambda line: line.split())

exercise: filter - map returns all paragraph containing "USF", flatmap returns the word "USF" only

In [13]:
output = words2.filter(lambda lines: "USF" in lines)

In [14]:
output.collect()

[u"USF's", u'USF', u"USF's", u"USF's", u'USF', u'USF', u'USF', u'USF']

exercise: distinct()

In [15]:
distinct_words = words2.distinct()

In [136]:
distinct_words.take(5)

[u'1981,', u'all', u'just', u'Father', u'actions']

Exercise: return count & sum for each RDD partition
- RDD creation - Method 2 & specifying # of partitions
- Transformation - mapPartitions
- Action - reduce, aggregate
- Function - create a function to pass in transformation

In [25]:
nums1 = sc.parallelize(range(1,17), 4)
nums1.glom().collect()

[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12], [13, 14, 15, 16]]

In [26]:
nums1.getNumPartitions()

4

In [27]:
def count_sum(nums):
    count_sum = [0,0]
    for num in nums:
        count_sum[0] += 1
        count_sum[1] += num
    return [count_sum]

In [36]:
partition_count_sum = nums1.mapPartitions(count_sum)
partition_count_sum.collect()

[[4, 10], [4, 26], [4, 42], [4, 58]]

Further calculating average

In [37]:
total_count_sum = partition_count_sum.reduce(lambda x,y: [x[0]+y[0], x[1]+y[1]])
average = float(total_count_sum[1]/total_count_sum[0])
average

8.0

Exercise : a different way to calculate average
- Action - reduce

Note: action will break the partitions

In [38]:
one_num = nums1.map(lambda x: [1,x])
one_num.glom().collect()

[[[1, 1], [1, 2], [1, 3], [1, 4]],
 [[1, 5], [1, 6], [1, 7], [1, 8]],
 [[1, 9], [1, 10], [1, 11], [1, 12]],
 [[1, 13], [1, 14], [1, 15], [1, 16]]]

In [39]:
count_sum = one_num.reduce(lambda x,y: [x[0]+y[0], x[1]+y[1]])
average = float(count_sum[1]/count_sum[0]) # without float, it will return 8
average

8.0

Calculating count and sum using action - aggregate, even simplier than reduce
- aggregate(zero)(SeqOp, combOp)

In [40]:
odd_nums.aggregate((0,0),lambda x,y:(x[0]+1,x[1]+y), lambda x,y:(x[0]+y[0],x[1]+y[1]))

(5, 25)

##### Difference between reduce and fold and aggregate: 

fold gives an initial value to each partition. Also when the RDD is empty, reduce will return error but fold will return zero given that the initial value is zero.

Aggregate can return an element of different type.

##### Similarity
reduce and fold will only take two arguments

In [32]:
nums = sc.parallelize([1,2,3,4,5,6,7,8,9])
odd_nums = nums.filter(lambda x: x%2 ==1)   # get the odd numbers
sum_odd = odd_nums.reduce(lambda x,y: x+y)
sum_odd

25

In [33]:
odd_nums.fold(0, lambda x,y: x+y)

25

In [26]:
nums2 = sc.parallelize([])
nums2.fold(0,lambda x,y:x+y) # fold will give us zero, but reduce will give us an error

0

1. action: count().
counts # of partition if there are more than one, or count # of elements in one partition.
2. transformation: union(otherDataset), intersection(otherDataset)
3. action: countByValue

In [50]:
words = sc.parallelize(["MSAN694", "MSAN694 Distributed Computing"])

In [52]:
words.map(lambda x: x.split()).count()

2

In [13]:
words.flatMap(lambda x: x.split()).count()

4

In [53]:
# union will not merge partition, but gather all partitions.
# intersection will keep elements that show up in both
x = sc.parallelize([3,4,1,2],4)
y = sc.parallelize(range(2,6),4)
z = x.union(y)
z.glom().collect()

[[3], [4], [1], [2], [2], [3], [4], [5]]

In [47]:
z.countByValue()

defaultdict(int, {2: 1, 3: 1, 4: 1})

#### RDD actions explaination
    1. takeSample(withReplacement,num, seed)
       sample(withReplacement, fraction or probability, seed) - transformation
    2. top(n) return top n elements with order
    3. take(n) take first n elements
    4. first() return the first element

In [28]:
z.takeSample(False,20)

[2, 1, 2, 3, 4, 4, 3, 5]

In [33]:
z.takeSample(True,20)

[5, 2, 2, 2, 3, 3, 1, 3, 2, 4, 2, 2, 3, 3, 1, 3, 3, 3, 4, 2]

In [32]:
z.takeSample(True,20,1) # order is fixed

[3, 2, 4, 5, 5, 1, 3, 4, 4, 1, 1, 2, 4, 2, 2, 3, 5, 4, 1, 2]

In [66]:
# sample returns an RRD. return arbitrary number of elements
z.sample(False, 0.5).collect()

[3, 2, 3]

In [5]:
z.top(6) # list of top 6 elements with order

[5, 4, 4, 3, 3, 2]

In [8]:
z.take(2) # take list of first 6 elements

[3, 4]

In [61]:
z.first() # return the first element

3

#### numeric RRD action types. The partition does not matter

count(), mean(), sum(), max(), min(), variance(), stdev()

In [67]:
z.count()

8

In [70]:
z.mean()

2.5

In [69]:
z.sum()

24

In [71]:
z.variance()

1.5

In [72]:
x.stdev()

1.1180339887498949

## week 3

### Pair RDDs creation
    must use map to create the tuple. If using flatMap, the structure is flat, then there is no tuple/pair.

### Pair RDD Operations - transformation
    (a) sortByKey()
    
    (b) groupByKey()
    
    (c) mapValues(func); flatMapValues(func): Pass values in each pair through a map function without changing the keys
    
    (d) reduceByKey(func): run parallel reduce operation for each key
    
    (e) combineByKey(createCombiner, mergeValue, mergeCombiners)

In [56]:
# KEY value pair example
lines = sc.textFile("Documents/Spark/README.md")

In [71]:
words = lines.flatMap(lambda line: line.split())

In [89]:
len_word = words.map(lambda x: (len(x),x))

In [137]:
len_word.sortByKey().take(15)

[(1, u'#'),
 (1, u'a'),
 (1, u'a'),
 (1, u'a'),
 (1, u'a'),
 (1, u'a'),
 (1, u'a'),
 (1, u'N'),
 (1, u'a'),
 (1, u'A'),
 (1, u'a'),
 (2, u'is'),
 (2, u'It'),
 (2, u'in'),
 (2, u'R,')]

### Actions that return resultiterable:
 groupByKey() and  cogroup(secondDataset)


### To display content of resultiterable, for example in a RRD pair:
    1. map the resultiterable to a list map(lambda x:(x[0], list(x[1]))), in this case only x[1] is resultiterable
    2. flatMapValues(lambda x:x) to get rid of the structure

example: display (length, word) tuple

In [62]:
grouped_len_word = len_word.groupByKey()
#grouped_len_word.collect() #returns (key, resultiterable)
#grouped_len_word.map(lambda x:(x[0], list(x[1]))).collect() # visualize what's inside

In [143]:
# SIMILARLY, flatMapValues will break the structure of set of values, so each key will be matched to one value
#grouped_len_word.flatMapValues(lambda x:x).glom().collect()

In [87]:
a = sc.parallelize({(1, (2,3,4)), (2, (3,4,5) ) })
a.flatMapValues(lambda x: x).collect()

[(1, 2), (1, 3), (1, 4), (2, 3), (2, 4), (2, 5)]

### obtain (word, occourence) pair example - 2 methods
    1. groupByKey() then mapValues
    2. reduceByKey()

In [147]:
# method 1
word_1 = words.map(lambda x: (x, 1))
word_occurence = word_1.groupByKey()
output = word_occurence.mapValues(lambda x: sum(x))
#output.glom().collect()

In [145]:
# method 2 - a simiplier way to get(word, occourence) pair using reduceByKey
output = word_1.reduceByKey(lambda x,y: x+y)
output.collect()

In [144]:
# foldByKey returns same result
output = word_1.foldByKey(0,lambda x,y: x+y)
output.collect() 

### combineByKey
    createCombiner - function to be used as the very first aggregation step for each key
    mergeValue - what to do when combiner is given a new value
    mergeCombiners - how to merge combiners

In [129]:
# create(length of words, (frequency, a list of words))
len_pair = len_word.combineByKey(lambda x: (1,x),  # (key=len, (1,u'word1'))
                  lambda x,y: (x[0]+1,x[1]+","+y),  
                    # key does not change. x=(1,word). => (len, (2, u'word1, word2'))
                  lambda x,y: (x[0]+y[0], x[1]+","+y[1]))
#len_pair.collect()

## week 4 
### Pair RDD Operations - Actions
    (a) subtractByKey:
    return RDD pairs whose keys are not in the other data set
    (b) join, rightOuterJoin, leftOuterJoin(otherDataset) - by key
    (c) cogroup: return (key, (resultiterable, resultiterable))

In [75]:
business_lines = sc.textFile("Documents/Spark/filtered_registered_business_sf.csv")
business_words = business_lines.map(lambda line: line.split(','))
business = business_words.map(lambda x: (x[0], x[1])).distinct()
# business.collect() (zipcode, business name)

In [76]:
supervisor_line = sc.textFile("Documents/Spark/supervisor_sf.csv")
supervisor = supervisor_line.map(lambda line: line.split(','))
supervisor_pair = supervisor.map(lambda x: (x[0], x[1])).distinct()
# supervisor_pair.collect() #(zipcode, supervisor_id)

In [148]:
business_without_supervisor = business.subtractByKey(supervisor_pair).values().distinct()
#business_without_supervisor.collect()

#### Examples

In [115]:
first_num_pairs =sc.parallelize({(2,3),(1,2),(1,3),(2,4),(3,6)})
second_num_pairs = sc.parallelize({(1,3),(2,2)})

In [41]:
first_num_pairs.subtract(second_num_pairs).collect() # subtract common pairs

[(2, 3), (3, 6), (1, 2), (2, 4)]

In [42]:
first_num_pairs.subtractByKey(second_num_pairs).collect() 

[(3, 6)]

In [43]:
first_num_pairs.join(second_num_pairs).collect() 
# by common keys. keys remained the same, values if the first data are added with that from 2nd

[(1, (2, 3)), (1, (3, 3)), (2, (3, 2)), (2, (4, 2))]

In [44]:
first_num_pairs.rightOuterJoin(second_num_pairs).collect() # the keys are from the right

[(1, (2, 3)), (1, (3, 3)), (2, (3, 2)), (2, (4, 2))]

In [45]:
first_num_pairs.leftOuterJoin(second_num_pairs).collect()# the keys are from the left

[(1, (2, 3)), (1, (3, 3)), (2, (3, 2)), (2, (4, 2)), (3, (6, None))]

In [128]:
first_num_pairs.cogroup(second_num_pairs).mapValues(lambda x: (list(x[0]), list(x[1]))).collect()

[(1, ([2, 3], [3])), (2, ([3, 4], [2])), (3, ([6], []))]

In [149]:
# (zip, (business name, supervisor id)) sorted by supervisor id
business.join(supervisor_pair).sortBy(lambda x:x[1][1]).take(5)

[(u'94117', ((u'Slattery Joseph', u'San Francisco'), u'1')),
 (u'94117', ((u'Duffy Colette A', u'San Francisco'), u'1')),
 (u'94117', ((u'Watts Harrey David', u'San Francisco'), u'1')),
 (u'94117', ((u'Bigelow Jamie Marie', u'San Francisco'), u'1')),
 (u'94117', ((u'Cole Hardware Inc', u'San Francisco'), u'1'))]

In [155]:
output = business.cogroup(supervisor_pair).mapValues(lambda x:(list(x[0]), list(x[1])))
# output.take(5)

In [2]:
nums = sc.parallelize([1,2,3,4,5,6,7,8,9])

example 5

Additional Actions - countByKey(), lookup(key)

In [79]:
business_lines = sc.textFile("Documents/Spark/filtered_registered_business_sf.csv")
business_words = business_lines.map(lambda line: line.split(','))

In [80]:
business = business_words.map(lambda x: (x[0], (x[1], x[3]) )).distinct()
#business.collect() # (zip, (name, city) )

In [81]:
len(business.lookup("")) # look up empty key

70

In [156]:
business.filter(lambda x: "San Francisco" not in x[1][1]).take(5)

[(u'94110', (u'"Rlm Partners', u'60 29th St Ste 537')),
 (u'95492', (u'Denbeste Transportation Inc', u'Windsor')),
 (u'94104', (u'Belal Korin', u'Daily+city')),
 (u'95112', (u'James Costa', u'San+jose')),
 (u'94111', (u'Rosewood Venture Associates Iv', u'San+francisco'))]

### Tuning Spark - persist in memory/disk

In [90]:
lines = sc.textFile("Documents/Spark/README.md")
lines_with_Spark = lines.filter(lambda x: "Spark" in x)

In [98]:
from pyspark.storagelevel import StorageLevel

In [111]:
lines_with_Spark.persist(StorageLevel.MEMORY_ONLY_2)

PythonRDD[213] at RDD at PythonRDD.scala:48

In [112]:
# If the object cached?
#lines_with_Spark.is_cached

#StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication= 1)
lines_with_Spark.getStorageLevel() 

StorageLevel(False, True, False, False, 2)

In [114]:
lines_with_Spark.unpersist()

PythonRDD[213] at RDD at PythonRDD.scala:48

In [157]:
# you can check spark ui storage when action occured
lines_with_Spark.take(5)

[u'# Apache Spark',
 u'Spark is a fast and general cluster computing system for Big Data. It provides',
 u'rich set of higher-level tools including Spark SQL for SQL and DataFrames,',
 u'and Spark Streaming for stream processing.',
 u'You can find the latest Spark documentation, including a programming']

## Last week - Loading and Saving Data
Example 1: load csv files from a directory with key=filename, value=content

In [115]:
files = sc.wholeTextFiles('Documents/Spark/Assigned2/input_1')
for file in files.take(1):
    print file

(u'file:/Users/QIAN/Documents/Spark/Assigned2/input_1/input.txt', u'Are you using Apache Spark 2.0?\nYes, I am using Apache Spark 2.0! .25')


Example 2: save data
- saveAsTextFile

In [118]:
lines = sc.textFile('Documents/Spark/supervisor_sf.csv', 5)
filtered_lines = lines.filter(lambda line: "94103," in line)
filtered_lines.saveAsTextFile("supervisor_94103") # a folder of 5 files

Example 3: import JSON file. 
- json.loads() change json format to dictionary
- json.dumpts() changes back to json format

In [120]:
import json
input = sc.textFile("Documents/Spark/example.json")
input.collect()

[u'{"ID":"id_1","array":[1,2,3],"dict": {"key": "value1"}}',
 u'{"ID":"id_2","array":[2,4,6],"dict": {"key": "value2"}}',
 u'{"ID":"id_3","array":[3,6,9],"dict": {"key": "value3", "extra_key": "extra_value3"}}']

In [121]:
data = input.map(lambda x: json.loads(x))
json_with_3 = data.filter(lambda x: 3 in x['array']) # records with 3 in array
json_with_3.collect()

[{u'ID': u'id_1', u'array': [1, 2, 3], u'dict': {u'key': u'value1'}},
 {u'ID': u'id_3',
  u'array': [3, 6, 9],
  u'dict': {u'extra_key': u'extra_value3', u'key': u'value3'}}]

In [122]:
json_with_3.map(lambda x: json.dumps(x)).saveAsTextFile("json_with_3")
# save in a folder with 2 files

['{"array": [1, 2, 3], "dict": {"key": "value1"}, "ID": "id_1"}',
 '{"array": [3, 6, 9], "dict": {"key": "value3", "extra_key": "extra_value3"}, "ID": "id_3"}']

Example 4: import csv file

In [123]:
import csv
import StringIO

In [124]:
# If the "fieldnames" parameter is omitted, the values in the first row of the csvfile will be used as the fieldnames.
# Use .next() to get the next item. (Iterator)
def csvLoader(line):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["Zip","Supervisor"]) 
    return reader.next()

In [125]:
input = sc.textFile("Documents/Spark/supervisor_sf.csv")
csv_data = input.map(csvLoader)
csv_data.take(5)

[{'Supervisor': '8', 'Zip': '94102'},
 {'Supervisor': '6', 'Zip': '94102'},
 {'Supervisor': '3', 'Zip': '94102'},
 {'Supervisor': '5', 'Zip': '94102'},
 {'Supervisor': '8', 'Zip': '94103'}]

In [127]:
sf_supervisor = csv_data.filter(lambda x :  (x['Zip']).startswith('94'))
#sf_supervisor.collect()

In [133]:
# csvWriter : Save in a format of "Supervisor, Zip".
# csv.DictWriter : https://docs.python.org/2/library/csv.html#csv.DictWriter
# Values in the dictionary passed to .writerow() are written to "output".
def csvWriter(rdd):
    output = StringIO.StringIO() # start as an empty string buffer
    writer = csv.DictWriter(output, fieldnames=["Supervisor", "Zip"])
    writer.writerow(rdd)
    return output.getvalue().strip() # strip() : delete \r and \n

In [134]:
csv_data.map(csvWriter).take(5)
#csv_data.map(csvWriter).saveAsTextFile("CSV_folder")

['8,94102', '6,94102', '3,94102', '5,94102', '8,94103']