In [3]:
import torch
import torchvision
from torch import nn 
#from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms
import matplotlib.pyplot as plt
import pandas as pd
from IPython.display import Image 
import numpy as np
import random
import dataset_utils
from torch.utils.data import Dataset, DataLoader

In [4]:
df = pd.read_csv('final_dataset.csv',index_col=0)
tform = transforms.Compose([transforms.Resize((64,64)),transforms.PILToTensor(),transforms.ConvertImageDtype(torch.float),transforms.Normalize(0.5,0.5)])
image_dataset = torchvision.datasets.ImageFolder("image_dataset/",transform=tform)
species2genus = dataset_utils.species_label_to_genus_label(df,image_dataset)

In [5]:
batch_size = 1000 
import random
import dataset_utils
img2dna = dataset_utils.get_imgs_bold_id(image_dataset,df)

nucleotides = df[['nucleotide','species_name','genus_name','processid','image_urls']]
colonna_dna = df.loc[:,"nucleotide"]
nucleotides.loc[:,'nucleotide'] = colonna_dna.apply(dataset_utils.one_hot_encoding)
random.seed(42)

X_train_val, X_test, y_train_val, y_test = dataset_utils.data_split(nucleotides,0.2,random_state=42)
print(y_test)
train_data = X_train_val
train_data['species_name'] = y_train_val

X_train, X_validation, y_train, y_validation = dataset_utils.data_split(train_data,0.2,drop_labels=False,random_state=42)
train_indices, val_indices, test_indices = dataset_utils.image_splits_from_df(X_train,X_validation,X_test,image_dataset)

365    Bembidion normannum
292       Bledius gallicus
321       Praxis edwardsii
352        Andrena pilipes
18     Automeris managuana
              ...         
412         Hemiceras losa
413         Hemiceras losa
417     Hemiceras punctata
418         Hemiceras losa
421     Hemiceras punctata
Name: species_name, Length: 9991, dtype: object


In [6]:
train_labels = np.array(image_dataset.imgs)[train_indices][:,1].astype(int)
val_labels = np.array(image_dataset.imgs)[val_indices][:,1].astype(int)

y_train = y_train.apply(lambda x: image_dataset.class_to_idx[x.replace(' ','_')])
y_test = y_test.apply(lambda x: image_dataset.class_to_idx[x.replace(' ','_')])
y_validation= y_validation.apply(lambda x: image_dataset.class_to_idx[x.replace(' ','_')])
y_train_val = y_train_val.apply(lambda x: image_dataset.class_to_idx[x.replace(' ','_')])

In [7]:
class DNAdataset(Dataset):
    def __init__(self, data, targets, transform=None):
        self.data = data
        self.targets = torch.tensor(targets)
        #self.transform = transform
        
    def __getitem__(self, index):
        x = torch.tensor(np.float32(self.data[index][0])).unsqueeze(0)
        y = self.targets[index]
        
        #if self.transform:
        #    x = Image.fromarray(self.data[index].astype(np.uint8).transpose(1,2,0))
        #    x = self.transform(x)
        
        return x, y
    
    def __len__(self):
        return len(self.data)
d_train = DNAdataset(X_train.values, y_train.values)
d_val = DNAdataset(X_validation.values, y_validation.values)

In [8]:
dataloader_train = DataLoader(d_train, batch_size=32,shuffle=True)
dataloader_val = DataLoader(d_val, batch_size=32,shuffle=True)
dataloaders = {'train':dataloader_train,'val':dataloader_val}
dataset_sizes = {'train': d_train.data.shape[0], 'val':d_val.data.shape[0]}

