In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split

random_state = 42

In [None]:
df = pd.read_csv('data/train.csv', usecols=['id', 'genome_sequence', "species"]) # Removes the index column of the csv
print("Shape of the dataset: ", df.shape)

In [None]:
df.head()

In [None]:
# Check that there are no missing values
df.isnull().sum()

In [None]:
# Check the balancing of the target
df.species.value_counts()

In [None]:
# Get the average length of the genome sequences
df.genome_sequence.apply(lambda x: len(x)).value_counts()

Let's remove the few that aren't of the 80 long

In [None]:
# Remove the <20 genome sequences that are not 80 long
df = df[df.genome_sequence.apply(lambda x: len(x)) == 80]
df.shape

## Data Preparation

In [None]:
# Create a label column that change the species name into a number
labels_dict = {species: i for i, species in enumerate(df.species.unique())}
df['label'] = df.species.map(labels_dict)
df.head()

In [None]:
# Define a function to one hot encode a DNA sequence
def one_hot_encote_dna(seq):
    return np.array(
        [
            [
                1 if c == "A" else 0,
                1 if c == "C" else 0,
                1 if c == "G" else 0,
                1 if c == "T" else 0,
            ]
            for c in seq.upper()
        ]
    )

In [None]:
# Drop unnecessary columns
df = df.drop(columns=['id', 'species'])
df.head()

In [None]:
# Split the dataset into train and test
df_train, df_test = train_test_split(df, test_size=0.2, random_state=random_state)
print("Shape of the train dataset: ", df_train.shape)
print("Shape of the test dataset: ", df_test.shape)

## Creating a torch dataset

In [None]:
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torch

In [None]:
# Define a custom dataset for CNN
class GenomeDatasetCNN(Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.transform = transforms.Compose([
            transforms.ToTensor(),
        ]) if transform is None else transform

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        label = row.label
        dna = row.genome_sequence
        dna = one_hot_encote_dna(dna)
        dna = torch.from_numpy(dna)
        dna = torch.reshape(dna, (dna.shape[1], dna.shape[0]))
        # convert all to float
        dna = dna.float()
        label = torch.tensor(label).float()
        return dna, label

In [None]:
def print_one_hot_encoded_dna(genome_seq):
    # Plot the one hot encoded DNA sequence
    plt.figure(figsize=(20, 2))
    plt.imshow(genome_seq, cmap="hot")
    plt.xticks(range(0, genome_seq.shape[0]))
    plt.yticks(range(0, 4), ["A", "C", "G", "T"])
    plt.show()

test_print = one_hot_encote_dna(df_train.iloc[0].genome_sequence).transpose(1, 0)
print_one_hot_encoded_dna(test_print)

In [None]:
dataset = GenomeDatasetCNN(df_train)
dataloader = DataLoader(dataset, batch_size=32)

# print an example of the dataset
dna, label = next(iter(dataloader))
print("DNA shape: ", dna.shape)
print("Label shape: ", label.shape)
print("Label: ", label)
print("DNA: ", dna[0])
print_one_hot_encoded_dna(dna[0]) # Print the first DNA sequence of the batch

# Make our test and train datasets and data loaders
train_dataset = GenomeDatasetCNN(df_train)
test_dataset = GenomeDatasetCNN(df_test)
train_dataloader = DataLoader(train_dataset, batch_size=1024, shuffle=True, drop_last=True)
test_dataloader = DataLoader(test_dataset, batch_size=1024, shuffle=True)

## CNN

In [None]:
import torchvision.models as models
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from tqdm import tqdm

# Define the model

class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        
        """
        # In channel: 4
        # Use 3 layers of Conv1D with kernel size (2, 3, 3) and stride (0, 1, 1), and padding (0, 1, 1)
        The in channels are: (4, 16, 32)
        The out channels are: (16, 32, 64)
        # Use 1 layer of MaxPool1D with kernel size (2) and stride (2)
        # Use 2 layers of Linear with 128 and 64 neurons
        # Use 1 layer of Linear with 1 neuron
        """

        self.conv1 = nn.Conv1d(4, 16, 2, stride=1, padding=0)
        self.conv2 = nn.Conv1d(16, 32, 3, stride=1, padding=1)
        self.conv3 = nn.Conv1d(32, 64, 3, stride=1, padding=1)
        self.pool = nn.MaxPool1d(2)
        self.fc1 = nn.Linear(19*64, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 1)


    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        # print(x.shape)
        x = x.view(x.shape[0], -1)
        # print(x.shape)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.sigmoid(self.fc3(x))
        # print(x.shape)
        return x


model = CNN()

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Train the model

def train_model(model, optimizer, criterion, dataloader, epochs=10, device="cpu"):
    model.to(device)
    model.train()
    for _ in range(epochs):
        train_loss = 0
        for batch_idx, (genome_seq, label) in tqdm(enumerate(dataloader), total=len(dataloader)):
            optimizer.zero_grad()
            genome_seq = genome_seq.to(device)
            label = label.to(device)
            output = model(genome_seq)
            loss = criterion(output.view(-1), label)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        print("Train loss: ", train_loss / len(dataloader))
    
    return model

In [None]:
model = train_model(model, optimizer, criterion, train_dataloader, epochs=3, device=device)

In [None]:
# Compute accuracy on the test set
from sklearn.metrics import accuracy_score

def compute_accuracy(model, dataloader):
    model.eval()
    y_true = []
    y_pred = []
    for batch_idx, (genome_seq, label) in enumerate(dataloader):
        output = model(genome_seq)
        y_true.extend(label.numpy())
        y_pred.extend(output.argmax(axis=1).numpy())
    return accuracy_score(y_true, y_pred)

print("Accuracy on the test set: ", compute_accuracy(model, test_dataloader))