# Homework 3: MapReduce and Spark

Seeley McGillis



---



# 1. tf-idf definition

In [1]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/agnews_clean.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 33.2M  100 33.2M    0     0  51.8M      0 --:--:-- --:--:-- --:--:-- 51.8M


In [2]:
!pip install pyspark



In [3]:
from pyspark.sql import SparkSession

#start spark session
spark = (SparkSession.builder
         .master("local[*]")
         .appName("AG news")
         .getOrCreate())

agnews = spark.read.csv("agnews_clean.csv", inferSchema=True, header=True) #import agnews as a spark dataset, assuming the schema, and that there is a header

# turning the second column from a string to an arraya
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType
agnews = agnews.withColumn('filtered', F.from_json('filtered', ArrayType(StringType())))

In [4]:
# each row contains the document id and a list of filtered words
agnews.show(5, truncate=30)

+---+------------------------------+
|_c0|                      filtered|
+---+------------------------------+
|  0|[wall, st, bears, claw, bac...|
|  1|[carlyle, looks, toward, co...|
|  2|[oil, economy, cloud, stock...|
|  3|[iraq, halts, oil, exports,...|
|  4|[oil, prices, soar, time, r...|
+---+------------------------------+
only showing top 5 rows





---



Design the MapReduce functions for calculating the tf-idf measure.

In [6]:
import math #import math functions

#Select the relevant columns from the DataFrame and convert to rdd
rdd = agnews.select("_c0", "filtered").rdd

#Map each row to a tuple
mapped_rdd = rdd.map(lambda row: (str(row["_c0"]), row["filtered"]))

#Collect results into a list
documents = mapped_rdd.collect()

total_documents = len(documents) #total number of documents

#computes the term frequency of each word in each document
def map_phase(documents):
    for doc_id, words in documents:
        total_terms = len(words) #total amount of words in the document
        tf_counts = {}
        for word in words: #iterate through words in documents
            tf_counts[word] = tf_counts.get(word, 0) + 1
        for word, count in tf_counts.items():
            tf = count / total_terms #term frequency = count of word in document / total number of words in document
            yield (word, (doc_id, tf)) #pairs where word is word in document, doc_id is document it came from, tf is term frequency

#Groups the mapped data by word to prepare it for the reduce phase
def shuffle_and_sort(mapped_data): #mapped_data is pairs from map phase
    term_dict = {} #empty dictionary
    for term, doc_tf in mapped_data:
        term_dict.setdefault(term, []).append(doc_tf)
    return term_dict #dictionary with key being a word, and value being a list of list of (doc_id, tf)

#Computes the TF-IDF score for each term in each document
def reduce_phase(shuffled_data, total_docs):
    tf_idf_results = {} #empty dictionary
    for term, doc_list in shuffled_data.items():
        doc_freq = len(doc_list)
        idf = math.log(total_docs / doc_freq) #compute inverse document frequency
        for doc_id, tf in doc_list:
            tf_idf_results.setdefault(doc_id, {})[term] = tf * idf
    return tf_idf_results #dictionary with key being the document id, and a dictionary of pairs for the document



---



Calculate tf-idf measure for each row in the agnews_clean.csv. Save the measures in a new column.

In [7]:
mapped = map_phase(documents)
shuffled = shuffle_and_sort(mapped)
tf_idf_matrix = reduce_phase(shuffled, total_documents)

In [8]:
from pyspark.sql import Row #import row

rows = [Row(doc_id=doc_id, tfidf=tfidf_dict) for doc_id, tfidf_dict in tf_idf_matrix.items()] #create list of row objects with document id and dictionary with tfidf

tfidf_df = spark.createDataFrame(rows) #convert list of rows into spark dataframe

from pyspark.sql.functions import col #import col

agnews = agnews.withColumnRenamed("_c0", "doc_id") #rename _c0 column with doc_id
agnews = agnews.withColumn("doc_id", col("doc_id").cast("string")) #ensures doc_id column is a string

agnews_with_tfidf = agnews.join(tfidf_df, on="doc_id", how="left") #perform a left join on doc_id



---



Print out the tf-idf measure for the first 5 documents.

In [9]:
for doc_id, term_scores in list(tf_idf_matrix.items())[:5]: #for first 5 documents
    print(f"\nDocument: {doc_id}") #print document id
    for term, score in sorted(term_scores.items(), key=lambda x: -x[1])[:5]:
        print(f"  {term}: {score:.4f}") #print tf-idf measure


Document: 0
  cynics: 0.5637
  wall: 0.5116
  claw: 0.4991
  dwindling: 0.4572
  sellers: 0.4468

Document: 9
  cynics: 0.5341
  wall: 0.4847
  claw: 0.4728
  dwindling: 0.4332
  sellers: 0.4233

Document: 215
  auction: 0.3942
  overstated: 0.2995
  face: 0.2839
  aspects: 0.2785
  playboy: 0.2724

Document: 806
  lowe: 0.4704
  earnings: 0.2776
  higher: 0.2735
  2q: 0.2506
  home: 0.2436

Document: 821
  changed: 0.4158
  little: 0.3211
  wall: 0.3175
  street: 0.3064
  liabilities: 0.3004




---



#2. SVM objective function

In [10]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/w.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/bias.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/data_for_svm.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1391  100  1391    0     0   5907      0 --:--:-- --:--:-- --:--:--  5919
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    22  100    22    0     0     78      0 --:--:-- --:--:-- --:--:--    78
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 61.9M  100 61.9M    0     0  65.1M      0 --:--:-- --:--:-- --:--:-- 65.1M


In [11]:
from pyspark.sql import SparkSession
import pandas as pd

# Start Spark session
spark = SparkSession.builder.appName("SVM_Predict").getOrCreate()

# Read CSV into Spark DataFrame
data_svm = spark.read.csv('data_for_svm.csv', header=False, inferSchema=True)

# Read weights and bias from CSV
w = pd.read_csv('w.csv', header=None)[0].tolist()
bias = pd.read_csv('bias.csv', header=None).iloc[0, 0]

# Convert Spark DataFrame to pandas DataFrame
#data_pd = data_svm.toPandas()
data_pd = pd.read_csv('data_for_svm.csv', header=None)

In [None]:
# This is an example to read the files. But you should consider using pyspark directly.
# *Make sure you are not assuming a header*!!
#import pandas as pd
#data_svm = pd.read_csv('data_for_svm.csv', header=None)
#w = pd.read_csv('w.csv', header=None)
#bias = pd.read_csv('bias.csv', header=None)

In [None]:
#def loss_SVM(w, b, X, y):
    #pass



---



Design the MapReduce functions required to calculate the loss function.

In [13]:
#Computes hinge loss for each data point
def map_phase(documents, w, bias):
    for x_i, y_i in documents:
        dot_product = sum(w_j * x_j for w_j, x_j in zip(w, x_i)) + bias
        hinge_loss = max(0, 1 - y_i * dot_product)
        yield ("hinge_loss", hinge_loss) #emits key value pair for hinge_loss of data point

#Groups all values with the same key into a list, builds a dictionary
def shuffle_and_sort(mapped_data):
    grouped = {}
    for key, value in mapped_data:
        grouped.setdefault(key, []).append(value)
    return grouped

#Computes final SVM loss
def reduce_phase(shuffled_data, total_docs, w, λ):
    hinge_losses = shuffled_data.get("hinge_loss", [])
    average_hinge_loss = sum(hinge_losses) / total_docs
    reg_term = λ * sum(w_j ** 2 for w_j in w)
    total_loss = reg_term + average_hinge_loss
    return total_loss #Final total loss




---



Using these functions, create a function loss_SVM(w, b, X, y) to calculate the SVM objective for a given choice of w, b with data stored in X, y.

In [14]:
def loss_SVM(w, bias, X, y, λ):
    documents = list(zip(X, y)) #combines features and labels
    total_documents = len(documents) #total amount of documents

    #use map reduce functions
    mapped = list(map_phase(documents, w, bias))
    shuffled = shuffle_and_sort(mapped)
    return reduce_phase(shuffled, total_documents, w, λ)

    pass



---



You are given the following dataset data_for_svm.csv, where the first 64 columns contain X and the last column contains y. Using the weights and bias provided in w.csv and bias.csv, calculate the objective value.

In [18]:
λ = 0.01 #set a random lambda value
X = data_pd.iloc[:, :-1].values #first 64 columns
y = data_pd.iloc[:, -1].values #last column

print("SVM Objective Value:", loss_SVM(w, bias, X, y, λ))

SVM Objective Value: 0.9999963377570842




---



Design the MapReduce function required to make prediction. Predict for all of the data using the provided weights and bias.

In [19]:
#converts spark data frame into an rdd
#adds index to each row
#reorders pair to (index, row)
rdd = data_svm.rdd.zipWithIndex().map(lambda row: (row[1], row[0]))

#applied to each row of rdd
def map_predict(row):
    index, features = row
    x = list(features)[:-1]  # convert row to list and drop label (assuming last column is label)
    dot_product = sum(w_j * x_j for w_j, x_j in zip(w, x)) #computes dot product between feature and weight vectors
    score = dot_product + bias #add bias to score
    prediction = 1 if score >= 0 else -1
    return (index, prediction) #return index and predicted label

# Apply map and sort by index
predictions_rdd = rdd.map(map_predict).sortByKey()

# Extract predictions
predictions = predictions_rdd.map(lambda x: x[1]).collect()

print("Predictions:", predictions)

Predictions: [1, -1, -1, -1, -1, -1, -1, 1, -1, -1, 1, 1, -1, -1, 1, -1, -1, 1, -1, -1, -1, -1, 1, 1, 1, -1, 1, -1, 1, -1, 1, -1, -1, -1, -1, 1, 1, 1, 1, 1, -1, 1, 1, -1, 1, -1, 1, -1, 1, 1, -1, -1, 1, -1, 1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1, -1, 1, 1, 1, 1, -1, -1, 1, 1, 1, -1, -1, -1, 1, 1, -1, -1, 1, -1, -1, 1, -1, 1, 1, -1, 1, -1, 1, 1, 1, 1, -1, 1, -1, -1, 1, 1, 1, 1, 1, 1, -1, 1, 1, 1, 1, -1, -1, -1, -1, -1, -1, -1, -1, 1, 1, 1, 1, 1, 1, -1, -1, -1, -1, -1, 1, -1, -1, -1, -1, 1, 1, 1, -1, -1, 1, 1, -1, 1, -1, 1, -1, 1, -1, 1, 1, -1, 1, -1, -1, -1, -1, 1, -1, -1, -1, -1, 1, 1, -1, 1, 1, 1, 1, -1, 1, 1, -1, -1, -1, -1, 1, 1, 1, 1, 1, -1, 1, -1, -1, -1, -1, -1, -1, -1, -1, 1, -1, 1, -1, 1, -1, -1, 1, -1, -1, 1, -1, -1, 1, -1, -1, -1, -1, -1, -1, -1, 1, -1, -1, -1, 1, 1, 1, 1, 1, 1, -1, -1, -1, 1, 1, 1, 1, -1, -1, -1, 1, -1, 1, -1, 1, 1, 1, 1, 1, 1, 1, -1, 1, -1, -1, 1, -1, 1, 1, -1, 1, 1, -1, -1, 1, 1, 1, -1, -1, 1, -1, 1, 1, -1, -1, -1, -1, 1, -1, -1, -1, 1, -1, -1, 1, -1, 1, 1,



---

