In [102]:
import math
import re
import pathlib
import json
import glob, os
import numpy as np
import random
import sklearn

<h3>Load and label samples</h3>

In [103]:
corpus_dir = "./data"

def loadAndLabelData(corpus_dir):
    corpus_dir = pathlib.Path(corpus_dir)
    
    samples = []
    labels = []
    
    for file_path in corpus_dir.rglob("*"):
        f = open(file_path, 'r')
        for line in f:
            if line[0:4] == 'spam':
                labels.append(1)
                samples.append(line[4:])
            elif line[0:3] == 'ham':
                labels.append(0)
                samples.append(line[3:])
            else:
                print('improperly labeled sample')
                
    return samples, labels

samples, labels = loadAndLabelData(corpus_dir)

# shuffle samples and labels pairs
samples, labels = sklearn.utils.shuffle(samples, labels) 

# split training and testing sets
count = len(samples)
test_size = round(count * 0.2)
      
test_samples = samples[count - test_size : count]
samples = samples[0 : count - test_size]
      
test_labels = labels[count - test_size: count]
labels = labels[0 : count - test_size]

print('Training set size: ' + str(count - test_size))
print('Test set size: ' + str(test_size)) 

Training set size: 4459
Test set size: 1115


<h3>Test preprocessing</h3>

In [122]:
#regex pattern to remove non-alphanumeric
pattern = re.compile('([^\s\w]|_)+')

def cleanText(text):
    # remove non-alphanumeric
    text = pattern.sub(' ', text)
    
    #remove newline and tab
    text = text.replace('\n',' ')
    
    #remove capitalization
    text = text.lower()
    
    return text

def getTokens(text):
    return re.findall(r"[\w']+|[.,!?;]", text)

samples_tokens = []
for sample in samples:
    samples_tokens.append(getTokens(cleanText(sample)))

<h3> Get Document Frequency </h3>

In [123]:
def updateDF(tokens, df):
    tokens = set(tokens) # remove duplicate tokens
        
    for token in tokens:
        if token in df.keys():
            df[token] += 1
        else:
            df[token] = 1
            
df = dict()

for tokens in samples_tokens:
    updateDF(tokens, df)
    
vocab = df.keys()
print(str(len(vocab)) + ' unique tokens')

7822 unique tokens


<h3>Reduce vocab</h3>

In [124]:
min_df = 0.001 # remove terms that appear in less than 0.1% of documents
max_df = 1#0.99

for key in list(df):
    if not min_df < df[key] / len(samples) < max_df:
        del df[key]
        
print('reduced to ' + str(len(vocab)) + ' unique tokens')

reduced to 1594 unique tokens


<h3> Get TF-IDF vectors </h3>

In [125]:
def getTFIDF(tokens, df, n_docs):
    tfidf = dict.fromkeys(df.keys(), 0)
        
    for token in tokens:
        if token in df.keys():
            tfidf[token] += 1 # term occurences
            
    for token in df.keys():
        tfidf[token] /= len(tokens) + 1 # term frequency
        idf = math.log((n_docs + 1) / (df[token] + 1))
            
        tfidf[token] = tfidf[token] * idf
        
    return np.array(list(tfidf.values()))

tfidfs = np.zeros((len(samples), len(df)))

for i in range(len(samples)):
    sample = samples[i]
    tfidfs[i] = getTFIDF(samples_tokens[i], df, len(samples))

In [98]:
'''max_cos = 0
similar = 0

vec = getTFIDF(getTokens(cleanText(test_samples[110])), df, len(samples))

for i in range(len(tfidfs)):
    cos = cosine(vec, tfidfs[i])
    if cos > max_cos:
        max_cos = cos
        similar = i
'''

<h3> Perform KNN classification </h3>

In [176]:
def cosine(v1, v2):
    mag = np.linalg.norm(v1) * np.linalg.norm(v2)
    if mag > 0:
        return np.dot(v1, v2) / mag
    else:
        return 0

def knn(k, xs, ys, x):
    cosines = []
    
    for i in range(len(xs)):
        cosines.append(cosine(xs[i], x))
        
    # indexes of k nearest
    nearest = list(range(0, k))
    
    for i in range(k, len(cosines)):
        
        # find least similar from nearest list
        least_sim_index = 0
        min_cos = cosines[nearest[least_sim_index]]
        
        for j in range(1, k):
            cos = cosines[nearest[j]]
            if cos < min_cos:
                min_cos = cos
                least_sim_index = j
                
        if min_cos < cosines[i]:
            nearest[least_sim_index] = i
            
    n = 0
    for i in nearest:
        n += ys[i]
        
    if n > k / 2:
        return 1
    else:
        return 0

# test accuracy
    
correct = 0

for i in range(len(test_samples)):
    #tfidf = 0.5 * getTFIDF(test_documents[i]['title']) + 0.5 * getTFIDF(test_documents[i]['text'])
    
    test_sample_tfidf = getTFIDF(getTokens(test_samples[i]), df, len(samples))
    
    if knn(5, tfidfs, labels, test_sample_tfidf) == test_labels[i]:
        correct += 1
        
print(correct / len(test_samples))

In [110]:
n = 0

for label in test_labels:
    if label == 1:
        n = n + 1
        
print(1 - n / len(test_labels))

0.8573991031390135


In [142]:
list(range(0, 5))

[0, 1, 2, 3, 4]

In [173]:
knn(1, tfidfs, labels, tfidfs[156])

156


1

In [172]:
labels[156]

1

In [137]:
max_cos = 0
similar = 0

vec = getTFIDF(getTokens(cleanText(test_samples[110])), df, len(samples))

for i in range(len(tfidfs)):
    cos = cosine(tfidfs[500], tfidfs[i])
    if cos > max_cos:
        max_cos = cos
        similar = i

In [138]:
similar

500