<a href="https://colab.research.google.com/github/shabahmd/Machine-Learning-Notebooks/blob/main/Broken_Chatbot_Seq2Seq.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!kaggle datasets download -d rajathmc/cornell-moviedialog-corpus  --unzip


In [None]:
import nltk
import pandas as pd
import numpy as np
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from keras.preprocessing.sequence import pad_sequences


In [None]:
print('hello world')

In [None]:
# Load dataset
with open('/kaggle/working/movie_lines.txt', encoding='utf-8', errors='replace') as f:
    lines = f.readlines()


In [None]:
import nltk
nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')


In [None]:
# Filepath: chatbot_seq2seq_nltk.py

import nltk
import re
from nltk.tokenize import word_tokenize
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer
import numpy as np

# Step 1: Download necessary NLTK data
nltk.download('punkt')

# Step 2: Load and preprocess the dataset
def load_and_preprocess_data(file_path):
    # Load the dataset
    with open('/kaggle/working/movie_lines.txt', 'r', encoding='utf-8', errors='ignore') as f:
        lines = f.readlines()

    # Extract conversations
    conversations = []
    for line in lines:
        parts = line.split(" +++$+++ ")
        if len(parts) == 5:
            conversations.append(parts[4].strip())

    return conversations

# Step 3: Clean the text using regex
def clean_text(text):
    text = text.lower()
    text = re.sub(r"i'm", "i am", text)
    text = re.sub(r"he's", "he is", text)
    text = re.sub(r"she's", "she is", text)
    text = re.sub(r"it's", "it is", text)
    text = re.sub(r"[^a-zA-Z?.!,]+", " ", text)
    return text.strip()

# Step 4: Tokenize and pad sequences
# Step 4: Tokenize and pad sequences, including <start> and <end> tokens
def preprocess_and_tokenize(conversations, max_length):
    # Clean and tokenize the conversations
    conversations_cleaned = [clean_text(conv) for conv in conversations]

    # Add <start> and <end> tokens to each sentence
    questions = ['<start> ' + q + ' <end>' for q in conversations_cleaned]
    answers = ['<start> ' + a + ' <end>' for a in conversations_cleaned]

    tokenizer = Tokenizer(filters='', lower=True)
    tokenizer.fit_on_texts(questions + answers)

    sequences_questions = tokenizer.texts_to_sequences(questions)
    sequences_answers = tokenizer.texts_to_sequences(answers)

    # Pad sequences
    questions_padded = pad_sequences(sequences_questions, maxlen=max_length, padding='post')
    answers_padded = pad_sequences(sequences_answers, maxlen=max_length, padding='post')

    return tokenizer, questions_padded, answers_padded, len(tokenizer.word_index) + 1


# Load, preprocess, and tokenize data
tokenizer, questions_padded, answers_padded, vocab_size = preprocess_and_tokenize(conversations, max_length=20)


In [None]:
# Filepath: chatbot_seq2seq_nltk.py

import tensorflow as tf

# Step 1: Define the Encoder
class Encoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, enc_units):
        super(Encoder, self).__init__()
        self.enc_units = enc_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.enc_units, return_sequences=True, return_state=True)

    def call(self, x):
        x = self.embedding(x)
        output, state = self.gru(x)
        return output, state

# Step 2: Define the Attention mechanism

class Attention(tf.keras.layers.Layer):
    def __init__(self, units):
        super(Attention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, query, values):
        # query: shape (batch_size, hidden_size)
        # values: shape (batch_size, max_len, hidden_size)

        # If the query is missing a batch dimension, expand it
        if len(query.shape) == 1:
            query = tf.expand_dims(query, 0)  # Add batch dimension to query: (1, hidden_size)

        # Expand query to have the time axis for broadcasting
        query_with_time_axis = tf.expand_dims(query, 1)  # (batch_size, 1, hidden_size)

        # Calculate score
        score = self.V(tf.nn.tanh(self.W1(query_with_time_axis) + self.W2(values)))

        # Apply softmax to get attention weights
        attention_weights = tf.nn.softmax(score, axis=1)  # (batch_size, max_len, 1)

        # Multiply attention weights with the values to get the context vector
        context_vector = attention_weights * values  # (batch_size, max_len, hidden_size)
        context_vector = tf.reduce_sum(context_vector, axis=1)  # (batch_size, hidden_size)

        return context_vector, attention_weights

# Step 3: Define the Decoder with Attention
# Define the Decoder with the updated attention mechanism
class Decoder(tf.keras.Model):
    def __init__(self, vocab_size, embedding_dim, dec_units):
        super(Decoder, self).__init__()
        self.dec_units = dec_units
        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.dec_units, return_sequences=True, return_state=True)
        self.fc = tf.keras.layers.Dense(vocab_size)
        self.attention = Attention(self.dec_units)

    def call(self, x, enc_output, hidden):
        context_vector, attention_weights = self.attention(hidden, enc_output)
        x = self.embedding(x)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
        output, state = self.gru(x)
        output = tf.reshape(output, (-1, output.shape[2]))
        x = self.fc(output)
        return x, state, attention_weights


