## 1. tf-idf

What do we need to calculate?

For each $d$, the counts of $t$,
refer to word count example
For each $d$, the counts of words,
For each $t$, the counts of $d$ that contains $t$.
what should be returned if we only want to know if the document contains $t$ of not.

In [1]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/agnews_clean.csv -O


  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 33.2M  100 33.2M    0     0  28.5M      0  0:00:01  0:00:01 --:--:-- 28.6M


In [2]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .master("local[*]")
         .appName("AG news")
         .getOrCreate()
        )

agnews = spark.read.csv("agnews_clean.csv", inferSchema=True, header=True)

# turning the second column from a string to an array
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType
agnews = agnews.withColumn('filtered', F.from_json('filtered', ArrayType(StringType())))

                                                                                

In [3]:
from pyspark.sql import Row
from pyspark.sql.functions import size, col, lit
import numpy as np


# count term occurances for each row
def count_term_occurrences(row, term):
    count = sum(1 for word in row.filtered if term in word) # iterate through the list of words and count the number of times the term appears
    
    return Row(**row.asDict(), # unpack the dictionary elements into the row
               specific_word_count=count) # return a new row with the count added

def tf_idf(t, D = agnews):
    # map function
    agnews_with_count = D.rdd.map(lambda row: count_term_occurrences(row, t)).toDF() # get number of term occurances

    # reduce functions to add word_count and term fraction
    agnews_with_count = agnews_with_count.withColumn("word_count", size(col("filtered")))  # get total word count per row
    agnews_with_count = agnews_with_count.withColumn("tf", col("specific_word_count") / col("word_count")) # get term fraction

    num_rows_with_word = agnews_with_count.filter(col("tf") > 0).count() # num rows with term
    num_rows = agnews_with_count.count() #  num rows total
    idf = np.log(num_rows/num_rows_with_word) # get log of the ratio of rows with term to total rows
    tf_idf = agnews_with_count.withColumn("tf_idf", col("tf") * lit(idf)) # make new column multiplying term fraction by idf
    
    return tf_idf

In [4]:
tf_idf("trump", agnews).show(5, truncate=False)

25/05/24 12:18:20 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , filtered
 Schema: _c0, filtered
Expected: _c0 but found: 
CSV file: file:///Users/kyan/Desktop/textbooks/DSE_300/HW3/agnews_clean.csv
25/05/24 12:18:21 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , filtered
 Schema: _c0, filtered
Expected: _c0 but found: 
CSV file: file:///Users/kyan/Desktop/textbooks/DSE_300/HW3/agnews_clean.csv
25/05/24 12:18:27 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , filtered
 Schema: _c0, filtered
Expected: _c0 but found: 
CSV file: file:///Users/kyan/Desktop/textbooks/DSE_300/HW3/agnews_clean.csv
25/05/24 12:18:31 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: , filtered
 Schema: _c0, filtered
Expected: _c0 but found: 
CSV file: file:///Users/kyan/Desktop/textbooks/DSE_300/HW3/agnews_clean.csv


+---+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+----------+---+------+
|_c0|filtered                                                                                                                                                                                                                                          |specific_word_count|word_count|tf |tf_idf|
+---+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+----------+---+------+
|0  |[wall, st, bears, claw, back, black, reuters, reuters, short, sellers, wall, street, dwindling, band, ultra, cynics, seein

## 2. SVM Classification
The **soft-margin support vector machine classification** model minimizes the following objective function (loss).

Given data ${\{\mathbf{x}_i, y_i\}_{i=1}^n}$ with ${\mathbf{x}_i \in \mathbb{R}^d}$, ${y_i \in \{-1, 1\}}$, weight vector ${\mathbf{w}}$, and bias ${b}$:

$$
L(\mathbf{w}, b) = \lambda \|\mathbf{w}\|^2 + \frac{1}{n}\sum_{i=1}^{n} \max\left\{0, 1 - y_i \left(\mathbf{w}^\mathsf{T} \mathbf{x}_i + b\right)\right\},
$$

where ${\|\mathbf{w}\|}$ is the ${L_2}$ norm of ${\mathbf{w}}$.

The SVM classifier predicts:

$$
\widehat{y}_i = \mathrm{sgn}\left(\mathbf{w}^\mathsf{T} \mathbf{x}_i + b\right),
$$

where ${\mathrm{sgn}(z)}$ returns the sign of ${z}$, i.e., ${+1}$ if ${z > 0}$, and ${-1}$ if ${z < 0}$.


In [None]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/w.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/bias.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/data_for_svm.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1391  100  1391    0     0   5804      0 --:--:-- --:--:-- --:--:--     0--:-- --:--:-- --:--:--  5795
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    22  100    22    0     0     69      0 --:--:-- --:--:-- --:--:--     0 69
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0 61.9M    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0

25/05/15 15:33:16 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:123)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:688)
	at org.apache.spark.storage.BlockManagerMasterE

100 61.9M  100 61.9M    0     0  9547k      0  0:00:06  0:00:06 --:--:-- 12.2M0:09  0:00:04  0:00:05 6626k


25/05/15 15:33:26 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:123)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.isExecutorAlive$lzycompute$1(BlockManagerMasterEndpoint.scala:688)
	at org.apache.spark.storage.BlockManagerMasterE

In [5]:
# This is an example to read the files. But you should consider using pyspark directly. 
# *Make sure you are not assuming a header*!!
import pandas as pd
data_svm = pd.read_csv('data_for_svm.csv', header=None)
w = pd.read_csv('w.csv', header=None)
bias = pd.read_csv('bias.csv', header=None)

