In [1]:

# from __future__ import print_function

import sys,re
import math
import string
try:
    from sets import Set
except ImportError:
    Set = set
import numpy # pip install numpy


from pyspark import SparkContext # spark 1.x
from pyspark.sql import SQLContext
from pyspark.mllib import *
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.classification import SVMWithSGD
from pyspark.mllib.evaluation import BinaryClassificationMetrics


## Environment Setup

Change the following setting according to your machine environment.

In [2]:
hdfs_nn = "127.0.0.1" # TODO: change it if it differs from your machine environment
hdfs_nn = "local" # Remove this 


## Improving SVM classifier with stopwords removal and N-Gram 


Checking through the data, we find that all the data are already in lower case. However there are still some punctuations. Let’s remove them. You may refer to the following code snippets to help you remove punctuation when building your Spark model.

In [3]:
def remove_punct(tweet):
    return re.sub('[\"\'.,!#@%\[\]{}*^]','',tweet) # OLD ONE WAS GIVING POOR RESULTS


There are a lot of stop words such as "I", "me", "to", "in". Let’s remove them. You may refer to the following code snippets.

In [4]:
stop_words = Set(['and', 'is', 'it', 'are', 'in', 'rt', 'what',\
   'from', 'her', 'to', 'their', 'you', 'me', 'his', 'http', 'that',\
   'they', 'by', 'he', 'a', 'on', 'for', 'i', 'of', 'this', 'she', 'the', 'my', 'at'])


def remove_stop_words(words):
    return [w for w in words if (w not in stop_words)]


The following are some other helper functions that we defined in practical 7

In [5]:

vector_fixed_size = 30 # fixed the size of each vector.
# if vectors have different sizes, the gradient descent algorithm will fail
# cut off if it exceeds, pad zeros if it has less than 30 elements


def hash(str):
    return reduce(lambda h,c:numpy.int32(31*h+ord(c)), str, 2147483647)

def to_words(tweet):
    return remove_stop_words(filter(None, remove_punct(tweet).split(" "))) # Added a filter here


### NGrams 

One observation is that some special korean idol names such as "ahn jae hyun" and "yoo in na", it is hard to use them individually. We may want to consider 2-ngrams or 3-grams as the terms of the language model. 

Define a `two_grams` function and a `three_grams` function, which later can be used to replace the `to_words` function.

In [6]:
from pyspark.sql import SparkSession

spark = (
            SparkSession.builder.master(hdfs_nn)
            .config("spark.io.compression.codec", "snappy")
            .config("spark.ui.enabled", "false")
            .getOrCreate()
        )

sc = spark.sparkContext


In [1]:


s = "The virus that causes COVID-19 is mainly  transmitted through droplets"

def two_grams(str):
    words = to_words(str)
    return [" ".join([words[i], words[i+1]]) for i in range(len(words)-1)] # TODO: fixme 


def three_grams(str):
    words = to_words(str)
    return [" ".join([words[i], words[i+1], words[i+2]]) for i in range(len(words)-2)] # TODO: fixme


### Test cases for `two_grams` and `three_grams`

In [8]:
s = "The virus that causes COVID-19 is mainly  transmitted through droplets"

print(two_grams(s))
print(three_grams(s))

['The virus', 'virus causes', 'causes COVID-19', 'COVID-19 mainly', 'mainly transmitted', 'transmitted through', 'through droplets']
['The virus causes', 'virus causes COVID-19', 'causes COVID-19 mainly', 'COVID-19 mainly transmitted', 'mainly transmitted through', 'transmitted through droplets']



Expected output

```text
['The virus', 'virus causes', 'causes COVID-19', 'COVID-19 mainly', 'mainly transmitted', 'transmitted through', 'through droplets']
['The virus causes', 'virus causes COVID-19', 'causes COVID-19 mainly', 'COVID-19 mainly transmitted', 'mainly transmitted through', 'transmitted through droplets']
```


### Training the model

Let's incorporate the `two_grams` and `three_grams` functions into our SVM model. First we need to load the data. 


In [9]:
posTXT = sc.textFile("Tweet_data/Kpop/*.txt")
negTXT = sc.textFile("Tweet_data/othertweet/*.txt")

We apply two_grams to construct the positive and negative labelled points and build a SVM model.

In [10]:
from functools import reduce

