In [None]:
import os
import sys
import csv
import torch
import shutil
import torch.nn as nn
import numpy as np
from sklearn import metrics
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from tensorboardX import SummaryWriter
csv.field_size_limit(sys.maxsize)

In [None]:
import numpy as np
import torch
from torch.utils.data import Dataset
import csv

class MyDataset(Dataset):
    def __init__(self, data_path, max_length=1500):
        self.data_path = data_path
        self.vocabulary = list("""abcdefghijklmnoprstuvwyz0123456789-,;.!?:’’’/\|_@#$%ˆ&*˜‘+-=<>()[]{}""")  # Removed 'q' and 'x'
        self.identity_mat = np.identity(len(self.vocabulary))
        texts, labels = [], []
        with open(data_path) as csv_file:
            reader = csv.reader(csv_file, quotechar='"')
            for idx, line in enumerate(reader):
                if idx != 0:
                    text = " ".join(line[1:])
                    label = line[0]
                    texts.append(text)
                    labels.append(label)
        self.texts = texts
        self.labels = labels
        self.max_length = max_length
        self.length = len(self.labels)
        self.num_classes = len(set(self.labels))

    def __len__(self):
        return self.length

    def __getitem__(self, index):
        raw_text = self.texts[index]
        data = np.array([self.identity_mat[self.vocabulary.index(i)] for i in raw_text if i in self.vocabulary],
                        dtype=np.float32)
        if len(data) > self.max_length:
            data = data[:self.max_length]
        elif len(data) < self.max_length:
            data = np.concatenate(
                (data, np.zeros((self.max_length - len(data), len(self.vocabulary)), dtype=np.float32)))
        elif len(data) == 0:
            data = np.zeros((self.max_length, len(self.vocabulary)), dtype=np.float32)

        label = self.labels[index]
        data_tensor = torch.tensor(data, dtype=torch.float32)
        return data_tensor, label

def get_evaluation(y_true, y_prob, list_metrics):
    y_pred = np.argmax(y_prob, -1)
    output = {}
    if 'accuracy' in list_metrics:
        output['accuracy'] = metrics.accuracy_score(y_true, y_pred)
    if 'loss' in list_metrics:
        try:
            output['loss'] = metrics.log_loss(y_true, y_prob)
        except ValueError:
            output['loss'] = -1
    if 'confusion_matrix' in list_metrics:
        output['confusion_matrix'] = str(metrics.confusion_matrix(y_true, y_pred))
    return output


In [None]:
class CharacterLevelCNN(nn.Module):
    def __init__(self, n_classes=14, input_length=1500, input_dim=68,
                 n_conv_filters=256,
                 n_fc_neurons=1024):
        super(CharacterLevelCNN, self).__init__()
        self.conv1 = nn.Sequential(nn.Conv1d(input_dim, n_conv_filters, kernel_size=7, padding=0), nn.ReLU(),
                                   nn.MaxPool1d(3))
        self.conv2 = nn.Sequential(nn.Conv1d(n_conv_filters, n_conv_filters, kernel_size=7, padding=0), nn.ReLU(),
                                   nn.MaxPool1d(3))
        self.conv3 = nn.Sequential(nn.Conv1d(n_conv_filters, n_conv_filters, kernel_size=3, padding=0), nn.ReLU())
        self.conv4 = nn.Sequential(nn.Conv1d(n_conv_filters, n_conv_filters, kernel_size=3, padding=0), nn.ReLU())
        self.conv5 = nn.Sequential(nn.Conv1d(n_conv_filters, n_conv_filters, kernel_size=3, padding=0), nn.ReLU())
        self.conv6 = nn.Sequential(nn.Conv1d(n_conv_filters, n_conv_filters, kernel_size=3, padding=0), nn.ReLU(),
                                   nn.MaxPool1d(3))
        # compute the  output shape after forwarding an input to the conv layers
        input_shape = (128,
                      input_length,
                      input_dim)
        self.output_dimension = self._get_conv_output(input_shape)

        self.fc1 = nn.Sequential(nn.Linear(self.output_dimension, n_fc_neurons), nn.Dropout(0.5))
        self.fc2 = nn.Sequential(nn.Linear(n_fc_neurons, n_fc_neurons), nn.Dropout(0.5))
        self.fc3 = nn.Linear(n_fc_neurons, n_classes)

        if n_conv_filters == 256 and n_fc_neurons == 1024:
            self._create_weights(mean=0.0, std=0.05)
        elif n_conv_filters == 1024 and n_fc_neurons == 2048:
            self._create_weights(mean=0.0, std=0.02)

    def _create_weights(self, mean=0.0, std=0.05):
        for module in self.modules():
            if isinstance(module, nn.Conv1d) or isinstance(module, nn.Linear):
                module.weight.data.normal_(mean, std)

    def _get_conv_output(self, shape):
        x = torch.rand(shape)
        x = x.transpose(1, 2)
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        x = self.conv6(x)
        x = x.view(x.size(0), -1)
        output_dimension = x.size(1)
        return output_dimension

    def forward(self, input):
        input = input.transpose(1, 2)
        output = self.conv1(input)
        output = self.conv2(output)
        output = self.conv3(output)
        output = self.conv4(output)
        output = self.conv5(output)
        output = self.conv6(output)

        output = output.view(output.size(0), -1)
        output = self.fc1(output)
        output = self.fc2(output)
        output = self.fc3(output)

        return output

