## CS631 Data Intensive Distributed Computing
### Fall 2022 - Final Project
---

**Group member names and UW student ID numbers**
* _Yujia Zheng (20789867)_
* _Xiyao Wang (20704844)_
* _KaYat Liu (20730764)_

Install Spark since it is not installed in Colab.

In [1]:
!apt-get update -qq > /dev/null
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.2.3/spark-3.2.3-bin-hadoop2.7.tgz
!tar xf spark-3.2.3-bin-hadoop2.7.tgz
!pip install -q findspark

After installing Spark and Java in Colab, set the environment path which allows to run Pyspark in the Colab environment, and create SparkContext. 

Word embedding: TF-IDF and Word count

Classification: KNN and Naive Bayes 

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.3-bin-hadoop2.7"

import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
sc = SparkContext(appName="YourTest", master="local[*]")

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("ReadCSVFile") \
    .getOrCreate()

In [3]:
!wget -q https://student.cs.uwaterloo.ca/~cs451/content/cs431/simple_tokenize.py
!wget -q https://git.uwaterloo.ca/y326zhen/cs631_project/-/raw/main/SMS_training.txt
!wget -q https://git.uwaterloo.ca/y326zhen/cs631_project/-/raw/main/SMS_test.txt

!wget -q https://git.uwaterloo.ca/y326zhen/cs631_project/-/raw/main/dataset/SMS_test_str.txt
!wget -q https://git.uwaterloo.ca/y326zhen/cs631_project/-/raw/main/dataset/SMS_training_str.txt
!ls

sample_data	       SMS_test_str.txt      SMS_training.txt
simple_tokenize.py     SMS_test.txt	     spark-3.2.3-bin-hadoop2.7
SMSSpamCollection.txt  SMS_training_str.txt  spark-3.2.3-bin-hadoop2.7.tgz


In [4]:
# Load the training and test datasets.
SMS_spam_training = sc.textFile('SMS_training.txt')
SMS_spam_test = sc.textFile('SMS_test.txt')
SMS_training_str = sc.textFile('SMS_training_str.txt')
SMS_test_str = sc.textFile('SMS_test_str.txt')

In [None]:
SMS_training_small = SMS_training_str.take(50)
SMS_training_small = sc.parallelize(SMS_training_small)
SMS_test_small = SMS_test_str.take(10)
SMS_test_small = sc.parallelize(SMS_test_small)

## K-NN

### Bag of Words Embedding

In [6]:
# K-NN Bag of Words helper functions
def get_token_features(train_data):

    token_features = train_data.map(lambda line: line.split('\t'))\
                            .flatMap(lambda line: simple_tokenize(line[1]))\
                            .map(lambda token: (token, 1))\
                            .reduceByKey(lambda x,y: x+y)\
                            .sortBy(lambda x: x[1], ascending=False)\
                            .keys()
    return token_features

def get_token_count_emb(token_features, data):

    #token_features=get_token_features(train_data)
    id = data.zipWithIndex().map(lambda line: line[1])
    id_tf_cross = id.cartesian(token_features)
    id_tf_cross = id_tf_cross.map(lambda x: (x, 0))

    pairs = data.map(lambda line: line.split('\t'))\
                .map(lambda line: (line[0], simple_tokenize(line[1])))\
                .filter(lambda x: x[1] != [])\
                .zipWithIndex()\
                .flatMap(lambda line: [((line[1], i), 1) for i in line[0][1]])\
                .reduceByKey(lambda x, y: x + y)
    
    res = id_tf_cross.leftOuterJoin(pairs)\
            .map(lambda x: (x[0], x[1][1]) if x[1][1] is not None else (x[0], 0))\
            .sortByKey()\
            .map(lambda x: (x[0][0], x[1]))\
            .groupByKey()\
            .mapValues(list)

    return res

def euc_dist(train_emb, test_emb):
    total = 0
    for i in range(len(train_emb)):
        total += (train_emb[i]-test_emb[i])**2
    return total**0.5

def top_k_nearest_neighbors(test_neighbors, K):
    test_idx = test_neighbors[0]
    neighbors = test_neighbors[1]
    neighbors = sorted(neighbors, key=lambda item: item[1])
    top_k_idx = [nei[0] for nei in neighbors][:K]
    return test_idx, top_k_idx

### TF-IDF Embedding

In [5]:
from simple_tokenize import simple_tokenize
import math

