<a href="https://colab.research.google.com/github/vatsaaa/mtech/blob/main/semester_2/nlp/assignments/one/NLP_Group_58.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Group members
<table width="100%">
  <tr>
    <th width="25%">Name</th>
    <th width="40%">Email</th>
    <th width="20%">Student ID</th>
    <th width="15%">Contribution</th>
  </tr>
  <tr>
    <td>G. Ankur Vatsa</td>
    <td>2023aa05727@wilp.bits-pilani.ac.in</td>
    <td>2023aa05727</td>
    <td>100%</td>
  </tr>
  <tr>
    <td>Meet Soni</td>
    <td>2023aa05655@wilp.bits-pilani.ac.in</td>
    <td>2023aa05655</td>
    <td>100%</td>
  </tr>
  <tr>
    <td>Kinjal Bhoiwala</td>
    <td>2023aa05490@wilp.bits-pilani.ac.in</td>
    <td>2023aa05490</td>
    <td>100%</td>
  </tr>
  <tr>
    <td>Randhawane Santosh Baban</td>
    <td>2023aa05828@wilp.bits-pilani.ac.in</td>
    <td>2023aa05828</td>
    <td>100%</td>
  </tr>
</table>

In [None]:
!pip install nltk
import nltk
nltk.download('punkt')