def train(feature, optimizer):
    # Set the seed
    if torch.backends.mps.is_available():
        torch.manual_seed(123)
    else:
        torch.manual_seed(123)

    # Initialize model
    if feature == "small":
        model = CharacterLevelCNN(input_length=max_length, n_classes=14,
                                  input_dim=len(alphabet),
                                  n_conv_filters=256, n_fc_neurons=1024)
    elif feature == "large":
        model = CharacterLevelCNN(input_length=max_length, n_classes=14,
                                  input_dim=len(alphabet),
                                  n_conv_filters=1024, n_fc_neurons=2048)
    else:
        raise ValueError("Invalid feature mode!")

    # Data loaders
    training_set = MyDataset(input + "/zero_kin_train.csv", max_length)
    test_set = MyDataset(input + "/zero_kin_test.csv", max_length)
    training_generator = DataLoader(training_set, batch_size=batch_size, shuffle=True, num_workers=0)
    test_generator = DataLoader(test_set, batch_size=batch_size, shuffle=True, num_workers=0)

    # Move model to MPS if available
    device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
    model.to(device)

    criterion = nn.CrossEntropyLoss()

    # Choose optimizer
    if optimizer == "adam":
        optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    elif optimizer == "sgd":
        optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    else:
        raise ValueError("Invalid optimizer type!")

    best_loss = 1e5
    best_epoch = 0
    model.train()
    num_iter_per_epoch = len(training_generator)

    for epoch in range(num_epochs):
        for iter, batch in enumerate(training_generator):
            features, label = batch
            label = torch.Tensor(np.array(label, int)).to(device)
            features = features.to(device)
            optimizer.zero_grad()
            predictions = model(features)
            loss = criterion(predictions, label.long())
            loss.backward()
            optimizer.step()

        model.eval()
        loss_ls = []
        te_label_ls = []
        te_pred_ls = []
        for batch in test_generator:
            te_feature, te_label = batch
            num_sample = len(te_label)
            te_label = torch.Tensor(np.array(te_label, int)).to(device)
            te_feature = te_feature.to(device)
            with torch.no_grad():
                te_predictions = model(te_feature)
            te_loss = criterion(te_predictions, te_label.long())
            loss_ls.append(te_loss.item() * num_sample)
            te_label_ls.extend(te_label.cpu().numpy())
            te_pred_ls.append(te_predictions.cpu().numpy())

        te_loss = sum(loss_ls) / len(test_set)
        te_pred = np.concatenate(te_pred_ls, 0)
        te_label = np.array(te_label_ls)
        test_metrics = get_evaluation(te_label, te_pred, list_metrics=["accuracy", "confusion_matrix"])

        print(f"Epoch: {epoch + 1}/{num_epochs}, Train Loss: {loss.item()}, Test Loss: {te_loss}, Test Accuracy: {test_metrics['accuracy']}")

        model.train()
        if te_loss < best_loss:
            best_loss = te_loss
            best_epoch = epoch
            torch.save(model.state_dict(), f"{output}/char-cnn_kin_{feature}.pth")

        # Early stopping
        if epoch - best_epoch > es_patience > 0:
            print(f"Stopping early at epoch {epoch}. Best loss was {best_loss} at epoch {best_epoch}.")
            break

        # Learning rate decay for SGD
        if optimizer == "sgd" and epoch % 3 == 0 and epoch > 0:
            current_lr = optimizer.param_groups[0]['lr']
            current_lr /= 2
            for param_group in optimizer.param_groups:
                param_group['lr'] = current_lr

