# CMU auto-graded notebook

Before you turn these assignments in, make sure everything runs as expected. First, **restart the kernel** (in the menubar, select Kernel$\rightarrow$Restart) and then **run all cells** (in the menubar, select Cell$\rightarrow$Run All).

Make sure you fill in any place that says `YOUR CODE HERE`, `<FILL IN>`, or "YOUR ANSWER HERE."

# CMU Machine Learning with Large Datasets
## Homework 1 - Coding 3: Naive Bayes with Spark

In [0]:
# Who did you collaborate with on this assignment?
# if no one, collaborators should contain an empty string,
# else list your collaborators below

collaborators = [""]
# YOUR CODE HERE
# raise NotImplementedError()

In [0]:
try:
    collaborators
except:
    raise AssertionError("you did not list your collaborators, if any")

In [0]:
# # YOU CAN MOST LIKELY IGNORE THIS CELL. This is only of use for running this notebook locally.
#
# # THIS CELL DOES NOT NEED TO BE RUN ON DATABRICKS.
# # Note that Databricks already creates a SparkContext for you, so this cell can be skipped.
#
import pyspark
from pyspark.sql import SparkSession, SQLContext

spark = SparkSession.builder \
    .appName("hw") \
    .config("spark.ui.showConsoleProgress", "False") \
    .getOrCreate()

sc = spark.sparkContext
sc.setLogLevel("OFF")
sqlContext = SQLContext(sc)

print("spark context started")

spark context started




In [0]:
import re
from pyspark import SparkFiles
from collections import Counter
import numpy as np

## Enviroment Setup

### (1a) Pick your data sample
All testing functionality is written for the `tiny` data sample. All assertions will fail if you do not use this subsample.

**IMPORTANT NOTE**: All local test cases are meant to be run using the `tiny` data sample. If you select a data sample other than the `tiny` sample, the local tests **will not pass**.

In [0]:
data_sample = 'tiny'  # 'tiny', 'smaller', 'small', 'medium', 'large', 'full'

def get_urls(data_sample):
    if data_sample == 'tiny':
        return ["6e7jq8igkgcsjlxjs16r9q1s5cn6rhbo",
                "rys1y8u5q9xvsnehiblx39xxd9peldbf"]

    elif data_sample == 'smaller':
        return ["tzneue2l2trpxa7eba30fc8gqy851wjw",
                "yc9hq3qg12c456t1b9utpjkhy1mg8wy5"]

    elif data_sample == 'small':
        return ["axx40czn6t9yx4m09y5el9xe5ke3tc4m",
                "wwopvxqhhkqpa020rwarpofr5a1gs3xg"]

    elif data_sample == 'medium':
        return ["5n8z4a9zgx3c0mfeb2si03gh30nliflj",
                "qvklaetsx8jczya65h3kp6975q5a6s7y"]

    elif data_sample == 'large':
        return ["2j5jx2u6tmd0nvkpw9r6d73b1u0k0vhm",
                "ekue74d3ek402liye9apuwv3rymy68dm"]

    elif data_sample == 'full':
        return ["y3oxd7k0qybfrbqxozz1fbmdeguyfetf",
                "ejaylm39bf20arnudovzc6u268qqzw53"]

    else:
        raise ValueError(f"Unknown data sample: {data_sample}")

all_labels = ['Agent', 'other', 'Event', 'ChemicalSubstance', 'Location', 'TimePeriod',
              'Device', 'Person','Place', 'Work', 'SportsSeason', 'Organisation',
              'Species', 'Activity', 'MeanOfTransportation', 'CelestialBody', 'Biomolecule']

urls = get_urls(data_sample)

### (1b) Parsing the raw data

In this part, you'll load the training dataset and preprocess it into a format suitable for training a Naive Bayes classifier. Each line contains a document, its labels, and its content. Use the `parse_line` function to convert each line into `(labels, tokens)` tuples, where `labels` is a list of labels and `tokens` is a list of words from the document.

