In [188]:
#importing 
import re
import string
import os
from collections import Counter
import json
import random
import math

In [189]:
class TextTokenizer:
    def __init__(self):
        self.mailid_regex = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
        self.url_regex = r'https?://\S+|www\.\S+'
        self.mention_regex = r'@\w+'
        self.hashtag_regex = r'#\w+'
        self.number_regex = r'\b\d+\b'
        self.word_regex = r'\w+'
#         self.punctuation_regex = r'[^\w\s]'
        self.sentence_regex = r'(?<=\.|\?|\s)\s+'

    def split_sentences(self, text):
        sentenced_text = re.split(self.sentence_regex, text)
        return sentenced_text

    def sentence_tokenizer(self, sentence):
        # Replacing mails
        sentence = re.sub(self.mailid_regex, "<MAILID>", sentence)

        # Replacing URLs
        sentence = re.sub(self.url_regex, "<URL>", sentence)

        # Replacing mentions
        sentence = re.sub(self.mention_regex, "<MENTION>", sentence)

        # Replacing hashtags
        sentence = re.sub(self.hashtag_regex, "<HASHTAG>", sentence)

        # Replacing numbers
        sentence = re.sub(self.number_regex, "<NUM>", sentence)
#         print(sentence)
#         Word tokenizer
        words = re.findall(self.word_regex, sentence)
#         print(words)
        # Append End of sentence and start of sentence to the list of words
        tokenized_sentence = ["<sos>"] + words + ["<eos>"]  
        
        
#         # Word tokenizer including punctuation
#         words_with_punctuation = re.findall(r'\S+|\s+', sentence)


#         # Remove leading and trailing whitespaces
#         words_with_punctuation = [word.strip() for word in words_with_punctuation]

#         # Append End of sentence and start of sentence to the list of words
#         tokenized_sentence = ["<sos>"] + words_with_punctuation + ["<eos>"]

        return tokenized_sentence

    def tokenizer(self, text):
        # Split the given text into sentences.
        sentenced_text = self.split_sentences(text)
#         print(sentenced_text)
        # Final tokenized tokens.
        tokenized_sentences = []

        for sentence in sentenced_text:
            tokenized_sentences.append(self.sentence_tokenizer(sentence))

        return tokenized_sentences

In [190]:
def BuildNGram(n: int, txt: str, tokenizer):
    nGramContext = {}  # dictionaries to store N-gram context
    nGramCounter = {}  # dictionaries to store N-gram count

    # Tokenizing the input text
    sentences = tokenizer.tokenizer(txt)

    for sentence in sentences:
        sentence_lower = [word.lower() for word in sentence]
#         print(n)
        if( n - 2 > 0 ):
            sentence = ["<sos>"] * (n - 2) + sentence_lower
        else:
            sentence = sentence_lower

        # Tokenize the sentence
        tokens = sentence
#         print(tokens)
        # Iterate through N-grams in the sentence
        for i in range(len(tokens) - n + 1):
            context = " ".join(tokens[i:i + n - 1])
            target_word = tokens[i + n - 1]
            total_ngram = context + " " + target_word if context else target_word

            # Update N-gram counters
            if total_ngram in nGramCounter:
                nGramCounter[total_ngram] += 1
            else:
                nGramCounter[total_ngram] = 1

                # Update N-gram context
                if context in nGramContext:
                    nGramContext[context].append(target_word)
                else:
                    nGramContext[context] = [target_word]

    return nGramContext, nGramCounter

In [191]:
class GoodTuringSmoothing:
    def __init__(self, ngram_counter):
        self.ngram_counter = ngram_counter
        self.total_ngrams = float(sum(ngram_counter.values()))
        self.adjusted_counts = self.calculate_adjusted_counts()

    def count_of_counts(self):
        count_of_counts_dict = {}
        for count in self.ngram_counter.values():
            count_of_counts_dict[count] = count_of_counts_dict.get(count, 0) + 1
        return count_of_counts_dict

    def calculate_adjusted_counts(self):
        adjusted_counts = {}
        count_of_counts_dict = self.count_of_counts()

        for count in self.ngram_counter.values():
            Nc_plus_1 = count_of_counts_dict.get(count + 1, 1)
            Nc = count_of_counts_dict.get(count, 1)
            c_star = (count + 1) * (Nc_plus_1 / Nc) 
            adjusted_counts[count] = c_star

        return adjusted_counts

    def smooth_ngram_probs(self):
        smoothed_ngram_probs = {}
        
        for ngram, count in self.ngram_counter.items():
            c_star = self.adjusted_counts.get(count, 1)
            probability = c_star / self.total_ngrams
            smoothed_ngram_probs[ngram] = probability

        # Assigning the probability for unseen N-grams
        unseen_ngram_prob = 1 / self.total_ngrams
        smoothed_ngram_probs[None] = unseen_ngram_prob

        return smoothed_ngram_probs
    
    def get_probability(self,trigram):
        prob = 0;
        return prob


