In [6]:
import math
import copy

current_path = '/content/drive/MyDrive/Projects/NLP_HW1/'

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


#### Part 2

##### Read text

In [None]:
def get_text(path):
    """ Read text from the input path and return lines of it """
    f = open(path, 'r', encoding='UTF-8')
    lines = f.readlines()
    f.close()
    return lines

train_lines = get_text(current_path + 'datasets/train.txt')

##### create unigram LM and bigram LM

In [None]:
def update_LM(k_words, k_gram):
    """ Update dictionary of LM """
    if k_words in k_gram.keys():
        k_gram[k_words] += 1
    else:
        k_gram[k_words] = 1
    return k_gram

def create_LM(lines):
    """ Create LM for unigram and bigram """
    unigram_LM = {}
    bigram_LM = {}
    for line in lines:
        words = line.strip().split()
        unigram_LM = update_LM(words[0], unigram_LM)
        for i in range(1, len(words)):
            biword = ' '.join(words[i-1: i+1])
            uniword = words[i]
            bigram_LM = update_LM(biword, bigram_LM)
            unigram_LM = update_LM(uniword, unigram_LM)
    return unigram_LM, bigram_LM

def get_all_B(bigram_LM):
    """ Calculate B variable in smoothing folrmula """
    all_B = {}
    for biword in bigram_LM.keys():
        word = biword.split()[0]
        if word in all_B:
            all_B[word] += 1
        else:
            all_B[word] = 1
    return all_B

In [None]:
unigram_LM, bigram_LM = create_LM(train_lines)
words_size = sum(unigram_LM.values())
vocab_size = len(unigram_LM.keys())
all_B = get_all_B(bigram_LM)

##### absolute discount smoothsing

In [None]:
def unigram_smoothing(unigram_LM, delta):
    """ Calculate probability of all uniwords and smooth it """
    smoothed_unigram = copy.copy(unigram_LM)
    for uniword in unigram_LM.keys():
        alpha = (delta/words_size)*vocab_size
        smoothed_unigram[uniword] = (max(unigram_LM[uniword] - delta, 0))/words_size + alpha*(1/vocab_size)
    return smoothed_unigram
        
def bigram_smoothing(bigram_LM, unigram_LM, delta):
    """ Calculate probability of all biiwords and smooth it """
    smoothed_bigram = copy.copy(bigram_LM)
    global all_B
    for biword in bigram_LM.keys():
        words = biword.split()
        B = all_B[words[0]]
        alpha1 = (delta/unigram_LM[words[0]])*B
        alpha2 = (delta/words_size)*vocab_size
        smoothed_bigram[biword] = (max(bigram_LM[biword] - delta, 0))/unigram_LM[words[0]] + \
                                                                alpha1*((max(unigram_LM[words[1]] - delta, 0))/words_size + \
                                                                           alpha2*(1/vocab_size))
    return smoothed_bigram

def get_prob(delta, n_gram, unigram_LM, bigram_LM=None):
    """ Calculate prbability of the input ngram """
    global all_B
    if bigram_LM:
        if n_gram in bigram_LM.keys():
            return bigram_LM[n_gram]
        else:
            w2 = n_gram.split()[1]
            if w2 in unigram_LM.keys():
                return unigram_LM[w2]
            else:
                alpha = (delta/words_size)*vocab_size
                return alpha*(1/vocab_size)                
    else:
        if n_gram in unigram_LM.keys():
            return unigram_LM[n_gram]
        else:
            alpha = (delta/words_size)*vocab_size
            return alpha*(1/vocab_size)

##### Calculate perplexity

In [None]:
def get_perplexity(delta, n_gram, unigram_LM, bigram_LM=None):
    """ Calculate perplexity of the input ngram. If bigram_LM is None just unigram is used in perplexity """
    prob = 0
    for i in range(1, len(n_gram)):
        if bigram_LM:
            prob += math.log(get_prob(delta, ' '.join(n_gram[i-1:i+1]), unigram_LM, bigram_LM))
        else:
            prob += math.log(get_prob(delta, n_gram[i], unigram_LM))
            
    return prob * (-1/len(n_gram))

def evaluate_LM(path, delta, unigram_LM, bigram_LM=None):
    """ Evaluate LM by calculating perplexity """
    val_lines = get_text(path)    
    all_perplexity = []
    for line in val_lines:
        if bigram_LM:
            perplexity = get_perplexity(delta, line.split(), unigram_LM, bigram_LM)
        else:
            perplexity = get_perplexity(delta, line.split(), unigram_LM)
        all_perplexity.append(math.e ** perplexity)
    avg_perplexity = sum(all_perplexity)/len(all_perplexity)
    return avg_perplexity