Your task is to integrate `parse_line` into the provided Spark code to prepare the data for model training.

In [0]:
def tokenizeDoc(curr_doc: str):
    """
    Tokenizes the given document string into a list of words.

    Args:
        curr_doc (str): The input document as a string.

    Returns:
        list: A list of words (tokens) extracted from the document.
    """
    return re.findall(r'\w+', curr_doc)

# TODO: Implement this function
def parse_line(line: str):
    """
    Parses a single line of the dataset into labels and tokens.

    The input line is expected to be tab-separated, with the format:
    <doc_id>\t<labels>\t<document_content>

    Args:
        line (str): A single line from the dataset.

    Returns:
        tuple: A tuple containing:
            - doc_id (str)        : The document ID
            - labels (list of str): List of labels for the document.
            - tokens (list of str): List of word tokens in the document.
    """
    # Hint 1: Use line.split('\t') to separate the document ID, labels, and content.
    parts = line.split('\t')
    doc_id = parts[0]
    # Hint 2: Use labels.split(",") to turn the label string into a list of labels.
    labels = parts[1].split(',')
    doc_words = parts[2]

    # Hint 3: Use tokenizeDoc(doc_words) to tokenize the document content.
    tokens = tokenizeDoc(doc_words)

    return (doc_id, labels, tokens)

# Load the training data
train_url = "https://cmu.box.com/shared/static/" + urls[1]
train_filename = urls[1]
sc.addFile(train_url)
train_rdd = sc.textFile("file://" + SparkFiles.get(train_filename))

# Process each line of the dataset using the `parse_line` function
train_rdd = train_rdd.map(parse_line)

# Remove the document ID for training
train_rdd = train_rdd.map(lambda x: x[1:])

In [0]:
# Check if you correctly implemented the data parsing
# Note: This is meant to be tested on the 'tiny' data sample
first_sample = train_rdd.take(1)[0]
assert(first_sample[0][0] == 'Person')
assert(first_sample[1][:5] == ['abd', 'allah', 'ibn', 'amr', 'ibn'])
assert(len(first_sample[1]) == 169)

## 2. Training the Naive Bayes Classifier

### (2a) Compute vocabulary length

In this step, you will compute the vocabulary size, which represents the number of unique tokens across the entire dataset. This is crucial for calculating conditional probabilities in the Naive Bayes model.

As a reminder, the data in `train_rdd` has the following schema for each line:
- Each line is a tuple containing:
    - `labels`: A list of class labels (e.g., `['label1', 'label2']`)
    - `tokens`: A list of words (e.g., `['word1', 'word2', 'word3']`)

To compute the vocabulary size, you need to flatten the tokens from all documents into a single RDD, then count the distinct tokens using `distinct()` and `count()`.

In [0]:
# Flatten the tokens from all documents into a single RDD, this can be done in a single line
vocabulary = train_rdd.flatMap(lambda x:x[1])

# Compute the distinct token count (vocabulary size), can also be done in a single line
vocabulary_size = vocabulary.distinct().count()

In [0]:
assert(vocabulary_size == 28762)

### (2b) Compute the remainder of your model

Now that you have completed computing the vocabulary lenght, you will need to re-implement the remainder of your Naive Bayes model in a scalable way using Spark.