In [9]:
from tqdm.notebook import tqdm
def fit(epochs,dataloaders,optimizer,model,start_idx=0):
    criterion = torch.nn.CrossEntropyLoss()
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    
    train_losses = []
    train_scores = []
    val_losses = []
    val_scores = []
    for epoch in range(epochs):
        running_train_corrects = 0
        for dnas,labels in tqdm(dataloaders['train']):
            model.train()
            dnas = dnas.to(device)
            labels = labels.to(device)
            optimizer.zero_grad()
            
            predicted_labels = model(dnas)
            train_loss = criterion(predicted_labels,labels)
            train_loss.backward()
            optimizer.step()
            
            _, preds = torch.max(predicted_labels, 1)
            #print(preds)
            #print(labels.data)
            running_train_corrects += torch.sum(preds == labels.data)
        train_losses.append(train_loss)
        
        running_val_corrects = 0
        for dnas,labels in tqdm(dataloaders['val']):
            
            model.eval()
            with torch.no_grad():
                dnas = dnas.to(device)
                labels = labels.to(device)
                optimizer.zero_grad()
                
                predicted_labels = model(dnas)
                val_loss = criterion(predicted_labels,labels)
                
                _, preds = torch.max(predicted_labels, 1)
                #print(preds)
                #print(labels.data)
                running_val_corrects += torch.sum(preds == labels.data)
        val_losses.append(val_loss)
        
        
        
        #real_scores.append(real_score)
        #fit_p.writer.add_scalar('loss_g', loss_g, epoch)
        # Log losses & scores (last batch)
        
        epoch_train_acc = running_train_corrects.double() / dataset_sizes['train']
        epoch_val_acc = running_val_corrects.double() / dataset_sizes['val']
        print("Epoch [{}/{}], train_loss: {:.4f},  train_score: {:.4f},val_loss: {:.4f},  val_score: {:.4f}".format(
            epoch+1, epochs, train_loss, epoch_train_acc,val_loss,epoch_val_acc))
        #print(f"class accuracy real {class_accuracy_real}")
    
    return train_losses

# CNN + LSTM Approach

In [10]:
class Hybrid_CNN_LSTM(nn.Module):
    def __init__(self):
        super(Hybrid_CNN_LSTM, self).__init__()

        self.conv1 = nn.Conv2d(1, 8, (5, 1))
        self.activation1 = nn.LeakyReLU()
        self.norm1 = nn.BatchNorm2d(8)
        self.conv2 = nn.Conv2d(8, 1, (5, 1))
        self.activation2 = nn.LeakyReLU()
        self.norm2 = nn.BatchNorm2d(1)
        self.flat = nn.Flatten()
        self.lstm = nn.LSTM(input_size=3250, hidden_size=128, num_layers=1, batch_first=True)
        self.norm3 = nn.BatchNorm1d(128)
        self.linear = nn.Linear(128, 1500)
        self.dropout1 = nn.Dropout(0.70)
        self.dropout2 = nn.Dropout(0.70)
        self.activation3 = nn.LeakyReLU()
        self.linear2 = nn.Linear(1500, 1050)
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.activation1(x)
        x = self.norm1(x)
        x = self.conv2(x)
        x = self.activation2(x)
        x = self.norm2(x)
        x = self.dropout1(x)
        
        x = self.flat(x)
        
        # Reshape for LSTM: (batch_size, seq_length, input_size)
        x = x.view(x.size(0), 1, -1)  # Adding a sequence length dimension of 1
        
        x, (hn, cn) = self.lstm(x)  # LSTM output
        
        # Take the last output of the LSTM (if sequence length > 1, we take the last timestep)
        x = x[:, -1, :]

        x = self.norm3(x)
        x = self.linear(x)
        x = self.dropout2(x)
        x = self.activation3(x)
        x = self.linear2(x)
        
        return x

In [None]:
sequence_length = 658
num_features = 5
num_classes = 1050

hybridmodel = Hybrid_CNN_LSTM()

optimizer = torch.optim.Adam(hybridmodel.parameters(),weight_decay=1e-4)

fit(40,dataloaders,optimizer,hybridmodel)

# Transformer approach