##### Find best delta

In [None]:
min_unigram_perplexity = math.inf
best_unigram_delta = None
min_bigram_perplexity = math.inf
best_bigram_delta = None
for i in range(5, 100, 5):   ## Change delta from 0 to 1 by 0.05 step
    delta = round(0.01*i, 2)
    smoothed_unigram_LM = unigram_smoothing(unigram_LM, delta)
    smoothed_bigram_LM = bigram_smoothing(bigram_LM, unigram_LM, delta)
    unigram_perplexity = evaluate_LM(current_path + 'datasets/valid.txt', delta, smoothed_unigram_LM)
    bigram_perplexity = evaluate_LM(current_path + 'datasets/valid.txt', delta, smoothed_unigram_LM, smoothed_bigram_LM)
    print('delta: ' + str(delta))
    print('\t unigram perplexity: ' + str(unigram_perplexity))
    print('\t bigram perplexity: ' + str(bigram_perplexity))
    if min_unigram_perplexity > unigram_perplexity:
        min_unigram_perplexity = unigram_perplexity
        best_unigram_delta = delta
    if min_bigram_perplexity > bigram_perplexity:
        min_bigram_perplexity = bigram_perplexity
        best_bigram_delta = delta

print("#########################")
print('best results in validation:')
print('unigram:')
print('\t best delta: ' + str(best_unigram_delta))
print('\t perplexity: ' + str(min_unigram_perplexity))
print('bigram:')
print('\t best delta: ' + str(best_bigram_delta))
print('\t perplexity: ' + str(min_bigram_perplexity))

smoothed_unigram_LM = unigram_smoothing(unigram_LM, best_unigram_delta)
smoothed_bigram_LM = bigram_smoothing(bigram_LM, unigram_LM, best_bigram_delta)
test_unigram_perplexity = evaluate_LM(current_path + 'datasets/test.txt', best_unigram_delta, smoothed_unigram_LM)
test_bigram_perplexity = evaluate_LM(current_path + 'datasets/test.txt', best_unigram_delta, smoothed_unigram_LM, smoothed_bigram_LM)
print("#########################")
print('best results in test:')
print('unigram:')
print('\t best delta: ' + str(best_unigram_delta))
print('\t perplexity: ' + str(test_unigram_perplexity))
print('bigram:')
print('\t best delta: ' + str(best_bigram_delta))
print('\t perplexity: ' + str(test_bigram_perplexity))

delta: 0.05
	 unigram perplexity: 1360.7482165830145
	 bigram perplexity: 896.5963299880291
delta: 0.1
	 unigram perplexity: 1255.3646534193872
	 bigram perplexity: 812.9647001352185
delta: 0.15
	 unigram perplexity: 1204.0645156351343
	 bigram perplexity: 773.4331753816832
delta: 0.2
	 unigram perplexity: 1171.4896572487612
	 bigram perplexity: 749.1201087894766
delta: 0.25
	 unigram perplexity: 1148.1562951505282
	 bigram perplexity: 732.3567453005434
delta: 0.3
	 unigram perplexity: 1130.2369272973428
	 bigram perplexity: 720.0751862088389
delta: 0.35
	 unigram perplexity: 1115.8340926881492
	 bigram perplexity: 710.7718366609752
delta: 0.4
	 unigram perplexity: 1103.8795086957416
	 bigram perplexity: 703.6156941274498
delta: 0.45
	 unigram perplexity: 1093.7168573124022
	 bigram perplexity: 698.1127853932402
delta: 0.5
	 unigram perplexity: 1084.9163131460652
	 bigram perplexity: 693.9589784069473
delta: 0.55
	 unigram perplexity: 1077.182274391375
	 bigram perplexity: 690.96917499

### Part 3

##### Test LM by predicting next word of incomplited sentences

In [None]:
def predict_next_words(n_gram, n_next_words, unigram_LM, bigram_LM):
    """ Predict next word of the input ngram using LM. If bigram_LM is None unigram LM is used else bigram LM """
    if bigram_LM:
        for i in range(n_next_words):
            condidates = [w for w in bigram_LM.keys() if w.split()[0]==n_gram[-1]]
            best_condidate = max(condidates, key=lambda x:bigram_LM[x])
            n_gram.append(best_condidate.split()[1])
        return ' '.join(n_gram)
    else:
        for i in range(n_next_words):
            best_condidate = max(unigram_LM, key=lambda x:unigram_LM[x])
            n_gram.append(best_condidate)
        return ' '.join(n_gram)       

