# Loading Libraries and the Dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import numpy as np
from collections import Counter
import nltk
from nltk.util import ngrams

In [None]:
train = np.loadtxt('/data/Train.txt', dtype='str', comments=None)
val = np.loadtxt('/data/Val.txt', dtype='str', comments=None)
test = np.loadtxt('/data/Test.txt', dtype='str', comments=None)

# The Dataset Properties

In [None]:
train_vocab = set(train[:,0])
val_vocab = set(val[:,0])
test_vocab = set(test[:])
labels = set(train[:,1])

In [None]:
print('Number of labels: ', len(labels))
print('Number of words in train set: ', len(train_vocab))
print('Number of words in validation set: ', len(val_vocab))
print('Number of words in test set: ', len(test_vocab))
print('Number of words in validation set which are not in train set: ', len(val_vocab - train_vocab))
print('Number of words in test set which are not in train set: ', len(test_vocab - train_vocab))

Number of labels:  23
Number of words in train set:  18949
Number of words in validation set:  16235
Number of words in test set:  10594
Number of words in validation set which are not in train set:  8927
Number of words in test set which are not in train set:  4971


# Defining Vocabulary

In [None]:
# Create a vocabulary set consisting of the k most frequent words in the text
vocab_size  = 8000
all_words = list(train[:,0])
word_freq = Counter(all_words)
vocab = set([word for word, freq in word_freq.most_common(vocab_size)])

# Replace all words not in the vocabulary with the <unk> token
train[:,0] = np.array([word if word in vocab else '<unk>' for word in train[:,0]])
val[:,0] = np.array([word if word in vocab else '<unk>' for word in val[:,0]])
test = np.array([word if word in vocab else '<unk>' for word in test])

# Separating Sentences

In [None]:
def padding(dataset, delm=[':', ';', '?', '.', '!', '؟']):
    padded = [begin_padding]
    for x in dataset:
        padded.append(x)
        if (isinstance(x, np.ndarray) and x[0] in delm) or (isinstance(x, str) and (x in delm)):
            padded.append(end_padding)
            padded.append(begin_padding)
    return np.array(padded[:-1])

In [None]:
begin_padding = np.array(['<s>','start'], dtype='str')
end_padding = np.array(['</s>','end'], dtype='str')
train = padding(train)
val = padding(val)
begin_padding = '<s>'
end_padding = '</s>'
test = padding(test)

# Emition

In [None]:
label_count = Counter(list(train[:,1]))
emition_count = Counter(tuple(x) for x in train)

In [None]:
def emition_probs(emition_count, label_count, k = 1):
    V = vocab_size + 3
    emitions_freq = Counter()

    # Apply additive smoothing with parameter k to the n-gram frequencies
    for observation, freq in emition_count.items():
        count = freq + k
        prefix_count = label_count[observation[1]]
        total_count = prefix_count + k*V
        emitions_freq[observation] = np.log(count / total_count)

    return emitions_freq

# Transition

In [None]:
def generate_ngrams(labels, n=2):
    # Generate n-grams from the padded text using the nltk ngrams function
    ngrams_list = list(ngrams(labels, n))

    # Count the frequency of each n-gram using the Counter function from the collections module
    ngrams_count = Counter(ngrams_list)
    return ngrams_count

transition_count = generate_ngrams(train[:,1])

def ngram_probs(bigrams, unigrams, k = 1):
    V = len(unigrams)
    ngrams_freq = Counter()

    # Apply additive smoothing with parameter k to the n-gram frequencies
    for ngram, freq in bigrams.items():
        count = freq + k
        prefix_count = unigrams[ngram[0]]
        total_count = prefix_count + k*V
        ngrams_freq[ngram] = np.log(count / total_count)

    return ngrams_freq

# Viterbi Algorithm