In [0]:
# TODO: Implement Naive Bayes using Spark
def train_naive_bayes_spark(train_rdd, vocabulary_size, alpha=1):
    """
    Train a Naive Bayes model in a scalable way using Spark.

    Args:
        train_rdd: RDD containing training data in the format (labels, tokens).
        vocabulary_size: Total number of unique tokens (vocabulary size).
        alpha: Laplace smoothing parameter (default is 1).

    Returns:
        model: A dictionary representing the trained Naive Bayes model.
    """
    # Step 1: Calculate P(y) (label prior probabilities)
    label_counts_rdd = train_rdd.flatMap(lambda x: [(label, 1) for label in x[0]])
    label_counts = label_counts_rdd.reduceByKey(lambda x, y: x + y).collectAsMap()  # P(y)

    total_labels = sum(label_counts.values())  # Total number of label occurrences

    # Step 2: Calculate P(w|y) (conditional probabilities with Laplace smoothing)
    token_counts_rdd = train_rdd.flatMap(
        lambda x: [((label, token), 1) for label in x[0] for token in x[1]]
    )
    token_counts = token_counts_rdd.reduceByKey(lambda x, y: x + y)  # Count of each (label, token)

    # Total token counts per label
    total_tokens_rdd = token_counts_rdd.map(lambda x: (x[0][0], x[1]))  # (label, count)
    total_tokens_per_label = total_tokens_rdd.reduceByKey(lambda x, y: x + y).collectAsMap()

    # Step 3: Build the model dictionary
    model = {
        'y': label_counts,  # Counts of labels
        'ys': total_labels,  # Total count of all labels
        'y_w': {},  # Token counts per label
        'y_ws': total_tokens_per_label,  # Total token counts per label
        'vocabulary': vocabulary_size  # Vocabulary size
    }

    # Populate token counts into the model
    for ((label, token), count) in token_counts.collect():
        if label not in model['y_w']:
            model['y_w'][label] = {}
        model['y_w'][label][token] = count

    return model


# Train the Naive Bayes model using the training data
sunny_model = train_naive_bayes_spark(train_rdd, vocabulary_size)

# Check the model structure
print("Trained model:")
print(f"P(y): {sunny_model['y']}")
print(f"Total labels (ys): {sunny_model['ys']}")
print(f"Total tokens per label (y_ws): {sunny_model['y_ws']}")
print(f"Vocabulary size: {sunny_model['vocabulary']}")

Trained model:
P(y): {'Event': 34, 'other': 289, 'Agent': 135, 'ChemicalSubstance': 6, 'Location': 11, 'Device': 1, 'Person': 849, 'SportsSeason': 16, 'Organisation': 138, 'Place': 540, 'Work': 274, 'Species': 226, 'MeanOfTransportation': 30, 'CelestialBody': 23, 'Biomolecule': 17}
Total labels (ys): 2589
Total tokens per label (y_ws): {'Event': 3083, 'other': 34615, 'Agent': 16609, 'ChemicalSubstance': 453, 'Location': 824, 'Device': 262, 'Person': 77993, 'SportsSeason': 1090, 'Organisation': 12478, 'Place': 35742, 'Work': 24024, 'Species': 11943, 'MeanOfTransportation': 3142, 'CelestialBody': 1656, 'Biomolecule': 1733}
Vocabulary size: 28762


%md
To test your model, please extract the following variables from your model and use the cell below to ensure that your model is correct.

In [0]:
# Count of the label "Event" in the tiny dataset
freq_event = train_rdd.filter(lambda x: 'Event' in x[0]).count()
# Count of the label "Work" in the tiny dataset
freq_work = train_rdd.filter(lambda x: 'Work' in x[0]).count()
# Count of the label "Person" in the tiny dataset
freq_person = train_rdd.filter(lambda x: 'Person' in x[0]).count()
# Total number of labels in the dataset
labels = train_rdd.flatMap(lambda x: x[0]).count()
# Count of the label "amr" with label "Person" in the tiny dataset
freq_amr_given_person = train_rdd \
    .filter(lambda x: 'Person' in x[0] and 'amr' in x[1]) \
    .map(lambda x: x[1].count('amr')) \
    .sum()
# Count of the label "the" with label "Person" in the tiny dataset
freq_the_given_person = train_rdd \
    .filter(lambda x: 'Person' in x[0] and 'the' in x[1]) \
    .map(lambda x: x[1].count('the')) \
    .sum()
# Count of the label "british" with label "Organisation" in the tiny dataset
freq_british_given_org = train_rdd \
    .filter(lambda x: 'Organisation' in x[0] and 'british' in x[1]) \
    .map(lambda x: x[1].count('british')) \
    .sum()