def pad_cap(xs,size):
    return xs[0:size] + [ 0.0 for x in range(0, size-len(xs))]

    
posTerms = posTXT.map(lambda line: two_grams(line))
negTerms = negTXT.map(lambda line: two_grams(line))
terms = posTerms + negTerms


posLP = posTerms.map(lambda terms:  LabeledPoint(1.0, Vectors.dense(pad_cap(list(map(lambda w:hash(w), terms)),vector_fixed_size))))
negLP = negTerms.map(lambda terms:  LabeledPoint(0.0, Vectors.dense(pad_cap(list(map(lambda w:hash(w), terms)),vector_fixed_size))))
data = negLP + posLP


We split the data into training and testing, feed the training into the model.

In [11]:

splits = data.randomSplit([0.6,0.4],seed = 11)
training = splits[0].cache()
test = splits[1]

# Run training algorithm to build the model
num_iteration = 20
num_iteration = 10 # REMOVE THIS
model = SVMWithSGD.train(training,num_iteration)

### Testing the model

We apply the model to the testing data.

In [12]:
# Clear the default threshold
model.clearThreshold()
# Compute raw scores on the test set
score_and_labels = test.map( lambda point: (float(model.predict(point.features)), point.label) )


We evaluate the performance of this model.

In [13]:
# Get the evaluation metrics
metrics = BinaryClassificationMetrics(score_and_labels)
au_roc = metrics.areaUnderROC

print("Area under ROC = %s" % str(au_roc))

Area under ROC = 0.49537914375700626


You should have auROC is around 0.49 to 0.62.  You may try to use `three_grams()` or improve the stop word set and the punctuation removal. 


## Fixing the performance bugs

Something is still not quite right. 

We've tried a few attempts but the improvement of our SVM model is puny. We gain less than 0.1 in terms of area under ROC. To understand why it is the case, we have to think about how SVM works. SVM expects each training sample is a vector. It plots all the positive and negative training samples in the vector space and try to derive a polynomal border to segment the positive and the negatives. However the way we built the vectors from the tweets in the earlier section is volatile under the orders of words and easily affected by noise. For instance, "I love yoo jae suk" and "yoo jae suk is in Singapore" are two possible KPOP related tweets. However, they result in two different vectors using the `hash` function.

In [14]:

print(two_grams("I love yoo jae suk"))
print(list(map(hash, two_grams("I love yoo jae suk"))))

['I love', 'love yoo', 'yoo jae', 'jae suk']
[-912178776, 2106175338, -595126104, -1432813480]


In [15]:
print(two_grams("yoo jae suk is in Singapore"))
print(list(map(hash, two_grams("yoo jae suk is in Singapore"))))


['yoo jae', 'jae suk', 'suk Singapore']
[-595126104, -1432813480, 700126286]


The common values `-595126104`, `-1432813480`, (corespondent to "yoo jae" and "jae suk") are found in different positions within the two vectors. As a result they are in different dimension of the vector space. It is harder to derive a good polynomial border that seperate these two vectors with the non-Kpop training sample. It would be a lot easier if these two vectors are "nearer" to each other. It seems to be making sense to "align" the vector dimensions based on all the words or grams. One possibility is to sort all the words arising in the training data. However this approach is not practical because that would cause an explosion in size in terms of the vector space.

Fortunately, there is a well known technique called *TF-IDF* to find out the "important" terms appearing on the corpus. 

### TF-IDF saves the day

TF-IDF stands for Term frequency - Inverse document frequency. It is a classic and powerful technique to search for interesting terms from a set
of documents.

*  TF is actually the word count. For instance, consider the following text data.
```text
apple smart phone
android smart phone
```
We assume that each line is a document, hence there are two documents here.

* The term frequency is
```text
apple, 1
android, 1
phone, 2
smart, 2
```

* IDF is is the total number of documents/records divided by the total number of the documents/records containing the words. We apply logarithmic to the quotient. The IDF for the above example is
```text
apple, log(2/1)
android, log(2/1)
phone, log(2/2)
smart, log(2/2)
```
that is
```text
apple, 0.693
android, 0.693
phone, 0
smart, 0
```

* TF-IDF is obtained by multiplying the TF with the IDF.
```text
apple, 0.693
android, 0.693
phone, 0
smart, 0
```


### Define the term frequency function `tf` 

Hint: `tf` is actually the word count.

In [16]:

