
# TDI 2025 Lab I: Big Data Processing

## Preliminaries (Imports, Starting Spark, & Defining MapReduce Framework)

Before getting started, please upload the files `shakespeare.txt` and
`social_network_edges.tsv` to the `content` directory in Colab.


In [None]:
from collections import defaultdict
import pyspark
from pyspark import SparkConf, SparkContext
import re
import random

In [None]:
# SparkConf configuration objects store the configuration needed to access the
# cluster
conf = SparkConf().setAppName("myapp").setMaster("local[*]")

# SparkContext objects are used to access the cluster
sc = SparkContext(conf=conf)

# This disables detailed log messages. Try also INFO (more detailed logging)
# or ERROR (less logging).
sc.setLogLevel("WARN")

### Implementation of MapReduce on top of Apache Spark

This implementation is provided so that it is sufficient to install Apache Spark
to use (Hadoop) MapReduce.
You can ignore this for now but may want to check it out later on.

In [None]:
class MapReduceProgram:
    def __init__(self):
        pass

    # -- User-defined methods (implemented in subclasses) -------------------------------------------------

    # User-defined Map function. Default outputs nothing.
    def map(self, key, value):
        pass

    # User-defined Combine function. Default does nothing.
    def combine(self, key, values):
        for value in values:
            yield key, value

    # User-defined Reduce function. Default outputs nothing.
    def reduce(self, key, values):
        pass

    # -- Methods to run the MapReduce program (or parts of it) --------------------------------------------

    # Run this MapReduce program on the provided data.
    def run(self, data):
        return (
            self.run_map_combine(data).groupByKey().flatMap(lambda kv: self.reduce(*kv))
        )

    # Run only the Map part of this program.
    def run_map(self, data):
        return data.flatMap(lambda kv: self.map(*kv))

    # Run only the Map and Combine part of this program.
    def run_map_combine(self, data):
        return (
            self.run_map(data)
            .mapPartitions(self.combine_partitions)
            .flatMap(lambda kv: self.combine(*kv))
        )

    # Run only the Map and Reduce part of this program.
    def run_map_reduce(self, data):
        return self.run_map(data).groupByKey().flatMap(lambda kv: self.reduce(*kv))

    # -- internals ----------------------------------------------------------------------------------------

    def combine_partitions(self, iterator):
        # construct input to combiner
        aggregator = defaultdict(list)
        i = 0
        for item in iterator:
            key, value = item
            aggregator[key].append(value)
        return aggregator.items()

## Task 1: Map Reduce

The goal of this task is to define several short MapReduce programs to become
familiar with MapReduce.

### Word Count Example

**Input:** A list of tuples `(int, str)` where the integer corresponds to a
document ID or a line number and the string to a piece of text.

**Output:** A list of tuples `(str, int)` where the string corresponds to a
word and the integer to the no. of occurrences of that word in all lines or
documents.

In [None]:
# Example program. We use the yield statement to output a key-value pair.
class WordCount(MapReduceProgram):
    def map(self, id, text):
        for word in text.split(" "):
            yield word, 1

    def reduce(self, word, values):
        yield word, sum(values)

#### Test run with example data

In [None]:
# Example data (of type RDD<Integer,String>). We first create an array and then
# copy it to the Spark cluster.
exampleData = sc.parallelize(
    [
        (1, "data science is all about data which means that data matters most"),
        (2, "data driven decisions require a lot of data to make sense of science data"),
        (3, "without data science is just a guess and more data makes data science better"),
        (4, "data pipelines process raw data into clean data that science can use to learn from data science"),
        (5, "while science fiction is called science it often contains little real science or data"),
    ]
)
exampleData.collect()

In [None]:
program = WordCount()
print("Map        : ", program.run_map(exampleData).collect(), end="\n\n")
print("Map+Reduce : ", program.run(exampleData).collect())

#### Test run with a Shakespeare text

