In [1]:
import camber

In [2]:
spark = camber.spark.connect(worker_size="XSMALL")

Output()

**Resilient Distributed Dataset (RDD)**: RDD is the fundamental data structure of Spark. It is fault-tolerant (resilient) and immutable distributed collections of any type of objects.

In [3]:
# Initializing Spark
sc = spark.sparkContext


# Part 1: Create RDDs and Basic Operations

In [4]:
# Generate random data:
import random
# Generate 10 random numbers between 0 and 40
randomlist = random.sample(range(0, 40), 10)
print(randomlist)

[14, 22, 7, 20, 38, 11, 2, 8, 16, 24]


In [5]:
# Create RDD:
rdd1 = sc.parallelize(randomlist, 4)
rdd1.collect()

[14, 22, 7, 20, 38, 11, 2, 8, 16, 24]

In [6]:
# Data distribution in partitions:
print(rdd1.getNumPartitions())
print(rdd1.glom().collect())
print("Two partitions: ", rdd1.glom().take(2))

4
[[14, 22], [7, 20], [38, 11], [2, 8, 16, 24]]
Two partitions:  [[14, 22], [7, 20]]


In [7]:
# Print last partition
for item in rdd1.glom().collect()[3]:
  print(item)

2
8
16
24


In [8]:
# count():
rdd1.count()

10

In [9]:
# first():
rdd1.first()

14

In [10]:
rdd1.glom().collect()

[[14, 22], [7, 20], [38, 11], [2, 8, 16, 24]]

In [11]:
# map():
rdd_map = rdd1.map(lambda x:(x+1)*3)
rdd_map.collect()

[45, 69, 24, 63, 117, 36, 9, 27, 51, 75]

In [12]:
# filter(): 
rdd_filter = rdd1.filter(lambda x : x%3==0)
rdd_filter.collect()

[24]

In [13]:
# flatMap():
rdd_flatmap=rdd1.flatMap(lambda x: [x+2,x+5])
print(rdd_flatmap.collect())
print("The summation of elements =", rdd_flatmap.reduce(lambda a,b : a + b))

[16, 19, 24, 27, 9, 12, 22, 25, 40, 43, 13, 16, 4, 7, 10, 13, 18, 21, 26, 29]
The summation of elements = 394


In [14]:
# Descriptive statistics:
print([rdd1.max(), rdd1.min(), rdd1.mean(), rdd1.sum(), round(rdd1.stdev(),2)])

[38, 2, 16.2, 162, 9.85]


In [15]:
rdd1.glom().collect()

[[14, 22], [7, 20], [38, 11], [2, 8, 16, 24]]

In [16]:
# mapPartitions():

def myfunc(partition):
  sum = 0
  for item in partition:
    sum = sum + item
  yield sum  # "return" causes a function to exit; "yield" is used to define generator and returns an intermediate results.

rdd_mapPartition = rdd1.mapPartitions(myfunc)
rdd_mapPartition.collect()

[36, 27, 49, 50]

# Code Demo 1: Filter and Transform Words

Write a Python program that takes a list of sentences as input and performs the following operations:

* Filter out sentences that contain the word "spam".
* Split each remaining sentence into a list of words.
* Transform each word to uppercase.
* Flatten the list of words into a single list.

In [17]:
# Input sentences
sentences = [
    "This is not spam.",
    "I love spam and eggs.",
    "No spam here.",
    "Spam, spam, spam!"
]

# Create RDD
rdd1 = sc.parallelize(sentences, 4)

print(rdd1.glom().collect())

# Implement the filter and transform operations
filtered_sentences = rdd1.map(lambda s: s.replace("spam", "").\
                              replace('"', '').replace('.', '').\
                              replace(',', '').replace('!',''))
words = filtered_sentences.flatMap(lambda s: s.split())

# Print the result
print(words.collect())


[['This is not spam.'], ['I love spam and eggs.'], ['No spam here.'], ['Spam, spam, spam!']]
['This', 'is', 'not', 'I', 'love', 'and', 'eggs', 'No', 'here', 'Spam']


In Apache Spark, `flatMap()` is a transformation operation on an RDD (Resilient Distributed Dataset) that allows you to apply a function to each element of the RDD and flatten the results. It creates a new RDD by first applying the function to each element and then flattening the results into a single list.

Here's how `flatMap()` works:

1. The input function is applied to each element of the RDD.
2. The function returns an iterator or sequence of elements for each input element.
3. The results from the function are flattened into a single list by merging all the generated iterators/sequences.
4. The flattened list is used to create a new RDD.

The key difference between `flatMap()` and `map()` is that `flatMap()` allows each input element to map to zero or more output elements, while `map()` maps each input element to a single output element.

