# **Homework 3: MapReduce and Spark**

# 1. tf-idf definition

In [2]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/agnews_clean.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 33.2M  100 33.2M    0     0  58.2M      0 --:--:-- --:--:-- --:--:-- 58.1M


In [3]:
!mkdir dataset
!mv agnews_clean.csv dataset/

In [4]:
# initialize Spark
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .master("local[*]")
         .appName("AG news")
         .getOrCreate()
        )

agnews = spark.read.csv("dataset/agnews_clean.csv", inferSchema=True, header=True)

# turning the second column from a string to an array
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType
agnews = agnews.withColumn('filtered', F.from_json('filtered', ArrayType(StringType())))

In [5]:
# each row contains the document id and a list of filtered words
agnews.show(5, truncate=30)

+---+------------------------------+
|_c0|                      filtered|
+---+------------------------------+
|  0|[wall, st, bears, claw, bac...|
|  1|[carlyle, looks, toward, co...|
|  2|[oil, economy, cloud, stock...|
|  3|[iraq, halts, oil, exports,...|
|  4|[oil, prices, soar, time, r...|
+---+------------------------------+
only showing top 5 rows



# 1. tf-idf definition

In [6]:
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType
from collections import defaultdict

# pull "filtered" column and convert to rdd
docs_rdd = (agnews.select("filtered")
            .rdd
            .zipWithIndex()
            .map(lambda rf: (rf[1], rf[0][0]))
           )

In [7]:
### mapping functions

# returns ((doc_id, term),1) for every occurence of each term in document
def map_phase_tf(document):
    doc_id, terms = document
    for t in terms:
        yield ((doc_id, t), 1)

# returns number of terms in document
def map_phase_doc_length(document):
    doc_id, terms = document
    yield (doc_id, len(terms))

# returns (term, 1) once per unique term in document
def map_phase_df(document):
    _, terms = document
    for t in set(terms):
        yield (t, 1)

In [8]:
# shuffle and sort phase: appends each value into list for corresponding key
def shuffle_and_sort(pairs):
    grouped = defaultdict(list)
    for key, values in pairs:
        grouped[key].append(values)
    return grouped

In [9]:
# reduce phase: sums up list of values for each key
def reduce_phase(shuffled_data):
    for key, values in shuffled_data.items():
        yield (key, sum(values))

In [10]:
# tf phase
mapped_tf = docs_rdd.flatMap(map_phase_tf).collect()
shuffled_tf = shuffle_and_sort(mapped_tf)
reduced_tf = dict(reduce_phase(shuffled_tf))

# document length
mapped_len = docs_rdd.flatMap(map_phase_doc_length).collect()
shuffled_len = shuffle_and_sort(mapped_len)
reduced_len = dict(reduce_phase(shuffled_len))

In [11]:
import math
# normalizing term frequency
tf_norm = {
    (d,t):reduced_tf[(d,t)]/reduced_len[d]
    for (d,t) in reduced_tf
}

# df phase
mapped_df = docs_rdd.flatMap(map_phase_df).collect()
shuffled_df = shuffle_and_sort(mapped_df)
reduced_df = dict(reduce_phase(shuffled_df))

# compyte idf for each term
N = reduced_len.__len__()
idf = {t:math.log(N/df) for t, df in reduced_df.items() }

In [12]:
### tf-idf

# same terms grouped together for TF and IDF
def map_phase_tf_join(doc_tf):
    for (d,t),tfv in doc_tf.items():
        yield (t,("TF", d, tfv))

def map_phase_idf_join(idf_dict):
    for t,idfv in idf_dict.items():
        yield (t,("IDF", idfv))

# computes TF-IDF multiplication
def reduce_phase_tfidf(shuffled):
    out = {}
    for term, recs in shuffled.items():
        idfv   = next(v for tag,*v in recs if tag=="IDF")[0]
        for tag,*rest in recs:
            if tag=="TF":
                d,tfv = rest
                out.setdefault(d,{})[term] = tfv * idfv
    return out

mapped_tf_j = list(map_phase_tf_join(tf_norm))
mapped_idf_j = list(map_phase_idf_join(idf))
shuffled_all = shuffle_and_sort(mapped_tf_j + mapped_idf_j)
doc_tfidf = reduce_phase_tfidf(shuffled_all)

Saving the measures in a new column:

In [13]:
# index agnews DF so each row gets the same 0-based doc_id
agnews_indexed = (
    agnews
      .rdd
      .zipWithIndex()
      .map(lambda row_idx: (row_idx[1],) + tuple(row_idx[0]))
      .toDF(["doc_id"] + agnews.columns)
)

In [14]:
from pyspark.sql.types import MapType, StringType, DoubleType # Import MapType and DoubleType

# turn doc_tfidfinto Spark DF
tfidf_rows = [
    (doc_id, dict(doc_tfidf[doc_id]))
    for doc_id in sorted(doc_tfidf.keys())
]

tfidf_df = (
    spark.createDataFrame(tfidf_rows, ["doc_id", "tfidf"])
         .withColumn("tfidf",
             F.col("tfidf").cast(MapType(StringType(), DoubleType())))
)

In [15]:
# join on doc_id
agnews_with_tfidf = (
    agnews_indexed
      .join(tfidf_df, on="doc_id", how="left")
      .drop("doc_id")
)

# results
agnews_with_tfidf.select(*agnews.columns, "tfidf").show(5, truncate=False)

+-----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [16]:
# print first 5 documents
for d in sorted(doc_tfidf)[:5]:
    print(f"Document {d}:")
    for term,score in sorted(doc_tfidf[d].items(), key=lambda x:-x[1])[:10]:
        print(f"{term:15s}{score: }")
    print()

Document 0:
cynics          0.563734318747707
wall            0.5115985326511431
claw            0.499114829314058
dwindling       0.4572386180709258
sellers         0.4468379768438066
ultra           0.4125512394225831
seeing          0.37743394553516213
band            0.3643421454792778
bears           0.3372044607529448
black           0.2953171727366614

Document 1:
carlyle         0.7168306746824437
occasionally    0.33274321954270536
timed           0.324478643568105
bets            0.27861293130724324
aerospace       0.2581171817448437
reputation      0.2578098186776328
quietly         0.25188254045524316
placed          0.2284965552404658
plays           0.22418048797172685
controversial   0.20949395177306526

Document 2:
outlook         0.4265073217271922
doldrums        0.3770252270329423
economy         0.3721400726458204
depth           0.31343954772064864
hang            0.30475018305843793
cloud           0.295159450642955
soaring         0.2596334462817101
plus         

# 2. SVM objective function

In [17]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/w.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/bias.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/data_for_svm.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1391  100  1391    0     0   9361      0 --:--:-- --:--:-- --:--:--  9398
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    22  100    22    0     0    173      0 --:--:-- --:--:-- --:--:--   174
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 61.9M  100 61.9M    0     0  76.2M      0 --:--:-- --:--:-- --:--:-- 76.1M


In [18]:
from pyspark.sql import SparkSession

# initialize Spark
spark = (SparkSession.builder
        .master("local[*]")
        .appName("SVM_Loss_Calc")
        .getOrCreate()
        )
sc = spark.sparkContext

In [19]:
# load data into rdds
data_df = spark.read.csv("data_for_svm.csv", header=False, inferSchema=True)
X = data_df.rdd.map(lambda row: [float(row[i]) for i in range(64)])
y = data_df.rdd.map(lambda row: int(row[64]))

In [20]:
# load w and bias csvs
w = [float(r[0]) for r in spark.read.csv("w.csv", header=False, inferSchema=True).collect()]
b = float(spark.read.csv("bias.csv", header=False, inferSchema=True).collect()[0][0])

In [21]:
# max component of function
def compute_max(x_vec, y_lbl, w, b):
    margin = sum(w_i*x_i for w_i, x_i in zip(w, x_vec)) + b
    return max(0.0, 1 - y_lbl * margin)

In [22]:
# svm function
def loss_SVM(w, b, X, y, lmb=1.0):
    # pair features & labels
    xy_rdd = X.zip(y)
    # map to individual losses
    margin_loss_rdd = xy_rdd.map(lambda xy: compute_max(xy[0], xy[1], w, b))
    total_max = margin_loss_rdd.sum()
    n = X.count()
    # compute L2-norm squared of weight vector
    w_norm_sq = sum(w_i**2 for w_i in w)
    return lmb*w_norm_sq + (1.0/n)*total_max

In [23]:
# compute objective value of SVM
svm_obj = loss_SVM(w, b, X, y, lmb=1.0)
print(f"SVM objective ({svm_obj:})")

SVM objective (1.0000598582575095)


In [24]:
# function to make prediction
def predict(w, b, X):
    return X.map(
        lambda x_vec:
            1 if (sum(w_i * x_i for w_i, x_i in zip(w, x_vec)) + b) >= 0
              else -1
    )

In [25]:
# apply function and predict
predictions_rdd = predict(w, b, X)
predictions = predictions_rdd.collect()
print(f"Total predictions: {predictions_rdd.count()}")
print("First 20 predictions:", predictions[:20])

Total predictions: 400000
First 20 predictions: [1, -1, -1, -1, -1, -1, -1, 1, -1, -1, 1, 1, -1, -1, 1, -1, -1, 1, -1, -1]


Generative AI Disclosure:

1) I used GAI to help resolve the errors I ran into throughout the homework

2) I used ChatGPT

3) I copied and pasted my error message into ChatGPT and asked "how do I fix this error"