# RDD Practice

In [12]:
from pyspark.sql import SparkSession
from py4j.protocol import Py4JJavaError

import time

In [13]:
# run locally
spark = SparkSession.builder.master("local").getOrCreate()

Simple `map()` example with lambda function:

In [14]:
temps_f = [64.0, 63.3, 61.2, 61.0, 58.7, 59.2, 59.0]

temps_f_rdd = spark.sparkContext.parallelize(temps_f)
temps_c_rdd = temps_f_rdd.map(lambda t: round((t - 32) * 5 / 9, 1))

temps_c = temps_c_rdd.collect()
print(temps_c)

[17.8, 17.4, 16.2, 16.1, 14.8, 15.1, 15.0]


You don't need a lambda, normal functions work as well:

In [15]:
# works with normal (not lambda) funcs as well:
def cels_to_fahr(cels):
    fahr = (cels - 32) * 5 / 9
    return round(fahr, 1)


temps_f = spark.sparkContext.parallelize([64.0, 63.3, 61.2, 61.0, 58.7, 59.2, 59.0])
print(temps_f.map(cels_to_fahr).collect())

[17.8, 17.4, 16.2, 16.1, 14.8, 15.1, 15.0]


PySpark is lazily evaluated, so only certain functions will actually trigger a transformation.

In [16]:
rdd = spark.sparkContext.parallelize([1, 2, 3])

# this transformation is clearly illegal
rdd_messed_up = rdd.map(lambda x: x / 0 + "bad")
print("Defined illegal transformation (fine)")

# ...but it won't error until we actually collect() the result:
try:
    print("Calling collect() on the result (errors)")
    rdd_messed_up.collect()
except Py4JJavaError as e:
    print(f"Error:\n  {repr(e)}")

Defined illegal transformation (fine)
Calling collect() on the result (errors)
Error:
  Py4JJavaError('An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.\n', JavaObject id=o159)


Another example of lazy evaluation. Here the "expensive" computation never actually happens.

In [17]:
def expensive_operation(input):
    time.sleep(5000000)  # sleep for many many seconds
    return input + 1


rdd = spark.sparkContext.parallelize([1, 2, 3]).map(expensive_operation)
print("This RDD exists despite never being evaluated:")
print("  " + repr(rdd))

This RDD exists despite never being evaluated:
  PythonRDD[18] at RDD at PythonRDD.scala:56


Calculating averages with `reduce()` and `count()`:

In [None]:
scores = [72, 87, 92, 76, 83, 91, 63, 88, 99, 85]

scores_rdd = spark.sparkContext.parallelize(scores)

total = scores_rdd.reduce(lambda x, y: x + y)
count = scores_rdd.count()
print(f"Average score is {total / count}")

Average score is 83.6


TODO: `broadcast()` and `accumulator`

More examples (none of these are important imo):

`countByKey()` and `countByValue()` measure the frequencies of things and return a dict.

In [50]:
names = ["Alice", "Bob", "Bob", "Bob", "Charlie", "Eve", "Eve", "Eggplant"]

rdd = spark.sparkContext.parallelize(names)
# basic counter
print("count by value: ", rdd.countByValue())
# first char in each string is the 'key'
# usually this would be used with tuples
print("count by key: ", rdd.countByKey())

count by value:  defaultdict(<class 'int'>, {'Alice': 1, 'Bob': 3, 'Charlie': 1, 'Eve': 2, 'Eggplant': 1})
count by key:  defaultdict(<class 'int'>, {'A': 1, 'B': 3, 'C': 1, 'E': 3})


Perform aggregates on paired data with `reduceByKey()`:

In [51]:
student_scores = [
    ("Jerry", 72),
    ("Jerry", 87),
    ("Jerry", 92),
    ("Jerry", 76),
    ("Jerry", 83),
    ("Jerry", 91),
    ("Jimbob", 63),
    ("Jimbob", 88),
    ("Jimbob", 69),
    ("Jimbob", 85),
]

score_rdd = spark.sparkContext.parallelize(student_scores)
total_rdd = score_rdd.reduceByKey(lambda x, y: x + y)
count_rdd = score_rdd.countByKey()
scores_rdd = total_rdd.map(lambda pair: (pair[0], pair[1] / count_rdd[pair[0]]))
print(scores_rdd.collect())

[('Jerry', 83.5), ('Jimbob', 76.25)]


`flatMap()` sounds simple but it's kinda weird imo. You can think of it as turning every row into multiple rows.

In [None]:
data_2d = [[1, 2, 3], [4, 5], [6, 7, 8, 9]]
data_phrases = [["the quick brown fox"], ["the lazy brown dog"]]

rdd_2d = spark.sparkContext.parallelize(data_2d)
rdd_phrases = spark.sparkContext.parallelize(data_phrases)

flat_2d = rdd_2d.flatMap(lambda row: row)
flat_phrases = rdd_phrases.flatMap(lambda row: row[0].split())

print("2D array flattened:", flat_2d.collect())
print("Phrases flattened:", flat_phrases.collect())

2D array flattened: [1, 2, 3, 4, 5, 6, 7, 8, 9]
Phrases flattened: ['the', 'quick', 'brown', 'fox', 'the', 'lazy', 'brown', 'dog']
