In [1]:
spark

In [2]:
nums = [1, 2, 3, 4, 5]

In [3]:
nums

[1, 2, 3, 4, 5]

In [8]:
sc = spark.sparkContext
rdd = sc.parallelize(nums)

In [9]:
rdd.collect()

[1, 2, 3, 4, 5]

In [7]:
rdd.count()

5

In [10]:
rdd.map(lambda x: x+1).collect()

[2, 3, 4, 5, 6]

In [11]:
#source RDD : rdd : RDD[Integer]
#target RDD : rdd4: RDD[Integer]
rdd4 = rdd.map(lambda x: x+1)

In [12]:
rdd4.collect()

[2, 3, 4, 5, 6]

In [13]:
# map is a 1-to-1 transformation


In [14]:
rdd5 = rdd.map(lambda x: (x+1, x+10))

In [15]:
rdd5.collect()

[(2, 11), (3, 12), (4, 13), (5, 14), (6, 15)]

In [17]:
rdd5.count()

5

In [18]:
rdd4.collect()

[2, 3, 4, 5, 6]

In [19]:
rdd5 = rdd4.filter(lambda x : x > 3)

In [20]:
rdd5.collect()

[4, 5, 6]

In [21]:
rdd6 = rdd4.filter(lambda x : x < 4)

In [22]:
rdd6.collect()

[2, 3]

In [23]:
# for filter(), you need a boolean predicate

In [24]:
rdd7 = rdd.map(lambda x: (x+1, x+10))

In [25]:
rdd7.collect()

[(2, 11), (3, 12), (4, 13), (5, 14), (6, 15)]

In [26]:
rdd8 = rdd7.filter(lambda x: x[1] > 12)

In [27]:
rdd8.collect()

[(4, 13), (5, 14), (6, 15)]

In [28]:
rdd8 = rdd7.filter(lambda x: x[1] > 12).map(lambda x: x[0])

In [29]:
rdd8.collect()

[4, 5, 6]

In [30]:
# Spark is a SUPERSET of Classic MapReduce

In [31]:
pairs = [('alex', 2), ('alex', 20), ('bob', 2), ('bob', 30), ('bob', 50)]

pairs

In [33]:
rdd = spark.sparkContext.parallelize(pairs)

In [34]:
rdd.collect()

[('alex', 2), ('alex', 20), ('bob', 2), ('bob', 30), ('bob', 50)]

In [35]:
sum_per_key = rdd.reduceByKey(lambda x, y: x+y)

In [36]:
sum_per_key.collect()

[('bob', 82), ('alex', 22)]

In [37]:
rdd2 = rdd.map(lambda x: (x[0], (x[1],1)))

In [38]:
rdd2.collect()

[('alex', (2, 1)),
 ('alex', (20, 1)),
 ('bob', (2, 1)),
 ('bob', (30, 1)),
 ('bob', (50, 1))]

In [39]:
rdd3 = rdd.mapValues(lambda i: (i, 1))

In [40]:
rdd3.collect()

[('alex', (2, 1)),
 ('alex', (20, 1)),
 ('bob', (2, 1)),
 ('bob', (30, 1)),
 ('bob', (50, 1))]

In [41]:
# x =(sum1, count1)
# y =(sum2, count2)
# x+y => (sum1+sum2, count1+count2)
sum_and_count = rdd3.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1]))

In [42]:
sum_and_count.collect()

[('bob', (82, 3)), ('alex', (22, 2))]

In [43]:
avg_per_key = sum_and_count.mapValues(lambda tuple2: tuple2[0]/tuple2[1])

In [44]:
avg_per_key.collect()

[('bob', 27.333333333333332), ('alex', 11.0)]

In [45]:
rdd.collect()

[('alex', 2), ('alex', 20), ('bob', 2), ('bob', 30), ('bob', 50)]

In [46]:
# sort&shuffle of Classic MapReduce:
# ('alex', [2, 20])
# ('bob', [2, 30, 50])

In [47]:
# groupByKey(): Group the values for each key in the RDD[(K, V)] into a single sequence.

In [48]:
grouped = rdd.groupByKey()

In [49]:
grouped.collect()

[('bob', <pyspark.resultiterable.ResultIterable at 0x7fee2ed65be0>),
 ('alex', <pyspark.resultiterable.ResultIterable at 0x7fee2ed65048>)]

In [50]:
grouped.mapValues(lambda it: list(it)).collect()

[('bob', [2, 30, 50]), ('alex', [2, 20])]

In [51]:
avg_by_key = grouped.mapValues(lambda it: sum(it)/len(it)) 

In [52]:
avg_by_key.collect()

[('bob', 27.333333333333332), ('alex', 11.0)]

In [53]:
# Transformations: map(), mapValues(), filter(), reduceByKey(), groupByKey()
# Actions: count(), collect()

In [54]:
strings = ['fox jumped', 'fox jumped', 'cat jumped', 'cat jumped', 'cat jumped']

In [55]:
strings

['fox jumped', 'fox jumped', 'cat jumped', 'cat jumped', 'cat jumped']

In [56]:
rdd = spark.sparkContext.parallelize(strings)

In [57]:
rdd.collect()

['fox jumped', 'fox jumped', 'cat jumped', 'cat jumped', 'cat jumped']

In [58]:
unique = rdd.distinct()

In [59]:
unique.collect()

['cat jumped', 'fox jumped']

In [None]:
#Question: assume that distinct() does not exist, 
# how would you implement the same equivalent functionality