def complete_text_by_LM(unigram_LM, bigram_LM=None):
    """ Test LM by predicting next word of all incomplete sentences of test file """
    f = open(current_path + 'datasets/test_incomplete.txt', 'r', encoding='UTF-8')
    lines = f.readlines()
    f.close()
    f = open(current_path + 'datasets/test_incomplete_gold.txt', 'r', encoding='UTF-8')
    gold_lines = f.readlines()
    f.close()
    for i, line in enumerate(lines):
        print('{0} test {1}:'.format('bigram' if bigram_LM else 'unigram', i+1))
        print('incompleted: ' + line.split('###')[1].strip())
        parts = line.strip().split('###')
        n_incomplite = int(parts[0])
        n_gram = parts[1].split()
        complite_text = predict_next_words(n_gram, n_incomplite, unigram_LM, bigram_LM)
        print('completed: ' + gold_lines[i].strip())
        print('predicted: ' + complite_text+ '\n')
        
complete_text_by_LM(smoothed_unigram_LM)  ### unigram LM test

unigram test 1:
incompleted: این سخن حقست اگر نزد سخن گستر
completed: این سخن حقست اگر نزد سخن گستر برند
predicted: این سخن حقست اگر نزد سخن گستر و

unigram test 2:
incompleted: آنکه با یوسف صدیق چنین خواهد
completed: آنکه با یوسف صدیق چنین خواهد کرد
predicted: آنکه با یوسف صدیق چنین خواهد و

unigram test 3:
incompleted: هیچ دانی چکند صحبت او با
completed: هیچ دانی چکند صحبت او با دگران
predicted: هیچ دانی چکند صحبت او با و

unigram test 4:
incompleted: سرمه دهی بصر بری سخت خوش است
completed: سرمه دهی بصر بری سخت خوش است تاجری
predicted: سرمه دهی بصر بری سخت خوش است و

unigram test 5:
incompleted: آتش ابراهیم را
completed: آتش ابراهیم را نبود زیان
predicted: آتش ابراهیم را و و

unigram test 6:
incompleted: من که اندر سر
completed: من که اندر سر جنونی داشتم
predicted: من که اندر سر و و

unigram test 7:
incompleted: هر شیر شرزه را که به نیش
completed: هر شیر شرزه را که به نیش سنان گزید
predicted: هر شیر شرزه را که به نیش و و

unigram test 8:
incompleted: هرکه از حق به
completed: هرکه از 

In [None]:
complete_text_by_LM(smoothed_unigram_LM, smoothed_bigram_LM)  ### bigram LM test

bigram test 1:
incompleted: این سخن حقست اگر نزد سخن گستر
completed: این سخن حقست اگر نزد سخن گستر برند
predicted: این سخن حقست اگر نزد سخن گستر و

bigram test 2:
incompleted: آنکه با یوسف صدیق چنین خواهد
completed: آنکه با یوسف صدیق چنین خواهد کرد
predicted: آنکه با یوسف صدیق چنین خواهد کرد

bigram test 3:
incompleted: هیچ دانی چکند صحبت او با
completed: هیچ دانی چکند صحبت او با دگران
predicted: هیچ دانی چکند صحبت او با تو

bigram test 4:
incompleted: سرمه دهی بصر بری سخت خوش است
completed: سرمه دهی بصر بری سخت خوش است تاجری
predicted: سرمه دهی بصر بری سخت خوش است و

bigram test 5:
incompleted: آتش ابراهیم را
completed: آتش ابراهیم را نبود زیان
predicted: آتش ابراهیم را به دست

bigram test 6:
incompleted: من که اندر سر
completed: من که اندر سر جنونی داشتم
predicted: من که اندر سر و از

bigram test 7:
incompleted: هر شیر شرزه را که به نیش
completed: هر شیر شرزه را که به نیش سنان گزید
predicted: هر شیر شرزه را که به نیش و از

bigram test 8:
incompleted: هرکه از حق به
completed: هرکه از 

#### Part 4

##### Import liberaries

In [12]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
from keras.layers.embeddings import Embedding
from keras.preprocessing.text import one_hot

import numpy as np
import pickle

##### Extract bigrams and trigrams from text

