<a href="https://colab.research.google.com/github/thedavidemmanuel/chatbot-with-transformers/blob/main/chatbot.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Cell 1: Setup and Imports
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import (
    GPT2Tokenizer,
    GPT2LMHeadModel,
    get_linear_schedule_with_warmup
)
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import logging
from tqdm.auto import tqdm
import os
import json
from google.colab import drive
import gc
from torch.cuda import empty_cache
import torch.nn as nn
from torch.cuda.amp import autocast, GradScaler

# Memory optimization settings
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'
torch.backends.cudnn.benchmark = True

# Mount Google Drive
drive.mount('/content/drive')

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Define paths
DATA_PATH = "/content/chatbot.csv"
SAVE_DIR = "/content/drive/My Drive/chatbot"
os.makedirs(SAVE_DIR, exist_ok=True)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Cell 2: Helper Classes
class ConversationHistory:
    def __init__(self, max_history=5):
        self.history = []
        self.max_history = max_history

    def add_exchange(self, category, intent, question, response, confidence):
        self.history.append({
            'category': category,
            'intent': intent,
            'question': question,
            'response': response,
            'confidence': confidence,
            'timestamp': pd.Timestamp.now()
        })
        if len(self.history) > self.max_history:
            self.history.pop(0)

    def get_context(self):
        return self.history

    def clear(self):
        self.history = []

class ChatbotDataset(Dataset):
    def __init__(self, data, tokenizer, max_length=512):
        self.data = data
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data.iloc[idx]
        question = str(item['user_input'])
        answer = str(item['bot_response'])
        category = str(item['category'])
        intent = str(item['intent'])

        input_text = (
            f"<|category|>{category}<|/category|>"
            f"<|intent|>{intent}<|/intent|>"
            f"<|question|>{question}<|/question|>"
            f"<|response|>{answer}<|/response|>"
        )

        encoding = self.tokenizer(
            input_text,
            padding='max_length',
            truncation=True,
            max_length=self.max_length,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].squeeze(0),
            'attention_mask': encoding['attention_mask'].squeeze(0),
            'labels': encoding['input_ids'].squeeze(0)
        }


