<a href="https://colab.research.google.com/github/kn0wthing/practice/blob/main/BiLSTM.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
import os
kaggle_dir = '/root/.kaggle'
os.makedirs(kaggle_dir, exist_ok=True)


In [3]:
!pwd

/content


In [4]:
!cp /content/drive/MyDrive/kaggle.json {kaggle_dir}/

In [5]:
!chmod 600 {kaggle_dir}/kaggle.json

In [6]:
!kaggle competitions download -c quora-insincere-questions-classification


quora-insincere-questions-classification.zip: Skipping, found more recently modified local copy (use --force to force download)


In [7]:
# !unzip /content/quora-insincere-questions-classification.zip

In [8]:
import numpy as np
import pandas as pd
import re
import string
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader


In [9]:
class MyDataLoader:
    """
    A class for loading and preprocessing the dataset.
    """
    def __init__(self, train_path: str):
        self.train_path = train_path
        self.data = None
        self.X_train = None
        self.X_val = None
        self.y_train = None
        self.y_val = None

    def load_data(self) -> pd.DataFrame:
        self.data = pd.read_csv(self.train_path)
        self.data = self.data[['question_text', 'target']].dropna()
        return self.data

    def split_data(self, test_size: float = 0.2, random_state: int = 42):
        if self.data is None:
            self.load_data()
        X = self.data['question_text'].values
        y = self.data['target'].values
        self.X_train, self.X_val, self.y_train, self.y_val = train_test_split(X, y, test_size=test_size, random_state=random_state)
        print(f"Data split: {len(self.X_train)} training samples, {len(self.X_val)} validation samples.")


In [10]:
class TextPreprocessor:
    """
    A class for preprocessing text data.
    """
    def __init__(self, max_words: int = 100000, max_len: int = 100):
        self.max_words = max_words
        self.max_len = max_len
        self.word_index = {'<PAD>': 0, '<UNK>': 1}  # Special tokens for padding and unknown words
        self.index_word = {0: '<PAD>', 1: '<UNK>'}
        self.vocab_size = 2  # Initialize with special tokens count

    def build_vocab(self, texts: np.ndarray):
        word_freq = {}
        for text in texts:
            words = text.split()
            for word in words:
                word_freq[word] = word_freq.get(word, 0) + 1

        # Sort words by frequency and take the most common words up to max_words
        sorted_words = sorted(word_freq.items(), key=lambda x: x[1], reverse=True)[:(self.max_words - 2)]
        for i, (word, freq) in enumerate(sorted_words, start=2):
            self.word_index[word] = i
            self.index_word[i] = word
        self.vocab_size = len(self.word_index)
        print(f"Vocabulary built. Size: {self.vocab_size}")

    def texts_to_sequences(self, texts: np.ndarray) -> list:
        sequences = []
        for text in texts:
            words = text.split()
            seq = [self.word_index.get(word, 1) for word in words]  # 1 is the index for <UNK>
            sequences.append(seq)
        return sequences

    def pad_sequences(self, sequences: list) -> np.ndarray:
        padded_sequences = np.zeros((len(sequences), self.max_len), dtype=int)
        for i, seq in enumerate(sequences):
            seq = seq[:self.max_len]  # Truncate if longer than max_len
            padded_sequences[i, :len(seq)] = seq
        return padded_sequences

    def transform_text(self, texts: np.ndarray) -> np.ndarray:
        sequences = self.texts_to_sequences(texts)
        padded_sequences = self.pad_sequences(sequences)
        return padded_sequences


In [11]:
class QuoraDataset(Dataset):
    def __init__(self, sequences: np.ndarray, labels: np.ndarray):
        self.sequences = torch.tensor(sequences, dtype=torch.long)
        self.labels = torch.tensor(labels, dtype=torch.float)

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx: int):
        return self.sequences[idx], self.labels[idx]


