### Spark Basics

#### Introdution to RDDs (Resilient Distribute Datasets)

This document is my personal consolidation of Frank Zane Pyspark Udemy's course.

In [2]:
# Setup PySpark
import findspark
findspark.init('/opt/spark')
from pyspark import SparkConf, SparkContext
import collections

# Configure to run on local machine
conf = SparkConf().setMaster("local").setAppName("Examples")
sc = SparkContext(conf = conf)

RDD are datasets used in Spark to handle data that you can apply Transformations or Actions functions.

Transformations functions examples:
- map
- flatmap
- filter
- distinct
- sample
- union, intersection, subtract, cartesian

Actions functions examples:
- collect
- count
- countByValue
- take
- top
- reduce

We are going to perform some operations with a movie rating dataset (100k rows) provided by Movielens.
Avaliable at: https://files.grouplens.org/datasets/movielens/ml-100k.zip

In [3]:
# Load an RDD with a text input file
lines = sc.textFile("u.data")
lines.take(10)

# Data structure: [user_id \t movie_id \t rating_value \t timestamp]

['196\t242\t3\t881250949',
 '186\t302\t3\t891717742',
 '22\t377\t1\t878887116',
 '244\t51\t2\t880606923',
 '166\t346\t1\t886397596',
 '298\t474\t4\t884182806',
 '115\t265\t2\t881171488',
 '253\t465\t5\t891628467',
 '305\t451\t3\t886324817',
 '6\t86\t3\t883603013']

In [4]:
# Consume just the ratings
ratings = lines.map(lambda x: x.split()[2])
ratings.take(10)

['3', '3', '1', '2', '1', '4', '2', '5', '3', '3']

In [5]:
# Call a action method to count how many times a rate has been voted
result = ratings.countByValue()
sortedResults = collections.OrderedDict(sorted(result.items()))

print("rate \t count")
for key, value in sortedResults.items():
    print("{} \t {}".format(key, value))

rate 	 count
1 	 6110
2 	 11370
3 	 27145
4 	 34174
5 	 21201


---

#### Key Values RDDs

Special functions examples
- reduceByKey
- groupByKey
- sortByKey
- keys, values

You can also run SQL style jobs like: join, rightOuterJoin, leftOuterJoin, cogroup, subtractByKey

Performance tip:
If you are going to transform just the values use mapValues()/flatMapValues() instead map() and flatMap()

In [6]:
# Here's an example of reading a csv file and transforming it into a key value rdd

def parseLine(line):
    friend = line.split(",")
    return (int(friend[2]), int(friend[3]))

lines = sc.textFile("fakefriends.csv")
rdd = lines.map(parseLine)

In [7]:
lines.take(5)
# Data struct full rdd: [Id, name, age, number_of_friends]

['0,Will,33,385',
 '1,Jean-Luc,26,2',
 '2,Hugh,55,221',
 '3,Deanna,40,465',
 '4,Quark,68,21']

In [8]:
rdd.take(5)
# Data struct mapped rdd: [(age, number_of_friends)]

[(33, 385), (26, 2), (55, 221), (40, 465), (68, 21)]

In [9]:
# Count sum of friends and number of entries by age
total_by_age = rdd.mapValues(lambda x: (x, 1))
total_by_age.take(5)

[(33, (385, 1)), (26, (2, 1)), (55, (221, 1)), (40, (465, 1)), (68, (21, 1))]

In [10]:
total_by_age = total_by_age.reduceByKey(lambda x, y: (x[0]+y[0], y[0]+y[1]))
total_by_age.take(10)

[(33, (3904, 411)),
 (26, (4115, 85)),
 (55, (3842, 272)),
 (40, (4264, 221)),
 (68, (2696, 424)),
 (59, (1980, 222)),
 (37, (2244, 472)),
 (54, (3615, 116)),
 (38, (2903, 161)),
 (27, (1825, 155))]

