# Intent Classification Model on Atis Data using 
I plan to build a series of models. 
1. barebone model for Bench Marking
2. Attention based model for trying deep architecture
3. Using advance learning libraries to check how the performance of the two compare


First importies

In [1]:
import pandas as pd
import os
import numpy as np
from sklearn.preprocessing import LabelEncoder
from collections import Counter
import torch
from torch.nn.utils.rnn import pad_sequence
import torch.nn as nn
import random
import re

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")
print(f"Using device: {device}")

def set_seed(seed_value=42):
    """Set seed for reproducibility."""
    torch.manual_seed(seed_value)
    np.random.seed(seed_value)
    random.seed(seed_value)
    os.environ['PYTHONHASHSEED'] = str(seed_value)

Using device: mps


# Barebone NLP Model

## Data Preprocessing
The process of getting a language agnostic representation of text is called tokenization. In this section, we will tokenize the text and build a vocabulary. We will also encode the labels as numerical values. This will help build an intent classifier model that an be used for chatbots in any domain and language
### Tokenize and Build Vocabulary

### Get the Indexed and Padded Sequences as Tensors

### Encode Labels
Also handle unknown labels, print a list of classes in the data

### Text to Index Lists
The text_to_indices function takes a string of text and the word_to_index dictionary.
It tokenizes the text into words (using the tokenize function we assumed earlier).
For each word in the tokenized text, it finds the corresponding index from the word_to_index dictionary. If the word is not found, it uses the index for "<UNK>".
The function returns a list of indices representing the text.
Finally, we use list comprehensions to apply this function to all entries in the training and testing datasets.
After executing this code, train_indices and test_indices will be lists of lists, where each inner list is a sequence of word indices corresponding to a sentence in your training and testing datasets, respectively. These are now ready to be padded and then used for training your PyTorch model.


In [None]:
from utils import tokenize, build_vocabulary, text_to_indices, encode_labels, convert_and_pad_sequences

# Load the training data
train = pd.read_csv("data/atis/train.tsv",sep='\t', header=None)
train.columns = ["text", "label"]
test= pd.read_csv("data/atis/test.tsv",sep='\t', header=None)
test.columns = ["text", "label"]

#build vocabulary
vocab_size=1000
word_to_index = build_vocabulary(train["text"], vocab_size)
print(f"Vocabulary Size: {len(word_to_index)}")

#get the indexed and padded sequences as tensors
train_indices = [text_to_indices(text, word_to_index) for text in train["text"]]
test_indices = [text_to_indices(text, word_to_index) for text in test["text"]]
train_padded=convert_and_pad_sequences(train_indices,device)
test_padded=convert_and_pad_sequences(test_indices,device)
# Now, train_padded and test_padded are the padded sequence tensors
print("Padded Training Sequences:", train_padded.size())
print("Padded Testing Sequences:", test_padded.size())

# Convert labels to numerical values
le = encode_labels(train,test)
train_labels = le.transform(train["label"])
test_labels = le.transform(test["label"])
print("Label Encoding:", dict(zip(le.classes_, le.transform(le.classes_))))

Vocabulary Size: 890


Label Encoding: {'abbreviation': 0, 'aircraft': 1, 'aircraft+flight+flight_no': 2, 'airfare': 3, 'airfare+flight_time': 4, 'airline': 5, 'airline+flight_no': 6, 'airport': 7, 'capacity': 8, 'cheapest': 9, 'city': 10, 'distance': 11, 'flight': 12, 'flight+airfare': 13, 'flight_no': 14, 'flight_time': 15, 'ground_fare': 16, 'ground_service': 17, 'ground_service+ground_fare': 18, 'meal': 19, 'quantity': 20, 'restriction': 21, "<unknown>": 22}


## Data Loader

In [None]:
from torch.utils.data import TensorDataset, DataLoader
# Create TensorDatasets
# TensorDataset combines a dataset and a label, and provides an iterable over the given dataset. The arguments should be tensors of the same size in the 0th dimension. Any other dimension will be considered as the sample dimension and will be iterated along. This dataset is especially useful to wrap tensors that represent input and target or that are already in batches, e.g. for mini-batch SGD.
train_data = TensorDataset(train_padded.to(device), torch.tensor(train_labels).to(device))
test_data = TensorDataset(test_padded.to(device), torch.tensor(test_labels).to(device))

# Create DataLoaders
batch_size = 32
train_loader = DataLoader(train_data, shuffle=False, batch_size=batch_size)
test_loader = DataLoader(test_data, shuffle=False, batch_size=batch_size)
print("Number of training batches:", len(train_loader))
print("Number of test batches:", len(test_loader))

In [None]:
# Plot the class distribution ins test and train
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
train_df.label.value_counts().plot(kind='bar', ax=ax1)
ax1.set_title("Train")
test_df.label.value_counts().plot(kind='bar', ax=ax2)
ax2.set_title("Test")
plt.show()