In [None]:
# Cell 3: Memory-Optimized Chatbot
class MemoryOptimizedChatBot:
    def __init__(self, model_name='gpt2', batch_size=4):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.tokenizer = GPT2Tokenizer.from_pretrained(model_name)
        self.batch_size = batch_size
        self.conversation_history = ConversationHistory()
        self.response_cache = {}

        # Add special tokens
        special_tokens = {
            'pad_token': '<|pad|>',
            'additional_special_tokens': [
                '<|category|>', '<|/category|>',
                '<|intent|>', '<|/intent|>',
                '<|question|>', '<|/question|>',
                '<|response|>', '<|/response|>',
                '<|context|>', '<|/context|>'
            ]
        }
        self.tokenizer.add_special_tokens(special_tokens)

        # Initialize model with memory optimizations
        self.model = GPT2LMHeadModel.from_pretrained(model_name)
        self.model.resize_token_embeddings(len(self.tokenizer))
        self.model.config.pad_token_id = self.tokenizer.pad_token_id

        # Enable memory optimizations
        self.model.gradient_checkpointing_enable()
        self.model.config.n_ctx = 512
        self.model.to(self.device)

        # Initialize mixed precision training
        self.scaler = GradScaler()

        logger.info(f"Using device: {self.device}")

    def train(self, train_dataset, val_dataset, epochs=3):
        try:
            # Clear memory before training
            gc.collect()
            empty_cache()

            train_loader = DataLoader(
                train_dataset,
                batch_size=self.batch_size,
                shuffle=True,
                pin_memory=True,
                num_workers=2
            )

            val_loader = DataLoader(
                val_dataset,
                batch_size=self.batch_size,
                pin_memory=True
            )

            # Optimizer and scheduler
            optimizer = torch.optim.AdamW(
                self.model.parameters(),
                lr=2e-5,
                weight_decay=0.01,
                eps=1e-8
            )

            accumulation_steps = 4
            scheduler = get_linear_schedule_with_warmup(
                optimizer,
                num_warmup_steps=len(train_loader) // 10,
                num_training_steps=len(train_loader) * epochs
            )

            best_val_loss = float('inf')

            for epoch in range(epochs):
                logger.info(f"Starting epoch {epoch + 1}/{epochs}")
                self.model.train()
                total_train_loss = 0
                optimizer.zero_grad()

                for batch_idx, batch in enumerate(tqdm(train_loader, desc=f'Epoch {epoch+1}')):
                    try:
                        # Use mixed precision training
                        with autocast():
                            inputs = {
                                'input_ids': batch['input_ids'].to(self.device, non_blocking=True),
                                'attention_mask': batch['attention_mask'].to(self.device, non_blocking=True),
                                'labels': batch['labels'].to(self.device, non_blocking=True)
                            }

                            outputs = self.model(**inputs)
                            loss = outputs.loss / accumulation_steps

                        # Scale loss and backward pass
                        self.scaler.scale(loss).backward()
                        total_train_loss += loss.item() * accumulation_steps

                        if (batch_idx + 1) % accumulation_steps == 0:
                            self.scaler.unscale_(optimizer)
                            torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
                            self.scaler.step(optimizer)
                            self.scaler.update()
                            scheduler.step()
                            optimizer.zero_grad()

                            # Clear memory
                            del outputs
                            gc.collect()
                            if torch.cuda.is_available():
                                empty_cache()

                    except RuntimeError as e:
                        if "out of memory" in str(e):
                            if torch.cuda.is_available():
                                empty_cache()
                            logger.warning(f"OOM on batch {batch_idx}. Skipping...")
                            if hasattr(optimizer, 'zero_grad'):
                                optimizer.zero_grad()
                            continue
                        raise e

                avg_train_loss = total_train_loss / len(train_loader)
                logger.info(f"Average training loss: {avg_train_loss:.4f}")

                # Validation
                val_loss = self.evaluate(val_loader)
                logger.info(f"Validation loss: {val_loss:.4f}")

                if val_loss < best_val_loss:
                    best_val_loss = val_loss
                    self.save_model(epoch + 1, val_loss)

                # Clear memory after each epoch
                gc.collect()
                if torch.cuda.is_available():
                    empty_cache()

        except Exception as e:
            logger.error(f"Training error: {str(e)}")
            raise e

    def save_model(self, epoch, loss):
        checkpoint_dir = os.path.join(SAVE_DIR, 'checkpoints')
        os.makedirs(checkpoint_dir, exist_ok=True)
        checkpoint_path = os.path.join(checkpoint_dir, f'model_epoch_{epoch}_loss_{loss:.4f}.pt')
        torch.save({
            'epoch': epoch,
            'model_state_dict': self.model.state_dict(),
            'tokenizer_state': self.tokenizer.save_pretrained(checkpoint_dir),
            'loss': loss
        }, checkpoint_path)
        logger.info(f"Saved checkpoint to {checkpoint_path}")

    def evaluate(self, val_loader):
        """Memory-efficient evaluation"""
        self.model.eval()
        total_loss = 0

        with torch.no_grad():
            for batch in val_loader:
                try:
                    inputs = {
                        'input_ids': batch['input_ids'].to(self.device, non_blocking=True),
                        'attention_mask': batch['attention_mask'].to(self.device, non_blocking=True),
                        'labels': batch['labels'].to(self.device, non_blocking=True)
                    }

                    outputs = self.model(**inputs)
                    total_loss += outputs.loss.item()

                    del outputs
                    if torch.cuda.is_available():
                        empty_cache()

                except RuntimeError as e:
                    if "out of memory" in str(e):
                        if torch.cuda.is_available():
                            empty_cache()
                        logger.warning("OOM during evaluation. Skipping batch...")
                        continue
                    raise e

        return total_loss / len(val_loader)

    def clean_response(self, response):
        """Clean the generated response by removing special tokens and extracting content"""
        try:
            start_token = "<|response|>"
            end_token = "</|response|>"

            start_idx = response.find(start_token) + len(start_token)
            end_idx = response.find(end_token)

            if start_idx == -1 or end_idx == -1:
                return response.strip()

            cleaned = response[start_idx:end_idx].strip()
            return cleaned
        except Exception as e:
            logger.error(f"Error cleaning response: {str(e)}")
            return response.strip()

    def generate_response(self, category, intent, question, max_length=150):
        """Generate a response with error handling"""
        try:
            cache_key = f"{category}_{intent}_{question}"
            if cache_key in self.response_cache:
                return self.response_cache[cache_key], 1.0

            self.model.eval()
            input_text = (
                f"<|category|>{category}<|/category|>"
                f"<|intent|>{intent}<|/intent|>"
                f"<|question|>{question}<|/question|>"
                f"<|response|>"
            )

            inputs = self.tokenizer(
                input_text,
                return_tensors='pt',
                truncation=True,
                max_length=512,
                padding=True
            ).to(self.device)

            with torch.no_grad():
                output_sequences = self.model.generate(
                    **inputs,
                    max_length=max_length,
                    min_length=20,
                    num_return_sequences=1,
                    no_repeat_ngram_size=3,
                    do_sample=True,
                    top_k=50,
                    top_p=0.92,
                    temperature=0.7,
                    repetition_penalty=1.2,
                    length_penalty=1.0,
                    early_stopping=True,
                    pad_token_id=self.tokenizer.pad_token_id,
                    eos_token_id=self.tokenizer.eos_token_id,
                )

            response = self.tokenizer.decode(output_sequences[0], skip_special_tokens=False)
            cleaned_response = self.clean_response(response)

            # Cache the response
            self.response_cache[cache_key] = cleaned_response
            return cleaned_response

        except Exception as e:
            logger.error(f"Error generating response: {str(e)}")
            return "I apologize, but I encountered an error generating a response. Please try again."