In [12]:
class BiLSTMModel(nn.Module):
    def __init__(self, vocabulary_size: int, embedding_dim: int = 128, lstm_units: int = 64, dropout_rate: float = 0.3):
        super(BiLSTMModel, self).__init__()
        self.embedding = nn.Embedding(num_embeddings=vocabulary_size, embedding_dim=embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=lstm_units, num_layers=1, batch_first=True, bidirectional=True)
        self.dropout = nn.Dropout(dropout_rate)
        self.fc1 = nn.Linear(lstm_units * 2, 64)
        self.fc2 = nn.Linear(64, 1)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        embedded = self.embedding(x)  # (batch_size, max_len, embedding_dim)
        lstm_out, _ = self.lstm(embedded)  # (batch_size, max_len, lstm_units*2)
        # We can take the last hidden state for classification, or use the mean of the outputs:
        out = lstm_out[:, -1, :]  # (batch_size, lstm_units*2)
        out = self.dropout(out)
        out = self.fc1(out)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.fc2(out)
        out = self.sigmoid(out)
        return out.squeeze()


In [13]:
class Trainer:
    """
    A class for training the model.
    """
    def __init__(self, model: nn.Module, learning_rate: float = 0.001, batch_size: int = 128, epochs: int = 10, patience: int = 3):
        self.model = model
        self.batch_size = batch_size
        self.epochs = epochs
        self.patience = patience
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        self.criterion = nn.BCELoss()
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model.to(self.device)
        self.best_val_loss = float('inf')
        self.best_model_state = None
        self.no_improvement_count = 0

    def train(self, train_dataset: Dataset, val_dataset: Dataset):
        train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True)
        val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=self.batch_size, shuffle=False)

        print("Starting training...")
        for epoch in range(self.epochs):
            self.model.train()
            epoch_loss = 0
            for sequences, labels in train_loader:
                sequences, labels = sequences.to(self.device), labels.to(self.device)
                self.optimizer.zero_grad()
                predictions = self.model(sequences)
                loss = self.criterion(predictions, labels)
                loss.backward()
                self.optimizer.step()
                epoch_loss += loss.item()

            epoch_loss /= len(train_loader)
            val_loss = self.validate(val_loader)
            print(f"Epoch {epoch+1}/{self.epochs}, Training Loss: {epoch_loss:.4f}, Validation Loss: {val_loss:.4f}")

            # Early stopping and checkpointing
            if val_loss < self.best_val_loss:
                self.best_val_loss = val_loss
                self.best_model_state = self.model.state_dict()
                self.no_improvement_count = 0
                print("Validation loss improved. Model checkpoint saved.")
            else:
                self.no_improvement_count += 1
                if self.no_improvement_count >= self.patience:
                    print("Early stopping triggered.")
                    break

        # Load the best model state
        if self.best_model_state is not None:
            self.model.load_state_dict(self.best_model_state)
        print("Training completed.")

    def validate(self, val_loader: DataLoader) -> float:
        self.model.eval()
        val_loss = 0
        with torch.no_grad():
            for sequences, labels in val_loader:
                sequences, labels = sequences.to(self.device), labels.to(self.device)
                predictions = self.model(sequences)
                loss = self.criterion(predictions, labels)
                val_loss += loss.item()
        val_loss /= len(val_loader)
        return val_loss


In [14]:
class Evaluator:
    def __init__(self):
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    def evaluate(self, model: nn.Module, val_dataset: Dataset):
        model.eval()
        val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=128, shuffle=False)
        all_preds = []
        all_labels = []
        with torch.no_grad():
            for sequences, labels in val_loader:
                sequences = sequences.to(self.device)
                predictions = model(sequences)
                preds = (predictions > 0.5).cpu().numpy().astype("int32")
                all_preds.extend(preds)
                all_labels.extend(labels.numpy().astype("int32"))

        acc = accuracy_score(all_labels, all_preds)
        prec = precision_score(all_labels, all_preds)
        rec = recall_score(all_labels, all_preds)
        f1 = f1_score(all_labels, all_preds)
        cm = confusion_matrix(all_labels, all_preds)

        print("Evaluation Results:")
        print(f"Accuracy: {acc:.4f}")
        print(f"Precision: {prec:.4f}")
        print(f"Recall: {rec:.4f}")
        print(f"F1 Score: {f1:.4f}")
        print("Confusion Matrix:")
        print(cm)


