In [None]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/agnews_clean.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/w.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/bias.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/data_for_svm.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 33.2M  100 33.2M    0     0  1824k      0  0:00:18  0:00:18 --:--:-- 10.1M
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1391  100  1391    0     0   5952      0 --:--:-- --:--:-- --:--:--  5969
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    22  100    22    0     0     96      0 --:--:-- --:--:-- --:--:--    96
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 61.9M  100 61.9M    0     0  17.7M      0  0:00:03  0:00:03 --:--:-- 17.7M


Gen AI: I used Google Colab to write the source code for this assignment before transferring to JupyterLab. Thus, I used the Colab autocomplete AI to assist with Spark syntax for aggregations and other dataframe functions. I also used ChatGPT to understand how to use user-defined functions better in Spark which helped with step 1 of part 2.

## 1. TF-IDF

In [None]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .master("local[*]")
         .appName("AG news")
         .getOrCreate()
        )

agnews = spark.read.csv("agnews_clean.csv", inferSchema=True, header=True)

# turning the second column from a string to an array
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType
agnews = agnews.withColumn('filtered', F.from_json('filtered', ArrayType(StringType())))

# each row contains the document id and a list of filtered words
agnews.show(5, truncate=30)

+---+------------------------------+
|_c0|                      filtered|
+---+------------------------------+
|  0|[wall, st, bears, claw, bac...|
|  1|[carlyle, looks, toward, co...|
|  2|[oil, economy, cloud, stock...|
|  3|[iraq, halts, oil, exports,...|
|  4|[oil, prices, soar, time, r...|
+---+------------------------------+
only showing top 5 rows



In [None]:
import pyspark.sql.dataframe as d

# Step 1: Map Phase
def map(agnews: d.DataFrame):
  agnews = agnews.select(F.col("_c0").alias("doc"), F.explode("filtered").alias("word"))
  agnews = agnews.groupBy(F.col("doc"), F.col("word"))
  return agnews.count().orderBy(F.col("doc"))

# Step 2: Reduce Phase
def reduce(mapped: d.DataFrame):
  n_docs = mapped.select(F.countDistinct("doc")).collect()[0][0]
  doclen = mapped.groupBy("doc").agg(F.sum("count").alias("doc_length")) # number of words in each doc

  # Calculating TF on word-doc pairs
  tf = mapped.join(doclen, on="doc")
  tf = tf.withColumn("tf", F.col("count") / F.col("doc_length"))

  # Calculating IDF on words
  idf = mapped.select("word", "doc").distinct().groupBy("word") # rows of docs for each word
  idf = idf.agg(F.count("*").alias("df")) # number of docs for each word
  idf = idf.withColumn("idf", F.log(F.lit(n_docs) / F.col("df")))

  # Final calculation
  tfidf = tf.join(idf, on="word", how="inner").withColumn("tfidf", F.col("tf") * F.col("idf"))

  return tfidf.select("doc", "word", "tf", "idf", "tfidf").orderBy(F.col("tf"))


In [None]:
mapped = map(agnews)
reduced = reduce(mapped)
docs = range(0,5)
for doc in docs:
  reduced.filter(F.col("doc") == doc).show(truncate=False)

+---+---------+-------------------+------------------+-------------------+
|doc|word     |tf                 |idf               |tfidf              |
+---+---------+-------------------+------------------+-------------------+
|0  |cynics   |0.05555555555555555|10.147217737458726|0.563734318747707  |
|0  |green    |0.05555555555555555|5.17879429217178  |0.2877107940095433 |
|0  |ultra    |0.05555555555555555|7.425922309606496 |0.4125512394225831 |
|0  |claw     |0.05555555555555555|8.984066927653044 |0.499114829314058  |
|0  |back     |0.05555555555555555|3.405989409371903 |0.1892216338539946 |
|0  |st       |0.05555555555555555|4.652511556905299 |0.2584728642725166 |
|0  |sellers  |0.05555555555555555|8.043083583188519 |0.4468379768438066 |
|0  |dwindling|0.05555555555555555|8.230295125276665 |0.4572386180709258 |
|0  |band     |0.05555555555555555|6.558158618627001 |0.3643421454792778 |
|0  |bears    |0.05555555555555555|6.069680293553007 |0.3372044607529448 |
|0  |black    |0.05555555

In [None]:
spark.stop()

## 2. SVM Objective Function

In [None]:
spark = (SparkSession.builder
         .master("local[*]")
         .appName("SVM")
         .getOrCreate()
        )

data_svm = spark.read.csv('data_for_svm.csv', header=None)
w = spark.read.csv('w.csv', header=None)
bias = spark.read.csv('bias.csv', header=None)

w = list(w.first())
w = [float(i) for i in w]
bias = float(bias.collect()[0][0])

In [32]:
from pyspark.sql.types import FloatType

# get them all out of spark
def xy_terms(data: d.DataFrame, w, b):
  # Row-wise calculation of the inside of max() term
  def xy_term(*row):
    x = [float(x) for x in row[:-1]]
    y = float(row[-1])
    dot = sum(w[i] * x[i] for i in range(features - 1))
    return float(y * (dot + b))

  features = len(w)

  row_xy = F.udf(xy_term, FloatType())
  columns = [F.col(f"_c{i}") for i in range(features - 1)]
  xy = data.withColumn("xy", row_xy(*columns))

  data = data.select(*columns)

  # Gen AI: Correct code to extract into a list
  return xy.select("xy").rdd.flatMap(lambda x: x).collect()

In [33]:
def loss_SVM(w, b, data, lam=1):
  xy = xy_terms(data, w, b)
  return lam*sum(i**2 for i in w) + sum(max(0, 1-i) for i in xy)

In [34]:
pred = loss_SVM(w, bias, data_svm)

import pandas as pd

pd.DataFrame(pred, columns=["pred_y"])
pd.to_csv("predictions.csv")

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "<ipython-input-32-7fd8aa495a70>", line 9, in xy_term
  File "<ipython-input-32-7fd8aa495a70>", line 9, in <genexpr>
IndexError: list index out of range


In [35]:
spark.stop()