In [None]:
from numpy.random import seed
from sklearn.decomposition import PCA
import sqlite3
import numpy as np
import pandas as pd
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')

In [None]:
df_drug = pd.read_pickle('/kaggle/input/datasetname/df.pkl')
conn = sqlite3.connect("/kaggle/input/datasetname/event.db")
extraction = pd.read_sql('select * from extraction;', conn)
extraction.drop(columns=['index'], inplace = True)

In [None]:
df_drug.drop(columns=['index', 'id'], inplace=True)

In [None]:
def feature_extractor(df, f_list):
    for feature in f_list:
        unique = set('|'.join(df[feature].values.tolist()).split('|'))

        for side in unique:
            df[side] = 0

        for index, row in df.iterrows():
            for side in row[feature].split('|'):
                df.at[index, side] = 1
    df.drop(columns=f_list, inplace=True)

In [None]:
f_list = ['side', 'target', 'enzyme', 'pathway', 'smile']

feature_extractor(df_drug, f_list)

In [None]:
from torch.utils.data import Dataset, DataLoader

class DrugDataset(Dataset):
    def __init__(self, df):
        self.data = torch.tensor(df.values.astype('float32'))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def train_autoencoder(autoencoder, train_loader, test_loader, num_epochs):
    criterion = nn.MSELoss()
    optimizer = optim.RMSprop(autoencoder.parameters(), lr=0.01)
    for epoch in range(num_epochs):
        # train the autoencoder
        running_train_loss = 0.0
        num_train_correct = 0
        num_train_total = 0
        train_pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} (training)")
        for data in train_pbar:
            data = data.to(device)
            inputs = data
            optimizer.zero_grad()
            outputs = autoencoder(data)
            loss = criterion(outputs, inputs)
            loss.backward()
            optimizer.step()
            running_train_loss += loss.item()

            predicted = (outputs > 0.5).float()
            num_train_correct += (predicted == inputs).sum().item()
            num_train_total += inputs.numel()

            train_pbar.set_postfix({'Loss': loss.item(), 'Accuracy': (predicted == inputs).sum().item() / inputs.numel()})

        # test the autoencoder
        running_test_loss = 0.0
        num_test_correct = 0
        num_test_total = 0
        test_pbar = tqdm(test_loader, desc=f"Epoch {epoch+1}/{num_epochs} (testing)")
        with torch.no_grad():
            for data in test_pbar:
                data = data.to(device)
                inputs = data
                outputs = autoencoder(data)
                loss = criterion(outputs, inputs)
                running_test_loss += loss.item()

                predicted = (outputs > 0.5).float()
                num_test_correct += (predicted == inputs).sum().item()
                num_test_total += inputs.numel()

                test_pbar.set_postfix({'Loss': loss.item(), 'Accuracy': (predicted == inputs).sum().item() / inputs.numel()})

        train_loss = running_train_loss / len(train_loader)
        train_acc = num_train_correct / num_train_total
        test_loss = running_test_loss / len(test_loader)
        test_acc = num_test_correct / num_test_total

        print('Epoch [{}/{}], Train Loss: {:.4f}, Train Acc: {:.4f}, Test Loss: {:.4f}, Test Acc: {:.4f}'.format(epoch+1, num_epochs, train_loss, train_acc, test_loss, test_acc))

In [None]:
class Autoencoder(nn.Module):
    def __init__(self, input_dim, latent_dim):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, latent_dim),
            nn.ReLU()
        )

        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, input_dim),
            nn.Sigmoid()
        )

    def forward(self, x):
        encoded = self.encoder(x)

        decoded = self.decoder(encoded)
        return decoded

In [None]:
input_dim = 12_829
latent_dim = 1024 * 4
num_epochs = 100
batch_size = 32

autoencoder = Autoencoder(input_dim, latent_dim).to(device)

from sklearn.model_selection import train_test_split

df_train, df_test = train_test_split(df_drug, test_size=0.2, random_state=42)


train_dataset = DrugDataset(df_train.drop(columns=['name']))
test_dataset = DrugDataset(df_test.drop(columns=['name']))

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

train_autoencoder(autoencoder, train_loader, test_loader, num_epochs)

In [None]:
encoder = autoencoder.encoder
for param in encoder:
    param.requires_grad = False

In [None]:
extraction['side'] = extraction['mechanism'] + extraction['action']
extraction.drop(columns=['mechanism', 'action'], inplace=True)
from sklearn import preprocessing
le = preprocessing.LabelEncoder()
extraction['side'] = le.fit_transform(extraction['side'])

In [None]:
class PairedDrugDataset(Dataset):
    def __init__(self, df):
        self.data = torch.tensor(df.values.astype('float32'))
        self.indices = [(i, j) for i in range(len(self.data)) for j in range(i + 1, len(self.data))]

    def __len__(self):
        return len(self.indices)

    def __getitem__(self, idx):
        idx1, idx2 = self.indices[idx]
        pair = torch.cat([encoder(torch.tensor(self.data[idx1]).to(device)), encoder(torch.tensor(self.data[idx2]).to(device))])
        return pair

In [None]:
df_train, df_test = train_test_split(df_drug, test_size=0.2, random_state=42)


train_dataset = PairedDrugDataset(df_train.drop(columns=['name']))
test_dataset = PairedDrugDataset(df_test.drop(columns=['name']))

In [None]:
autoencoder2 = Autoencoder(2*4096, 4096).to(device)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

train_autoencoder(autoencoder2, train_loader, test_loader, num_epochs)

In [None]:
class DrugDrugInteractionDataset(Dataset):
    def __init__(self, df, extraction, encoder1, encoder2):
        self.drug_data = torch.tensor(df.values.astype('float32'))
        self.extraction = extraction
        self.encoder1 = encoder1
        self.encoder2 = encoder2
    def __len__(self):
        return len(self.extraction)

    def __getitem__(self, idx):
        drugA = torch.tensor(self.df[self.df['name'] == self.extraction.loc[0]['drugA']].drop(columns=['name']).values.astype('float32')).to(device)
        drugB = torch.tensor(self.df[self.df['name'] == self.extraction.loc[0]['drugB']].drop(columns=['name']).values.astype('float32')).to(device)

        return self.encoder2(torch.cat([self.encoder1(drugA),self.encoder1(drugB)])) , self.extraction.loc[idx]['side']

In [None]:
class ClassificationModel(nn.Module):
    def __init__(self, input_size):
        super(ClassificationModel, self).__init__()
        self.fc1 = nn.Linear(input_size, 256)
        self.fc2 = nn.Linear(256, 65)

    def forward(self, x):
        x = x.view(-1) # Flatten the input tensor
        x = nn.functional.relu(self.fc1(x))
        x = self.fc2(x)
        return nn.functional.softmax(x)