# Helper function to compute the tfidf weights for words in sentences
def tfidf(dataset_name):
  # tokenize the sentences, filter out empty lists after tokenize and add index
  lines = dataset_name
  lines_with_index = lines.map(lambda line: line.split('\t'))\
                .map(lambda line: (line[0], simple_tokenize(line[1])))\
                .filter(lambda x: x[1] != [])\
                .zipWithIndex()          
  line_counts = lines_with_index.count()

  # get term frequency in a sentence for each word
  pair = lines_with_index.flatMap(lambda line: [((line[1], i), 1) for i in line[0][1]])\
                .reduceByKey(lambda x, y: x + y)         
  tf = pair.map(lambda x: (x[0][1], (x[0][0], x[1])))

  # get inverse document frequency for each word in the corpus
  idf_prep = pair.map(lambda x: (x[0][1],(x[0][0],x[1],1)))\
            .map(lambda x:(x[0],x[1][2]))\
            .reduceByKey(lambda x,y:x+y)
  idf = idf_prep.map(lambda x: (x[0],math.log(line_counts/x[1])))

  # extract tf and idf for each word and sentence index pair and compute tfidf
  prep = tf.join(idf)
  output = prep.map(lambda x: ((x[0], x[1][0][0]),x[1][0][1]*x[1][1])).sortByKey()

  # output format: ((word, index), tfidf)                
  return output


# for tfidf embedding
def get_token_features_tfidf(tf_train_data):

  token_features = tf_train_data.map(lambda x: (x[0][0], x[1]))\
                    .reduceByKey(lambda x, y: x + y)\
                    .sortBy(lambda x: x[1], ascending=False)\
                    .keys()                     
  # (word, freq), sorted by tfidf desc => rdd of words                       
  return token_features


def get_tfidf_emb(token_features, data, tfidf):
    #token_features=get_token_features(train_data)
    # index rdd => cartesian words (index, word), map with 0 get ((index, word), 0)
    id = data.zipWithIndex().map(lambda line: line[1])
    id_tf_cross = id.cartesian(token_features)
    id_tf_cross = id_tf_cross.map(lambda x: (x, 0))

    pairs = tfidf            
    
    # ((index, word), (0, None or count)) => (index, count) => (index, list of count)
    res = id_tf_cross.leftOuterJoin(pairs)\
            .map(lambda x: (x[0], x[1][1]) if x[1][1] is not None else (x[0], 0))\
            .sortByKey()\
            .map(lambda x: (x[0][0], x[1]))\
            .groupByKey()\
            .mapValues(list)

    return res

### Models

In [7]:
def knn(train_data, test_data, K, emb):

    num_test = test_data.count()
    training_labels = train_data.map(lambda line: line.split('\t'))\
                                          .map(lambda line: line[0])\
                                          .zipWithIndex().map(lambda x: (x[1], x[0]))
    test_labels = test_data.map(lambda line: line.split('\t'))\
                                          .map(lambda line: line[0])\
                                          .zipWithIndex().map(lambda x: (x[1], x[0]))

    if emb == 'BoW':                                      
      # all words sorted by freq                                      
        token_features = get_token_features(train_data)
        train_emb = get_token_count_emb(token_features, data=train_data)
        test_emb = get_token_count_emb(token_features, data=test_data)
    elif emb == 'TF-IDF':
        # all words sorted by tfidf 
        tf_idf_train =  tfidf(train_data)  
        tf_idf_test =  tfidf(test_data)                                
        token_features=get_token_features_tfidf(tf_idf_train)
        train_emb = get_tfidf_emb(token_features, data=train_data, tfidf = tf_idf_train)
        test_emb = get_tfidf_emb(token_features, data=test_data, tfidf = tf_idf_test)


    train_test_cross = train_emb.cartesian(test_emb) # train index, emb, test index, emb
    train_test_cross = train_test_cross.map(lambda x: (x[1][0], (x[0][0], euc_dist(x[0][1], x[1][1])))) # test index, (train index, euc dist)
    train_test_cross = train_test_cross.groupByKey()\
                                      .mapValues(list)\
                                      .map(lambda x: top_k_nearest_neighbors(x, K))\
                                      .flatMap(lambda x: [(idx, x[0]) for idx in x[1]]) # selected train index, test index
    # 1st map: => test index, train neighbors labels
    # 2nd last map: test index, # of spams
    preds = train_test_cross.leftOuterJoin(training_labels).map(lambda x: x[1]).groupByKey()\
                                      .mapValues(list)\
                                      .map(lambda x: (x[0], x[1].count('spam')))\
                                      .map(lambda x: (x[0], 'spam' if x[1]>=K/2 else 'ham'))
    output = preds.join(test_labels)

    acc = sc.accumulator(0)
    error = sc.accumulator(0)
    def calcutateError(x):
        index = x[0]
        pred_label = x[1][0]
        true_label = x[1][1]
        if pred_label == true_label:
            acc.add(1)
        else:
            error.add(1)
    output.foreach(calcutateError)

    print(f'Test accuracy: {acc.value*100/num_test:.2f}%', 
          f'\nTotal number of prediction errors: {error.value}')

In [9]:
%%time
knn(train_data=SMS_training_str, test_data=SMS_test_str, K=10, emb = 'BoW')