In [234]:
class LinearInterpolationSmoothing:
    def __init__(self, trigram_counts, bigram_counts, unigram_counts, is_train=False, lambdas=None):
        self.trigram_counts = trigram_counts
        self.bigram_counts = bigram_counts
        self.unigram_counts = unigram_counts
        total_trigrams = 0
        total_bigrams = 0
        total_unigrams = 0
        self.is_train = is_train
        self.lambdas = lambdas

    def update_lambdas(self):
        updated_lambdas = {f'lambda{i}': self.lambdas[f'lambda{i}'] for i in range(1, 4)}
        total_unigram = float(sum(self.unigram_counts.values()))

        for trigram, trigram_count in self.trigram_counts.items():
            t1, t2, t3 = trigram.split()

            # Check if the frequency of the trigram is greater than 0
            if trigram_count > 0:
                # Calculate the three conditions
                condition1 = (trigram_count - 1) / max(1, self.bigram_counts.get(f'{t1} {t2}', 0) - 1)
                condition2 = (self.bigram_counts.get(f'{t2} {t3}', 0) - 1) / max(1, self.unigram_counts.get(t2, 0) - 1)
                condition3 = (self.unigram_counts.get(t3, 0) - 1) / max(1, total_unigram - 1)

                # Find the index of the maximum condition
                max_condition_index = max(enumerate([condition1, condition2, condition3]), key=lambda x: x[1])[0]

                # Update lambdas based on the maximum condition
                updated_lambdas[f'lambda{max_condition_index + 1}'] += trigram_count

        # Normalize the lambdas to ensure they sum to 1
        lambda_sum = sum(updated_lambdas.values())
        normalized_lambdas = {key: value / lambda_sum for key, value in updated_lambdas.items()}

        return normalized_lambdas

    def linear_interpolation_smoothing(self):
        self.total_trigrams = sum(self.trigram_counts.values())
        self.total_bigrams = sum(self.bigram_counts.values())
        self.total_unigrams = sum(self.unigram_counts.values())

        interpolated_probs = {}

        if self.is_train:
            self.lambdas = {f'lambda{i}': 0.0 for i in range(1, 4)}
            self.lambdas = self.update_lambdas()

        for trigram, count in self.trigram_counts.items():
            t1, t2, t3 = trigram.split()

            # Unigram probability
            P1_t3 = self.unigram_counts.get(t3, 0) / self.total_unigrams

            # Bigram probability
            P2_t3 = self.bigram_counts.get(f"{t2} {t3}", 0) / self.total_bigrams

            # Trigram probability
            P3_t3 = count / self.total_trigrams

            # Interpolation weights
            lambda1 = self.lambdas.get('lambda1', 1/3.0)
            lambda2 = self.lambdas.get('lambda2', 1/3.0)
            lambda3 = self.lambdas.get('lambda3', 1/3.0)

            # Linear interpolation
            interpolated_prob = lambda1 * P1_t3 + lambda2 * P2_t3 + lambda3 * P3_t3

            interpolated_probs[trigram] = interpolated_prob

        if self.is_train:
            return interpolated_probs, self.lambdas
        else:
            return interpolated_probs
        
    def get_probability(self,trigram):
#         print(trigram)
        t1, t2, t3 = trigram.split()

        # Unigram probability
        P1_t3 = self.unigram_counts.get(t3, 0) / self.total_unigrams

        # Bigram probability
        P2_t3 = self.bigram_counts.get(f"{t2} {t3}", 0) / self.total_bigrams

        # Trigram probability
        P3_t3 = self.trigram_counts.get(trigram,0) / self.total_trigrams
#         print(P1_t3,P2_t3,P3_t3)
        # Interpolation weights
        lambda1 = self.lambdas.get('lambda1', 1/3.0)
        lambda2 = self.lambdas.get('lambda2', 1/3.0)
        lambda3 = self.lambdas.get('lambda3', 1/3.0)
