# CS4248 Assignment 3

### Perceptron text classifier in Python with feature selection

##### https://github.com/jia1/pyceptron

In [None]:
import os
import re
import sys
from decimal import Decimal
from string import punctuation
from porter import PorterStemmer

p = PorterStemmer()

In [None]:
src_dir = os.path.abspath('tc')

stopword_list_file = 'stopword-list'
train_class_list_file = 'train-class-list'
test_list_file, test_class_list_file = 'test-list', 'test-class-list'

In [None]:
'''
class_to_text
<class 'dict'>
KEY: <class 'str'>
VAL: <class 'set'>
E.g: {'c1': {'0000', '0001', '0002', ... }, 'c2': {'1000', '1001', '1002', ... } }

text_to_freq
<class 'dict'>
KEY: <class 'str'>
VAL: <class 'dict'>
     KEY: <class 'str'>
     VAL: <class 'int'>
E.g: {'0000': {'word0': 1, 'word1': 3, 'word2': 5, ... }, '0001': { ... }}

class_to_feat_chi_tup
<class 'dict'>
KEY: <class 'str'>
VAL: <class 'list'>
     VAL: <class 'tuple'>
          <class 'str'>
          <class 'float'>
E.g: {'c1': [ ('word1', 99.00), ('word2', 90.00), ... ], 'c2': [ ... ] }

nxx_dict
chi_dict
'''

k = 4
num_both, num_train = 0, 0
train_ratio = 0.8
test_ratio = 1 - train_ratio
feat_prune_ratio = 0.4

class_list = []
class_to_text, text_to_freq = {}, {}
class_to_feat_chi_tup = {}
class_to_feat_set = {}
class_to_feat_list_sort_by_chi = {}

nxxs = ['n10', 'n11']
nxxs_map = {
    'n10': 'n00',
    'n11': 'n01'
}
nxx_dict = { n: {} for n in nxxs }
chi_dict = {}

In [None]:
# A. Split each line by space into tokens
# B. Strip all default white space characters from each token
# C. Remove punctuation from each token
# D. Return a list of tokens which are not stop words

def strip_and_filter_line(ln):
    tokens = map(lambda t: t.strip().strip(punctuation).lower(), ln.split(' '))
    return list(filter(lambda t: t and len(t) > 2 and t.isalpha() and t not in stop_list, tokens))

In [None]:
def is_in(a, b):
    return 1 if a in b else 0

def count_nxx(nxx, w, c):
    global class_list, class_to_text, text_to_freq
    answer = 0
    if nxx == 'n10':
        for class_name in filter(lambda x: x != c, class_list):
            for text in class_to_text[class_name]:
                answer += is_in(w, text_to_freq[text])
    elif nxx == 'n11':
        for text in class_to_text[c]:
            answer += is_in(w, text_to_freq[text])
    return answer

In [None]:
# n00 is the number of training texts that do not contain w and are not in class c.
# n01 is the number of training texts that do not contain w and are in class c.
# n10 is the number of training texts that contain w and are not in class c.
# n11 is the number of training texts that contain w and are in class c.

# n00 = num_train - n10
# n01 = num_train - n11
# n10 = num_train - n00
# n11 = num_train - n01

def chi_square(w, c):
    global num_train, nxxs, nxxs_map, nxx_dict
    ns_dict = {}
    for n in nxxs:
        if w not in nxx_dict[n]:
            nxx_dict[n][w] = {}
        if c not in nxx_dict[n][w]:
            nxx_dict[n][w][c] = count_nxx(n, w, c)
        ns_dict[n] = nxx_dict[n][w][c]
    for n, nn in nxxs_map.items():
        ns_dict[n] = num_train - nxx_dict[nn][w][c]
    n00, n01, n10, n11 = ns_dict['n00'], ns_dict['n01'], ns_dict['n10'], ns_dict['n11']
    return ((n11+n10+n01+n00)*(n11*n00-n10*n01)**2)/((n11+n01)*(n11+n10)*(n10+n00)*(n01+n00))

In [None]:
def put_chi_dict(c, w, chi_square_value):
    global chi_dict
    if w not in chi_dict[c]:
        chi_dict[c][w] = chi_square_value
    else:
        chi_dict[c][w] = max(chi_dict[c][w], chi_square_value)

In [None]:
def gen_feat():
    global class_list, chi_dict, class_to_feat_chi_tup
    max_feat_len = sys.maxsize
    feat_queue_dict = { c: [] for c in class_list }
    for c in chi_dict:
        feat_queue_dict[c] = sorted(chi_dict[c].items(), key = lambda x: x[1], reverse = True)
        max_feat_len = min(max_feat_len, len(feat_queue_dict[c]))
    max_feat_len *= feat_prune_ratio 
    class_to_feat_chi_tup = { c: feat_queue_dict[c][:int(max_feat_len)] for c in feat_queue_dict }

