# 1. tf-idf definition


In [1]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/agnews_clean.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 33.2M  100 33.2M    0     0  53.4M      0 --:--:-- --:--:-- --:--:-- 53.3M


In [2]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .master("local[*]")
         .appName("AG news")
         .getOrCreate()
        )

agnews = spark.read.csv("/content/agnews_clean.csv", inferSchema=True, header=True)

# turning the second column from a string to an array
import pyspark.sql.functions as F
from pyspark.sql.types import ArrayType, StringType
agnews = agnews.withColumn('filtered', F.from_json('filtered', ArrayType(StringType())))
agnews.show()

+---+--------------------+
|_c0|            filtered|
+---+--------------------+
|  0|[wall, st, bears,...|
|  1|[carlyle, looks, ...|
|  2|[oil, economy, cl...|
|  3|[iraq, halts, oil...|
|  4|[oil, prices, soa...|
|  5|[stocks, end, nea...|
|  6|[money, funds, fe...|
|  7|[fed, minutes, sh...|
|  8|[safety, net, for...|
|  9|[wall, st, bears,...|
| 10|[oil, economy, cl...|
| 11|[need, opec, pump...|
| 12|[non, opec, natio...|
| 13|[google, ipo, auc...|
| 14|[dollar, falls, b...|
| 15|[rescuing, old, s...|
| 16|[kids, rule, back...|
| 17|[market, head, to...|
| 18|[us, trade, defic...|
| 19|[shell, target, t...|
+---+--------------------+
only showing top 20 rows



In [3]:
# Converting to RDD
rdd = agnews.select("_c0", "filtered").rdd
rdd.collect()

[Row(_c0=0, filtered=['wall', 'st', 'bears', 'claw', 'back', 'black', 'reuters', 'reuters', 'short', 'sellers', 'wall', 'street', 'dwindling', 'band', 'ultra', 'cynics', 'seeing', 'green']),
 Row(_c0=1, filtered=['carlyle', 'looks', 'toward', 'commercial', 'aerospace', 'reuters', 'reuters', 'private', 'investment', 'firm', 'carlyle', 'group', 'reputation', 'making', 'well', 'timed', 'occasionally', 'controversial', 'plays', 'defense', 'industry', 'quietly', 'placed', 'bets', 'another', 'part', 'market']),
 Row(_c0=2, filtered=['oil', 'economy', 'cloud', 'stocks', 'outlook', 'reuters', 'reuters', 'soaring', 'crude', 'prices', 'plus', 'worries', 'economy', 'outlook', 'earnings', 'expected', 'hang', 'stock', 'market', 'next', 'week', 'depth', 'summer', 'doldrums']),
 Row(_c0=3, filtered=['iraq', 'halts', 'oil', 'exports', 'main', 'southern', 'pipeline', 'reuters', 'reuters', 'authorities', 'halted', 'oil', 'export', 'flows', 'main', 'pipeline', 'southern', 'iraq', 'intelligence', 'showed'

In [49]:
# 1. Designing the MapReduce functions for calculating the tf-idf measure
# Each row is a single document, d
# All rows is the collection of documents D
# t is a word

# Map Phase
# Need to compute everything requried to calculate tf-idf
# Need to find: # occurrences of t in d, # terms in d, # docs in D, and # docs containing t
def map_phase(doc):
  for doc in doc:
      doc_id = doc["_c0"]
      words = doc["filtered"]
      # Creating an empty set for words that have already been counted
      counted = set()

      # Counting for # docs in D
      yield ("total_docs", 1)

      for word in words:
          # FOr # occurrences of t in d
          yield ((doc_id, word), 1)

          # Keeping track of every word in d to sum later for the # terms in d
          yield (("doc_length", doc_id), 1)

          # Counting how many documents each word appears in (# docs containing t)
          # Only looks through the unique words (doesn't count the same word twice)
          if word not in counted:
              yield (("df", word), doc_id)
              counted.add(word)

In [50]:
# Shuffle/sort phase: grouping by key for calculations in the reduce phase

# Grouping the mapped data by key
def shuffle_and_sort(mapped_data):
  sorted_data = {}
  for key, value in mapped_data:
      if key not in sorted_data:
          sorted_data[key] = []
      sorted_data[key].append(value)
  return sorted_data

In [51]:
import math

# Reduce Phase
# For aggregation and calculating final values
def reduce_phase(shuffled_data):
  # Number of occurrences of t in d
  tf_num = {}
  # Number of terms in d
  tf_den = {}
  # Total number of docs in D
  idf_num = 0
  # Number of docs containing t
  idf_den = {}

  for key, values in shuffled_data.items():
      if key == "total_docs":
          idf_num = sum(values)

      elif key[0] == "df":
          word = key[1]
          # Only counting unique docs
          if word not in idf_den:
              idf_den[word] = set()
          # Adding all doc IDs
          idf_den[word].update(values)

      elif key[0] == "doc_length":
          doc_id = key[1]
          tf_den[doc_id] = sum(values)

      # Counting eah word in the document
      else:
          tf_num[key] = sum(values)

  # Calculating tf-idf for every (doc_id, word)
  tfidf = {}
  for (doc_id, word), count in tf_num.items():
      tf = count / tf_den[doc_id]
      idf = math.log(idf_num / (len(idf_den[word])))
      tfidf[(doc_id, word)] = tf * idf

  tfidf_by_doc = {}
  for (doc_id, word), score in tfidf.items():
      if doc_id not in tfidf_by_doc:
          tfidf_by_doc[doc_id] = []
      tfidf_by_doc[doc_id].append((word, score))

  return tfidf_by_doc

In [91]:
rows = agnews.select("_c0", "filtered").collect()

mapped = list(map_phase(rows))
shuffled = shuffle_and_sort(mapped)
result = reduce_phase(shuffled)

from pyspark.sql import Row

tfidf_rows = []
for doc_id, word_scores in result.items():
    for word, score in word_scores:
        tfidf_rows.append(Row(doc_id=doc_id, word=word, tfidf_score=score))

# Creating Spark DataFrame
tfidf_df = spark.createDataFrame(tfidf_rows)
tfidf_df.show(5)

+------+-----+------------------+
|doc_id| word|       tfidf_score|
+------+-----+------------------+
|     0| wall|0.5115985326511431|
|     0|   st|0.2584728642725166|
|     0|bears|0.3372044607529448|
|     0| claw| 0.499114829314058|
|     0| back|0.1892216338539946|
+------+-----+------------------+
only showing top 5 rows



In [92]:
# Printing the full tf-idf measure for the first 5 documents.
for doc_id in sorted(result.keys())[:5]:
  print(f"\nDocument {doc_id + 1}:")
  for word, score in result[doc_id]:
      print(f"  {word}: {score}")


Document 1:
  wall: 0.5115985326511431
  st: 0.2584728642725166
  bears: 0.3372044607529448
  claw: 0.499114829314058
  back: 0.1892216338539946
  black: 0.2953171727366614
  reuters: 0.24754017186645658
  short: 0.2773120373951269
  sellers: 0.4468379768438066
  street: 0.24678348986493034
  dwindling: 0.4572386180709258
  band: 0.3643421454792778
  ultra: 0.4125512394225831
  cynics: 0.563734318747707
  seeing: 0.37743394553516213
  green: 0.2877107940095433

Document 2:
  carlyle: 0.7168306746824437
  looks: 0.1973537176743789
  toward: 0.1898997183872362
  commercial: 0.2057832028092643
  aerospace: 0.2581171817448437
  reuters: 0.1650267812443044
  private: 0.1929050573011279
  investment: 0.1890771769001148
  firm: 0.15969712503706046
  group: 0.12468100563149095
  reputation: 0.2578098186776328
  making: 0.1698717076460444
  well: 0.17053284421704767
  timed: 0.324478643568105
  occasionally: 0.33274321954270536
  controversial: 0.20949395177306526
  plays: 0.22418048797172685


# 2. SVM Objective Function

In [55]:
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/w.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/bias.csv -O
!curl https://raw.githubusercontent.com/mosesyhc/de300-2025sp-class/refs/heads/main/data_for_svm.csv -O

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1391  100  1391    0     0   7468      0 --:--:-- --:--:-- --:--:--  7478
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100    22  100    22    0     0    122      0 --:--:-- --:--:-- --:--:--   122
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 61.9M  100 61.9M    0     0  65.0M      0 --:--:-- --:--:-- --:--:-- 64.9M


In [60]:
import numpy as np

spark = SparkSession.builder.appName("SVM Loss").getOrCreate()

# Loading the data
data_df = spark.read.csv("data_for_svm.csv", header=False, inferSchema=True)
w_df = spark.read.csv("w.csv", header=False, inferSchema=True)
bias_df = spark.read.csv("bias.csv", header=False, inferSchema=True)

# Extracting values using RDDs
# First 64 colums of data_for_svm.csv contain X and the last column contains y
X = data_df.rdd.map(lambda row: np.array(row[:64], dtype=float)).collect()
y = data_df.rdd.map(lambda row: int(row[64])).collect()

w = np.array(w_df.collect()[0], dtype=float)
b = float(bias_df.rdd.map(lambda row: row[0]).collect()[0])


In [87]:
# Map Phase
# Outputs intermediate key pairs
def map_phase_svm(X, y, w, b):
  # Looping through X
  for i in range(len(X)):
      xi = X[i]
      yi = y[i]
      hinge_loss = max(0, 1 - (yi * (np.dot(w, xi) + b)))
      yield ("hinge_loss", hinge_loss)

In [88]:
# Grouping the mapped data by key
def shuffle_and_sort_svm(mapped_data):
  sorted_data = {}
  for key, value in mapped_data:
      if key not in sorted_data:
          sorted_data[key] = []
      sorted_data[key].append(value)
  return sorted_data

In [90]:
# Reduce Phase
# Calculates the loss function
# SVM Objective function includes the regularization term (with lambda + the average hinge loss (includes the margin)
# Need to calculate the regularization and average hinge loss from the margin in map phase
def reduce_phase_svm(shuffled_data, w, l):
  # Calculating regularization term
  regularization = l * np.sum(np.square(w))
  # Calculating the average hinge loss
  hinge_sum = sum(shuffled_data["hinge_loss"])
  hinge_avg = hinge_sum / len(shuffled_data["hinge_loss"])

  return regularization + hinge_avg

def loss_SVM(w, b, X, y, l=0.01):
    mapped = map_phase_svm(X, y, w, b)
    shuffled = shuffle_and_sort_svm(mapped)
    return reduce_phase_svm(shuffled, w, l)

svm_loss = loss_SVM(w, b, X, y, l=0.01)
print("SVM Loss:", svm_loss)

SVM Loss: 0.9997559286225003


Generative AI was used in this assignment in the following ways:
1. I was getting some type errors for the first problem, so I copied and pasted the error message.  Based on the explanation, I made changes to the reduce_phase function.
2. I couldn't figure out how to import the data using Spark for the second problem, so I asked "How should I load data using Spark" and followed the instructions.
3. I asked for help with understanding the SVM objective function conceptually and what all the variables were for.
4. I asked "How should I save the tf-idf measures as a new column?" and followed the instructions using Row and spark.createDataFrame.
5. Other minor coding errors were pasted for debugging, but specific prompts weren't recorded.