## Embeddings

Yay, our data is now vectorized. We can start playing with it. The first step is to build an embedding model. In this code:

vocab_size is the number of unique words in your vocabulary.
embedding_dim is the number of dimensions for each word embedding.
SimpleNLPModel is a basic PyTorch model class with an embedding layer.
The forward method defines how data passes through the model. In this simple example, it only passes through the embedding layer.
Finally, an example input is passed through the model to obtain embeddings. The input should be a tensor of token indices, like the output of your padding step.
This setup will initialize the embeddings randomly, and they will be updated during training. If you have pre-trained embeddings that you want to use, you can initialize the nn.Embedding layer with these pre-trained weights. 

Great!. The Embeddings are working. , if we want to use advance embeddings, we can also use pretrained embeddings, including contextualized embeddings from the task. Lets refine our Embedding Model

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import mlflow
import mlflow.pytorch
from torch.utils.tensorboard import SummaryWriter

class IntentClassifierLSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, dropout_rate):
        super(IntentClassifierLSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.dropout1 = nn.Dropout(dropout_rate)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.batch_norm = nn.BatchNorm1d(hidden_dim)  # Batch normalization layer
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        # Embedding layer
        embedded = self.embedding(x)

        # Dropout layer
        dropped = self.dropout1(embedded)

        # LSTM layer
        lstm_out, (hidden, _) = self.lstm(dropped)
        # Take the output of the last time step
        hidden = hidden[-1]
        # Batch normalization
        normalized = self.batch_norm(hidden)

        # Fully connected layer
        out = self.fc(normalized)
        return out

# Instantiate the model with dropout and batch normalization

In [None]:
import torch.nn.functional as F

class SelfAttentionLayer(nn.Module):
    def __init__(self, feature_size):
        super(SelfAttentionLayer, self).__init__()
        self.feature_size = feature_size

        # Linear transformations for Q, K, V from the same source
        self.key = nn.Linear(feature_size, feature_size)
        self.query = nn.Linear(feature_size, feature_size)
        self.value = nn.Linear(feature_size, feature_size)

    def forward(self, x, mask=None):
        # Apply linear transformations
        keys = self.key(x)
        queries = self.query(x)
        values = self.value(x)

        # Scaled dot-product attention
        scores = torch.matmul(queries, keys.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.feature_size, dtype=torch.float32))

        # Apply mask (if provided)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)

        # Apply softmax
        attention_weights = F.softmax(scores, dim=-1)

        # Multiply weights with values
        output = torch.matmul(attention_weights, values)

        return output
class IntentClassifierLSTMWithAttention(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, dropout_rate):
        super(IntentClassifierLSTMWithAttention, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.dropout = nn.Dropout(dropout_rate)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.attention = SelfAttentionLayer(hidden_dim)
        self.batch_norm = nn.BatchNorm1d(hidden_dim)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        embedded = self.embedding(x)
        dropped = self.dropout(embedded)
        lstm_out, _ = self.lstm(dropped)

        # Apply attention
        attn_out = self.attention(lstm_out)
        final_output= attn_out[:, -1, :]
        normalized = self.batch_norm(final_output)

        out = self.fc(normalized)
        return out

In [None]:
# Define loss function and optimizer
loss_function = nn.CrossEntropyLoss()
learning_rate=0.001
weight_decay=1e-4
dropout_rate=0.4
embedding_dim =64            # Size of each embedding vector
hidden_dim = 128               # Number of features in the hidden state of the LSTM
batch_size = 32
output_dim = len(le.classes_)  # Number of classes
num_epochs=20
# Create a string that summarizes these parameters
params_str = f"Vocab Size: {vocab_size}\n" \
             f"Embedding Dim: {embedding_dim}\n" \
             f"Hidden Dim: {hidden_dim}\n" \
             f"Output Dim: {output_dim}\n" \
             f"Dropout Rate: {dropout_rate}\n" \
             f"learning Rate: {learning_rate}\n" \
             f"epochs: {num_epochs}"
print(params_str)

## Training and Evaluation
Implement a training loop for the model. You can use any optimizer and loss function of your choice. You can also use any other metric that you think is suitable for the comparison.

In [None]:
device="mps"
print(torch.backends.mps.is_available())

In [None]:
# Define loss function and optimizer
from torch.optim.lr_scheduler import StepLR

#model = IntentClassifierLSTM(vocab_size, embedding_dim, hidden_dim, output_dim, dropout_rate).to(device)
model = IntentClassifierLSTMWithAttention(vocab_size, embedding_dim, hidden_dim, output_dim, dropout_rate).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)
scheduler = StepLR(optimizer, step_size=5, gamma=0.1)
for epoch in range(num_epochs):
    train_loss = 0.0
    correct=0.0
    acc=0
    for batch in train_loader:
        # get data
        x, y = batch
        # zero the gradients
        optimizer.zero_grad()
        # forward pass
        y_hat = model(x)
        # compute loss
        loss = loss_function(y_hat, y)
        train_loss += loss.item()

        # backward pass
        loss.backward()
        # step
        optimizer.step()
        # update train loss
        # compute accuracy
        _, predicted = torch.max(y_hat, 1)
        correct += (predicted == y).sum().item()
    # compute average losses
    train_loss /= len(train_loader)
    acc=(correct/len(train_padded))

    # log average losses
    mlflow.log_metric("train_loss", train_loss, step=epoch)
    # Log training loss per epoch
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {train_loss:.4f}')
    print(f'Epoch [{epoch+1}/{num_epochs}], Accuracy: {acc:.4f}')