## Training on kinyarwanda

In [None]:
if __name__ == "__main__":
    torch.backends.cudnn.deterministic = True

    alphabet = "abcdefghijklmnoprstuvwyz0123456789-,;.!?:’’’/\|_@#$%ˆ&*˜‘+-=<>()[]{}"
    max_length = 1500
    optimizer = "sgd"
    batch_size = 128
    num_epochs = 30
    lr = 0.001
    es_min_delta = 0.0
    es_patience = 3
    input = "cleaned"
    output = "../output"
    log_path = "../tensorboard/char-cnn"
    train("small", optimizer)

In [None]:
# Save only the state dict
torch.save(model.state_dict(), 'char_cnn_kinn.pth')

## Direct evaluation

In [None]:
alphabet = "abcdefghijklmnoprstuvwyz0123456789-,;.!?:’’’/\|_@#$%ˆ&*˜‘+-=<>()[]{}"
max_length = 1500

In [None]:
model = CharacterLevelCNN(input_length=1500, n_classes=14, input_dim=len(alphabet))

In [None]:
model.load_state_dict(torch.load('char_cnn_kinn.pth'))
model.eval()

In [None]:
# Load the Kirundi dataset
kirundi_test_set = MyDataset("cleaned/zero_kir_test.csv", max_length)
kirundi_test_loader = DataLoader(kirundi_test_set, batch_size=128, shuffle=False, num_workers=0)

In [None]:
criterion = nn.CrossEntropyLoss()

In [None]:
from sklearn.metrics import f1_score

def evaluate(model, test_loader, criterion):
    model.eval()  # Set model to evaluation mode
    loss_ls = []
    te_label_ls = []
    te_pred_ls = []

    with torch.no_grad():  # Disable gradient calculation for evaluation
        for batch in test_loader:
            features, labels = batch
            
            # Ensure labels are numeric
            if isinstance(labels[0], str):  # Check if labels are strings
                labels = np.array([int(label) for label in labels])
            labels = torch.Tensor(labels).long()

            if torch.cuda.is_available():
                features = features.cuda()
                labels = labels.cuda()

            predictions = model(features)
            loss = criterion(predictions, labels)
            loss_ls.append(loss.item() * len(labels))
            
            te_label_ls.extend(labels.cpu().numpy())
            te_pred_ls.append(predictions.cpu().numpy())
    
    total_loss = sum(loss_ls) / len(test_loader.dataset)
    te_pred = np.concatenate(te_pred_ls, axis=0)
    
    return np.array(te_label_ls), te_pred, total_loss

# Test and evaluate on the Kirundi dataset
kirundi_labels, kirundi_predictions, kirundi_loss = evaluate(model, kirundi_test_loader, criterion)

# Get evaluation metrics
test_metrics = get_evaluation(kirundi_labels, kirundi_predictions, list_metrics=["accuracy", "confusion_matrix"])

# Calculate F1 Score
f1 = f1_score(kirundi_labels, np.argmax(kirundi_predictions, axis=1), average='weighted')

# Print the results
print(f"Test Loss: {kirundi_loss}")
print(f"Test Accuracy: {test_metrics['accuracy']}")
#print(f"Confusion Matrix:\n{test_metrics['confusion_matrix']}")
print(f"F1 Score: {f1}")


## Fine tuning

