## Working with RDDs (ch 3)

## Common Transformations and Actions##

Element-wise transformations: **map**  (pg 34)

In [0]:
nums = sc.parallelize([1, 2, 3, 4])
nums.map(lambda x: x * x).collect()

**flatMap** and map -- this is ex. 3.29

In [0]:
lines = sc.parallelize(["the quick red fox", "advanced in the woods"])

In [0]:
lines.map(lambda line: line.split(" ")).collect()

In [0]:
lines.flatMap(lambda line: line.split(" ")).collect()

In [0]:
inputRDD = sc.textFile('/FileStore/tables/sample_movielens_movies.txt')

In [0]:
thrillerRDD = inputRDD.filter(lambda x: "Thriller" in x)
comedyRDD = inputRDD.filter(lambda x: "Comedy" in x)

In [0]:
x = comedyRDD.map(lambda line: line.split("::"))

In [0]:
x.take(5)

In [0]:
# select the genres for each movie
comedyGenres = x.map(lambda l: l[2])

In [0]:
comedyGenres.take(10)

**distinct()**: extract genres from movie lines, then select distinct

In [0]:
# remove duplicates (just like in SQL)
comedyGenres.distinct().collect()

In [0]:
# putting it all together:
thrillerGenres = thrillerRDD.map(lambda line: line.split("::")).map(lambda l: l[2])
thrillerGenres.distinct().collect()

**intersection**

In [0]:
# both comedy and thriller 
thrillerAndComedyGenres = thrillerGenres.intersection(comedyGenres).collect()

**set difference**

In [0]:
# thriller but not comedy
thrillerGenres.subtract(comedyGenres).distinct().collect()

**cartesian**

In [0]:
thrillerGenres.cartesian(comedyGenres).count()

### Refer to links below for Table 3-2, 3-3, 3-4 for a summary of Basic RDD transformations on an RDD. These tables are taken from Learning Spark book (Chapter 3)
https://github.com/mutazb999/CSC8101-lab-and-coursework/blob/main/01-labs/notebooks/Table3_3_2_3_3_ch3_LearningSpark.png

https://github.com/mutazb999/CSC8101-lab-and-coursework/blob/main/01-labs/notebooks/Table3_3_4_ch3_LearningSpark.png



## exercise: Computing averages using map and reduce ##

In [0]:
nums = sc.parallelize([2,3,4,5])
nums

In [0]:
nums.collect()

In [0]:
pairs = nums.map(lambda x: (x,1))

## the reduce() function operates on an RDD.  This is described in Table 3-4

In [0]:
pairs.collect()

In [0]:
(sums, count) = pairs.reduce(lambda x,y: (x[0]+y[0],x[1]+y[1]) )

In [0]:
print("average: "+str(sums/float(count)))

Computing average using the **aggregate()** function

We start by illustrating how the aggregate function operates on RDD data partitions in two steps:

In [0]:
z = sc.parallelize([1,2,3,4,5,6], 2)

In [0]:
y = z.aggregate(0,(lambda acc, value: max(acc, value)), (lambda acc1, acc2: acc1+acc2))

In [0]:
y

Following this pattern, let us figure out how to use the aggregate() function to parallelise computing averages

In [0]:
sumCount = nums.aggregate((0, 0),
                          (lambda acc, value: (acc[0] + value, acc[1] + 1)),
                          (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
sumCount[0] / float(sumCount[1])