In [25]:
class TransformerModel(nn.Module):
    def __init__(self, nhead, num_encoder_layers, dim_feedforward, num_classes):
        super(TransformerModel, self).__init__()
        
        self.conv1 = nn.Conv2d(1, 16, (5, 1), stride=(2, 1), padding=(2, 0))
        self.activation1 = nn.LeakyReLU()
        self.norm1 = nn.BatchNorm2d(16)
        self.pool1 = nn.MaxPool2d((2, 1))  # Pooling layer to reduce dimensionality
        
        self.conv2 = nn.Conv2d(16, 32, (5, 1), stride=(2, 1), padding=(2, 0))
        self.activation2 = nn.LeakyReLU()
        self.norm2 = nn.BatchNorm2d(32)
        self.pool2 = nn.MaxPool2d((2, 1))  # Pooling layer to reduce dimensionality
        
        self.conv3 = nn.Conv2d(32, 64, (5, 1), stride=(2, 1), padding=(2, 0))
        self.activation3 = nn.LeakyReLU()
        self.norm3 = nn.BatchNorm2d(64)
        self.pool3 = nn.MaxPool2d((2, 1))  # Pooling layer to reduce dimensionality
        
        self.flat = nn.Flatten()

        # Example input size, to be calculated dynamically
        example_input = torch.zeros(1, 1, 30, 1)  # Adjust the dimensions according to your input
        self.conv_output_size = self._get_conv_output_size(example_input)
        
        self.input_dim = self.conv_output_size
        if self.input_dim % nhead != 0:
            raise ValueError(f"input_dim ({self.input_dim}) must be divisible by nhead ({nhead})")
        
        self.encoder_layer = nn.TransformerEncoderLayer(
            d_model=self.input_dim, nhead=nhead, dim_feedforward=dim_feedforward
        )
        self.transformer_encoder = nn.TransformerEncoder(
            self.encoder_layer, num_layers=num_encoder_layers
        )
        
        self.linear1 = nn.Linear(self.input_dim, 512)
        self.dropout1 = nn.Dropout(0.5)
        self.activation4 = nn.LeakyReLU()
        
        self.linear2 = nn.Linear(512, num_classes)

    def _get_conv_output_size(self, x):
        x = self.conv1(x)
        x = self.activation1(x)
        x = self.norm1(x)
        x = self.pool1(x)
        
        x = self.conv2(x)
        x = self.activation2(x)
        x = self.norm2(x)
        x = self.pool2(x)
        
        x = self.conv3(x)
        x = self.activation3(x)
        x = self.norm3(x)
        x = self.pool3(x)
        
        x = self.flat(x)
        
        return x.numel()

    def forward(self, x):
        x = self.conv1(x)
        x = self.activation1(x)
        x = self.norm1(x)
        x = self.pool1(x)
        
        x = self.conv2(x)
        x = self.activation2(x)
        x = self.norm2(x)
        x = self.pool2(x)

        x = self.conv3(x)
        x = self.activation3(x)
        x = self.norm3(x)
        x = self.pool3(x)
        
        x = self.flat(x)
        
        # Reshape for transformer: (seq_length, batch_size, input_dim)
        x = x.view(1, x.size(0), -1)
        
        x = self.transformer_encoder(x)
        
        x = x.view(x.size(1), -1)
        
        x = self.linear1(x)
        x = self.dropout1(x)
        x = self.activation4(x)
        
        x = self.linear2(x)
        
        return x

In [26]:
nhead = 8  # Adjusting nhead to a divisor of the new calculated input_dim
num_encoder_layers = 2
dim_feedforward = 512
num_classes = 1050

model = TransformerModel(nhead, num_encoder_layers, dim_feedforward, num_classes)

optimizer = torch.optim.Adam(hybridmodel.parameters(),weight_decay=1e-4)

fit(40,dataloaders,optimizer,model)

ValueError: Expected more than 1 value per channel when training, got input size torch.Size([1, 64, 1, 1])

In [25]:
dataloader_train = DataLoader(d_train, batch_size=len(d_train),shuffle=False)
dataloader_val = DataLoader(d_val, batch_size=len(d_val),shuffle=False)
dataloaders = {'train':dataloader_train,'val':dataloader_val}
dataset_sizes = {'train': d_train.data.shape[0], 'val':d_val.data.shape[0]}

In [27]:
hybridmodel.eval()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
with torch.no_grad():

    for dnas,labels in dataloaders['train']:
        dnas = dnas.to(device)
        train_dna_features = hybridmodel.feature_extract(dnas)
        train_dna_labels = labels
    for dnas,labels in dataloaders['val']:
        dnas = dnas.to(device)
        val_dna_features = hybridmodel.feature_extract(dnas)
        val_dna_labels = labels
print(train_dna_features.shape)

train_dna_features = train_dna_features.cpu()
val_dna_features = val_dna_features.cpu()

AttributeError: 'Hybrid_CNN_LSTM' object has no attribute 'feature_extract'