In [None]:
!pip install seqeval --quiet

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, ConcatDataset
from torch.nn.utils.rnn import pad_sequence
from seqeval.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
import numpy as np
import csv
import math
import pandas as pd
from transformers.models import convbert
from typing import Optional
import warnings
warnings.filterwarnings("ignore", category=UserWarning, module="seqeval")
from torch.nn.utils import weight_norm

In [None]:
class PSSPDataset(Dataset):
    def __init__(self, x_path, y_path):
        self.encodings = torch.load(x_path, map_location='cpu', mmap=True, weights_only=False)
        self.labels = torch.load(y_path, map_location='cpu', mmap=True, weights_only=False)

    def __getitem__(self, idx):
        embedding = self.encodings[idx]
        labels = self.labels[idx]
        return torch.tensor(embedding), torch.tensor(labels)

    def __len__(self):
        return len(self.labels)

In [None]:
def collate_fn(batch):
    xs, ys = zip(*batch) 

    lengths = [x.size(0) for x in xs]
    max_len = max(lengths)

    x_padded = pad_sequence(xs, batch_first=True)
    y_padded = pad_sequence(ys, batch_first=True, padding_value=-100)

    attention_mask = torch.zeros(y_padded.shape, dtype=torch.bool)
    for i, length in enumerate(lengths):
        attention_mask[i, :length] = True

    src_key_padding_mask = ~attention_mask

    return x_padded, y_padded, src_key_padding_mask

In [None]:
training_dataset = ConcatDataset([
    PSSPDataset(
        x_path='/kaggle/input/ankh-embedding-data/train0_ssp8_embeddings.pt',
        y_path='/kaggle/input/ankh-embedding-data/train0_ssp8_labels.pt'
    ),
    PSSPDataset(
        x_path='/kaggle/input/ankh-embedding-data/train1_ssp8_embeddings.pt',
        y_path='/kaggle/input/ankh-embedding-data/train1_ssp8_labels.pt'
    ),
    PSSPDataset(
        x_path='/kaggle/input/ankh-embedding-data/train2_ssp8_embeddings.pt',
        y_path='/kaggle/input/ankh-embedding-data/train2_ssp8_labels.pt'
    ),
    PSSPDataset(
        x_path='/kaggle/input/ankh-embedding-data/train3_ssp8_embeddings.pt',
        y_path='/kaggle/input/ankh-embedding-data/train3_ssp8_labels.pt'
    ),
    PSSPDataset(
        x_path='/kaggle/input/ankh-embedding-data/train4_ssp8_embeddings.pt',
        y_path='/kaggle/input/ankh-embedding-data/train4_ssp8_labels.pt'
    ),
])

val_dataset = PSSPDataset(
        x_path='/kaggle/input/ankh-embedding-data/val_ssp8_embeddings.pt',
        y_path='/kaggle/input/ankh-embedding-data/val_ssp8_labels.pt'
    )

test_dataset = PSSPDataset(
        x_path='/kaggle/input/ankh-embedding-data/test_ssp8_embeddings.pt',
        y_path='/kaggle/input/ankh-embedding-data/test_ssp8_labels.pt'
    )

In [None]:
BATCH_SIZE = 32
train_loader = DataLoader(training_dataset, collate_fn=collate_fn, batch_size=BATCH_SIZE, shuffle=True, num_workers=2, pin_memory=True)
val_loader   = DataLoader(val_dataset, collate_fn=collate_fn, batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)
test_loader = DataLoader(test_dataset, collate_fn=collate_fn, batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)

In [None]:
unique_tags = {'B', 'C', 'E', 'G', 'H', 'I', 'S', 'T'}
tag2id = {'B': 0, 'C': 1, 'I': 2, 'T': 3, 'S': 4, 'E': 5, 'G': 6, 'H': 7}
id2tag = {0: 'B', 1: 'C', 2: 'I', 3: 'T', 4: 'S', 5: 'E', 6: 'G', 7: 'H'}

In [None]:
class Transformer(nn.Module):
    def __init__(self, input_dim, num_heads, num_layers=1):
        super().__init__()
        encoder_layer = nn.TransformerEncoderLayer(d_model=input_dim, nhead=num_heads)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)

    def forward(self, x, padding_mask=None):
        x = x.transpose(0, 1)
        x = self.transformer_encoder(x, src_key_padding_mask=padding_mask)
        x = x.transpose(0, 1)
        return x

class Chomp1d(nn.Module):
    def __init__(self, chomp_size):
        super(Chomp1d, self).__init__()
        self.chomp_size = chomp_size

    def forward(self, x):
        return x[:, :, :-self.chomp_size].contiguous()