In [None]:
def fine_tune(feature, optimizer, model_path, fine_tune_layers=False):
    # Set the seed
    if torch.backends.mps.is_available():
        torch.manual_seed(123)
    else:
        torch.manual_seed(123)

    # Initialize model
    if feature == "small":
        model = CharacterLevelCNN(input_length=max_length, n_classes=14,
                                  input_dim=len(alphabet),
                                  n_conv_filters=256, n_fc_neurons=1024)
    elif feature == "large":
        model = CharacterLevelCNN(input_length=max_length, n_classes=14,
                                  input_dim=len(alphabet),
                                  n_conv_filters=1024, n_fc_neurons=2048)
    else:
        raise ValueError("Invalid feature mode!")

    # Load the pre-trained weights
    model.load_state_dict(torch.load(model_path))

    # Optionally freeze certain layers
    if not fine_tune_layers:
        for param in model.conv1.parameters():
            param.requires_grad = False
        for param in model.conv2.parameters():
            param.requires_grad = False
        # Add more layers as needed to freeze them
        # For example, freeze the first few conv layers and fine-tune only fully connected layers

    # Data loaders
    training_set = MyDataset(input + "/zero_kir_train.csv", max_length)  # Use the fine-tuning dataset here
    test_set = MyDataset(input + "/zero_kir_test.csv", max_length)       # Use the fine-tuning dataset here
    training_generator = DataLoader(training_set, batch_size=batch_size, shuffle=True, num_workers=0)
    test_generator = DataLoader(test_set, batch_size=batch_size, shuffle=True, num_workers=0)

    # Move model to MPS if available
    device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
    model.to(device)

    criterion = nn.CrossEntropyLoss()

    # Choose optimizer
    if optimizer == "adam":
        optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    elif optimizer == "sgd":
        optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    else:
        raise ValueError("Invalid optimizer type!")

    best_loss = 1e5
    best_epoch = 0
    model.train()
    num_iter_per_epoch = len(training_generator)

    for epoch in range(num_epochs):
        for iter, batch in enumerate(training_generator):
            features, label = batch
            label = torch.Tensor(np.array(label, int)).to(device)
            features = features.to(device)
            optimizer.zero_grad()
            predictions = model(features)
            loss = criterion(predictions, label.long())
            loss.backward()
            optimizer.step()

        model.eval()
        loss_ls = []
        te_label_ls = []
        te_pred_ls = []
        for batch in test_generator:
            te_feature, te_label = batch
            num_sample = len(te_label)
            te_label = torch.Tensor(np.array(te_label, int)).to(device)
            te_feature = te_feature.to(device)
            with torch.no_grad():
                te_predictions = model(te_feature)
            te_loss = criterion(te_predictions, te_label.long())
            loss_ls.append(te_loss.item() * num_sample)
            te_label_ls.extend(te_label.cpu().numpy())
            te_pred_ls.append(te_predictions.cpu().numpy())

        te_loss = sum(loss_ls) / len(test_set)
        te_pred = np.concatenate(te_pred_ls, 0)
        te_label = np.array(te_label_ls)
        test_metrics = get_evaluation(te_label, te_pred, list_metrics=["accuracy", "confusion_matrix"])

        print(f"Epoch: {epoch + 1}/{num_epochs}, Train Loss: {loss.item()}, Test Loss: {te_loss}, Test Accuracy: {test_metrics['accuracy']}")

        model.train()
        if te_loss < best_loss:
            best_loss = te_loss
            best_epoch = epoch
            torch.save(model.state_dict(), f"{output}/fine_tuned_char_cnn_{feature}.pth")

        # Early stopping
        if epoch - best_epoch > es_patience > 0:
            print(f"Stopping early at epoch {epoch}. Best loss was {best_loss} at epoch {best_epoch}.")
            break

        # Learning rate decay for SGD
        if optimizer == "sgd" and epoch % 3 == 0 and epoch > 0:
            current_lr = optimizer.param_groups[0]['lr']
            current_lr /= 2
            for param_group in optimizer.param_groups:
                param_group['lr'] = current_lr


In [None]:
if __name__ == "__main__":
    # Ensure reproducibility
    torch.backends.cudnn.deterministic = True

    # Alphabet and parameters
    alphabet = "abcdefghijklmnoprstuvwyz0123456789-,;.!?:’’’/\|_@#$%ˆ&*˜‘+-=<>()[]{}"
    max_length = 1500
    optimizer = "sgd"
    batch_size = 128
    num_epochs = 15
    lr = 0.001
    es_min_delta = 0.0
    es_patience = 3

    # Paths to input and output
    input = "cleaned"
    output = "../output"
    log_path = "../tensorboard/char-cnn-tuned"
    
    # Path to the pre-trained model you want to fine-tune
    model_path = "char_cnn_kinn.pth"  # Adjust this path to where your pre-trained model is saved

    # Call the fine_tune function
    fine_tune(
        feature="small",              # Use "small" or "large" based on the model size
        optimizer=optimizer,          # Optimizer choice ("adam" or "sgd")
        model_path=model_path,        # Path to the pre-trained model
        fine_tune_layers=True         # Set to False to freeze certain layers, True to fine-tune all layers
    )


In [None]:
# Save only the state dict
torch.save(model.state_dict(), 'char_cnn_tuned.pth')

In [None]:
alphabet = "abcdefghijklmnoprstuvwyz0123456789-,;.!?:’’’/\|_@#$%ˆ&*˜‘+-=<>()[]{}"
max_length = 1500

In [None]:
model = CharacterLevelCNN(input_length=1500, n_classes=14, input_dim=len(alphabet))