Test accuracy: 77.26% 
Total number of prediction errors: 68
CPU times: user 3.37 s, sys: 402 ms, total: 3.77 s
Wall time: 9min 47s


In [14]:
%%time
knn(train_data=SMS_training_str, test_data=SMS_test_str, K=10, emb = 'TF-IDF')

Test accuracy: 50.84% 
Total number of prediction errors: 147
CPU times: user 3.2 s, sys: 445 ms, total: 3.64 s
Wall time: 9min 5s


In [None]:
# Incurred incredibly long runtime
%%time
knn(train_data=SMS_spam_training, test_data=SMS_spam_test, K=10, emb = 'BoW')

In [None]:
# Incurred incredibly long runtime
%%time
knn(train_data=SMS_spam_training, test_data=SMS_spam_test, K=10, emb = 'TF-IDF')

## Naive Bayes

### Bag of Words Embedding

In [None]:
# Naive Bayes
from simple_tokenize import simple_tokenize
import math

# Helper function to compute the bag of words embedding
def bag_of_words(dataset_name):
  # tokenize the sentences, filter out empty lists after tokenize and add index
  lines_with_index = dataset_name.map(lambda line: line.split('\t'))\
                .map(lambda line: (line[0], simple_tokenize(line[1])))\
                .filter(lambda x: x[1] != [])\
                .zipWithIndex()          
  line_counts = lines_with_index.count()

  # get word count in a sentence for each word
  pair = lines_with_index.flatMap(lambda line: [((line[1], i), 1) for i in line[0][1]])\
                .reduceByKey(lambda x, y: x + y)         
  word_count = pair.map(lambda x: ((x[0][1], x[0][0]), x[1]))
  
  return word_count

### TF-IDF Embedding

In [None]:
# Helper function to compute the tfidf weights for words in sentences
def tfidf(dataset_name):
  # tokenize the sentences, filter out empty lists after tokenize and add index
  lines = dataset_name
  lines_with_index = lines.map(lambda line: line.split('\t'))\
                .map(lambda line: (line[0], simple_tokenize(line[1])))\
                .filter(lambda x: x[1] != [])\
                .zipWithIndex()          
  line_counts = lines_with_index.count()

  # get term frequency in a sentence for each word
  pair = lines_with_index.flatMap(lambda line: [((line[1], i), 1) for i in line[0][1]])\
                .reduceByKey(lambda x, y: x + y)         
  tf = pair.map(lambda x: (x[0][1], (x[0][0], x[1])))

  # get inverse document frequency for each word in the corpus
  idf_prep = pair.map(lambda x: (x[0][1],(x[0][0],x[1],1)))\
            .map(lambda x:(x[0],x[1][2]))\
            .reduceByKey(lambda x,y:x+y)
  idf = idf_prep.map(lambda x: (x[0],math.log(line_counts/x[1])))

  # extract tf and idf for each word and sentence index pair and compute tfidf
  prep = tf.join(idf)
  output = prep.map(lambda x: ((x[0], x[1][0][0]),x[1][0][1]*x[1][1])).sortByKey()

  # output format: ((word, index), tfidf)                
  return output


### Models

In [None]:
# this function builds the Naive Bayes classifier with tfidf scores
def train(train_dataset, emb_func):
  # tokenize the sentences, filter out empty lists after tokenize and add index
  lines = train_dataset
  lines_with_index = lines.map(lambda line: line.split('\t'))\
                .map(lambda line: (line[0], simple_tokenize(line[1])))\
                .filter(lambda x: x[1] != [])\
                .zipWithIndex()         
  line_counts = lines_with_index.count()

  # get prior for each class
  label = lines_with_index.map(lambda x: (x[1], x[0][0]))
  ham_counts = label.filter(lambda x: x[1] == 'ham').count()
  spam_counts = label.filter(lambda x: x[1] == 'spam').count()
  ham_prob = ham_counts/line_counts
  spam_prob = spam_counts/line_counts
  distinct_label = label.map(lambda x: x[1]).distinct()

  # setup for computing conditional probabilities
  words = lines.map(lambda line: line.split('\t')).flatMap(lambda line: simple_tokenize(line[1])).distinct()
  word_counts = words.count()
  all_words = words.map(lambda x: (x, 0))
  cross_prep = distinct_label.cartesian(all_words).map(lambda x: ((x[0], x[1][0]), x[1][1]))
  dataset = emb_func(train_dataset)
  
  # get the label for the sentence and sum of tfidf for each (label, word) combination
  dataset_by_label = dataset.map(lambda x: (x[0][1], (x[0][0], x[1])))\
                            .join(label)\
                            .map(lambda x: ((x[1][1], x[1][0][0]), x[1][0][1]))\
                            .reduceByKey(lambda x, y: x + y)
  # sum of tfidf for each class
  total_by_label = dataset_by_label.map(lambda x: (x[0][0], x[1])).reduceByKey(lambda x, y: x + y)
  total_by_label = total_by_label.collectAsMap() # small table
  broadcast_tbl = sc.broadcast(total_by_label)

  # map the cross product of (label, word) combination with the existing (label, word) table
  # and assign tfidf from existing table or 0 otherwise
  # then calculate probability 
  raw_prob_dataset = cross_prep.leftOuterJoin(dataset_by_label)
  def getProb(x):
    label = x[0][0]
    word = x[0][1]
    default_zero = x[1][0]
    value_to_check = x[1][1]
    if value_to_check == None:
      emb = 0
    else:
      emb = value_to_check
    prob = (emb + 1)/(broadcast_tbl.value[label] + word_counts)
    return ((label, word), prob)
  prob_dataset = raw_prob_dataset.map(getProb)

  # output format for dataset: ((label, word), prob)
  return prob_dataset, ham_prob, spam_prob, distinct_label