class TemporalBlock(nn.Module):
    def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, dropout):
        super().__init__()
        
        padding = (kernel_size - 1) * dilation

        self.conv1 = nn.Sequential(
            weight_norm(nn.Conv1d(n_inputs, n_outputs, kernel_size=kernel_size, padding=padding,
                                 stride=stride, dilation=dilation)),
            Chomp1d(padding),
            nn.ReLU(),
            nn.Dropout(dropout)
        )
        
        self.conv2 = nn.Sequential(
            weight_norm(nn.Conv1d(n_outputs, n_outputs, kernel_size=kernel_size, padding=padding,
                                 stride=stride, dilation=dilation)),
            Chomp1d(padding),
            nn.ReLU(),
            nn.Dropout(dropout)
        )
        
        self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
        self.relu = nn.ReLU()

    def forward(self, x):
        out = self.conv1(x)
        out = self.conv2(out)
        res = x if self.downsample is None else self.downsample(x)
        return self.relu(res + out)


class TemporalConvNet(nn.Module):
    def __init__(self, num_inputs, num_channels, kernel_size, dropout):
        super().__init__()
        layers = []
        num_levels = len(num_channels)
        for i in range(num_levels):
            dilation_size = 2 ** i
            in_channels = num_inputs if i == 0 else num_channels[i-1]
            out_channels = num_channels[i]
            layers.append(
                TemporalBlock(in_channels, out_channels, kernel_size, stride=1,
                              dilation=dilation_size, dropout=dropout)
            )
        self.network = nn.Sequential(*layers)
        

    def forward(self, x):
        x = self.network(x)
        return x

class BiTCN(nn.Module):
    def __init__(self, input_dim, num_channels, kernel_size, dropout):
        super().__init__()
        self.forward_tcn = TemporalConvNet(input_dim, num_channels, kernel_size, dropout)
        self.backward_tcn = TemporalConvNet(input_dim, num_channels, kernel_size, dropout)

    def forward(self, x):
        forward_out = self.forward_tcn(x)
        backward_out = self.backward_tcn(torch.flip(x, dims=[2]))
        backward_out = torch.flip(backward_out, dims=[2])
        out = torch.cat([forward_out, backward_out], dim=1)
        return out

class PSSPModel(nn.Module):
    def __init__(self, num_channels, kernel_size, num_heads, dropout, input_dim=1536, num_layers=1, num_classes=8):
        super().__init__()
        self.bitcn = BiTCN(input_dim, num_channels, kernel_size, dropout)
        self.transformer = Transformer(num_channels[-1]*2, num_heads, num_layers)
        self.fc = nn.Linear(num_channels[-1]*2, num_classes)

    def forward(self, x, padding_mask=None):
        x = x.transpose(1, 2)
        x = self.bitcn(x)
        x = x.transpose(1, 2)
        x = self.transformer(x, padding_mask)
        logits = self.fc(x)
        return logits

In [None]:
def train_model(model, data_loader, criterion, optimizer, id2tag, device):
    model.train()
    loss_total = 0

    all_preds = []
    all_labels = []

    for x_batch, y_batch, attention_mask in data_loader:
        x_batch, y_batch, attention_mask = x_batch.to(device), y_batch.to(device), attention_mask.to(device)
        optimizer.zero_grad()
        with torch.cuda.amp.autocast():
            outputs = model(x_batch, attention_mask)
            loss = criterion(outputs.view(-1, outputs.shape[-1]), y_batch.view(-1))
        loss.backward()
        optimizer.step()

        preds = outputs.argmax(dim=2).cpu().numpy()
        labels = y_batch.cpu().numpy()

        for i in range(labels.shape[0]):
            pred_seq, label_seq = [], []
            for j in range(labels.shape[1]):
                if labels[i, j] != -100:
                    pred_seq.append(id2tag[preds[i][j]])
                    label_seq.append(id2tag[labels[i][j]])
            all_preds.append(pred_seq)
            all_labels.append(label_seq)

        loss_total += loss.item()

    avg_loss = loss_total / len(data_loader)

    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)

    return avg_loss, accuracy, f1, precision, recall

In [None]:
def eval_model(model, data_loader, criterion, id2tag, device):
    model.eval()
    loss_total = 0

    all_preds = []
    all_labels = []

    for x_batch, y_batch, attention_mask in data_loader:
        x_batch, y_batch, attention_mask = x_batch.to(device), y_batch.to(device), attention_mask.to(device)
        with torch.cuda.amp.autocast():
            outputs = model(x_batch, attention_mask)
            loss = criterion(outputs.view(-1, outputs.shape[-1]), y_batch.view(-1))

        preds = outputs.argmax(dim=2).cpu().numpy()
        labels = y_batch.cpu().numpy()

        for i in range(labels.shape[0]):
            pred_seq, label_seq = [], []
            for j in range(labels.shape[1]):
                if labels[i, j] != -100:
                    pred_seq.append(id2tag[preds[i][j]])
                    label_seq.append(id2tag[labels[i][j]])
            all_preds.append(pred_seq)
            all_labels.append(label_seq)

        loss_total += loss.item()

    avg_loss = loss_total / len(data_loader)

    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)

    return avg_loss, accuracy, f1, precision, recall