def tf(terms):
    word_count = terms.flatMap(list).map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)
    return word_count # TODO: fixme

In [17]:

def one_grams(s):
    return s.split(" ")

test_terms = [one_grams("apple smart phone"), one_grams("android smart phone")]
# print(test_terms)
test_tf = tf(sc.parallelize(test_terms))
test_tf.collect()

[('apple', 1), ('smart', 2), ('phone', 2), ('android', 1)]


Expected output

```text
[('phone', 2), ('android', 1), ('apple', 1), ('smart', 2)]
```


### Define the document frequency function `df`

Hint: `df` is almost the same as `tf` except that duplications within a record (a document) are disregarded.

In [18]:

def df(terms):
    word_count = terms.flatMap(lambda x: list(set(x))).map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)
    return word_count # TODO: fixme


In [19]:

test_df = df(sc.parallelize(test_terms))
test_df.collect()

[('smart', 2), ('phone', 2), ('apple', 1), ('android', 1)]

## Combining `tf` and `df`

We now can define `tfidf` in terms of `tf` and `df`.

In [20]:

def tfidf(terms): 
    dCount = terms.count()
    tfreq = tf(terms)
    dfreq = df(terms)
    return tfreq.join(dfreq).map(lambda p :(p[0], p[1][0] * math.log(dCount/p[1][1]))).sortBy( lambda p : - p[1])


Let's incorporate that into our Spark machine learning model. For instance, we can run tf-idf, sort the terms according to tf-idf score in descending order, and collect the top 150
terms, and use them as the vector dimensions. (This implies that we will have a 150-dimension space.)

In [21]:

topTerms = list(map(lambda p:p[0], tfidf(terms).collect()[:150]))
print(topTerms)