In [None]:
# Cell 4: Training Script
def train_chatbot():
    # Clear memory
    gc.collect()
    if torch.cuda.is_available():
        empty_cache()

    # Load data
    logger.info("Loading dataset...")
    data = pd.read_csv(DATA_PATH)

    # Print statistics
    print("\nDataset Statistics:")
    print(f"Total conversations: {len(data)}")
    print("\nCategory distribution:")
    print(data['category'].value_counts())
    print("\nIntent distribution:")
    print(data['intent'].value_counts())

    # Save metadata
    metadata = {
        "total_conversations": len(data),
        "categories": data['category'].unique().tolist(),
        "intents": data['intent'].unique().tolist(),
        "model_config": {
            "model_type": "gpt2",
            "max_length": 512,
            "batch_size": 4,
            "accumulation_steps": 4
        }
    }

    with open(os.path.join(SAVE_DIR, "metadata.json"), 'w') as f:
        json.dump(metadata, f, indent=2)

    # Split data
    train_data, val_data = train_test_split(data, test_size=0.1, random_state=42)

    # Initialize bot and datasets
    bot = MemoryOptimizedChatBot()
    train_dataset = ChatbotDataset(train_data, bot.tokenizer)
    val_dataset = ChatbotDataset(val_data, bot.tokenizer)

    # Train
    logger.info("Starting training...")
    bot.train(train_dataset, val_dataset)

    return bot, metadata

In [None]:
# Cell 5: Main Execution
if __name__ == "__main__":
    try:
        # Train model
        bot, metadata = train_chatbot()

        # Save model for Streamlit
        final_model_path = os.path.join(SAVE_DIR, 'final_model')
        bot.model.save_pretrained(final_model_path)
        bot.tokenizer.save_pretrained(final_model_path)

        # Create model info
        model_info = {
            "model_path": final_model_path,
            "categories": metadata['categories'],
            "intents": metadata['intents'],
            "total_samples": metadata['total_conversations'],
            "model_config": metadata['model_config']
        }

        with open(os.path.join(SAVE_DIR, 'model_info.json'), 'w') as f:
            json.dump(model_info, f, indent=2)

        # Zip the model files for easy download
        !zip -r /content/chatbot_model.zip {SAVE_DIR}

        print("\nTraining complete! Model saved and ready for Streamlit deployment.")
        print("Download the chatbot_model.zip file and extract it to your Streamlit project directory.")

    except Exception as e:
        logger.error(f"Error in main execution: {str(e)}")
        raise e


Dataset Statistics:
Total conversations: 600

Category distribution:
category
git         212
github      169
greeting    117
error       102
Name: count, dtype: int64

Intent distribution:
intent
pull_request      169
branch            125
welcome           117
merge_conflict    102
commit             87
Name: count, dtype: int64


  self.scaler = GradScaler()


Epoch 1:   0%|          | 0/135 [00:00<?, ?it/s]

  with autocast():


Epoch 2:   0%|          | 0/135 [00:00<?, ?it/s]

Epoch 3:   0%|          | 0/135 [00:00<?, ?it/s]


zip error: Nothing to do! (try: zip -r /content/chatbot_model.zip . -i /content/drive/My Drive/chatbot)