In [8]:
def get_ngram(path, n):
    """ Read text file of the input path and return all ngrams and next word of ngrams as labels """
    f = open(path, 'r', encoding='UTF-8')
    lines = f.readlines()
    f.close()
    n_grams = []
    labels = []
    # unique_ngrams = set()
    for line in lines:
        words = line.split()
        for i in range(n, len(words)):
            # if ' '.join(words[i-n:i+]) not in unique_ngrams:
                # unique_ngrams.add(' '.join(words[i-n:i+1]))
            n_grams.append(words[i-n:i])            
            labels.append(words[i])
    
    return n_grams, labels

bigrams, bigram_labels = get_ngram(current_path + 'datasets/train.txt', 2)
bigrams_val, bigram_labels_val = get_ngram(current_path + 'datasets/valid.txt', 2)
trigrams, trigram_labels = get_ngram(current_path + 'datasets/train.txt', 3)
trigrams_val, trigram_labels_val = get_ngram(current_path + 'datasets/valid.txt', 3)

##### Encode ngrams to vectors

In [9]:
def get_vocabs(path):
    """ Extract all vocabularies of tetx file of the input path """
    f = open(path, 'r', encoding='UTF-8')
    lines = f.readlines()
    f.close()
    vocabs = []
    for line in lines:
        vocabs.extend(line.split())
    return list(set(vocabs))

def encode_ngram(n_grams, word2id):
    """ Encode ngrams of the input using word2id dictionary """
    encoded_ngrams = []
    for n_gram in n_grams:
        if type(n_gram) == str:
            if n_gram in word2id.keys():
                encoded_ngrams.append(word2id[n_gram])
            else: 
                encoded_ngrams.append(word2id[''])
        else:
            encoded_ngram = []
            for w in n_gram:
                if w in word2id.keys():
                    encoded_ngram.append(word2id[w])
                else:
                    encoded_ngram.append(word2id[''])
            encoded_ngrams.append(encoded_ngram)
    return np.array(encoded_ngrams)    
        
vocabs = get_vocabs(current_path + 'datasets/train.txt')
word2id = {w:i+1 for i, w in enumerate(vocabs)}  ## assign a unique number to any words of vocabs
word2id[''] = 0
with open(current_path + 'word2id.pkl', 'wb') as f:
    pickle.dump(word2id, f, protocol=pickle.HIGHEST_PROTOCOL)

encoded_bigrams = encode_ngram(bigrams, word2id)
encoded_bigram_labels = encode_ngram(bigram_labels, word2id)
encoded_bigrams_val = encode_ngram(bigrams_val, word2id)
encoded_bigram_labels_val = encode_ngram(bigram_labels_val, word2id)

encoded_trigrams = encode_ngram(trigrams, word2id)
encoded_trigrams_labels = encode_ngram(trigram_labels, word2id)
encoded_trigrams_val = encode_ngram(trigrams_val, word2id)
encoded_trigrams_labels_val = encode_ngram(trigram_labels_val, word2id)

##### Create keras model

In [10]:
def get_model(word2id, n_gram):
    vocab_size = len(word2id)
    model = Sequential()
    model.add(Embedding(vocab_size, 64, input_length=len(n_gram[0])))
    model.add(Flatten())
    model.add(Dense(256))
    model.add(Dense(vocab_size, activation='softmax'))
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

bigram_model = get_model(word2id, bigrams)
trigram_model = get_model(word2id, trigrams)

##### Train bigram model then save it

In [11]:
bigram_model.fit(encoded_bigrams, encoded_bigram_labels, batch_size=1024, epochs=7, shuffle=True, validation_data=(encoded_bigrams_val, encoded_bigram_labels_val))
# save the model to file
bigram_model.save(current_path + 'bigram_model.h5')

Epoch 1/7
Epoch 2/7
Epoch 3/7
Epoch 4/7
Epoch 5/7
Epoch 6/7
Epoch 7/7


##### Train trigram model then save it

In [13]:
trigram_model.fit(encoded_trigrams, encoded_trigrams_labels, batch_size=1024, epochs=7, validation_data=(encoded_trigrams_val, encoded_trigrams_labels_val))
# save the model to file
trigram_model.save(current_path + 'trigram_model.h5')

Epoch 1/7
Epoch 2/7
Epoch 3/7
Epoch 4/7
Epoch 5/7
Epoch 6/7
Epoch 7/7


##### Load saved models 

In [None]:
bigram_model.load_weights(current_path + 'bigram_model.h5')
trigram_model.load_weights(current_path + 'trigram_model.h5')

