In [None]:
%pip install torch tiktoken onnx onnxruntime panadas

In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import onnxruntime as ort
import tiktoken
import numpy as np
import pandas as pd
from datasets import load_dataset
from sklearn.model_selection import train_test_split

In [3]:
# Initialise tiktoken tokeniser
tokeniser = tiktoken.get_encoding("gpt2")

def encode_text(text, max_length):
    tokens = tokeniser.encode(text, allowed_special={"<|endoftext|>"})
    if len(tokens) > max_length:
        tokens = tokens[:max_length]  # Truncate
    else:
        tokens += [0] * (max_length - len(tokens))  # Pad
    return tokens

In [4]:
# Hyperparameters
vocab_size = tokeniser.n_vocab  # Tokeniser vocabulary size
embed_dim = 768
num_heads = 12
num_layers = 6
max_seq_length = 512
learning_rate = 1e-4
batch_size = 32
epochs = 5

# Config
model_path = "/Users/paulzanna/Github/Ziggy/model/"
model_filename = "ziggy_model.bin"
onnx_model_filename = "ziggy_model.onnx"
data_path = "/Users/paulzanna/Github/Ziggy/data/"
data_filename = "data.csv"
req_filename = "requirements.csv"

In [5]:
class TextClassificationDataset(Dataset):
    def __init__(self, texts, labels, max_length):
        self.texts = texts
        self.labels = labels
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        input_ids = torch.tensor(encode_text(text, self.max_length), dtype=torch.long)
        attention_mask = (input_ids != 0).long()  # Mask non-padding tokens
        return input_ids, attention_mask, torch.tensor(label, dtype=torch.long)

In [6]:
class TransformerClassifier(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_heads, num_layers, num_classes, max_seq_length):
        super(TransformerClassifier, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.positional_encoding = nn.Parameter(torch.zeros(1, max_seq_length, embed_dim))
        encoder_layer = nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.fc = nn.Linear(embed_dim, num_classes)

    def forward(self, input_ids, attention_mask):
        embedded = self.embedding(input_ids) + self.positional_encoding[:input_ids.size(1), :]
        transformer_output = self.transformer_encoder(
            embedded.transpose(0, 1),  # (seq_len, batch, embed_dim)
            src_key_padding_mask=~attention_mask.bool()  # Inverse mask
        )
        pooled_output = transformer_output.mean(dim=0)  # Mean pooling
        logits = self.fc(pooled_output)
        return logits

In [7]:
def train_model(model, dataloader, epochs, learning_rate, device):
    model = model.to(device)
    optimiser = torch.optim.AdamW(model.parameters(), lr=learning_rate)
    loss_fn = nn.CrossEntropyLoss()

    for epoch in range(epochs):
        model.train()
        total_loss = 0
        for input_ids, attention_mask, labels in dataloader:
            input_ids, attention_mask, labels = (
                input_ids.to(device),
                attention_mask.to(device),
                labels.to(device),
            )
            optimiser.zero_grad()
            outputs = model(input_ids, attention_mask)
            loss = loss_fn(outputs, labels)
            loss.backward()
            optimiser.step()
            total_loss += loss.item()

        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(dataloader):.4f}")

In [None]:
# Load labels and create mappings
labels = pd.read_csv(data_path + req_filename)
print(labels)
id2label = pd.Series(labels.requirement.values, index=labels.id).to_dict()
label2id = pd.Series(labels.id.values, index=labels.requirement).to_dict()
num_classes = len(id2label)

# print labels
print(id2label)
print(label2id)

In [None]:
# Load example data
clause_data = pd.read_csv(data_path + data_filename)

# Combine label columns into a single multi-label 'label' column
label_columns = labels['requirement'].tolist()
clause_data['label'] = clause_data[label_columns].values.tolist()
clause_data = clause_data.drop(columns=label_columns)

# Find which item in clause is the label
clause_data['label'] = clause_data['label'].apply(lambda x: [i for i, v in enumerate(x) if v == 1])
clauses = clause_data['clause'].tolist()
clause_label = clause_data['label'].apply(lambda x: x[0] if x else -1)  # Convert to single integer label or -1 if empty
clause_label = clause_label.to_list()
# Print clauses and labels
print(clauses)
print(clause_label)

# Convert clauses to a dataset
dataset = TextClassificationDataset(clauses, clause_label, max_seq_length)

In [None]:
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Initialise model
model = TransformerClassifier(vocab_size, embed_dim, num_heads, num_layers, num_classes, max_seq_length)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

# Train model
train_model(model, dataloader, epochs, learning_rate, device)

# Save model
torch.save(model.state_dict(), model_path + model_filename)

In [None]:
# Load model
model = TransformerClassifier(vocab_size, embed_dim, num_heads, num_layers, num_classes, max_seq_length)
model_hf= torch.load(model_path + model_filename)
sd_hf = model.state_dict()

for k, v in sd_hf.items():
    print(k, v.shape)

In [None]:
sd_hf["positional_encoding"].view(-1)[:20]

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

plt.imshow(sd_hf["embedding.weight"], cmap="gray")

In [None]:
plt.plot(sd_hf["positional_encoding"][:, 150])
plt.plot(sd_hf["positional_encoding"][:, 200])
plt.plot(sd_hf["positional_encoding"][:, 250])

In [15]:
#
# Export ONNX model
#
dummy_input_ids = torch.randint(0, vocab_size, (1, max_seq_length)).to(device)
dummy_attention_mask = torch.ones(1, max_seq_length).to(device)

torch.onnx.export(
    model,
    (dummy_input_ids, dummy_attention_mask),
    model_path + onnx_model_filename,
    opset_version=14,
    input_names=["input_ids", "attention_mask"],
    output_names=["logits"],
    dynamic_axes={
        "input_ids": {0: "batch_size", 1: "seq_length"},
        "attention_mask": {0: "batch_size", 1: "seq_length"},
        "logits": {0: "batch_size"},
    },
    input_types=[torch.int64, torch.int64]
)

In [None]:
#
# Verify ONNX model
#

# Function to predict using ONNX
def predict_with_onnx(ort_session, input_ids, attention_mask):
    inputs = {
        "input_ids": input_ids.cpu().numpy().astype(np.int64),  # Ensure int64 type
        "attention_mask": attention_mask.astype(np.float32),
    }
    logits = ort_session.run(None, inputs)[0]
    return np.argmax(logits, axis=1)

# Load ONNX model
ort_session = ort.InferenceSession(model_path + onnx_model_filename)

# Input text
input_text = "The service provider must ensure that all data is encrypted at rest"

# Tokenise and preprocess
input_ids = torch.tensor([encode_text(input_text, max_seq_length)], dtype=torch.int64)
attention_mask = (input_ids != 0).numpy().astype(np.float32)


# Apply softmax to logits to compute probabilities
def softmax(logits):
    exp_logits = np.exp(logits - np.max(logits))  # Subtract max for numerical stability
    return exp_logits / np.sum(exp_logits, axis=1, keepdims=True)

# Predict and compute probabilities
logits = ort_session.run(None, {"input_ids": input_ids.cpu().numpy().astype(np.int64), "attention_mask": attention_mask.astype(np.float32)})[0]

# Apply softmax
probabilities = softmax(logits)

# Get predicted label and probability
predicted_label = np.argmax(probabilities, axis=1)[0]
predicted_probability = probabilities[0][predicted_label]

# Print the id2label mapping of the predicted label
print(f"Predicted Label: {id2label[predicted_label]}")
print(f"Probability: {predicted_probability * 100:.2f}%")
print(f"Probabilities: {probabilities[0]}")