# Count of the label "district" with label "Place" in the tiny dataset
freq_district_given_place = train_rdd \
    .filter(lambda x: 'Place' in x[0] and 'district' in x[1]) \
    .map(lambda x: x[1].count('district')) \
    .sum()
# Count of tokens with the label "Event" in the tiny dataset
event_tokens = train_rdd \
    .filter(lambda x: 'Event' in x[0]) \
    .map(lambda x: len(x[1])) \
    .sum()
# Count of tokens with the label "Person" in the tiny dataset
person_tokens = train_rdd \
    .filter(lambda x: 'Person' in x[0]) \
    .map(lambda x: len(x[1])) \
    .sum()
# Count of tokens with the label "Work" in the tiny dataset
work_tokens = train_rdd \
    .filter(lambda x: 'Work' in x[0]) \
    .map(lambda x: len(x[1])) \
    .sum()

In [0]:
## Test the count of each label in the dataset
assert(freq_event == 34)
assert(freq_work == 274)
assert(freq_person == 849)
## Test the total number of labels in the dataset
assert(labels == 2589)
## Test the count of each token given a label
assert(freq_amr_given_person == 7)
assert(freq_the_given_person == 4544)
assert(freq_british_given_org == 11)
assert(freq_district_given_place == 260)
## Test the count of tokens for each label
assert(event_tokens == 3083)
assert(person_tokens == 77993)
assert(work_tokens == 24024)

## 3. Testing the model
Run the following cell to load the test data. If you correctly implemented the `parse_line` function, this should work just fine.

In [0]:
test_url = "https://cmu.box.com/shared/static/" + urls[0]
test_filename = urls[0]
sc.addFile(test_url)
test_rdd = sc.textFile("file:///" + SparkFiles.get(test_filename))
test_rdd = test_rdd.map(parse_line)

In [0]:
first_sample = test_rdd.filter(lambda x: x[0] == "2005_UEFA_Women's_Cup_Final").collect()[0][1:]
assert(first_sample[0][0] == 'Event')
assert(first_sample[1][:5] == ['the','2005','uefa','womens','cup',])
assert(len(first_sample[1]) == 36)

### (3a) Generating predictions

In this step, you will generate predictions on the test dataset.

Make sure you:
- Implement the loop that iterates over all labels and tokens.
- Use Laplace smoothing when calculating `P(w|y)` with alpha value of 1 as you did in the previous notebook.

You should output an RDD `output` where each element looks like `[doc_ID, true_labels, predictd_label, max_log_prob]`.

In [0]:
import math
def calculate_label_log_probabilities(tokens, label, model, vocabulary_size, alpha=1):
    """
    Calculate the log probability for a given label using the tokens in the document.
    Args:
        tokens: List of tokens from the document.
        label: The label for which to calculate the log probability.
        model: The Naive Bayes model.
        vocabulary_size: Size of the vocabulary.
        alpha: Laplace smoothing parameter.
    Returns:
        Log probability of the label for the given document.
    """
    # Prior probability: log(P(y))
    label_count = model['y'].get(label, 0)
    total_labels = model['ys']
    log_prob_y = math.log(label_count / total_labels)

    # Conditional probabilities: log(P(w|y))
    total_tokens_label = model['y_ws'].get(label, 0)
    log_prob_tokens = 0
    for token in tokens:
        token_count = model['y_w'].get(label, {}).get(token, 0)
        prob_w_given_y = (token_count + alpha) / (total_tokens_label + alpha * vocabulary_size)
        log_prob_tokens += math.log(prob_w_given_y)

    return log_prob_y + log_prob_tokens