In [None]:
# Load whole document into an RDD<Integer, String>.
shakespeare_with_index = (
    sc
    .textFile("shakespeare.txt")        # RDD<String>
    .zipWithIndex()                     # RDD<String, Int>
    .map(lambda t: tuple(reversed(t)))  # RDD<Int, String>
)

shakespeare_with_index.take(10)  # Note: Data is not cleaned.

In [None]:
program = WordCount()
print("Map        : ", program.run_map(shakespeare_with_index).take(10))
print("Map+Reduce : ", program.run(shakespeare_with_index).take(10))

# Top-10 words
print("Top-k words:")
top_words = (
    program
    .run(shakespeare_with_index)
    .sortBy(lambda t: t[1], ascending=False)  # Sort according to the tuple's 2nd element, i.e., the count.
)

top_words.take(10)

In [None]:
# Inspect program like so:
print(top_words.toDebugString().decode("utf-8"))

### Document Count Exercise

**Input:** Given is a list of `(int, str)` tuples where the integer
corresponds to a  document ID (or line number) and the string to the document
(or line) content.

**Output:** A list of `(str, int)` tuples where the string corresponds to a
word and the integer to the no. of occurrences of that word in all documents
(or lines).

In [None]:
class DocumentCount(MapReduceProgram):
    def map(self, key, value):
        # TODO: Your code here.
        ...

    def reduce(self, term, values):
        # TODO: Your code here.
        ...

#### Test run with example data

In [None]:
# run and output 10 terms
DocumentCount().run(exampleData).collect()

# Should give:
# [('most', 1),
#  ('require', 1),
#  ('of', 1),
#  ('decisions', 1),
#  ('more', 1),
# ('and', 1),
# ('pipelines', 1),
# ('all', 1),
# ('that', 2),
#  ...

#### Test run with a shakespeare text (from above)

In [None]:
# run and output 10 terms
DocumentCount().run(shakespeare_with_index).take(10)

# Should give:
# [('KING', 1598),
#  ('', 49942),
#  ('\tDRAMATIS', 37),
#  ('HENRY\tthe', 4),
#  ('HENRY,', 14),
#  ('of', 15527),
#  ('Wales\t(PRINCE', 1),
#  ('\t\t|', 41),
#  ('JOHN', 102),
#  ('WALTER', 21)]

### N-Gram Count Exercise

An n-gram is a contiguous sequence of `n` words from a given sequence of text,
where `n > 0` is a parameter. Your implementation should meet the following
requirements:

- Before processing, remove all special characters at the beginning and end of
  a word. Special characters are all characters other than letters and digits.
  To do this, you can use the already existing `trim()` method.
- Count all occurrences of n-grams of length n that occur at least `sigma`
  times, where `sigma > 0` is another parameter. Both `n` and `sigma` can be
  specified via global variables
- The output should be pairs of (n-gram, number of occurrences).

**Input:** A list of `(int, str)` tuples where the integer corresponds to a
line number (or document ID) and the string to a line of text or a document.

**Output:** A list of `(str, int)` tuples where the string corresponds to an
n-gram and the integer to the no. of occurrences of that n-gram. As described
above, the no. of occurrences should be at least `sigma`.

In [None]:
# set parameters as global variables (use these in your solution)
n = 2  # length of n-grams
sigma = 2  # minimum frequency to output

In [None]:
# Trims a single word.
def trim(s):
    return re.search(r"^[^\w\d]*(.*?)[^\w\d]*$", s).group(1)


# example
trim("!test12,#")

In [None]:
# Define and implement class NGramCount.
class NGramCount(MapReduceProgram):
    def map(self, key, value):
        # TODO: Your code here.
        ...

    def reduce(self, ngram, values):
        # TODO: Your code here.
        ...

In [None]:
NGramCount().run(exampleData).take(10)

# Should give:
# [('data science', 4), ('science is', 2)]