In [11]:
avg_by_age = total_by_age.mapValues(lambda x: x[0]/x[1])

In [12]:
avg_by_age.take(5)

[(33, 9.498783454987835),
 (26, 48.411764705882355),
 (55, 14.125),
 (40, 19.294117647058822),
 (68, 6.3584905660377355)]

#### FIlter with RDD

Get a minimun temperature by location

In [38]:
rdd = sc.textFile("1800.csv")

In [39]:
rdd.take(5)

['ITE00100554,18000101,TMAX,-75,,,E,',
 'ITE00100554,18000101,TMIN,-148,,,E,',
 'GM000010962,18000101,PRCP,0,,,E,',
 'EZE00100082,18000101,TMAX,-86,,,E,',
 'EZE00100082,18000101,TMIN,-135,,,E,']

In [40]:
min_temps = rdd.filter(lambda x: 'TMIN' in x)

In [41]:
min_temps.take(5)

['ITE00100554,18000101,TMIN,-148,,,E,',
 'EZE00100082,18000101,TMIN,-135,,,E,',
 'ITE00100554,18000102,TMIN,-125,,,E,',
 'EZE00100082,18000102,TMIN,-130,,,E,',
 'ITE00100554,18000103,TMIN,-46,,I,E,']

In [98]:
def parseLine(line):
    line_split = line.split(",")
    location = line_split[0]
    temperature = int(line_split[3])
    return (location, temperature)

In [99]:
rdd = min_temps.map(parseLine)

In [100]:
rdd.take(5)

[('ITE00100554', -148),
 ('EZE00100082', -135),
 ('ITE00100554', -125),
 ('EZE00100082', -130),
 ('ITE00100554', -46)]

In [102]:
min_temps_reduced = rdd.reduceByKey(lambda x, y: min(x, y))

In [104]:
max_temps_reduced = rdd.reduceByKey(lambda x, y: max(x, y))

In [105]:
min_temps_reduced.take(2)

[('ITE00100554', -148), ('EZE00100082', -135)]

#### Map x Flatmap

Explore the differences between this two functions

In [110]:
rdd = sc.parallelize(['Perform some tests with differents map functions'])

In [111]:
map_default = rdd.map(lambda x: x.upper())

In [113]:
map_default.take(10)

['PERFORM SOME TESTS WITH DIFFERENTS MAP FUNCTIONS']

In [114]:
flat_map = rdd.flatMap(lambda x: x.upper())

In [115]:
flat_map.take(10)

['P', 'E', 'R', 'F', 'O', 'R', 'M', ' ', 'S', 'O']

---

A more complex example

In [116]:
rdd = sc.textFile("10.2 Book.txt")

In [117]:
rdd_mapped = rdd.map(lambda x: x.split())

In [126]:
rdd_mapped.take(1)

[['Self-Employment:', 'Building', 'an', 'Internet', 'Business', 'of', 'One']]

In [121]:
rdd_flat_mapped = rdd.flatMap(lambda x: x.split())

In [122]:
rdd_flat_mapped.take(5)

['Self-Employment:', 'Building', 'an', 'Internet', 'Business']

In [151]:
import re
def normalizeWords(text):
    return re.compile(r'\W+', re.UNICODE).split(text.lower())

In [152]:
rdd_flat_mapped = rdd.flatMap(normalizeWords)

In [164]:
rdd_flat_mapped.take(5)

['self', 'employment', 'building', 'an', 'internet']

In [233]:
words_count = rdd_flat_mapped.map(lambda x: (x,1)).reduceByKey(lambda x, y: x+y).sortByKey()

In [236]:
words_count.take(5)

[('', 772), ('0', 1), ('000', 24), ('05', 1), ('07', 1)]

In [239]:
words_count_1 = words_count.map(lambda x: (x[1], x[0])).sortByKey(False)

In [241]:
words_count_1.take(5)

[(1878, 'you'), (1828, 'to'), (1420, 'your'), (1292, 'the'), (1191, 'a')]