In [6]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("ReadSVMData") \
    .master("local[*]") \
    .getOrCreate()

# Read CSV files with no header and infer schema
data_svm = spark.read.csv('data_for_svm.csv', header=False, inferSchema=True)
w = spark.read.csv('w.csv', header=False, inferSchema=True)
bias = spark.read.csv('bias.csv', header=False, inferSchema=True)

25/05/24 12:18:33 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

In [None]:
from pyspark.sql import functions as F

def compute_dot_product_expr(w_values):
    """Returns the Spark expression for the dot product w^T x."""
    return sum(F.col(f"_c{i}") * F.lit(w_values[f"_c{i}"]) for i in range(64))

def add_margin_column(data_df, dot_product_expr, b_value):
    """Adds margin column to data: y * (w^T x + b)."""
    return data_df.withColumn("y", F.col("_c64")).withColumn(
        "margin", F.col("y") * (dot_product_expr + F.lit(b_value))
    )

def compute_hinge_loss(data_df):
    """Computes the hinge loss: max(0, 1 - margin)."""
    return data_df.withColumn(
        "hinge_loss", F.greatest(F.lit(0), 1 - F.col("margin"))
    )

def compute_loss(w_df, b_df, data_df, lambda_reg=1.0):
    """
    Computes the soft-margin SVM loss.
    - w_df: DataFrame with 1 row, 64 columns of weights
    - b_df: DataFrame with 1 row, 1 column of bias
    - data_df: DataFrame with 64 feature columns + 1 label column (_c64)
    - lambda_reg: Regularization parameter (default = 1.0)

    Returns loss (float): Soft-margin SVM loss
    """
    
    # get parameters from spark df
    w_values = w_df.first()
    b_value = b_df.first()['_c0']

    # reduce function to get dot product
    dot_product_expr = compute_dot_product_expr(w_values)

    # map functions to get margin and hinge loss
    df_with_margin = add_margin_column(data_df, dot_product_expr, b_value)
    df_with_hinge_loss = compute_hinge_loss(df_with_margin)

    # reduce function to get sum of hingeloss
    hinge_loss_sum = df_with_hinge_loss.agg(F.sum("hinge_loss")).first()["sum(hinge_loss)"]

    # norm squared of weights w
    w_squared = sum((w_values[f"_c{i}"])**2 for i in range(64))
   
    n = data_df.count()  # n of samples

    #  final loss
    return lambda_reg * w_squared + hinge_loss_sum / n


In [12]:
compute_loss(w, bias, data_svm)


                                                                                

1.002940383485753

In [None]:
from pyspark.sql import functions as F

def predict_SVM(w_df, b_df, data_df):
    """
    Predict labels using the linear SVM decision rule.

    Parameters:
    w_df     : Spark DataFrame with 1 row and 64 columns (weights)
    b_df     : Spark DataFrame with 1 value (bias)
    data_df  : Spark DataFrame with 64 feature columns (_c0 to _c63)

    Returns:
    predictions_df : Spark DataFrame with original features + raw score + predicted label
    """
    # Extract weight vector and bias
    w_values = w_df.first()
    b_value = b_df.first()['_c0']

    # dot product w^T x
    dot_product_expr = compute_dot_product_expr(w_values)

    # decision function w^T x + b as "score" column
    df_with_score = data_df.withColumn("score", dot_product_expr + F.lit(b_value))

    # predicted label: sign(score) (either 1 or -1)
    predictions_df = df_with_score.withColumn(
        "prediction", F.when(F.col("score") >= 0, 1).otherwise(-1)
    )

    return predictions_df
predictions_df = predict_SVM(w, bias, data_svm)
predictions_df.show(5)

+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+---+--------------------+----------+
|_c0|_c1|_c2|_c3|_c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|_c24|_c25|_c26|_c27|_c28|_c29|_c30|_c31|_c32|_c33|_c34|_c35|_c36|_c37|_c38|_c39|_c40|_c41|_c42|_c43|_c44|_c45|_c46|_c47|_c48|_c49|_c50|_c51|_c52|_c53|_c54|_c55|_c56|_c57|_c58|_c59|_c60|_c61|_c62|_c63|_c64|  y|               score|prediction|
+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+---

In [None]:
## Here I evaluate prediction accuracy. It doesn't do very well at all

correct_predictions = predictions_df.withColumn(
    "is_correct", F.when(F.col("prediction") == F.col("y"), 1).otherwise(0)
)

# Aggregate total correct and total examples
accuracy_stats = correct_predictions.agg(
    F.sum("is_correct").alias("correct"),
    F.count("is_correct").alias("total")
).first()

# Calculate accuracy
correct = accuracy_stats["correct"]
total = accuracy_stats["total"]
accuracy = correct / total

print(f"Correct Predictions: {correct}")
print(f"Total Predictions: {total}")
print(f"Accuracy: {accuracy:.4f}")


Correct Predictions: 200825
Total Predictions: 400000
Accuracy: 0.5021


                                                                                

## Gen AI Disclosure Statement

Purpose of Use:
I used a limited amount of GAI to assist with understanding SVM and reading the scuffed markdown from the homework html file.

Tool Used: ChatGPT by OpenAI

Prompts Used: "Explain SVM" "SVM loss vs. prediction" "Fix and render this markdown LaTeX"