In [None]:
# run and output 10 ngrams from the Shakespeare text used before.
NGramCount().run(shakespeare_with_index).take(10)

# Should give:
# [('1 KING', 48),
#  ('DRAMATIS PERSONAE', 37),
#  ('HENRY\tthe Fourth', 2),
#  ('Fourth KING', 2),
#  ('Prince of', 29),
#  ('SIR WALTER', 19),
#  ('EDMUND MORTIMER\tEarl', 2),
#  ('MORTIMER\tEarl of', 2),
#  ('March MORTIMER', 2),
#  ('of York', 111)]

In [None]:
# Result can be stored on disk using `saveAsTextFile`.
# ngram_result.saveAsTextFile(f"shakespeare-{n}grams-{sigma}")


## Task 2: Apache Spark instead of MapReduce

In this task, implement the previous MapReduce programs using Apache Spark.

For each task, we will provide a list of transformations and actions that
you can use to solve the given problem. Of course, feel free to use your
own approach.

### Word Count (Spark)

**Input:** (see above)

**Output:** (see above)

**Possible Transformations & Actions:**

- `flatMap`: transform each line into a list of words (split at spaces between words), then flatten
- `map`: trim each word using the provided `trim` function
- `map`: transform each word into a tuple `(word, 1)`
- `reduceByKey`: sum all ones together that belong to a single word

In [None]:
word_count_result = (
     sc
    .textFile("shakespeare.txt")
    # TODO: Your code here.
)

word_count_result.take(10)

# Should give:

# [('KING', 1774),
#  ('', 64849),
#  ('DRAMATIS', 37),
#  ('HENRY\tthe', 4),
#  ('Fourth', 45),
#  ('of', 16423),
#  ('Wales\t(PRINCE', 1),
#  ('JOHN', 121),
#  ('WALTER', 21),
#  ('BLUNT', 12)]

In [None]:
# Sorting is really easy in Spark (but would require another program in MapReduce).
word_count_result_sorted = (
    word_count_result
    .sortBy(lambda t: t[1], ascending=False)
)

word_count_result_sorted.take(10)

# Should give:
# [('', 64849),
#  ('the', 25542),
#  ('and', 19603),
#  ('I', 18419),
#  ('to', 16850),
#  ('of', 16423),
#  ('a', 13254),
#  ('you', 12507),
#  ('my', 11285),
#  ('in', 10545)]

### Document Count (Spark)

**Input:** (see above)

**Output:** (see above)

**Possible Transformations & Actions:**

- `flatMap`: Split a document (or line) into words, then trim the word. Keep
  the index in the resulting tuples (in the second position).
- `groupByKey`: Group all tuples by their keys so that the values
  consist of the indices.
- `mapValues`: Convert the values into a set (using the `set` constructor).
- `mapValues`: Compute the no. of items per set.


In [None]:
document_count_result = (
    sc
    .textFile("shakespeare.txt")
    .zipWithIndex()
    # TODO: Your code here.
)

document_count_result.take(10)

# Should give:

# [('KING', 1766),
#  ('', 50119),
#  ('DRAMATIS', 37),
#  ('HENRY\tthe', 4),
#  ('Fourth', 45),
#  ('of', 15842),
#  ('Wales\t(PRINCE', 1),
#  ('JOHN', 121),
#  ('WALTER', 21),
#  ('BLUNT', 12)]

### n-gram Count (Spark)

**Input:** (see above)

**Output:** (see above)

**Possible Transformations & Actions:**

- `flatMap`: Create ngrams using the `create_ngrams` helper function.
- `map`: Convert the ngrams into `(ngram, 1)` tuples.
- `reduceByKey`: Group by key and add the values together to produce an ngram count.
- `filter`: Filter ngrams with less than sigma occurrences.

In [None]:
n = 3
sigma = 20


# Define helper function.
def create_ngrams(line):
    """Given a line of text, produce ngrams of length n."""
    # TODO: Your code here.