In [None]:
def feat_select():
    global class_list, class_to_text, text_to_freq, class_to_feat_chi_tup
    for c in class_list:
        for text in class_to_text[c]:
            for w in text_to_freq[text]:
                put_chi_dict(c, w, chi_square(w, c))
                gen_feat()

In [None]:
with open(stopword_list_file, 'r') as s:
    stop_list = list(map(lambda ln: ln.strip(), s.readlines()))

In [None]:
with open(train_class_list_file, 'r') as t:
    lines = map(lambda ln: ln.strip().split(' '), t.readlines())
    for ln in lines:
        file, curr_class = ln
        text = file.split('/')[-1]
        num_both += 1
        num_train += 1
        flat_text, freq_dict = [], {}
        with open(file, 'r') as f:
            processed_lines = map(lambda ln: strip_and_filter_line(ln), f.readlines())
            for line in processed_lines:
                flat_text.extend(list(map(lambda word: p.stem(word, 0, len(word) - 1), line)))
            num_words = len(flat_text)
            last_index = num_words - 1
            for i in range(num_words):
                word = flat_text[i]
                if word not in freq_dict:
                    freq_dict[word] = 1
                else:
                    freq_dict[word] += 1
                if i < last_index:
                    bigram = '{} {}'.format(word, flat_text[i + 1])
                    if bigram not in freq_dict:
                        freq_dict[bigram] = 1
                    else:
                        freq_dict[bigram] += 1
            fin_freq_dict = { word: freq for word, freq in freq_dict.items() if freq >= k }
            compromise = 1
            while not fin_freq_dict:
                fin_freq_dict = { word: freq for word, freq in freq_dict.items() if freq >= k - compromise }
                compromise += 1
            sum_freq = sum(fin_freq_dict.values())
            if curr_class not in class_to_text:
                class_to_text[curr_class] = set()
            else:
                class_to_text[curr_class].add(text)
            if text not in text_to_freq:
                text_to_freq[text] = {}
            else:
                text_to_freq[text] = { word: freq / sum_freq for word, freq in fin_freq_dict.items() }

In [None]:
for curr_dir, sub_dir, files in os.walk(src_dir):
    if not files:
        class_list = sub_dir
        num_class = len(class_list)
        class_to_text = { c: set() for c in class_list }
        chi_dict = { c: {} for c in class_list }
        class_to_feat_chi_tup = { c: set() for c in class_list }
        class_to_feat_set = { c: set() for c in class_list }
        class_to_feat_to_index = { c: {} for c in class_list }
        continue
    curr_class = re.split('[(\\\\)(\\)(\/)]', curr_dir)[-1] # curr_dir.split('\\')[-1]
    curr_num_files = len(files)
    num_both += curr_num_files
    curr_num_train = int(curr_num_files * train_ratio)
    num_train += curr_num_train
    for i in range(curr_num_train):
        file = files[i]
        flat_text, freq_dict = [], {}
        with open(os.path.join(curr_dir, file), 'r') as f:
            processed_lines = map(lambda ln: strip_and_filter_line(ln), f.readlines())
            for line in processed_lines:
                flat_text.extend(list(map(lambda word: p.stem(word, 0, len(word) - 1), line)))
            num_words = len(flat_text)
            last_index = num_words - 1
            for i in range(num_words):
                word = flat_text[i]
                if word not in freq_dict:
                    freq_dict[word] = 1
                else:
                    freq_dict[word] += 1
                if i < last_index:
                    bigram = '{} {}'.format(word, flat_text[i + 1])
                    if bigram not in freq_dict:
                        freq_dict[bigram] = 1
                    else:
                        freq_dict[bigram] += 1
            fin_freq_dict = { word: freq for word, freq in freq_dict.items() if freq >= k }
            compromise = 1
            while not fin_freq_dict:
                fin_freq_dict = { word: freq for word, freq in freq_dict.items() if freq >= k - compromise }
                compromise += 1
            sum_freq = sum(fin_freq_dict.values())
            class_to_text[curr_class].add(file)
            text_to_freq[file] = { word: freq / sum_freq for word, freq in fin_freq_dict.items() }

In [None]:
import pprint
pp = pprint.PrettyPrinter(indent = 4)
pp.pprint(text_to_freq)

In [None]:
from time import time
start = time()

class_to_feat_chi_tup = { c: set() for c in class_list }
feat_select()

stop = time()
print(stop - start)

In [None]:
import pprint
pp = pprint.PrettyPrinter(indent = 4)
pp.pprint(class_to_feat_chi_tup)