In [None]:
# this function takes a test dataset and a training dataset and returns the 
# accuracy of the test set after training using training set
def test(train_dataset, dataset_name, emb_func):
  # tokenize the sentences, filter out empty lists after tokenize and add index, extract true label for comparision
  lines = dataset_name
  lines_with_index = lines.map(lambda line: line.split('\t'))\
                .map(lambda line: (line[0], simple_tokenize(line[1])))\
                .filter(lambda x: x[1] != [])\
                .zipWithIndex()            
  line_counts = lines_with_index.count()
  label = lines_with_index.map(lambda x: (x[1], x[0][0]))

  # get prior from training set
  ham_prob = train(train_dataset, emb_func)[1]
  spam_prob = train(train_dataset, emb_func)[2]
  distinct_label = train(train_dataset, emb_func)[3]

  # get (label, word) combination for each sentence and for all classes
  # and extract probability from training set
  # if word does not exist in training set, assign a small probability to it
  # then multiply the conditional probability as well as the prior for each sentence
  def createIndexWord(x):
    label = x[0][0]
    list_of_words = x[0][1]
    index = x[1]
    output = []
    for word in list_of_words:
      output.append((word, index))
    return output
  prep_for_label = lines_with_index.flatMap(createIndexWord)
  cross_prep = distinct_label.cartesian(prep_for_label).map(lambda x: ((x[0], x[1][0]), x[1][1]))
  prob_dataset_dict = train(train_dataset, emb_func)[0]
  full_table = cross_prep.leftOuterJoin(prob_dataset_dict)

  def getProbTrain(x):
    label = x[0][0]
    word = x[0][1]
    index = x[1][0]
    prob_to_check = x[1][1]
    if prob_to_check == None:
      return ((index, label), 0.0001) # if does not exist in training, assign 0.0001
    else:
      return ((index, label), prob_to_check)
  test_dataset = full_table.map(getProbTrain).reduceByKey(lambda x, y: x * y)

  def get_prior(x):
    index = x[0][0]
    label = x[0][1]
    prob = x[1]
    if label == 'ham':
      prior = ham_prob
    else:
      prior = spam_prob
    return ((index, label), prob * prior)
  test_dataset = test_dataset.map(get_prior)

  # get predicting label for each sentence and compare with true label to calculate error and accuracy
  def max_label(a, b):
    prob_a = a[1]
    prob_b = b[1]
    if prob_a < prob_b:
      return b  
    else:
      return a
  test_dataset = test_dataset.map(lambda x: (x[0][0], (x[0][1], x[1])))
  test_dataset = test_dataset.reduceByKey(max_label).map(lambda x: (x[0], x[1][0])) 
  output = test_dataset.join(label)

  acc = sc.accumulator(0)
  error = sc.accumulator(0)
  def calcutateError(x):
    index = x[0]
    pred_label = x[1][0]
    true_label = x[1][1]
    if pred_label == true_label:
      acc.add(1)
    else:
      error.add(1)
  output.foreach(calcutateError)

  print(f'Test accuracy: {acc.value*100/line_counts:.2f}%', 
        f'\nTotal number of prediction errors: {error.value}')
  return

In [None]:
test(SMS_spam_training, SMS_spam_test, emb_func=bag_of_words)

Test accuracy: 98.39% 
Total number of prediction errors: 18


In [None]:
test(SMS_training_str, SMS_test_str, emb_func=bag_of_words)

Test accuracy: 95.32% 
Total number of prediction errors: 14


In [None]:
test(SMS_spam_training, SMS_spam_test, emb_func=tfidf)

Test accuracy: 98.57% 
Total number of prediction errors: 16


In [None]:
test(SMS_training_str, SMS_test_str, emb_func=tfidf)

Test accuracy: 95.99% 
Total number of prediction errors: 12