def generate_predictions(test_rdd, model, vocabulary_size, alpha=1):
    """
    Generate predictions on the test dataset using the trained Naive Bayes model.
    Args:
        test_rdd: RDD containing the test data.
        model: The trained Naive Bayes model.
        vocabulary_size: Size of the vocabulary.
        alpha: Laplace smoothing parameter.
    Returns:
        RDD where each element is [doc_ID, true_labels, predicted_label, max_log_prob].
    """
    def predict(doc):
        doc_id, true_labels, tokens = doc
        max_log_prob = float('-inf')
        predicted_label = None

        # Iterate over all labels in the model
        for label in model['y']:
            log_prob = calculate_label_log_probabilities(tokens, label, model, vocabulary_size, alpha)
            if log_prob > max_log_prob:
                max_log_prob = log_prob
                predicted_label = label

        return [doc_id, true_labels, predicted_label, max_log_prob]

    # Apply the prediction function to each document in the test RDD
    return test_rdd.map(predict)


# Generate predictions on the test dataset
output = generate_predictions(test_rdd, model=sunny_model, vocabulary_size=sunny_model['vocabulary'])

In [0]:
# Get first two predictions by filtering for their document ID, then extracting their true labels, predicted label, and max log probability.
first_pred = output.filter(lambda x: x[0] == "2005_UEFA_Women's_Cup_Final").collect()[0][1:]
second_pred = output.filter(lambda x: x[0] == "Armand_L%C3%A9on_de_Baudry_d'Asson").collect()[0][1:]

epsilon = 1e-3

assert(first_pred[:2] == [['Event'], 'Person'])
assert(abs(first_pred[2] + 275.095) < epsilon)

assert(second_pred[:2] == [['Person', 'other'], 'Person'])
assert(abs(second_pred[2] + 228.391) < epsilon)

### (3b) Checking accuracy

Now, you will implement the `check_prediction` function to calculate the accuracy of the Naive Bayes model. This function checks whether the predicted label for a document matches any of the true labels.

The `check_prediction` function takes a tuple containing:

- `labels`: The true labels for the document.
- `prediction`: The predicted label for the document.
- A third value, which is the log probability, that you don’t need to use in this function.
If the predicted label matches any of the true labels, return 1. Otherwise, return 0.

After implementing this function, you will use map to apply it to each row of the output, where each row contains the true labels, predicted label, and log probability.

Next, compute:
- `total_test_samples`: The total number of test samples (the number of rows in the output).
- `total_correct_samples`: The number of correct predictions (the sum of the values returned by check_prediction).

Note: The RDD `output` containing the predictions should have been computed in the last part

In [0]:
output = generate_predictions(test_rdd, sunny_model, vocabulary_size)

def check_prediction(row):
    """
    Checks if the predicted label matches any of the true labels.

    Args:
        row (tuple): A tuple containing the doc_id, true labels, predicted label, and log probability.

    Returns:
        int: 1 if the prediction is correct, 0 otherwise.
    """
    doc_id, labels, prediction, log_prob = row
    return 1 if prediction in labels else 0

    # raise NotImplementedError

# # Apply the check_prediction function to each row in the output
output = output.map(check_prediction)
# # TODO: Compute total_test_samples and total_correct_samples, both of these are one-liners.
total_test_samples = output.count()
total_correct_samples = output.sum()
accuracy = total_correct_samples / total_test_samples * 100
print(f"Accuracy: {accuracy:.3f}%")

Accuracy: 82.288%


In [0]:
epsilon = 1e-3
assert(abs(accuracy - 82.288) < epsilon)

## 4. Top 10 words per label

In this task, you will write code that identifies the top 10 most frequent tokens for each label. This is useful for analyzing which words are most characteristic of each class.

Your result must be a dictionary where the keys are labels, and the values are lists of tuples of `(token, frequency)` sorted such that the first element is the most frequent token, the second is the next most, and so on.

Store your result in the `result` variable.

Hint: Think about how you calculated `y_w`, which is the frequency of each token given a label. You might want to re-use some code from that section, and think about what other map and reduce operations could you add onto that computation to get the top 10 most frequent words?

In [0]:
# Flatten the train_rdd to generate (label, token) pairs
label_token_counts_rdd = train_rdd.flatMap(
    lambda x: [((label, token), 1) for label in x[0] for token in x[1]]
)