[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

In [None]:
import nltk
from nltk.tokenize import word_tokenize
from nltk.probability import ConditionalFreqDist, FreqDist
import numpy as np
from re import sub
from collections import defaultdict
import logging

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def log_function_entry_exit(func):
    def wrapper(*args, **kwargs):
        logging.info(f"Entering {func.__name__}")
        try:
            result = func(*args, **kwargs)
            logging.info(f"Exiting {func.__name__} with result {result}")
            return result
        except Exception as e:
            logging.exception(f"Exception in {func.__name__}: {e}")
            raise
    return wrapper

In [None]:
@log_function_entry_exit
def preprocess_corpus(corpus):
    """
    Preprocesses the corpus text by cleaning, tokenizing, and lowercasing.

    Args:
    corpus: String containing the raw corpus text.

    Returns:
    A list of preprocessed tokens (words).
    """
    try:
        cleaned_text = sub(r'[^\w\s]', '', corpus)
        tokens = word_tokenize(cleaned_text.lower())
        logging.info(f"Preprocessed {len(tokens)} tokens.")
        return tokens
    except Exception as e:
        logging.exception(f"Error in preprocess_corpus: {e}")
        raise

In [None]:
@log_function_entry_exit
def build_bigram_model(tokens, smoothing_factor=1):
    """
    Builds a bigram model from the preprocessed tokens with Laplace smoothing.

    Args:
        tokens: A list of preprocessed tokens (words).
        smoothing_factor: The value to add for smoothing (default: 1).

    Returns:
        A ConditionalFreqDist object representing the bigram probabilities with smoothing.
    """
    logging.info("Starting build_bigram_model")
    try:
        bigrams = list(nltk.bigrams(tokens))
        bigram_counts = defaultdict(lambda: defaultdict(int))
        vocab = set(tokens)
        vocab_size = len(vocab)

        # Count bigrams with progress logging
        num_bigrams = len(bigrams)
        processed_bigrams = 0
        for i, (w1, w2) in enumerate(bigrams):
            bigram_counts[w1][w2] += 1
            processed_bigrams += 1
            if processed_bigrams % 10000 == 0:  # Log progress every 10000 bigrams
                logging.info(f"Processed {processed_bigrams}/{num_bigrams} bigrams ({processed_bigrams/num_bigrams:.2%})")

        # Apply smoothing and calculate probabilities
        bigram_probs = defaultdict(lambda: defaultdict(float))
        for w1 in bigram_counts:
            total_count = sum(bigram_counts[w1].values()) + (smoothing_factor * vocab_size)
            for w2 in vocab:
                bigram_probs[w1][w2] = (bigram_counts[w1][w2] + smoothing_factor) / total_count
                logging.debug(f"Calculated probability: [{w1} -> {w2}]: {bigram_probs[w1][w2]}")

        return bigram_probs, vocab_size
    except Exception as e:
        logging.exception(f"Error in build_bigram_model: {e}")
        raise
    finally:
        logging.info("Exiting build_bigram_model")

In [None]:
@log_function_entry_exit
def generate_sentence(bigram_model, start_word, max_length=20, unk_token="<UNK>"):
    """
    Generates a sentence using the bigram model.

    Args:
    bigram_model: A ConditionalFreqDist object representing the bigram probabilities.
    start_word: The starting word for the sentence.
    max_length: The maximum desired length of the sentence.
    unk_token: The token to use for unknown words (default: "<UNK>").

    Returns:
    A generated sentence as a list of words.
    """
    try:
        sentence = [start_word]
        current_word = start_word
        for _ in range(max_length):
            next_word_probs = bigram_model[current_word]
            if current_word not in bigram_model:
                next_word = unk_token
            else:
                next_word = next_word_probs.max()
            sentence.append(next_word)
            current_word = next_word
            if next_word in ['.', '!', '?']:
                break
        return sentence
    except Exception as e:
        logging.exception(f"Error in generate_sentence: {e}")
        raise

In [None]:
@log_function_entry_exit
def evaluate_test_set(bigram_probs, test_set, vocab_size):
    """
    Evaluates the model on a test set by calculating average and standard deviation
    of sentence probabilities.

    Args:
    bigram_probs: A dictionary representing the bigram probabilities.
    test_set: A list of preprocessed test sentences.
    vocab_size: The size of the vocabulary.

    Returns:
    A tuple containing the average and standard deviation of sentence probabilities.
    """
    try:
        sentence_probs = []
        for sentence in test_set:
            if len(sentence) < 2:
                logging.info("Skipping sentence with less than 2 words")
                continue

            sentence_prob = 1.0
            for i in range(1, len(sentence)):
                second_word = sentence[i]
                first_word = sentence[i - 1]
                logging.debug(f"Evaluating bigram: [{first_word} -> {second_word}]")

                cond_prob = bigram_probs[first_word].get(second_word, 1 / vocab_size)
                sentence_prob *= cond_prob
                logging.debug(f"Conditional probability: {cond_prob}")

            sentence_probs.append(sentence_prob)
            logging.info(f"Sentence probability: {sentence_prob}")

        avg_prob = np.mean(sentence_probs)
        std_dev = np.std(sentence_probs)

        return avg_prob, std_dev
    except Exception as e:
        logging.exception(f"Error in evaluate_test_set: {e}")
        raise

In [None]:
@log_function_entry_exit
def build_trigram_model(tokens, smoothing_factor=1):
    """
    Builds a trigram model from the preprocessed tokens with Laplace smoothing.

    Args:
    tokens: A list of preprocessed tokens (words).
    smoothing_factor: The value to add for smoothing (default: 1).

    Returns:
    A ConditionalFreqDist object representing the trigram probabilities with smoothing.
    """
    try:
        trigrams = list(nltk.trigrams(tokens))
        trigram_counts = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
        vocab = set(tokens)
        vocab_size = len(vocab)

        for w1, w2, w3 in trigrams:
            trigram_counts[w1][w2][w3] += 1
            logging.debug(f"Trigram count [{w1} -> {w2} -> {w3}]: {trigram_counts[w1][w2][w3]}")

        trigram_probs = defaultdict(lambda: defaultdict(lambda: defaultdict(float)))
        for w1 in trigram_counts:
            for w2 in trigram_counts[w1]:
                total_count = sum(trigram_counts[w1][w2].values()) + (smoothing_factor * vocab_size)
                for w3 in vocab:
                    trigram_probs[w1][w2][w3] = (trigram_counts[w1][w2][w3] + smoothing_factor) / total_count
                    logging.debug(f"Trigram probability [{w1} -> {w2} -> {w3}]: {trigram_probs[w1][w2][w3]}")

        return trigram_probs, vocab_size
    except Exception as e:
        logging.exception(f"Error in build_trigram_model: {e}")
        raise

In [None]:
def calculate_perplexity(bigram_probs, test_set, vocab_size):
  """
  Calculates the perplexity of the model on a test set.

  Args:
    bigram_probs: A dictionary representing the bigram probabilities.
    test_set: A list of preprocessed test sentences.
    vocab_size: The size of the vocabulary.

  Returns:
    The perplexity of the model on the test set.
  """
  try:
    total_prob = 1.0
    for sentence in test_set:
      if len(sentence) < 2:
        continue
      sentence_prob = 1.0
      for i in range(1, len(sentence)):
        second_word = sentence[i]
        first_word = sentence[i - 1]
        cond_prob = bigram_probs[first_word].get(second_word, 1 / vocab_size)
        sentence_prob *= cond_prob
      total_prob *= sentence_prob
    perplexity = (vocab_size ** (1.0 / len(words_in_test_set))) / total_prob
    return perplexity
  except Exception as e:
    logging.exception(f"Error in calculate_perplexity: {e}")
    raise

In [None]:
# Load English news corpus as a string
try:
    with open("/content/drive/MyDrive/Colab Notebooks/eng_news_2019_10K-sentences.txt", "r") as f:
        corpus = f.read()
    logging.info("Corpus loaded successfully.")
except Exception as e:
    logging.exception(f"Error reading corpus file: {e}")
    raise

In [None]:
# Preprocess the corpus
tokens = preprocess_corpus(corpus)

# Split the data into training and test sets
split_index = int(0.8 * len(tokens))
train_tokens = tokens[:split_index]
test_tokens = tokens[split_index:]
logging.info(f"Training set size: {len(train_tokens)}")
logging.info(f"Test set size: {len(test_tokens)}")

In [None]:
# Build the bigram model
bigram_probs, vocab_size = build_bigram_model(train_tokens)

In [None]:
# Generate 10 sentences
for i in range(10):
    try:
        generated_sentence = generate_sentence(bigram_probs, "the", vocab_size)
        logging.info(f"Generated sentence {i+1}: {' '.join(generated_sentence)}")
    except Exception as e:
        logging.exception(f"Error generating sentence: {e}")

In [None]:
# Evaluate the model on provided test set (replace with your test set)
test_set = [["the", "weather", "is", "sunny"], ["the", "economy", "is", "booming"]]
try:
    avg_prob, std_dev = evaluate_test_set(bigram_probs, test_set, vocab_size)
    logging.info(f"Average Probability (Provided Test Set): {avg_prob}")
    logging.info(f"Standard Deviation (Provided Test Set): {std_dev}")
except Exception as e:
    logging.exception(f"Error evaluating provided test set: {e}")

In [None]:
# Create your own curated test set
curated_test_set = [["artificial", "intelligence", "revolution"]]
try:
    avg_prob, std_dev = evaluate_test_set(bigram_probs, curated_test_set, vocab_size)
    logging.info(f"Average Probability (Curated Test Set): {avg_prob}")
    logging.info(f"Standard Deviation (Curated Test Set): {std_dev}")
except Exception as e:
    logging.exception(f"Error evaluating curated test set: {e}")