In [None]:
ngram_result = (
    sc
    .textFile("shakespeare.txt")
    .flatMap(create_ngrams)
    # TODO: Your code here.
)

ngram_result.take(10)

# Should give:

# [('1 KING HENRY', 48),
#  ('KING HENRY IV', 52),
#  ('to tell you', 21),
#  ('It is a', 28),
#  ('is to be', 34),
#  ('so much as', 24),
#  ('by and by', 41),
#  ('that thou art', 24),
#  ('as it is', 30),
#  ('I can tell', 38)]

## Task 3: A more complex spark task

This task contains a Spark task that is a bit more complicated than the
previous ones.

**Input:** A social network graph, given as a list of edges as tab-separated 
values: `str \t str`. Each relationship only occurs once.

**Output:** A list of mutual friends of pairs of network members, i.e., a list
of tuples `((str, str), set[str])`.

### Example

**Input:** 
```
Max       Stefanie
Max       Carla
Stefanie  Carla
Max       Bernhard
Stefanie  Bernhard

```
**Output:**
```
[
 (('Bernhard', 'Carla'), {'Max', 'Stefanie'}),
 (('Max', 'Stefanie'), {'Bernhard', 'Carla'}),
 (('Bernhard', 'Max'), {'Stefanie'}),
 (('Carla', 'Max'), {'Stefanie'}),
 (('Carla', 'Stefanie'), {'Max'}),
 (('Bernhard', 'Stefanie'), {'Max'})
]
```

**Possible Transformations & Actions:**

It may help to approach this problem in several steps.

- **Step 1:** Compute adjacency sets per person where each set contains all
  friends of that person.
  - `map`: Split each input line at the tab character `\t`.
  - `flatMap`: For each friends tuple, create a reversed tuple.
  - `groupByKey`: Group all friends per user.
  - `mapValues`: Apply the `set` constructor to obtain a set of friends per user.
- **Step 2:** Create tuples which contain a single mutual friend per pair of users.
  - `flatMap`: For each pair of friends `(friendA, friendB)` of a user `user`,
      create a tuple `((friendA, friendB), user)`.
- **Step 3:** Group preceding results together to solve the problem.
  - `groupByKey`: Group pairs of users together so that the values consist of
    their mutual friends.
  - `mapValues`: Convert the values into a set.
  - (optional) `sortBy`: Sort the results based on the no. of mutual friends.

In [None]:
example_friends = sc.parallelize([
    "Max\tStefanie",
    "Max\tCarla",
    "Stefanie\tCarla",
    "Max\tBernhard",
    "Stefanie\tBernhard",
])

adj_sets = (
    # sc.textFile("social_network_edges.tsv")  # actual data (see file)
    example_friends  # some example data (see above)
    # TODO: Your code here.
)

adj_sets.take(5)

# Should give:
# [('Stefanie', {'Bernhard', 'Carla', 'Max'}),
#  ('Bernhard', {'Max', 'Stefanie'}),
#  ('Max', {'Bernhard', 'Carla', 'Stefanie'}),
#  ('Carla', {'Max', 'Stefanie'})]

In [None]:
single_mutual_friend = (
    adj_sets
    # TODO: Your code here.
)

single_mutual_friend.take(5)

# Should give:
# [(('Carla', 'Max'), 'Stefanie'),
#  (('Bernhard', 'Carla'), 'Stefanie'),
#  (('Bernhard', 'Max'), 'Stefanie'),
#  (('Max', 'Stefanie'), 'Bernhard'),
#  (('Bernhard', 'Stefanie'), 'Max')]

In [None]:
mutual_friends = (
    single_mutual_friend
    # TODO: Your code here.
)

mutual_friends.take(10)

# Should give:

# [(('Bernhard', 'Carla'), {'Max', 'Stefanie'}),
#  (('Max', 'Stefanie'), {'Bernhard', 'Carla'}),
#  (('Bernhard', 'Max'), {'Stefanie'}),
#  (('Carla', 'Max'), {'Stefanie'}),
#  (('Carla', 'Stefanie'), {'Max'}),
#  (('Bernhard', 'Stefanie'), {'Max'})]