In [15]:
class Predictor:
    def __init__(self, model: nn.Module, preprocessor: TextPreprocessor):
        self.model = model
        self.model.eval()
        self.preprocessor = preprocessor
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    def predict(self, questions: list) -> list:
        sequences = self.preprocessor.transform_text(np.array(questions))
        sequences = torch.tensor(sequences, dtype=torch.long).to(self.device)
        with torch.no_grad():
            predictions = self.model(sequences)
            predictions = (predictions > 0.5).cpu().numpy().astype("int32")
        return predictions.ravel().tolist()


In [16]:

# Step 1: Data Loading and Splitting
data_loader = MyDataLoader(train_path='train.csv')
data = data_loader.load_data()
data_loader.split_data()

# Step 2: Text Preprocessing
text_preprocessor = TextPreprocessor(max_words=100000, max_len=200)
text_preprocessor.build_vocab(data_loader.X_train)
X_train_seq = text_preprocessor.transform_text(data_loader.X_train)
X_val_seq = text_preprocessor.transform_text(data_loader.X_val)

# Convert data to PyTorch datasets
train_dataset = QuoraDataset(X_train_seq, data_loader.y_train)
val_dataset = QuoraDataset(X_val_seq, data_loader.y_val)

# Step 3: Model Building
bilstm_model = BiLSTMModel(
    vocabulary_size=text_preprocessor.vocab_size,
    embedding_dim=128,
    lstm_units=64,
    dropout_rate=0.3,
)

# Step 4: Model Training
trainer = Trainer(model=bilstm_model, learning_rate=0.001, batch_size=128, epochs=10, patience=3)
trainer.train(train_dataset, val_dataset)



Data split: 1044897 training samples, 261225 validation samples.
Vocabulary built. Size: 100000
Starting training...
Epoch 1/10, Training Loss: 0.2370, Validation Loss: 0.2290
Validation loss improved. Model checkpoint saved.
Epoch 2/10, Training Loss: 0.2343, Validation Loss: 0.2290
Validation loss improved. Model checkpoint saved.
Epoch 3/10, Training Loss: 0.2335, Validation Loss: 0.2289
Validation loss improved. Model checkpoint saved.
Epoch 4/10, Training Loss: 0.2332, Validation Loss: 0.2289
Epoch 5/10, Training Loss: 0.2331, Validation Loss: 0.2290
Epoch 6/10, Training Loss: 0.2330, Validation Loss: 0.2289
Early stopping triggered.
Training completed.


In [19]:
# # Step 5: Model Evaluation
evaluator = Evaluator()
evaluator.evaluate(bilstm_model, val_dataset)

# Step 6: Inference (Prediction)
predictor = Predictor(model=bilstm_model, preprocessor=text_preprocessor)
sample_questions = [
    "Why do people ask insincere questions on Quora?",
    "How to lose weight quickly?"
]
predictions = predictor.predict(sample_questions)
for question, prediction in zip(sample_questions, predictions):
    print(f"Question: {question} -> Prediction: {'Insincere' if prediction == 1 else 'Sincere'}")


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


Evaluation Results:
Accuracy: 0.9393
Precision: 0.0000
Recall: 0.0000
F1 Score: 0.0000
Confusion Matrix:
[[245369      0]
 [ 15856      0]]
Question: Why do people ask insincere questions on Quora? -> Prediction: Sincere
Question: How to lose weight quickly? -> Prediction: Sincere