Training complete! Model saved and ready for Streamlit deployment.
Download the chatbot_model.zip file and extract it to your Streamlit project directory.


In [None]:
# Cell 6: Testing Script
import torch
from typing import Dict, List, Tuple
import numpy as np
from tqdm import tqdm
import logging

class ChatbotTester:
    def __init__(self, bot, test_data=None):
        """Initialize the tester with a trained bot and optional test data."""
        self.bot = bot
        self.test_data = test_data
        self.conversation_history = []
        self.metrics = {
            'response_times': [],
            'confidence_scores': [],
            'category_accuracy': [],
            'intent_accuracy': []
        }

    def generate_response(self, user_input: str) -> Tuple[str, Dict]:
        """Generate a response and capture metrics."""
        start_time = time.time()

        # Generate response with metadata
        response = self.bot.generate_response(user_input)

        # Calculate response time
        response_time = time.time() - start_time

        # Store interaction
        self.conversation_history.append({
            'user_input': user_input,
            'bot_response': response['text'],
            'confidence': response.get('confidence', 0.0),
            'category': response.get('category', ''),
            'intent': response.get('intent', ''),
            'response_time': response_time
        })

        return response['text'], response

    def interactive_session(self):
        """Start an interactive testing session."""
        print("\nStarting interactive testing session (type 'exit' to end)...")
        print("--------------------------------------------------------")

        while True:
            user_input = input("\nYou: ").strip()

            if user_input.lower() == 'exit':
                break

            try:
                response, metadata = self.generate_response(user_input)
                print(f"\nBot: {response}")

                # Print detailed metrics if available
                if metadata.get('confidence'):
                    print(f"Confidence: {metadata['confidence']:.2f}")
                if metadata.get('category'):
                    print(f"Detected Category: {metadata['category']}")
                if metadata.get('intent'):
                    print(f"Detected Intent: {metadata['intent']}")

            except Exception as e:
                print(f"Error generating response: {str(e)}")

    def run_automated_tests(self, test_cases: List[Dict] = None):
        """Run automated tests with provided test cases or test data."""
        if test_cases is None and self.test_data is not None:
            test_cases = self.test_data.to_dict('records')

        if not test_cases:
            raise ValueError("No test cases provided!")

        print("\nRunning automated tests...")
        results = []

        for case in tqdm(test_cases):
            try:
                response, metadata = self.generate_response(case['input'])

                # Compare with expected outputs if provided
                result = {
                    'input': case['input'],
                    'response': response,
                    'expected_category': case.get('category'),
                    'predicted_category': metadata.get('category'),
                    'expected_intent': case.get('intent'),
                    'predicted_intent': metadata.get('intent'),
                    'confidence': metadata.get('confidence', 0.0),
                    'response_time': self.conversation_history[-1]['response_time']
                }

                results.append(result)

            except Exception as e:
                logging.error(f"Error testing case {case}: {str(e)}")

        return self._analyze_results(results)

    def _analyze_results(self, results: List[Dict]) -> Dict:
        """Analyze test results and compute metrics."""
        metrics = {
            'total_tests': len(results),
            'successful_tests': sum(1 for r in results if r['confidence'] > 0.5),
            'avg_confidence': np.mean([r['confidence'] for r in results]),
            'avg_response_time': np.mean([r['response_time'] for r in results]),
            'category_accuracy': None,
            'intent_accuracy': None
        }

        # Calculate accuracies if expected values were provided
        if all('expected_category' in r for r in results):
            category_matches = sum(1 for r in results
                                 if r['expected_category'] == r['predicted_category'])
            metrics['category_accuracy'] = category_matches / len(results)

        if all('expected_intent' in r for r in results):
            intent_matches = sum(1 for r in results
                               if r['expected_intent'] == r['predicted_intent'])
            metrics['intent_accuracy'] = intent_matches / len(results)

        return metrics

# Example usage
if __name__ == "__main__":
    # Load the trained bot
    try:
        bot = MemoryOptimizedChatBot()
        bot.load_model(os.path.join(SAVE_DIR, 'final_model'))

        # Initialize tester
        tester = ChatbotTester(bot)

        # Run interactive testing session
        print("\nStarting interactive testing...")
        tester.interactive_session()

        # Optional: Run automated tests if you have test cases
        test_cases = [
            {
                'input': 'What are your store hours?',
                'category': 'general_inquiry',
                'intent': 'hours_inquiry'
            },
            {
                'input': 'I need to return a product',
                'category': 'customer_service',
                'intent': 'return_request'
            }
        ]

        print("\nRunning automated tests...")
        metrics = tester.run_automated_tests(test_cases)

        print("\nTest Results:")
        print("-------------")
        for metric, value in metrics.items():
            if isinstance(value, float):
                print(f"{metric}: {value:.2f}")
            else:
                print(f"{metric}: {value}")

    except Exception as e:
        logging.error(f"Error in testing: {str(e)}")
        raise e