In [None]:
def viterbi(obs, states, start_prob, trans_prob, emit_prob, zero_prob_emit, zero_prob_trans):
    # Initialize matrices
    T = len(obs)
    N = len(states)
    viterbi_mat = np.zeros((N, 2))
    backpointer = np.zeros((N, T), dtype=int)

    # Precompute transition and emission probabilities
    trans_prob_mat = np.array([[trans_prob.get((states[s0], states[s]), zero_prob_trans[states[s0]]) for s in range(N)] for s0 in range(N)])
    emit_prob_mat = np.array([[emit_prob.get((obs[t], states[s]), zero_prob_emit[states[s]]) for s in range(N)] for t in range(T)])
    start_prob_mat = np.array([start_prob[states[s0]] for s0 in range(N)])

    # Initialization step
    viterbi_mat[:, 0] = start_prob_mat + emit_prob_mat[0]

    # Forward step
    for t in range(1, T):
        temp = viterbi_mat[:, 0] + trans_prob_mat.T + emit_prob_mat[t].reshape(N,1)
        viterbi_mat[:, 1] = np.max(temp, axis=1)
        backpointer[:, t] = np.argmax(temp, axis=1)
        viterbi_mat[:, 0] = viterbi_mat[:, 1]

    # Backward step
    best_path_prob = np.max(viterbi_mat[:, 1])
    best_path_pointer = np.argmax(viterbi_mat[:, 1])
    best_path = [best_path_pointer]

    for t in range(T - 2, -1, -1):
        best_path_pointer = backpointer[best_path_pointer, t + 1]
        best_path.insert(0, best_path_pointer)

    return best_path, best_path_prob


# Putting it all together

In [None]:
def run(add_k, final=False):
    emition_prob = emition_probs(emition_count, label_count, add_k)
    transition_prob = ngram_probs(transition_count, label_count, add_k)
    start_prob = {}
    for item, count in label_count.items():
        count = (item == 'start') * label_count['start']
        start_prob[item] = np.log((count + add_k) / (label_count['start'] + add_k * len(list(label_count.keys()))))

    zero_prob_emit = {}
    for label, count in label_count.items():
        zero_prob_emit[label] = np.log(add_k / (label_count[label] + add_k*(vocab_size+3)))
    zero_prob_emit['start'] = float('-inf')
    zero_prob_emit['end'] = float('-inf')

    zero_prob_trans = {}
    for label, count in label_count.items():
        zero_prob_trans[label] = np.log(add_k / (label_count[label] + add_k*len(label_count)))
    zero_prob_trans['end'] = float('-inf')

    states = list(label_count.keys())

    if final:
        best_path_test, _ = viterbi(list(test), states, start_prob, transition_prob, emition_prob, zero_prob_emit, zero_prob_trans)
        for i in range(len(test)):
            best_path_test[i] = states[best_path_test[i]]
        return best_path_test

    best_path_train, _ = viterbi(list(train[:,0]), states, start_prob, transition_prob, emition_prob, zero_prob_emit, zero_prob_trans)
    best_path_val, _ = viterbi(list(val[:,0]), states, start_prob, transition_prob, emition_prob, zero_prob_emit, zero_prob_trans)

    true = 0
    tot=0
    for i in range(len(best_path_train)):
        if train[i][1] != 'start' and train[i][1] != 'end':
            tot += 1
            true += train[i][1] == states[best_path_train[i]]
    train_acc = true/tot

    true = 0
    tot=0
    for i in range(len(best_path_val)):
        if val[i][1] != 'start' and val[i][1] != 'end':
            tot += 1
            true += val[i][1] == states[best_path_val[i]]
    val_acc = true/tot

    return train_acc, val_acc

# Evaluation

In [None]:
for i in [0, -1, -2, -3, -4, -5, -8]:
    print('train and validation accuracy for k = ', 10**i, ' : ', run(10**i))

train and validation accuracy for k =  1  :  (0.9477778547618497, 0.9254818787226093)
train and validation accuracy for k =  0.1  :  (0.9568465784429202, 0.9123607112244225)
train and validation accuracy for k =  0.01  :  (0.9582284425352394, 0.9145750456053979)
train and validation accuracy for k =  0.001  :  (0.9585825692664188, 0.9179542407525441)
train and validation accuracy for k =  0.0001  :  (0.9586172120988168, 0.9189707466098156)
train and validation accuracy for k =  1e-05  :  (0.9586210613024165, 0.9192454779225917)
train and validation accuracy for k =  1e-08  :  (0.9586210613024165, 0.9192949295588914)


In [None]:
test_result = run(1, True)

In [None]:
test_pos = []
for i in range(len(test)):
    if test[i] != '<s>' and test[i] != '</s>':
        test_pos.append(test_result[i])

In [None]:
path = 'y_test.txt'
with open(path, 'w') as file:
    for item in test_pos:
        file.write(str(item) + '\n')