In [None]:
def test_model(model, data_loader, criterion, device, id2tag, class_names=None):
    model.eval()
    loss_total = 0


    all_preds_seq, all_labels_seq = [], []
    all_preds_flat, all_labels_flat = [], []

    for x_batch, y_batch, attention_mask in data_loader:
        x_batch, y_batch, attention_mask = x_batch.to(device), y_batch.to(device), attention_mask.to(device)
        with torch.cuda.amp.autocast():
            outputs = model(x_batch, attention_mask)  # [batch, seq_len, num_classes]
            loss = criterion(outputs.view(-1, outputs.shape[-1]), y_batch.view(-1))
        loss_total += loss.item()

        preds = outputs.argmax(dim=2).cpu().numpy()
        labels = y_batch.cpu().numpy()

        for i in range(labels.shape[0]):
            pred_seq, label_seq = [], []
            for j in range(labels.shape[1]):
                if labels[i, j] != -100:
                    pred_seq.append(id2tag[preds[i][j]])
                    label_seq.append(id2tag[labels[i][j]])

                    all_preds_flat.append(preds[i][j])
                    all_labels_flat.append(labels[i][j])

            all_preds_seq.append(pred_seq)
            all_labels_seq.append(label_seq)

    avg_loss = loss_total / len(data_loader)

    accuracy = accuracy_score(all_labels_seq, all_preds_seq)
    precision = precision_score(all_labels_seq, all_preds_seq)
    recall = recall_score(all_labels_seq, all_preds_seq)
    f1 = f1_score(all_labels_seq, all_preds_seq)

    if class_names is None:
        class_names = [id2tag[i] for i in range(len(set(all_labels_flat)))]

    cm = confusion_matrix(all_labels_flat, all_preds_flat)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
    disp.plot(cmap=plt.cm.Blues, xticks_rotation=45)
    plt.title("Confusion Matrix Q8")
    plt.tight_layout()
    plt.show()
    plt.savefig("confusion_matrix.png")
    plt.close()

    # --- Save predictions to CSV ---
    with open("predictions_and_labels.csv", mode="w", newline="") as f_csv:
        writer = csv.writer(f_csv)
        writer.writerow(["predictions", "labels"])  # Header
        for pred_seq, label_seq in zip(all_preds_seq, all_labels_seq):
            writer.writerow([" ".join(pred_seq), " ".join(label_seq)])

    return avg_loss, accuracy, f1, precision, recall

In [None]:
TCN_LEVEL = 3
NUM_CHANNELS = [256] * TCN_LEVEL
KERNEL_SIZE = 5
NUM_HEADS = 8
DROPOUT = 0.5

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = PSSPModel(num_channels=NUM_CHANNELS, 
                  kernel_size=KERNEL_SIZE,
                  num_heads=NUM_HEADS,
                 dropout=DROPOUT).to(device)
print(model)

In [None]:
criterion = nn.CrossEntropyLoss(ignore_index=-100)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-2)

In [None]:
best_acc = 0
best_epoch = -1
checkpoint_path = '/kaggle/working/best_checkpoint.pt'
patience = 10
best_loss = float("inf")
counter_loss = 0
EPOCH = 100

history = {
    "train_loss": [],
    "val_loss": [],
    "train_acc": [],
    "val_acc": [],
    "train_f1": [],
    "val_f1": [],
    "train_precision": [],
    "val_precision": [],
    "train_recall": [],
    "val_recall": []
}