In [None]:
class_to_feat_set = { c: {} for c in class_list }
for c in class_to_feat_chi_tup:
    for p in class_to_feat_chi_tup[c]:
        w = p[0]
        class_to_feat_set[c].add(w)
    for nc in class_to_feat_chi_tup:
        if nc != c:
            for i in range(len(class_to_feat_chi_tup[nc]) // (num_class - 1)):
                w = class_to_feat_chi_tup[nc][i][0]
                class_to_feat_set[c].add(w) # if many overlapping words across class_to_feat_chi_tup, then fewer features

class_to_feat_list_sort_by_chi = { c: sorted(list(class_to_feat_set[c])) for c in class_list }
class_to_feat_to_index = { c: {} for c in class_list }
for c in class_to_feat_list_sort_by_chi:
    for i in range(len(class_to_feat_list_sort_by_chi[c])):
        class_to_feat_to_index[c][class_to_feat_list_sort_by_chi[c][i]] = i

In [None]:
import pprint
pp = pprint.PrettyPrinter(indent = 4)
pp.pprint(class_to_feat_to_index)

In [None]:
# Credits: Dr. Jason Brownlee
from random import seed, randrange
seed(4248)

# Split data_mat into num_folds number of folds
def get_folds(data_mat, num_folds):
    folds = []
    data_clone = list(data_mat)
    fold_size = int(len(data_mat) / num_folds)
    for i in range(num_folds):
        fold = []
        while len(fold) < fold_size:
            index = randrange(len(data_clone))
            fold.append(data_clone.pop(index))
        folds.append(fold)
    return folds

# Calculate accuracy percentage
def get_accuracy(predicted, actual):
    num_correct = 0
    for i in range(len(actual)):
        if predicted[i] == actual[i]:
            num_correct += 1
    return num_correct / len(actual) * 100

# Evaluate an algorithm using a cross validation split
def get_cross_validation_scores(data_mat, algorithm, num_folds, *args):
    folds = get_folds(data_mat, num_folds)
    scores = []
    for fold in folds:
        train_set = list(folds)
        train_set.remove(fold)
        train_set = [row for fold in train_set for row in fold]
        test_set = []
        for row in fold:
            row_clone = list(row)
            test_set.append(row_clone)
            row_clone[-1] = None
        predicted = algorithm(train_set, test_set, *args)
        actual = [row[-1] for row in fold]
        accuracy = get_accuracy(predicted, actual)
        scores.append(accuracy)
    return scores

# Make a prediction with weights
def predict(row, weights):
    activation = weights[0]
    for i in range(len(row) - 1):
        activation += weights[i + 1] * row[i]
    return 1 if activation >= 0 else 0

# Estimate Perceptron weights using stochastic gradient descent
def train_weights(train, alpha, max_iterations = 1000):
    weights = [0 for i in range(len(train[0]))]
    for _ in range(max_iterations):
        for row in train:
            prediction = predict(row, weights)
            error = row[-1] - prediction
            weights[0] = weights[0] + alpha * error
            for i in range(len(row) - 1):
                weights[i + 1] += alpha * error * row[i]
    return weights

# Perceptron Algorithm With Stochastic Gradient Descent
def perceptron(train, test, alpha, max_iterations):
    predictions = list()
    weights = train_weights(train, alpha, max_iterations)
    print(weights)
    for row in test:
        prediction = predict(row, weights)
        predictions.append(prediction)
    return predictions

In [None]:
# load and prepare data
class_to_feat_mat = { c: [] for c in class_list }
for c in class_list:
    for d in class_list:
        texts = class_to_text[d]
        num_texts = len(texts)
        texts = iter(texts)
        if c != d:
            num_texts_to_train = int((1 - train_ratio) * num_texts)
        else:
            num_texts_to_train = num_texts
        for i in range(num_texts_to_train):
            text = next(texts)
            feat_vec = [0 for i in range(len(class_to_feat_to_index[d]) + 1)]
            for word in text_to_freq[text]:
                if word in class_to_feat_to_index[d]:
                    index = class_to_feat_to_index[d][word]
                    feat_vec[index] = text_to_freq[text][word]
            feat_vec[-1] = 1 if c == d else 0
            class_to_feat_mat[c].append(feat_vec)

In [None]:
data = class_to_feat_mat
num_folds_list = [5] # [5, 10]
alpha_list = [0.05] # [0.02, 0.03, 0.05, 0.07, 0.1]
max_iterations_list = [500] # [500, 1000, 2000]

for num_folds in num_folds_list:
    for alpha in alpha_list:
        for max_iterations in max_iterations_list:
            print('{}-fold cross validation'.format(num_folds))
            print('Learning rate: {}, maximum number of iterations: {}'.format(alpha, max_iterations))
            for c in class_list:
                scores = get_cross_validation_scores(data[c], perceptron, num_folds, alpha, max_iterations)
                print('Class: {}'.format(c))
                print('Cross validation scores: {}'.format(scores))
                print('Mean accuracy: {:.2f}%'.format(sum(scores) / num_folds))
                print()

In [None]:
import pprint
pp = pprint.PrettyPrinter(indent = 4)
pp.pprint(class_to_feat_mat)