# Code Demo 2: Medical Dataset Analysis and Storytelling

Assume you have a custom medical dataset that contains records of patients and their medical conditions. Each record is a string with the following format: "<patient_id>:<condition>". The exercise involves analyzing the dataset, filtering specific conditions, transforming data, and generating a story based on the results.

Here are the steps to follow:

1. Create an RDD from a list of medical records.
2. Implement a filter operation to select records of patients with a specific condition (e.g., "diabetes").
3. Transform the filtered records by mapping each record to the patient ID only.
Implement a flatMap operation to obtain a single list of patient IDs from the transformed records.
4. Generate a story by using the patient IDs to gather additional information (e.g., demographic data) from another dataset.
Note: For this exercise, assume you have a separate dataset or API that provides additional patient information based on their ID.

In [18]:
# Input medical records
medical_records = [
    "1:diabetes",
    "2:asthma",
    "3:diabetes",
    "4:hypertension",
    "5:diabetes",
    "6:arthritis",
    "7:diabetes",
    "8:asthma",
    "9:diabetes"
]

# Create RDD from medical records
rdd_medical = sc.parallelize(medical_records)

# Filter for patients with diabetes
filtered_diabetes = rdd_medical.filter(lambda record: record.split(":")[1] == "diabetes")

# Transform to patient IDs
patient_ids = filtered_diabetes.map(lambda record: record.split(":")[0])

# Obtain a single list of patient IDs
flat_patient_ids = patient_ids.flatMap(lambda x: x)

# Generate a story based on patient IDs
story = "The following patients have been diagnosed with diabetes: " + ", ".join(flat_patient_ids.collect())

# Print the story
print(story)


The following patients have been diagnosed with diabetes: 1, 3, 5, 7, 9


# Part 2: Advanced RDD Transformations and Actions

In [19]:
# union():
rdd1 = sc.parallelize(random.sample(range(0, 40), 10), 4)
print(rdd1.collect())
rdd2 = sc.parallelize([1, 14, 20, 20, 29, 10, 13, 3],2)
print(rdd2.collect())

rdd_union = rdd1.union(rdd2)
print(rdd_union.getNumPartitions())
print(rdd_union.collect())

[21, 36, 1, 5, 19, 33, 25, 28, 10, 12]
[1, 14, 20, 20, 29, 10, 13, 3]
6
[21, 36, 1, 5, 19, 33, 25, 28, 10, 12, 1, 14, 20, 20, 29, 10, 13, 3]


In [20]:
rdd_union.glom().collect()

[[21, 36],
 [1, 5],
 [19, 33],
 [25, 28, 10, 12],
 [1, 14, 20, 20],
 [29, 10, 13, 3]]

In [21]:
# takeOrdered(n, [ordering])
# This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driverâ€™s memory.
print(rdd1.collect())
print(rdd1.takeOrdered(5))
print(rdd1.takeOrdered(5, key=lambda x: -x))

[21, 36, 1, 5, 19, 33, 25, 28, 10, 12]
[1, 5, 10, 12, 19]
[36, 33, 28, 25, 21]


In [22]:
# reduce():
# A commutative and associative binary operator.
rdd1.reduce(lambda x,y: x+y)

190

In [23]:
# reduceByKey():
rdd_Rbk = sc.parallelize([(1,4),(7,10),(5,7),(1,12),(7,12),(7,1),(9,1),(7,4)], 2)
print(rdd_Rbk.collect())

[(1, 4), (7, 10), (5, 7), (1, 12), (7, 12), (7, 1), (9, 1), (7, 4)]


In [24]:
rdd_Rbk.reduceByKey(lambda x,y: x+y).collect()

[(1, 16), (5, 7), (7, 27), (9, 1)]

In [25]:
# sortByKey():
rdd_Rbk.reduceByKey(lambda x,y: x+y).sortByKey().collect()


[(1, 16), (5, 7), (7, 27), (9, 1)]

In [26]:
# countByKey()
rdd_Rbk.countByKey()
rdd_Rbk.countByKey().items()
sorted(rdd_Rbk.countByKey())
sorted(rdd_Rbk.countByKey().items())

[(1, 2), (5, 1), (7, 4), (9, 1)]

In [27]:
# groupByKey():
rdd_group = rdd_Rbk.groupByKey() 
rdd_group.getNumPartitions()

rdd_group.collect() # it executes at driver node, not recommended

for item in rdd_group.collect():
  print(item[0], [value for value in item[1]])

1 [4, 12]
5 [7]
7 [10, 12, 1, 4]
9 [1]


In [28]:
# lookup(key):
rdd_Rbk.lookup(7)

[10, 12, 1, 4]