#         print(lambda1,lambda2, lambda3)
        # Linear interpolation
        interpolated_prob = lambda1 * P1_t3 + lambda2 * P2_t3 + lambda3 * P3_t3
        if(interpolated_prob == 0):
            interpolated_prob = 0.0001
        return interpolated_prob



In [235]:
# # Example usage during training:
# trigram_counts_train = {'a b c': 5, 'b c d': 3, 'c d e': 2}
# bigram_counts_train = {'a b': 10, 'b c': 8, 'c d': 6}
# unigram_counts_train = {'a': 15, 'b': 12, 'c': 9, 'd': 7, 'e': 5}

# # Create an instance of LinearInterpolationSmoother during training
# smoother_train = LinearInterpolationSmoother(trigram_counts_train, bigram_counts_train, unigram_counts_train, is_train=True)
# interpolated_probs_train, updated_lambdas = smoother_train.linear_interpolation_smoothing()

# # Example usage during testing with updated lambdas:
# trigram_counts_test = {'x y z': 4, 'y z w': 2, 'z w v': 1}
# bigram_counts_test = {'x y': 8, 'y z': 6, 'z w': 4}
# unigram_counts_test = {'x': 10, 'y': 8, 'z': 6, 'w': 4, 'v': 2}

# # Create an instance of LinearInterpolationSmoother during testing and pass updated lambdas
# smoother_test = LinearInterpolationSmoother(trigram_counts_test, bigram_counts_test, unigram_counts_test, is_train=False, lambdas=updated_lambdas)
# interpolated_probs_test = smoother_test.linear_interpolation_smoothing()

# # Display the interpolated probabilities during training
# print("Training:")
# for trigram, prob in interpolated_probs_train.items():
#     print(f"{trigram}: {prob}")

# # Display the interpolated probabilities during testing
# print("\nTesting:")
# for trigram, prob in interpolated_probs_test.items():
#     print(f"{trigram}: {prob}")


# # Example usage:
# text = "This is a mohit sharma. han han a MOHIT Sharma."
# tokenizer = TextTokenizer()  # Assuming TextTokenizer is your tokenizer class
# nGramContext, nGramCounter = BuildNGram(n=3, txt=text, tokenizer=tokenizer)

# # Print results
# print("N-Gram Context:", nGramContext)
# print("N-Gram Counter:", nGramCounter)

In [236]:
def generate_ngram_model(N, corpus_path,tokenizer):

    # Read the corpus from the file
    with open(corpus_path, 'r', encoding='utf-8') as file:
        corpus = file.read()

    # Generate N-grams using the BuildNGram function
    nGramContext, nGramCounter = BuildNGram(N, corpus,tokenizer)

    return nGramContext, nGramCounter

In [237]:

class Model:
    def __init__(self, corpus_path, result_name="2022201060_LM1", n_gram_order=3, smoothing_type="g", lambdas=None):
        self.n_gram_order = n_gram_order
        self.smoothing_type = smoothing_type
        self.lambdas = lambdas
        self.corpus_path = corpus_path
        self.save_file_path = self._get_save_file_path(0)
        self.train_corpus_path = self._get_save_file_path(1)
        self.test_corpus_path = self._get_save_file_path(2)
        self.test_samples_count = 1000
        self.result_name = result_name
        self.nGramContext = None
        self.nGramCounter = None
        self.probs = None
        self.smoothing_instance = None  # Instance to hold the smoothing object
        self.tokenizer = TextTokenizer()  # Instance to hold the TextTokenizer object

    def _get_save_file_path(self, file_type):
        # Extract the base name of the corpus path
        corpus_name = os.path.basename(self.corpus_path) if self.corpus_path else "corpus_unknown.txt"

        if file_type == 0:
            return f"{self.n_gram_order}_{self.smoothing_type}_{corpus_name}.json"
        elif file_type == 1:
            return f"{self.n_gram_order}_{self.smoothing_type}_{corpus_name}_train.txt"
        elif file_type == 2:
            return f"{self.n_gram_order}_{self.smoothing_type}_{corpus_name}_test.txt"
        else:
            raise ValueError("Invalid file_type. Use 0 for save_file, 1 for train_corpus, and 2 for test_corpus.")
    def setup(self, corpus_path=None):
        # If corpus_path is not provided, use the path stored in self.corpus_path
        if corpus_path is None:
            corpus_path = self.corpus_path

        # Read the corpus from the file
        with open(corpus_path, 'r', encoding='utf-8') as file:
            corpus = file.read()

        # Split the corpus into sentences
        sentences = self.tokenizer.split_sentences(corpus)
        # Exclude sentences with zero words
        sentences = [sentence for sentence in sentences if len(sentence.split()) > 0]