In [None]:
from keras.models import load_model
bigram_model = load_model(current_path + 'bigram_model.h5')
trigram_model = load_model(current_path + 'trigram_model.h5')

with open('word2id.pkl', 'rb') as f:
    word2id = pickle.load(f)

##### Calculate perplexity for neural LM model

In [15]:
import math
def neural_perplexity(path, model, word2id, model_type):
    id2word = {v:k for k,v in word2id.items()}
    f = open(path, 'r', encoding='UTF-8')
    lines = f.readlines()
    f.close()
    all_perplexity = []
    for line in lines:
        words = line.split()
        if model_type == 'bigram':
            k = 2
        else:
            k = 3

        ngrams = []
        labels = []
        for i in range(k, len(words)):
            n_gram = encode_ngram(words[i-k:i], word2id)
            label = encode_ngram([words[i]], word2id)[0]
            ngrams.append(n_gram)
            labels.append(label)
        preds = model.predict(np.array(ngrams))
        perplexity = 0
        for i in range(len(preds)):
            perplexity += math.log(preds[i][labels[i]])
                
        perplexity = perplexity * (-1/len(words[k:]))
        all_perplexity.append(perplexity)
    
    avg_perplexity = sum(all_perplexity)/len(all_perplexity)
    perplexity = math.e ** avg_perplexity
    print('perplexity of neural {} LM: '.format(model_type) + str(perplexity))

neural_perplexity(current_path + 'datasets/test.txt', bigram_model, word2id, 'bigram')
neural_perplexity(current_path + 'datasets/test.txt', trigram_model, word2id, 'trigram')

perplexity of neural bigram LM: 547.7307027241197
perplexity of neural trigram LM: 473.6301467049531


##### Test LM by predcting next words of test data

In [None]:
def test_neural_LM(model, word2id, model_type):
    id2word = {v:k for k,v in word2id.items()}
    f = open(current_path + 'datasets/test_incomplete.txt', 'r', encoding='UTF-8')
    lines = f.readlines()
    f.close()
    f = open(current_path + 'datasets/test_incomplete_gold.txt', 'r', encoding='UTF-8')
    gold_lines = f.readlines()
    f.close()
    for i, line in enumerate(lines):
        print('{0} test {1}:'.format(model_type, i+1))
        print('incompleted: ' + line.split('###')[1].strip())
        parts = line.strip().split('###')
        n_incomplite = int(parts[0])
        complite_text = parts[1]
        for _ in range(n_incomplite):
            if model_type == 'bigram':
                n_gram = complite_text.split()[-2:]
            else:
                n_gram = complite_text.split()[-3:]
            n_gram = encode_ngram(n_gram, word2id)
            pred = model.predict(np.array([n_gram]))[0]
            next_word = id2word[pred.argmax()]
            complite_text = complite_text + ' ' + next_word
        print('completed: ' + gold_lines[i].strip())
        print('predicted: ' + complite_text + '\n')

test_neural_LM(bigram_model, word2id, 'bigram')

In [None]:
test_neural_LM(trigram_model, word2id, 'trigram')

trigram test 1:
incompleted: این سخن حقست اگر نزد سخن گستر
completed: این سخن حقست اگر نزد سخن گستر برند
predicted: این سخن حقست اگر نزد سخن گستر بود

trigram test 2:
incompleted: آنکه با یوسف صدیق چنین خواهد
completed: آنکه با یوسف صدیق چنین خواهد کرد
predicted: آنکه با یوسف صدیق چنین خواهد کرد

trigram test 3:
incompleted: هیچ دانی چکند صحبت او با
completed: هیچ دانی چکند صحبت او با دگران
predicted: هیچ دانی چکند صحبت او با دل

trigram test 4:
incompleted: سرمه دهی بصر بری سخت خوش است
completed: سرمه دهی بصر بری سخت خوش است تاجری
predicted: سرمه دهی بصر بری سخت خوش است و

trigram test 5:
incompleted: آتش ابراهیم را
completed: آتش ابراهیم را نبود زیان
predicted: آتش ابراهیم را در جهان

trigram test 6:
incompleted: من که اندر سر
completed: من که اندر سر جنونی داشتم
predicted: من که اندر سر آن که

trigram test 7:
incompleted: هر شیر شرزه را که به نیش
completed: هر شیر شرزه را که به نیش سنان گزید
predicted: هر شیر شرزه را که به نیش می کند

trigram test 8:
incompleted: هرکه از حق به
compl