# Aggregate counts for each (label, token) pair
label_token_counts = label_token_counts_rdd.reduceByKey(lambda x, y: x + y)

# Map to (label, (count, token)) for grouping by label
label_to_token_counts_rdd = label_token_counts.map(
    lambda x: (x[0][0], (x[1], x[0][1]))  # (label, (count, token))
)

# Group by label and extract top 10 tokens for each label
top_tokens_per_label_rdd = label_to_token_counts_rdd.groupByKey().mapValues(
    lambda tokens: sorted(tokens, key=lambda x: (-x[0], x[1]))[:10]  # Sort by count descending, token ascending
)

# Convert to a dictionary
result = dict(top_tokens_per_label_rdd.collect())

In [0]:
assert(result['Organisation'][:5] == [(872, 'the'), (459, 'in'), (433, 'and'), (410, 'of'), (247, 'a')])
assert(result['Person'][:5] == [(4544, 'the'), (2838, 'in'), (2716, 'of'), (2504, 'and'), (1862, 'a')])

## 5. Train and test on large dataset
If you have passed all local tests, you are now ready to train and test your model on the `large` dataset. Run the following cell to load the `large` training and test RDDs.

In [0]:
data_sample = 'large'
urls = get_urls(data_sample)

# Load the training data
train_url = "https://cmu.box.com/shared/static/" + urls[1]
train_filename = urls[1]
sc.addFile(train_url)
train_rdd = sc.textFile("file://" + SparkFiles.get(train_filename))
train_rdd = train_rdd.map(parse_line)
train_rdd = train_rdd.map(lambda x: x[1:])

# Load the test data
test_url = "https://cmu.box.com/shared/static/" + urls[0]
test_filename = urls[0]
sc.addFile(test_url)
test_rdd = sc.textFile("file:///" + SparkFiles.get(test_filename))
test_rdd = test_rdd.map(parse_line)

Here, you will implement your training and testing functionality. Your final result should be an RDD `output` where each line has the form `[doc_ID, true_labels, predictd_label, max_log_prob]`.

In [0]:
# Step 1: Train the Naive Bayes model
# Flatten the tokens from all documents into a single RDD, this can be done in a single line
vocabulary = train_rdd.flatMap(lambda x:x[1])

# Compute the distinct token count (vocabulary size), can also be done in a single line
vocabulary_size = vocabulary.distinct().count()
large_model = train_naive_bayes_spark(train_rdd, vocabulary_size)

# Step 2: Generate predictions
output = generate_predictions(test_rdd, model=large_model, vocabulary_size=large_model['vocabulary'])

After training and creating the `output` RDD from the test data, we convert `output` from an RDD to a Spark DataFrame and call `display` to see the entries. Once this cell runs, you should download the df as a `.csv` by clicking on the dropdown next to Table in the top left and select 'Download all rows. This will re-run the cell, and then download a file `export.csv` to your local machine. This what you will upload to Gradescope so make sure you store it somewhere accessible.

In [0]:
# Ensure max_log_probability is a float
result = output.map(lambda x: (x[0], x[1], x[2], float(x[3])))

# Convert to DF
result_df = result.toDF()

# Display DF
display(result_df)

_1,_2,_3,_4
%C3%80_cause_de_l'automne,List(Work),Work,-720.611917952317
%C3%89lisabeth_Marguerite_d'Orl%C3%A9ans,List(Person),Person,-457.5147967979956
%C3%89tienne_N'tsunda,List(Person),Person,-251.8373855091699
'74%E2%80%93'75,List(Work),Work,-241.13436901364457
'74_Jailbreak,List(Work),Work,-709.0104328896405
'Allo_'Allo!,List(Work),Work,-903.875511805592
'Alqama_ibn_'Abada,"List(Agent, Person, other)",other,-1323.3642698474791
'Bout_It,List(Work),Work,-583.0112895803089
'Moteng,List(Place),Place,-128.62376193602535
'Ot_'n'_Sweaty,List(Work),Work,-1314.3953021229363