In [None]:
# Cell 6: Performance Visualization
class PerformanceVisualizer:
    def __init__(self, save_dir):
        self.save_dir = save_dir
        self.metrics = {
            'train_loss': [],
            'val_loss': [],
            'accuracy': [],
            'epochs': []
        }

    def log_metrics(self, epoch, train_loss, val_loss, accuracy):
        self.metrics['train_loss'].append(train_loss)
        self.metrics['val_loss'].append(val_loss)
        self.metrics['accuracy'].append(accuracy)
        self.metrics['epochs'].append(epoch)

    def plot_training_curves(self):
        plt.figure(figsize=(15, 5))

        # Loss curves
        plt.subplot(1, 2, 1)
        plt.plot(self.metrics['epochs'], self.metrics['train_loss'], label='Training Loss')
        plt.plot(self.metrics['epochs'], self.metrics['val_loss'], label='Validation Loss')
        plt.title('Training and Validation Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.legend()
        plt.grid(True)

        # Accuracy curve
        plt.subplot(1, 2, 2)
        plt.plot(self.metrics['epochs'], self.metrics['accuracy'], label='Accuracy')
        plt.title('Model Accuracy')
        plt.xlabel('Epoch')
        plt.ylabel('Accuracy')
        plt.legend()
        plt.grid(True)

        plt.tight_layout()
        plt.savefig(os.path.join(self.save_dir, 'training_curves.png'))
        plt.close()

    def save_metrics(self):
        with open(os.path.join(self.save_dir, 'training_metrics.json'), 'w') as f:
            json.dump(self.metrics, f, indent=2)


In [None]:
# Cell 7: Model Evaluation
class ModelEvaluator:
    def __init__(self, model, tokenizer, device):
        self.model = model
        self.tokenizer = tokenizer
        self.device = device

    def evaluate_response(self, generated, reference):
        """Calculate BLEU score and response similarity"""
        from nltk.translate.bleu_score import sentence_bleu
        from nltk.tokenize import word_tokenize
        import nltk
        nltk.download('punkt')

        # Tokenize sentences
        reference_tokens = word_tokenize(reference.lower())
        generated_tokens = word_tokenize(generated.lower())

        # Calculate BLEU score
        bleu_score = sentence_bleu([reference_tokens], generated_tokens)

        # Calculate word overlap
        ref_set = set(reference_tokens)
        gen_set = set(generated_tokens)
        overlap = len(ref_set.intersection(gen_set)) / len(ref_set.union(gen_set))

        return {
            'bleu_score': bleu_score,
            'overlap_score': overlap
        }

    def evaluate_sample(self, category, intent, question, reference):
        """Evaluate a single sample"""
        generated = self.model.generate_response(category, intent, question)
        metrics = self.evaluate_response(generated, reference)
        return {
            'generated': generated,
            'reference': reference,
            'metrics': metrics
        }

    def run_evaluation(self, test_data, num_samples=100):
        """Run full evaluation on test set"""
        results = []
        total_bleu = 0
        total_overlap = 0

        samples = test_data.sample(n=min(num_samples, len(test_data)))

        for _, row in tqdm(samples.iterrows(), total=len(samples), desc="Evaluating"):
            eval_result = self.evaluate_sample(
                row['category'],
                row['intent'],
                row['user_input'],
                row['bot_response']
            )
            results.append({
                'category': row['category'],
                'intent': row['intent'],
                'question': row['user_input'],
                'generated': eval_result['generated'],
                'reference': eval_result['reference'],
                'bleu_score': eval_result['metrics']['bleu_score'],
                'overlap_score': eval_result['metrics']['overlap_score']
            })

            total_bleu += eval_result['metrics']['bleu_score']
            total_overlap += eval_result['metrics']['overlap_score']

        avg_metrics = {
            'average_bleu': total_bleu / len(results),
            'average_overlap': total_overlap / len(results)
        }

        return results, avg_metrics