#         for s in sentences:
#             print(s)
#             print("--------------------")
        # Randomly select 1000 sentences for testing
        selected_sentences = random.sample(sentences, min(self.test_samples_count, len(sentences)))

        # Write the selected sentences to the test corpus file
        with open(self.test_corpus_path, 'w', encoding='utf-8') as test_file:
            test_file.write("\n".join(selected_sentences))

        # Write the remaining sentences to the train corpus file
        remaining_sentences = [sentence for sentence in sentences if sentence not in selected_sentences]
        with open(self.train_corpus_path, 'w', encoding='utf-8') as train_file:
            train_file.write("\n".join(remaining_sentences))

#         print(f"Setup complete. Train corpus: {self.train_corpus_path}, Test corpus: {self.test_corpus_path}")

    def train(self, corpus_path = None):
        if(corpus_path):
            self.nGramContext, self.nGramCounter = generate_ngram_model(self.n_gram_order,corpus_path,self.tokenizer)
        else:   
            self.nGramContext, self.nGramCounter = generate_ngram_model(self.n_gram_order, self.corpus_path,self.tokenizer)
        
        if self.smoothing_type == "g":
            smoothing_instance = GoodTuringSmoothing(self.nGramCounter)
            self.probs = smoothing_instance.smooth_ngram_probs()
        elif self.smoothing_type == "i":
            if(corpus_path):
                bigram_context, bigram_counter = generate_ngram_model(2, corpus_path, self.tokenizer)
                unigram_context, unigram_counter = generate_ngram_model(1, corpus_path, self.tokenizer)
            else:    
                bigram_context, bigram_counter = generate_ngram_model(2, self.corpus_path,self.tokenizer)
                unigram_context, unigram_counter = generate_ngram_model(1, self.corpus_path,self.tokenizer)
            smoothing_instance = LinearInterpolationSmoothing(self.nGramCounter, bigram_counter, unigram_counter, is_train=True, lambdas=self.lambdas)
            self.probs, self.lambdas = smoothing_instance.linear_interpolation_smoothing()
        else:
            raise ValueError("Invalid smoothing type. Choose 'g' for Good-Turing or 'i' for Linear Interpolation.")

        # Store the smoothing instance for future reference
        self.smoothing_instance = smoothing_instance

    def save(self):
        # Save all necessary variables to a JSON file
        model_state = {
            'n_gram_order': self.n_gram_order,
            'probs': self.probs,
            'lambdas': self.lambdas,
            'nGramContext': self.nGramContext,
            'nGramCounter': self.nGramCounter,
        }

        with open(self.save_file_path, 'w') as file:
            json.dump(model_state, file)

    def load(self):
        # Load all necessary variables from a JSON file
        with open(self.save_file_path, 'r') as file:
            model_state = json.load(file, object_hook=self.json_object_hook)

        self.n_gram_order = model_state['n_gram_order']
        self.probs = model_state['probs']
        self.lambdas = model_state['lambdas']
        self.nGramContext = model_state['nGramContext']
        self.nGramCounter = model_state['nGramCounter']

    def json_object_hook(self, dct):
        # Replace the key 'null' with None during JSON deserialization
        return {key if key != 'null' else None: value for key, value in dct.items()}
    
    def calculate_probability(self, sentence):
        probability = 1.0
        tokenized_sentence = self.tokenizer.sentence_tokenizer(sentence)
        if( n_gram_order - 2 > 0 ):
            tokenized_sentence = ["<sos>"] * (n_gram_order - 2) + tokenized_sentence
        print(tokenized_sentence)
        total_words = len(tokenized_sentence) - 2
        for i in range(self.n_gram_order - 1, len(tokenized_sentence)):
            context = " ".join(tokenized_sentence[max(0, i - self.n_gram_order + 1): i ])
            target_word = tokenized_sentence[i]
            n_gram = f"{context} {target_word}" if context else target_word

            # Calculate log likelihood based on the model probabilities
#             print(n_gram)
            prob = self.smoothing_instance.get_probability(n_gram)
            print(n_gram, prob)
            probability*=prob
        p ,_ = self.perplexity(tokenized_sentence)
        print(p)
        return probability
    
    def perplexity(self, tokenized_sentence):
        total_words = len(tokenized_sentence) - 2  # Exclude <SOS> and <EOS>
        log_likelihood_sentence = 0.0