In [None]:
model.eval()

# evaluate on test set
test_loss = 0.0
correct=0
acc=0
for batch in test_loader:
    # get data
    x, y = batch
    # forward pass
    y_hat = model(x)
    # compute loss
    loss = loss_function(y_hat, y)
    # update test loss
    test_loss += loss.item()
    _, predicted = torch.max(y_hat, 1)
    correct += (predicted == y).sum().item()
# compute average losses
test_loss /= len(test_loader)
acc=(correct/len(test_padded))

# log average losses
mlflow.log_metric("test_loss", test_loss, step=epoch)
mlflow.log_metric("test_accuracy", acc, step=epoch)
# Log the precision and recall

print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {acc:.4f}")

# log model
mlflow.pytorch.log_model(model, "model")

In [None]:
try:
    mlflow.end_run()
except:
    pass
from torch.optim.lr_scheduler import StepLR
#model = IntentClassifierLSTM(vocab_size, embedding_dim, hidden_dim, output_dim, dropout_rate).to(device)
model = IntentClassifierLSTMWithAttention(vocab_size, embedding_dim, hidden_dim, output_dim, dropout_rate).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate,weight_decay=weight_decay)
scheduler = StepLR(optimizer, step_size=5, gamma=0.1)

# Convert labels to torch tensor
train_labels_tensor = torch.tensor(train_labels).to(device)
test_labels_tensor = torch.tensor(test_labels).to(device)
print(train_labels_tensor)
# TensorBoard writer
writer = SummaryWriter()
# Start an MLflow run
with mlflow.start_run():
    mlflow.log_artifacts(writer.log_dir, artifact_path="tensorboard_logs")

    # Log model architecture
    mlflow.log_text(str(model), "model.txt")

    # Log hyperparameters
    mlflow.log_text(params_str, "model_parameters.txt")

    for epoch in range(num_epochs):
        train_loss=0
        correct=0
        accuracy=0
        for i in range(0, len(train_padded), batch_size):
            # Batch inputs and labels
            input_batch = train_padded[i:i+batch_size].to(device)
            label_batch = train_labels_tensor[i:i+batch_size]

            # Zero the gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(input_batch)

            # Compute the loss
            loss = loss_function(outputs, label_batch)
            train_loss+=loss.item()
            # Backward pass and optimize
            loss.backward()
            optimizer.step()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == label_batch).sum().item()
        avg_loss=train_loss/len(train_padded)
        accuracy=(correct/len(train_padded))
        # Log training loss per epoch
        mlflow.log_metric("train_loss",avg_loss , step=epoch)
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')
        print(f'Epoch [{epoch+1}/{num_epochs}], Accuracy: {accuracy:.4f}')
        if accuracy>0.995:
            break

    # Evaluation
    model.eval()
    total_loss = 0
    total_samples = 0
    correct=0

    with torch.no_grad():
        for i in range(0, len(test_padded), batch_size):
            # Batch inputs and labels
            input_batch = test_padded[i:i+batch_size]
            label_batch = test_labels_tensor[i:i+batch_size]

            # Forward pass
            outputs = model(input_batch)
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == label_batch).sum().item()
            # Compute loss
            loss = loss_function(outputs, label_batch)
            total_loss += loss.item()
            #total_loss += loss.item() * input_batch.size(0)


    # Compute and log the average test loss
    average_loss = total_loss / len(test_padded)
    print(f"Test Loss: {average_loss:.4f}")
    accuracy = (correct / len(test_padded))
    print(f"Test Accuracy: {accuracy:.4f}")
    mlflow.log_metric("test_loss", average_loss)
    mlflow.log_metric("Accuracy", accuracy)
    writer.add_scalar("Loss/test", average_loss)

    # Log the final model to MLflow
    mlflow.pytorch.log_model(model, "model")
writer.close()

