In [1]:
spark.version


'3.3.2'

In [2]:
spark

# Demo of map()

In [3]:
some_strings = ['fox jumped', 'fox jumped again', 'red fox jumped', 'blue fox jumped']
print(some_strings)

['fox jumped', 'fox jumped again', 'red fox jumped', 'blue fox jumped']


In [5]:
rdd = spark.sparkContext.parallelize(some_strings)

In [6]:
rdd.collect()

                                                                                

['fox jumped', 'fox jumped again', 'red fox jumped', 'blue fox jumped']

In [7]:
rdd.count()

                                                                                

4

In [8]:
# rdd: RDD[String]

In [10]:
rdd2 = rdd.map(lambda x: (x, len(x)))


In [11]:
rdd2.collect()

[('fox jumped', 10),
 ('fox jumped again', 16),
 ('red fox jumped', 14),
 ('blue fox jumped', 15)]

In [12]:
rdd2.count()


4

In [13]:
# map: 1-to-1 Transformation


In [14]:
some_lists = [ ['A', 'B', 'C'], [], ['E', 'F', 'G'], ['X', 'Y'], []]
len(some_lists)

5

In [15]:
rdd = spark.sparkContext.parallelize(some_lists)

In [16]:
rdd.collect()


[['A', 'B', 'C'], [], ['E', 'F', 'G'], ['X', 'Y'], []]

In [17]:
rdd.count()

5

In [18]:
rdd2 = rdd.map(lambda x: x)


In [19]:
rdd2.collect()

[['A', 'B', 'C'], [], ['E', 'F', 'G'], ['X', 'Y'], []]

In [20]:
rdd2.count()

5

In [21]:
# source RDD: rdd
# target RDD: rdd3
rdd3 = rdd.flatMap(lambda x: x)


In [22]:
rdd3.collect()

['A', 'B', 'C', 'E', 'F', 'G', 'X', 'Y']

In [23]:
rdd3.count()

8

In [24]:
# RDD.flatMap(): Return a new RDD by first applying a function 
# to all elements of this RDD, and then flattening the results.

# map(): 1-to-1 Transformation
# flatMap(): 1-to-Many Transformation, Many: 0, 1, 2, 3, 4, ...



In [25]:
# filter() Transformation
# filter(): Return a new RDD containing only the elements that satisfy a predicate.

In [26]:
rdd.collect()

[['A', 'B', 'C'], [], ['E', 'F', 'G'], ['X', 'Y'], []]

In [27]:
rdd4 = rdd.filter(lambda x: len(x) > 0)
# Boolean predicate 

In [None]:
len([])

In [30]:
# source RDD: rdd4
# target RDD: rdd5
rdd5 = rdd4.filter(lambda x: len(x) > 2)

In [31]:
rdd5.collect()

[['A', 'B', 'C'], ['E', 'F', 'G']]

In [33]:
# (KEY, VALUE)
key_value = [('A', 2), ('A', 3), ('B', 2), ('B', 6), ('B', 9), ('C', 5)]
len(key_value)
print(key_value)

[('A', 2), ('A', 3), ('B', 2), ('B', 6), ('B', 9), ('C', 5)]


In [34]:
len(key_value)

6

# find sum of values per key
# Reduction Transformations

## groupByKey()
## reduceByKey()
## aggregateByKey()
## combineByKey()

In [35]:
rdd = spark.sparkContext.parallelize(key_value)

In [36]:
rdd.collect()

[('A', 2), ('A', 3), ('B', 2), ('B', 6), ('B', 9), ('C', 5)]

In [37]:
rdd.count()

6

In [38]:
grouped_by_key = rdd.groupByKey()

In [39]:
grouped_by_key.collect()

                                                                                

[('B', <pyspark.resultiterable.ResultIterable at 0x10e53d4e0>),
 ('C', <pyspark.resultiterable.ResultIterable at 0x10e34a800>),
 ('A', <pyspark.resultiterable.ResultIterable at 0x10e34a380>)]

In [40]:
grouped_by_key.mapValues(lambda values: list(values)).collect()

[('B', [2, 6, 9]), ('C', [5]), ('A', [2, 3])]

In [41]:
# x: ('B', [2, 6, 9])
# x: ('C', [5])
# x: ('A', [2, 3])
# x: (key, value)
# x : (x[0], x[1]), where x[0] denotes a key AND x[1] denotes a value
sum_per_key = grouped_by_key.map(lambda x: (x[0], sum(x[1])))


In [42]:
sum_per_key.collect()

[('B', 17), ('C', 5), ('A', 5)]

In [43]:
# mapValues(): Pass each value in the (key, value) pair RDD 
# through a map function without changing the keys

In [44]:
# x: [2, 6, 9]
# x: [5]
# x: [2, 3]
# x: value of (key, value)
sum_per_key = grouped_by_key.mapValues(lambda x: sum(x))


In [45]:
sum_per_key.collect()

[('B', 17), ('C', 5), ('A', 5)]

In [46]:
# x: (key, value): (x[0], x[1])
gt10 = sum_per_key.filter(lambda x: x[1] > 10)

In [47]:
gt10.collect()

[('B', 17)]