#         print(tokenized_sentence)
        for i in range(self.n_gram_order - 1, len(tokenized_sentence)):
            context = " ".join(tokenized_sentence[max(0, i - self.n_gram_order + 1): i ])
            target_word = tokenized_sentence[i]
            n_gram = f"{context} {target_word}" if context else target_word

            # Calculate log likelihood based on the model probabilities
#             print(n_gram)
            prob = self.smoothing_instance.get_probability(n_gram)
#             print(prob)
            log_likelihood_sentence += math.log(prob)

        perplexity_sentence = 2 ** (-log_likelihood_sentence / total_words)
        return perplexity_sentence, log_likelihood_sentence

    def evaluate_helper(self,result_file_path, corpus_path):
        # Read the corpus from the file
        with open(corpus_path, 'r', encoding='utf-8') as file:
            corpus = file.read()

        sentences = self.tokenizer.split_sentences(corpus)
        # Exclude sentences with zero words
#         sentences = [sentence for sentence in sentences if len(sentence.split()) > 0]
        
#         print(sentences)
        total_words = 0
        log_likelihood_sum = 0.0
        perplexity_scores = []
        
        with open(result_file_path, 'w', encoding='utf-8') as result_file:
            # Initialize average perplexity
            average_perplexity = None

            for sentence in sentences:
                tokenized_sentence = self.tokenizer.sentence_tokenizer(sentence)
                if len(tokenized_sentence) > self.n_gram_order:
                    perplexity_sentence, log_likelihood_sentence = self.perplexity(tokenized_sentence)
                    perplexity_scores.append(perplexity_sentence)
    #                 print(sentence , " : perplexity  score : ",perplexity_sentence )

                    # Write results to the file
                    result_file.write(f"{sentence}\t{perplexity_sentence}\n")

                    log_likelihood_sum += log_likelihood_sentence
                    total_words += len(sentence.split())

            if total_words >0:
            # Calculate average perplexity
                average_perplexity = 2 ** (-log_likelihood_sum / total_words)

            # Move the cursor to the beginning of the file
            result_file.seek(0)

            # Write average perplexity at the first line of the file
            result_file.write(f"avg_perplexity\t{average_perplexity}\n")
        return average_perplexity
        
    def evaluate(self, train=True, test=True):
        if train:
            corpus_path = self.train_corpus_path
            result_file_path = f"{self.result_name}_train-perplexity.txt"
            avg_perp = self.evaluate_helper(result_file_path,corpus_path)
            print("Average Perplexity on Train Set:",avg_perp)
        if test:
            corpus_path = self.test_corpus_path
            result_file_path = f"{self.result_name}_test-perplexity.txt"
            avg_perp = self.evaluate_helper(result_file_path,corpus_path)
            print("Average Perplexity on Test Set:",avg_perp)
        if (not train and not test ):
            raise ValueError("Either train or test flag should be True.")
        


In [238]:

# Example usage:
# corpus_path = "sample_corpus.txt"
# corpus_path = "./corpus/Pride and Prejudice - Jane Austen.txt"
corpus_path = "./corpus/Ulysses  James Joyce.txt"
result_name = "2022201060_LM1"
n_gram_order = 3
smoothing_type = "i"
lambdas = None

# Create an instance of the Model class
lm_model = Model(corpus_path,result_name, smoothing_type=smoothing_type)

# Setup and train the model
lm_model.setup()
# print("<------------------------------ || setup ||------------------------------------->")
# lm_model.train(corpus_path)
lm_model.train()
# print("<------------------------------ || train ||------------------------------------->")

# Save the model
# lm_model.save()

# Load the model
# lm_model.load()
sentence = "This eBook is for the use of anyone anywhere in the United States and most other parts of the world at no cost and with almost no restrictions whatsoever."
# lm_model.calculate_probability(sentence)
# Evaluate on the test set
lm_model.evaluate()



Average Perplexity on Train Set: 321.30966537090006
Average Perplexity on Test Set: 310.29411892895706


In [None]:
# Prompt for user input
input_sentence ="This eBook is for the use of"

# Get the last 2 word in the input sentence as the prefix
prefix = input_sentence.split()[-1] if input_sentence else ""

# Generate k candidates for the next word
candidates = lm_model.generate_candidates(prefix, k)

# Print the output
print("output:")
for candidate, probability in candidates:
    print(f"{candidate} {probability}")
