In [31]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/agnews_clean.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 33.2M  100 33.2M    0     0  50.3M      0 --:--:-- --:--:-- --:--:-- 50.3M


In [32]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .master("local[*]")
         .appName("AG news")
         .getOrCreate()
        )

agnews = spark.read.csv("agnews_clean.csv", inferSchema=True, header=True)

# turning the second column from a string to an array
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType
agnews = agnews.withColumn('filtered', F.from_json('filtered', ArrayType(StringType())))

In [33]:
# each row contains the document id and a list of filtered words
agnews.show(10, truncate=30)

+---+------------------------------+
|_c0|                      filtered|
+---+------------------------------+
|  0|[wall, st, bears, claw, bac...|
|  1|[carlyle, looks, toward, co...|
|  2|[oil, economy, cloud, stock...|
|  3|[iraq, halts, oil, exports,...|
|  4|[oil, prices, soar, time, r...|
|  5|[stocks, end, near, year, l...|
|  6|[money, funds, fell, latest...|
|  7|[fed, minutes, show, dissen...|
|  8|[safety, net, forbes, com, ...|
|  9|[wall, st, bears, claw, bac...|
+---+------------------------------+
only showing top 10 rows



In [34]:
def get_tf(agnews):
  from pyspark.sql.functions import explode, col, size, count

  exploded = agnews.select("_c0", explode("filtered").alias("term")) #making each of the filtered terms into its own rows
  doc_lengths = agnews.select("_c0", size("filtered").alias("doc_len")) #computing the length of each of the rows within the agnews dataset
  term_counts = exploded.groupBy("_c0", "term").agg(count("*").alias("term_count")) #computing the number of terms in each document (keyed by document id)

  tf = term_counts.join(doc_lengths, on="_c0")
  tf = tf.withColumn("tf", col("term_count") / col("doc_len")) #computing the tf term for each word in each document

  return tf #haha the fuck (sorry im delusional atm)


In [35]:
def get_idf(agnews):
  from pyspark.sql.functions import explode, col, countDistinct, log, lit

  distinct_terms = agnews.select("_c0", explode("filtered").alias("term")).dropDuplicates() #we only care about the unique terms in each, i.e. the unique words in each document
  df_counts = distinct_terms.groupBy("term").agg(countDistinct("_c0").alias("df")) #getting the number of times each word appears in different documents
  N = agnews.select("_c0").distinct().count() #number of documents

  idf = df_counts.withColumn("idf", log(lit(N) / col("df"))) #combining to get idf
  return idf

In [36]:
from pyspark.sql.functions import col
def get_tf_idf(agnews):
  tf = get_tf(agnews) #using functions made earlier
  idf = get_idf(agnews) #using functions made earlier
  tf_idf = tf.join(idf, on="term", how="inner")   # Join on term

  # Compute tf-idf

  tf_idf = tf_idf.withColumn("tf_idf", col("tf") * col("idf")) #create new column called tf_idf

  return tf_idf

In [37]:
tf_idf = get_tf_idf(agnews) #checking results
tf_idf.show(10)

+---------+-----+----------+-------+--------------------+----+------------------+-------------------+
|     term|  _c0|term_count|doc_len|                  tf|  df|               idf|             tf_idf|
+---------+-----+----------+-------+--------------------+----+------------------+-------------------+
|   online|42468|         1|     26|0.038461538461538464|2444|3.9552643296013406|0.15212555113851312|
|   online|45307|         1|     28| 0.03571428571428571|2444|3.9552643296013406|0.14125944034290502|
|   online|23364|         1|     13| 0.07692307692307693|2444|3.9552643296013406|0.30425110227702623|
|    still|36538|         1|     58|0.017241379310344827|2281|4.0242864276084385|0.06938424875186963|
|    still|26425|         1|     31| 0.03225806451612903|2281|4.0242864276084385|0.12981569121317543|
|    still| 1238|         1|     21|0.047619047619047616|2281|4.0242864276084385|0.19163268702897324|
|arguments|56741|         1|     19| 0.05263157894736842|  91| 7.245796143375976| 

In [38]:
from pyspark.sql.functions import struct, collect_list

#GETTING OUTPUT IN DESIRED WAY

tf_idf = get_tf_idf(agnews) #using function made earlier
tfidf_struct = tf_idf.select("_c0", struct("term", "tf_idf").alias("tfidf_term")) #creating tf-idf struct (it is mapping a term name to a number)
tfidf_grouped = tfidf_struct.groupBy("_c0").agg(collect_list("tfidf_term").alias("tfidf_vector")) #group the struct by document name
agnews_with_tfidf = agnews.join(tfidf_grouped, on="_c0", how="left") #merge with original document

agnews_with_tfidf.select("_c0", "filtered", "tfidf_vector").show(5, truncate=False)

+-----+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [39]:
def innerterm(x_i, y_i, w, b): #making the whole weird sum mumbo jumbo
    dot = float(np.dot(w, x_i)) + b #computes w^T x_i + b
    margin = y_i * dot #computes y_i(w^T x_i + b)
    return max(0.0, 1.0 - margin) #computes max(0, 1 - margin)


def reduce_sum(values):
    return sum(values) #using this so it can be used for mapping function later

def loss_SVM(w, b, X, y, lambda_=1):
    n = len(X) #number of terms
    inner_term = [innerterm(x_i, y_i, w, b) for x_i, y_i in zip(X, y)] #compute innter term for values of X and y
    avg_hinge = reduce_sum(inner_term) / n #getting the sum divided by n
    reg = lambda_ * np.sum(w ** 2) #getting the first term
    return reg + avg_hinge #return total sum

def predict_SVM(w, b, X):
    scores = X @ w + b  # w^T x_i + b
    y_pred = np.where(scores >= 0, 1, -1) #using a where function to result in correct mapping
    return y_pred



In [40]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/w.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/bias.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/data_for_svm.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1391  100  1391    0     0   7803      0 --:--:-- --:--:-- --:--:--  7814
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    22  100    22    0     0    160      0 --:--:-- --:--:-- --:--:--   160
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 61.9M  100 61.9M    0     0  77.0M      0 --:--:-- --:--:-- --:--:-- 77.0M


In [41]:
# This is an example to read the files. But you should consider using pyspark directly.
# *Make sure you are not assuming a header*!!
import pandas as pd
data_svm = pd.read_csv('data_for_svm.csv', header=None)
w = pd.read_csv('w.csv', header=None)
bias = pd.read_csv('bias.csv', header=None)

In [42]:
bias

Unnamed: 0,0
0,0.00015


In [43]:
data_svm

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,55,56,57,58,59,60,61,62,63,64
0,-1,-1,1,1,-1,1,1,1,1,-1,...,-1,-1,-1,-1,-1,-1,-1,-1,1,-1
1,1,1,-1,1,-1,-1,1,1,-1,1,...,-1,-1,-1,-1,-1,-1,1,1,-1,1
2,1,1,1,-1,1,1,-1,1,-1,1,...,-1,-1,-1,-1,-1,-1,1,1,1,1
3,1,-1,1,-1,-1,-1,-1,-1,-1,1,...,1,1,1,1,1,1,1,1,1,1
4,1,-1,1,-1,1,-1,-1,1,1,1,...,1,1,1,1,1,1,1,1,1,-1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
399995,-1,-1,1,-1,-1,-1,1,1,1,-1,...,1,1,1,1,1,-1,1,1,-1,1
399996,1,-1,1,1,-1,-1,1,1,1,1,...,1,1,1,1,1,-1,1,-1,-1,1
399997,1,-1,1,1,-1,1,1,-1,1,1,...,-1,-1,-1,-1,-1,1,1,-1,-1,1
399998,1,-1,1,-1,1,-1,1,1,1,1,...,1,1,1,1,1,1,-1,1,1,-1


In [44]:
import numpy as np


# Convert w and b
w_vec = w.values.flatten() #turns into a flat vector
b_val = float(bias.values[0][0])  # single float, it is currently stored in a 1 by 1 matrix

# Split data
X = data_svm.iloc[:, :-1].values  # shape: (n, 64), select all but last column
y = data_svm.iloc[:, -1].values   # shape: (n,), select the last column (response)

# X, w_vec, and b_val already prepared
y_pred = predict_SVM(w_vec, b_val, X) #pass in to function

# Preview predictions
print(y_pred[:10])


[-1 -1 -1  1 -1  1 -1 -1  1 -1]


In [45]:
loss = loss_SVM(w_vec, b_val, X, y)
print("SVM loss:", loss)

SVM loss: 1.0029403834857364


In [46]:
accuracy = np.mean(y_pred == y)
print("Accuracy:", accuracy)


Accuracy: 0.5020625