for epoch in range(EPOCH):
    # training
    train_loss, train_acc, train_f1, train_precision, train_recall = train_model(
        model, train_loader, criterion, optimizer, id2tag, device
    )
    print(f'[Epoch {epoch+1}/{EPOCH}] '
          f'Train Loss: {train_loss:.4f} | '
          f'Acc: {train_acc:.4f} | '
          f'F1: {train_f1:.4f} | '
          f'Precision: {train_precision:.4f} | '
          f'Recall: {train_recall:.4f}')

    # validation
    val_loss, val_acc, val_f1, val_precision, val_recall = eval_model(
        model, val_loader, criterion, id2tag, device
    )
    print(f'[Epoch {epoch+1}/{EPOCH}] '
          f'Val   Loss: {val_loss:.4f} | '
          f'Acc: {val_acc:.4f} | '
          f'F1: {val_f1:.4f} | '
          f'Precision: {val_precision:.4f} | '
          f'Recall: {val_recall:.4f}')

    # save best model based on acc
    if val_acc > best_acc:
        best_acc = val_acc
        best_epoch = epoch + 1
        print(f"✅ Saving best model (Acc = {best_acc:.4f}) at epoch {best_epoch}")
        torch.save({
            'epoch': best_epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'val_acc': best_acc
        }, checkpoint_path)

    # early stop
    if val_loss < best_loss:
        best_loss = val_loss
        counter_loss = 0
    else:
        counter_loss += 1
        if counter_loss >= patience:
            print(f"Early stopping triggered at {epoch + 1}")
            break

    # save history
    history["train_loss"].append(train_loss)
    history["val_loss"].append(val_loss)
    history["train_acc"].append(train_acc)
    history["val_acc"].append(val_acc)
    history["train_f1"].append(train_f1)
    history["val_f1"].append(val_f1)
    history["train_precision"].append(train_precision)
    history["val_precision"].append(val_precision)
    history["train_recall"].append(train_recall)
    history["val_recall"].append(val_recall)

# save model last epoch
torch.save(model.state_dict(), '/kaggle/working/last_epoch_model.pt')

In [None]:
epochs = range(1, len(history["train_loss"]) + 1)

metrics = {
    "loss": ("train_loss", "val_loss", "Loss"),
    "accuracy": ("train_acc", "val_acc", "Accuracy"),
    "precision": ("train_precision", "val_precision", "Precision"),
    "recall": ("train_recall", "val_recall", "Recall"),
    "f1": ("train_f1", "val_f1", "F1"),
}

# Loop untuk bikin satu gambar per metric
for metric_name, (train_key, val_key, title) in metrics.items():
    plt.figure(figsize=(8, 6))
    plt.plot(epochs, history[train_key], label="Train")
    plt.plot(epochs, history[val_key], label="Validation")
    plt.title(f"{title} per Epoch")
    plt.xlabel("Epoch")
    plt.ylabel(title)
    plt.legend()
    plt.grid(True, linestyle="--", alpha=0.6)
    plt.tight_layout()
    plt.savefig(f"{metric_name}_plot.png", dpi=300)
    plt.show()

# Save history ke CSV
with open("/kaggle/working/training_history.csv", mode="w", newline="") as file:
    writer = csv.writer(file)
    writer.writerow(history.keys())  # Header
    writer.writerows(zip(*history.values()))

In [None]:
unique_labels = ['B', 'C', 'I', 'T', 'S', 'E', 'G', 'H']
test_loss, test_acc, test_f1, test_precision, test_recall = test_model(model, test_loader, criterion, device, id2tag, unique_labels)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}, Test F1: {test_f1:.4f}, Test Precision: {test_precision:.4f}, Test Recall: {test_recall:.4f}")

In [None]:
model.load_state_dict(torch.load("/kaggle/working/best_checkpoint.pt")['model_state_dict'])
test_loss, test_acc, test_f1, test_precision, test_recall = test_model(model, test_loader, criterion, device, id2tag, unique_labels)
print(f"Test Loss: {test_loss:.4f}, Test Accuracy: {test_acc:.4f}, Test F1: {test_f1:.4f}, Test Precision: {test_precision:.4f}, Test Recall: {test_recall:.4f}")

In [None]:
df = pd.read_csv("/kaggle/working/predictions_and_labels.csv")

# Mapping Q8 → Q3
q8_to_q3 = {
    "E": "E",
    "B": "E",
    "H": "H",
    "G": "H",
    "I": "H",
    "C": "C",
    "T": "C",
    "S": "C"
}

def convert_sequence(seq):
    return [q8_to_q3[str(x)] for x in str(seq).split()]

preds_q3 = df["predictions"].apply(convert_sequence).tolist()
labels_q3 = df["labels"].apply(convert_sequence).tolist()

acc = accuracy_score(preds_q3, labels_q3)
precision = precision_score(preds_q3, labels_q3)
recall = recall_score(preds_q3, labels_q3)
f1 = f1_score(preds_q3, labels_q3)

print("Accuracy:", acc)
print("F1-score:", f1)
print("Precision:", precision)
print("Recall:", recall)

y_pred = [x for row in preds_q3 for x in row]
y_true = [x for row in labels_q3 for x in row]

cm = confusion_matrix(y_true, y_pred, labels=["H","E","C"])
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["H","E","C"])
disp.plot(cmap=plt.cm.Blues, xticks_rotation=45)
plt.title("Confusion Matrix Q3")
plt.tight_layout()
plt.show()
plt.savefig(f"/kaggle/working/confusion_matrix_q3.png")
plt.close()