In [None]:
model.load_state_dict(torch.load('char_cnn_tuned.pth'))
model.eval()

In [None]:
# Load the Kirundi dataset
kirundi_test_set = MyDataset("cleaned/zero_kir_test.csv", max_length)
kirundi_test_loader = DataLoader(kirundi_test_set, batch_size=128, shuffle=False, num_workers=0)

In [None]:
criterion = nn.CrossEntropyLoss()

In [None]:
from sklearn.metrics import f1_score

def evaluate(model, test_loader, criterion):
    model.eval()  # Set model to evaluation mode
    loss_ls = []
    te_label_ls = []
    te_pred_ls = []

    with torch.no_grad():  # Disable gradient calculation for evaluation
        for batch in test_loader:
            features, labels = batch
            
            # Ensure labels are numeric
            if isinstance(labels[0], str):  # Check if labels are strings
                labels = np.array([int(label) for label in labels])
            labels = torch.Tensor(labels).long()

            if torch.cuda.is_available():
                features = features.cuda()
                labels = labels.cuda()

            predictions = model(features)
            loss = criterion(predictions, labels)
            loss_ls.append(loss.item() * len(labels))
            
            te_label_ls.extend(labels.cpu().numpy())
            te_pred_ls.append(predictions.cpu().numpy())
    
    total_loss = sum(loss_ls) / len(test_loader.dataset)
    te_pred = np.concatenate(te_pred_ls, axis=0)
    
    return np.array(te_label_ls), te_pred, total_loss

# Test and evaluate on the Kirundi dataset
kirundi_labels, kirundi_predictions, kirundi_loss = evaluate(model, kirundi_test_loader, criterion)

# Get evaluation metrics
test_metrics = get_evaluation(kirundi_labels, kirundi_predictions, list_metrics=["accuracy", "confusion_matrix"])

# Calculate F1 Score
f1 = f1_score(kirundi_labels, np.argmax(kirundi_predictions, axis=1), average='weighted')

# Print the results
print(f"Test Loss: {kirundi_loss}")
print(f"Test Accuracy: {test_metrics['accuracy']}")
#print(f"Confusion Matrix:\n{test_metrics['confusion_matrix']}")
print(f"F1 Score: {f1}")


## Forgetting

In [None]:
model.load_state_dict(torch.load('char_cnn_tuned.pth'))
model.eval()

In [None]:
# Load the Kinywarnda dataset
kirundi_test_set = MyDataset("cleaned/zero_kin_test.csv", max_length)
kirundi_test_loader = DataLoader(kirundi_test_set, batch_size=128, shuffle=False, num_workers=0)

In [None]:
from sklearn.metrics import f1_score

def evaluate(model, test_loader, criterion):
    model.eval()  # Set model to evaluation mode
    loss_ls = []
    te_label_ls = []
    te_pred_ls = []

    with torch.no_grad():  # Disable gradient calculation for evaluation
        for batch in test_loader:
            features, labels = batch
            
            # Ensure labels are numeric
            if isinstance(labels[0], str):  # Check if labels are strings
                labels = np.array([int(label) for label in labels])
            labels = torch.Tensor(labels).long()

            if torch.cuda.is_available():
                features = features.cuda()
                labels = labels.cuda()

            predictions = model(features)
            loss = criterion(predictions, labels)
            loss_ls.append(loss.item() * len(labels))
            
            te_label_ls.extend(labels.cpu().numpy())
            te_pred_ls.append(predictions.cpu().numpy())
    
    total_loss = sum(loss_ls) / len(test_loader.dataset)
    te_pred = np.concatenate(te_pred_ls, axis=0)
    
    return np.array(te_label_ls), te_pred, total_loss

# Test and evaluate on the Kirundi dataset
kirundi_labels, kirundi_predictions, kirundi_loss = evaluate(model, kirundi_test_loader, criterion)

# Get evaluation metrics
test_metrics = get_evaluation(kirundi_labels, kirundi_predictions, list_metrics=["accuracy", "confusion_matrix"])

# Calculate F1 Score
f1 = f1_score(kirundi_labels, np.argmax(kirundi_predictions, axis=1), average='weighted')

# Print the results
print(f"Test Loss: {kirundi_loss}")
print(f"Test Accuracy: {test_metrics['accuracy']}")
#print(f"Confusion Matrix:\n{test_metrics['confusion_matrix']}")
print(f"F1 Score: {f1}")