# Instantiate the models
embedding_dim = 256
units = 512
encoder = Encoder(vocab_size, embedding_dim, units)
decoder = Decoder(vocab_size, embedding_dim, units)

# Step 4: Define the loss function and optimizer
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    return tf.reduce_mean(loss_)

optimizer = tf.keras.optimizers.Adam()

# Training step (simplified)
def train_step(input_seq, target_seq, encoder, decoder, batch_size):
    enc_output, enc_hidden = encoder(input_seq)
    dec_hidden = enc_hidden
    dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * batch_size, 1)

    with tf.GradientTape() as tape:
        for t in range(1, target_seq.shape[1]):
            predictions, dec_hidden, _ = decoder(dec_input, enc_output, dec_hidden)
            loss = loss_function(target_seq[:, t], predictions)
            dec_input = tf.expand_dims(target_seq[:, t], 1)

    variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(loss, variables)
    optimizer.apply_gradients(zip(gradients, variables))
    return loss


updated attention mechaninsg


In [None]:
# Inference: Generating responses
def generate_response(input_sentence):
    input_seq = tokenizer.texts_to_sequences([clean_text(input_sentence)])
    input_seq = pad_sequences(input_seq, maxlen=20, padding='post')

    enc_output, enc_hidden = encoder(input_seq)
    dec_hidden = enc_hidden

    # Start the sequence with <start> token
    dec_input = tf.expand_dims([tokenizer.word_index['<start>']], 0)

    result = ''
    for t in range(20):
        predictions, dec_hidden, _ = decoder(dec_input, enc_output, dec_hidden)
        predicted_id = tf.argmax(predictions[0]).numpy()

        # If we predict <end>, stop the generation
        if tokenizer.index_word[predicted_id] == '<end>':
            break

        # Add the predicted word to the result
        result += tokenizer.index_word[predicted_id] + ' '

        # Use the predicted word as the next decoder input
        dec_input = tf.expand_dims([predicted_id], 0)

    return result.strip()

# Example interaction
response = generate_response("You are broken!")
print("Chatbot response:", response)


In [None]:
response = generate_response("how are you?")

print("Chatbot response:", response)


In [None]:
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

# Function to evaluate the chatbot responses using BLEU score
def evaluate_bleu(reference, hypothesis):
    reference_tokens = [reference.split()]  # Tokenize the reference (ground truth)
    hypothesis_tokens = hypothesis.split()   # Tokenize the generated response

    # Apply BLEU scoring
    smoothie = SmoothingFunction().method4
    score = sentence_bleu(reference_tokens, hypothesis_tokens, smoothing_function=smoothie)
    return score

# Example interaction and evaluation
user_input = "Hello!"
generated_response = generate_response(user_input)
reference_response = "Hi, how can I help you?"  # Ground truth response

# Calculate BLEU score
bleu_score = evaluate_bleu(reference_response, generated_response)
print(f"Generated Response: {generated_response}")
print(f"Reference Response: {reference_response}")
print(f"BLEU Score: {bleu_score}")


In [None]:
pip install rouge-score


In [None]:
from rouge_score import rouge_scorer

# Function to evaluate chatbot responses using ROUGE score
def evaluate_rouge(reference, hypothesis):
    scorer = rouge_scorer.RougeScorer(['rouge1', 'rougeL'], use_stemmer=True)
    scores = scorer.score(reference, hypothesis)
    return scores

# Example interaction and evaluation
user_input = "Hello!"
generated_response = generate_response(user_input)
reference_response = "Hi, how can I help you?"  # Ground truth response

# Calculate ROUGE score
rouge_scores = evaluate_rouge(reference_response, generated_response)
print(f"Generated Response: {generated_response}")
print(f"Reference Response: {reference_response}")
print(f"ROUGE-1 Score: {rouge_scores['rouge1'].fmeasure}")
print(f"ROUGE-L Score: {rouge_scores['rougeL'].fmeasure}")



In [None]:
import numpy as np

def perplexity(model, X, y):
    # X is the input sequence, y is the target sequence
    preds = model.predict(X)  # Predictions from the model
    cross_entropy = -np.mean(np.log(np.max(preds, axis=1)))  # Compute cross-entropy
    return np.exp(cross_entropy)  # Return perplexity

# Assuming you have X_test and y_test ready
perplexity_score = perplexity(encoder_decoder_model, X_test, y_test)
print(f"Perplexity Score: {perplexity_score}")


In [None]:
def custom_accuracy(true_responses, generated_responses):
    correct = 0
    for true, generated in zip(true_responses, generated_responses):
        if true == generated:
            correct += 1
    return correct / len(true_responses)

# Example usage:
true_responses = ["Hi, how can I help you?", "Goodbye!", "What's your name?"]
generated_responses = [generate_response("Hello!"), generate_response("Bye!"), generate_response("What's your name?")]

accuracy = custom_accuracy(true_responses, generated_responses)
print(f"Custom Accuracy: {accuracy}")