['jae suk', 'yoo jae', 'jae lee', 'kim jae', 'jae millz', 'ahn jae', 'jae hyun', 'jae joong', 'running man', 'yoon jae', 'with jae', 'song jae', 'jeremiah jae', 'jae crowder', 'young jae', 'jae wook', 'liked video', 'jae kyung', 'lee jae', 'jae hee', 'jae rim', 'jae lees', 'jae hyeon', 'hyuk jae', 'cover jae', 'jae kim', 'check out', 'jae jung', 'jung jae', 'happy bihday', 'jae hoon', 'jae jin', 'park jae', 'will be', 'like jae', 'jae park', 'lee jong', 'jae suks', 'jae joon', 'thanks jae', 'jae min', 'lee hyuk', 'south korea', 'jae shin', 'lee hwi', 'looks like', 'jaehoon ha', 'joong 1st', 'jae yoo', 'lee jungjae', 'jae jae', 'has been', 'yoo jaesuk', 'jae bum', 'jung joon', 'ho jae', 'millz troy', 'jong kook', 'photo jae', 'right now', 'joon young', 'jae ari', 'jae hyung', 'lee young', 'asia tour', 'new favorite', 'jae rhim', 'jae won', 'jae yeong', 'new music', 'so eun', 'c hong', 'yong jae', 'tour conce', 'jae yong', 'jae wan', 'sketch cover', 'video playlist', 'greg pak', 'added v


### Expected output

We should see something similar to the following 

```text
[u'jae millz', u'jae suk', u'jae lee', u'yoo jae', u'jae crowder', u'jeremiah jae', u'kim jae', u'ahn jae', u'jae hyun', u'with jae', u'jae joong', u'liked video', u'check out', u'happy bihday', u'thanks jae', u'running man', u'jae jae', u'yoon jae', u'jae synth', u'new music', u'ho jae', u'directed jae', u'song jae', u'jae tips', u'will be', u'young jae', u'jae hood', u'jae hee', u'jae wook', u'lee jae', u'millz og', u'millzy johnson', u'og millzy', u'new favorite', u'like jae', u'name jae', u'jae kyung', u'jae ga', u'hi jae', u'cover jae', u'love jae', u'thank jae', u'young money', u'jae jin', u'jae rim', u'murda mook', u'jae kim', u'aesop rock', u'jae lees', u'prod jeremiah', u'music jae', u'danny brown', u'ego death', u'jasmine jae', u'rock danny', u'ft jae', u'out jae', u'jaesean tate', u'video jae', u'bihday jae', u'brown prod', u'busdriver ego', u'jung jae', u'hyuk jae', u'millz troy', u'jae mills', u'im jae', u'gudda gudda', u'right now', u'park jae', u'jae hoon', u'jae im', u'has been', u'looks like', u'jae jung', u'travoltified name', u'jae so', u'jae lol', u'jae ari', u'jae hyeon', u'jae has', u'new video', u'when jae', u'feat jae', u'photo jae', u'best friend', u'\xc3\xbf \xc3\xbf', u'vs jae', u'would be', u'so much', u'nigga jae', u'see jae', u'(official music', u'jae won', u'jae )', u'music video)', u'jae said', u'feat aesop', u'think jae', u'jae park', u'millz vs', u'or jae', u'jae joon', u'miss jae', u'death feat', u'millz 1990', u'jae wan', u'sarah jae', u'added video', u'whats yours', u'si jae', u'video playlist', u'jaehoon ha', u'baby jae', u'about jae', u'jae bae', u'jae min', u'lil wayne', u'better than', u'jae foster', u'music video', u'jae just', u'im not', u'no one', u'mook vs', u'jae suks', u'(official video)', u'top 3', u'interview with', u'south korea', u'lee jong', u'cha jae', u'jae yoo', u'do jae', u'lol jae', u'jae (', u'good times', u'lee jungjae', u'suk jin', u'dont know', u'than jae', u'yoo jaesuk', u'jae bum', u'jae good', u'3 aists', u'jae laffer', u'lee hwi', u'jae shin', u'gudda jae', u'jae busdriverse']
```


Recalling our counter example, 

```text
I love yoo jae suk
```

If we use the above `topTerms` as a reference, we build a 150-element vector from the two gram of "I love yoo jae suk", it should yield

```text
Vector(0.0, 0.0, 1.0, 0.0, ... )
```

Because the 3rd term "jae suk" from the top 150 tfidf is present in the tweet, the 3rd number in the vector is 1.0 while the rest are zeros.

Similarly, the vector for "yoo jae suk is in Singapore" will have the vector

```text
Vector(0.0, 0.0, 1.0, 0.0, ... )
```


### Defining the function `ComputeLP` to create labeled points (3 Marks)

Let's define a function to convert a tweet into a labeled point, recalling that labeled point is the numeric label (0 or 1) and the vector representation of our data.

Concretely speaking, the `computeLP` function takes a label `1.0` or `0.0`, a sequence of string i.e. the 2-grams or 3-grams, and a array of top-N TF-IDF.

For each tf-idf term, let's say `t` is the i-th top-N TF-IDF term, if `t` is in the sequence of strings, we should put a `1.0` at the i-th position of the output vector, otherwise it should be `0.0`.

Complete the following function according to the example given above.

In [22]:
def computeLP(label, n_grams_text, tf_idf_text):
    return LabeledPoint(label, Vectors.dense([1.0 if r in n_grams_text else 0.0 for r in tf_idf_text]))


In [23]:
computeLP(1.0, two_grams("I love yoo jae suk"), topTerms)

LabeledPoint(1.0, [1.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0])


### Expected output

```text
LabeledPoint(1.0, [0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0])
```


We incorporate the above `computeLP()` into our model.

In [24]:

posLP = posTerms.map( lambda seq: computeLP(1.0, seq, topTerms) )
negLP = negTerms.map( lambda seq: computeLP(0.0, seq, topTerms) )

data = negLP + posLP


# Split data into training (60%) and test (40%).

splits = data.randomSplit([0.6,0.4],seed = 11)
training = splits[0].cache()
test = splits[1]

# Run training algorithm to build the model
num_iteration = 100
num_iteration = 10 # REMOVE THIS

model = SVMWithSGD.train(training,num_iteration)

# This will takes about 20 mins on a 4-core intel i7 processor 3.8GHZ with hyperthreading


We apply the updated model to our testing data.

In [25]:
model.clearThreshold()
# Compute raw scores on the test set
score_and_labels = test.map( lambda point: (float(model.predict(point.features)), point.label) )



We would like to re-evaluate the performance of the new model incorporated with TF-IDF

In [26]:

# Get the evaluation metrics
metrics = BinaryClassificationMetrics(score_and_labels)
au_roc = metrics.areaUnderROC

print("Area under ROC = %s" % str(au_roc))

Area under ROC = 0.8901226000462642


We should see a significant increase in auROC, which is about 0.8 to 0.88.