In [None]:
from torch.optim.lr_scheduler import StepLR
#model = IntentClassifierLSTM(vocab_size, embedding_dim, hidden_dim, output_dim, dropout_rate).to(device)
model = IntentClassifierLSTMWithAttention(vocab_size, embedding_dim, hidden_dim, output_dim, dropout_rate).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate,weight_decay=weight_decay)
scheduler = StepLR(optimizer, step_size=5, gamma=0.1)

# Convert labels to torch tensor
train_labels_tensor = torch.tensor(train_labels).to(device)
test_labels_tensor = torch.tensor(test_labels).to(device)
print(train_labels_tensor)
# TensorBoard writer
writer = SummaryWriter()
# Start an MLflow run
with mlflow.start_run():
    mlflow.log_artifacts(writer.log_dir, artifact_path="tensorboard_logs")

    # Log model architecture
    mlflow.log_text(str(model), "model.txt")
    
    # Log hyperparameters
    mlflow.log_text(params_str, "model_parameters.txt")
    for epoch in range(num_epochs):
        train_loss=0
        correct=0
        accuracy=0
        for i in range(0, len(train_padded), batch_size):
            # Batch inputs and labels
            input_batch = train_padded[i:i+batch_size].to(device)
            label_batch = train_labels_tensor[i:i+batch_size]

            # Zero the gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(input_batch)

            # Compute the loss
            loss = loss_function(outputs, label_batch)
            train_loss+=loss.item()
            # Backward pass and optimize
            loss.backward()
            optimizer.step()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == label_batch).sum().item()
        avg_loss=train_loss/len(train_padded)
        accuracy=(correct/len(train_padded))
        # Log training loss per epoch
        mlflow.log_metric("train_loss",avg_loss , step=epoch)
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')
        print(f'Epoch [{epoch+1}/{num_epochs}], Accuracy: {accuracy:.4f}')
        if accuracy>0.998:
            break

    # Evaluation
    model.eval()
    total_loss = 0
    total_samples = 0
    correct=0

    with torch.no_grad():
        for i in range(0, len(test_padded), batch_size):
            # Batch inputs and labels
            input_batch = test_padded[i:i+batch_size]
            label_batch = test_labels_tensor[i:i+batch_size]
    
            # Forward pass
            outputs = model(input_batch)
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == label_batch).sum().item()
            # Compute loss
            loss = loss_function(outputs, label_batch)
            total_loss += loss.item()
            #total_loss += loss.item() * input_batch.size(0)
      

    # Compute and log the average test loss
    average_loss = total_loss / len(test_padded)
    print(f"Test Loss: {average_loss:.4f}")
    accuracy = (correct / len(test_padded))
    print(f"Test Accuracy: {accuracy:.4f}")
    mlflow.log_metric("test_loss", average_loss)
    writer.add_scalar("Loss/test", average_loss)

    # Log the final model to MLflow
    mlflow.pytorch.log_model(model, "model")
writer.close()

In [None]:
with torch.no_grad():
    for i in range(0, len(test_padded), batch_size):
        # Batch inputs and labels
        input_batch = test_padded[i:i+batch_size].to(device)
        label_batch = test_labels_tensor[i:i+batch_size].to(device)

        # Forward pass
        outputs = model(input_batch)
        _, predicted = torch.max(outputs, 1)

        # Convert predictions and labels to CPU and then to NumPy
        predicted_np = predicted.cpu().numpy()
        labels_np = label_batch.cpu().numpy()

        # Convert numerical labels to original categorical labels
        predicted_labels = le.inverse_transform(predicted_np)
        actual_labels = le.inverse_transform(labels_np)
        '''
        # Print predicted and actual labels side by side
        for pred, actual in zip(predicted_labels, actual_labels):
            print(f"Predicted: {pred}, Actual: {actual}")
        '''

In [None]:
torch.save(model, 'model.pth')
#model_serve=IntentClassifierLSTMWithAttention(cfg.vocab_size, embedding_dim, hidden_dim, output_dim, dropout_rate).to(device)
#model_serve.load_state_dict(torch.load('model_state_dict.pth'))


In [None]:
model_serve = torch.load('model.pth')

def predict(model, query, max_length):
    model.eval()
    # Tokenize and prepare input
    query_indices = [text_to_indices(text, word_to_index) for text in query]
    print(query_indices)
    query_tensor = [torch.tensor(seq).to(device) for seq in query_indices]
    print(query_tensor)
    input = pad_sequence(query_tensor, batch_first=True, padding_value=0)
    print(input)
    # Inference
    model.eval()
    with torch.no_grad():
        outputs = model(input)
        _, predicted = torch.max(outputs, 1)

    # Convert prediction to label
    return le.inverse_transform(predicted.data.cpu().numpy())
torch.save(model.state_dict(), 'model_state_dict.pth')



query=list()
query.append("what airlines off from love field between 6 and 10 am on june sixth")
query.append("unknown sequence")
prediction = predict(model_serve, query,46)
print(f"Predicted label: {prediction}")