# Code Demo 3: Books collection

Assume you have a dataset representing a collection of books. Each record in the dataset contains information about a book, such as its title, author, publication year, and genre. Your task is to perform various analyses on the books using RDD methods in Spark.

In [90]:
bookRecords = [
    ("Book A", "Author 1", 2005, "Fiction"),
    ("Book B", "Author 2", 2010, "Mystery"),
    ("Book C", "Author 1", 2015, "Science Fiction"),
    ("Book D", "Author 3", 2018, "Fantasy"),
    ("Book E", "Author 3", 2018, "Fantasy"),
    # Add more book records as needed
]

booksRDD = sc.parallelize(bookRecords, 6)


In [91]:
# Calculate the number of books in each genre.
booksByGenre = booksRDD.map(lambda book: (book[3], 1)) \
                      .reduceByKey(lambda a, b: a + b)
booksByGenre.collect()

[('Fiction', 1), ('Mystery', 1), ('Fantasy', 2), ('Science Fiction', 1)]

In [92]:
# Find the top 5 authors who have written the most books.
topAuthors = booksRDD.map(lambda book: (book[1], 1)) \
                    .reduceByKey(lambda a, b: a + b) \
                    .takeOrdered(5, key=lambda x: -x[1])
topAuthors

[('Author 3', 2), ('Author 1', 2), ('Author 2', 1)]

In [93]:
# Find the book with the earliest publication year.
oldestBook = booksRDD.min(key=lambda book: book[2])
oldestBook

('Book A', 'Author 1', 2005, 'Fiction')

In [94]:
# Count the number of books written by each author.
booksByAuthor = booksRDD.map(lambda book: (book[1], 1)) \
                        .reduceByKey(lambda a, b: a + b)


In [95]:
booksByAuthor.collect()

[('Author 2', 1), ('Author 3', 2), ('Author 1', 2)]

In [96]:
authorYearRDD = booksRDD.map(lambda x: (x[1], x[2]))


In [101]:
authorYearRDD.glom().collect()

[[],
 [('Author 1', 2005)],
 [('Author 2', 2010)],
 [('Author 1', 2015)],
 [('Author 3', 2018)],
 [('Author 3', 2018)]]

In [102]:
sumCountRDD = authorYearRDD.combineByKey(
    lambda value: (value, 1),
    lambda x, value: (x[0] + value, x[1] + 1),
    lambda x, y: (x[0] + y[0], x[1] + y[1])
)

In [105]:
sumCountRDD.collect()

[('Author 2', (2010, 1)), ('Author 3', (4036, 2)), ('Author 1', (4020, 2))]

In [46]:
meanYearsRDD = sumCountRDD.mapValues(lambda x: x[0] / x[1])

In [47]:
meanYearsRDD.collect()

[('Author 2', 2010.0), ('Author 3', 2018.0), ('Author 1', 2010.0)]



1. `lambda value: (value, 1)`: This is the `createCombiner` function. It is executed when encountering a new key for the first time. In this case, it takes the `value` as an argument and returns a tuple `(value, 1)`. It creates the initial intermediate value by using the `value` itself and setting the count to 1.

2. `lambda x, value: (x[0] + value, x[1] + 1)`: This is the `mergeValue` function. It is called for each subsequent value associated with the same key. The function takes two arguments: `x`, the current intermediate value, and `value`, the new value to be merged. In this case, it adds the `value` to the existing sum (`x[0] + value`) and increments the count by 1 (`x[1] + 1`).

3. `lambda x, y: (x[0] + y[0], x[1] + y[1])`: This is the `mergeCombiners` function. It is used to merge intermediate values within the same partition when there are multiple partitions for the same key. The function takes two arguments: `x` and `y`, representing two intermediate values. In this case, it adds the sums (`x[0] + y[0]`) and counts (`x[1] + y[1]`) to combine the intermediate results across partitions.

Now, let's see how these functions are applied in the `combineByKey()` transformation:

1. When encountering a new key for the first time, the `createCombiner` function is called. It creates the initial intermediate value `(value, 1)`.

2. For subsequent values with the same key, the `mergeValue` function is called. It merges the new value with the existing intermediate value. It updates the sum by adding the new value to the existing sum and increments the count by 1.

3. If there are multiple partitions, the `mergeCombiners` function is used to merge intermediate values across partitions. It combines the sums and counts from different partitions to create a final intermediate result.

The resulting RDD `sumCountRDD` will contain key-value pairs, where the key is the unique author, and the value is a tuple representing the sum of the years and the count of books for that author.

By using `combineByKey()`, you can perform custom aggregations on each key in a distributed and parallel manner, making it a powerful tool for data aggregation in Spark.