### Bonus: Mutual friends of friends

Filter the list of mutual friends so that it only contains mutual friends of befriended users.

**Example (continue from above):**
```
[
 (('Max', 'Stefanie'), {'Bernhard', 'Carla'}),
 (('Bernhard', 'Max'), {'Stefanie'}),
 (('Carla', 'Max'), {'Stefanie'}),
 (('Carla', 'Stefanie'), {'Max'}),
 (('Bernhard', 'Stefanie'), {'Max'})
]
```
This means that the tuple `(('Bernhard', 'Carla'), {'Max', 'Stefanie'})` is removed
because Bernhard and Carla are not friends.

**Possible Transformations & Actions:**

- **Step 1:** Compute a list of all friend pairs.
  - `map`: Split input at tab characters.
  - `flatMap`: Per tuple, additionally create the reversed tuple.
  - `collect`: Collect data from the Spark cluster to a local Python list.
- **Step 2:** Use the list of friend pairs to filter the previous result.
  - `filter`: Keep the result if it exists in `friend_pairs`.
 
**Note:** Step 1 would be solved using a [Broadcast variable](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.Broadcast.html) in practice.

In [None]:
friend_pairs = set(
    # sc.textFile("social_network_edges.tsv")
    example_friends
    # TODO: Your code here.
)

friend_pairs

# Should give:
# {('Bernhard', 'Max'),
#  ('Bernhard', 'Stefanie'),
#  ('Carla', 'Max'),
#  ('Carla', 'Stefanie'),
#  ('Max', 'Bernhard'),
#  ('Max', 'Carla'),
#  ('Max', 'Stefanie'),
#  ('Stefanie', 'Bernhard'),
#  ('Stefanie', 'Carla'),
#  ('Stefanie', 'Max')}

In [None]:
mutual_friends_of_friends = (
    mutual_friends
    # TODO: Your code here.
)

mutual_friends_of_friends.take(10)

# Should give: 

# [(('Max', 'Stefanie'), {'Bernhard', 'Carla'}),
#  (('Bernhard', 'Max'), {'Stefanie'}),
#  (('Carla', 'Max'), {'Stefanie'}),
#  (('Carla', 'Stefanie'), {'Max'}),
#  (('Bernhard', 'Stefanie'), {'Max'})]

## Understanding RDDs

To use RDDs effectively, it is important to understand the underlying dataflow
concept so that (1) your Spark program outputs what it should and (2) your
Spark program does not compute more than it should (and than you may expect).

The program below will showcase these points. It has a number flaws, which
we first identify and then fix.


In [None]:
a = list(range(0, 128000))
sample = sc.parallelize(a)
while sample.count() > 1000:
    sample = sample.filter(lambda x: random.random() > 0.5)

### a)
Suppose we run the program five times (but do not execute the program
yet.) Which outputs do you expect on line 5? How often do you think the
random number generator is called on average during the second iteration
of the while loop?


### b)
Execute the program five times in Spark. Which outputs do you get? Were your
expectations met?



In [None]:
print(sample.count())

### c)

The program can output a sample size larger than 1000. Why?


### d)

Assume that there are no failures in the cluster while running your program.
Suggest how to fix the above program using the cache. (The goal is to get an
RDD representing a sample of size at most 1000.)


In [None]:
# TODO: Your code here.

### e)

In your fix, how much data will be cached by Spark? (If in answering this
question you spot a problem in your fix, fix it.)


In [None]:
# TODO: Your code here.

### f)

Assume that there can be node failures. Is your fix still correct? If yes,
why? If not, why not? Modify your fix so that it works in case of up to one
node failure.

Hint: have a look at the [Storage Level
documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html).

In [None]:
# TODO: Your code here.