In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from torch.optim import Adam
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Dataset

USE_CUDA = True

In [None]:
class CustomDataset(Dataset):
    def __init__(self, x, y):
        super(CustomDataset, self).__init__()
        self.data = torch.from_numpy(x).float().unsqueeze(1)
        self.labels = torch.from_numpy(y).float()

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

    def get_labels(self):
        return self.labels

    def get_data(self):
        return self.data


def get_th_dataset(x, y):
    """
    assemble a dataset with the given data and labels
    :param x:
    :param y:
    :return:
    """
    _dataset = CustomDataset(x, y)
    return _dataset


def get_dataloader(dataset: Dataset, batch_size):
    """pytorch bilstm
    assemble a dataloader with the given dataset
    :param dataset:
    :param batch_size:
    :return:
    """
    _dataLoader = DataLoader(dataset=dataset, batch_size=batch_size, pin_memory=True,
                             drop_last=True, shuffle=True)
    return _dataLoader


def get_th_dataset(x, y):
    """
    assemble a dataset with the given data and labels
    :param x:
    :param y:
    :return:
    """
    _dataset = CustomDataset(x, y)
    return _dataset


def get_dataloader(dataset: Dataset, batch_size):
    """
    assemble a dataloader with the given dataset
    :param dataset:
    :param batch_size:
    :return:
    """
    _dataLoader = DataLoader(dataset=dataset, batch_size=batch_size, pin_memory=True,
                             drop_last=True, shuffle=True)
    return _dataLoader

In [None]:
file_url = "https://raw.githubusercontent.com/txz32102/paper/main/data/drugminer/esm2_320_dimensions_with_labels.csv"
df = pd.read_csv(file_url)

In [None]:
X = df.drop(['label', 'UniProt_id'], axis=1)
y = df['label'].apply(lambda x: 0 if x != 1 else x).to_numpy().astype(np.int64)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
scalar = MinMaxScaler()
X_train = scalar.fit_transform(X_train)
X_test = scalar.fit_transform(X_test)
train_set = get_th_dataset(X_train, y_train)
test_set = get_th_dataset(X_test, y_test)
train_loader = get_dataloader(train_set, batch_size=2)
test_loader = get_dataloader(test_set, batch_size=len(test_set))

In [None]:
class ConvLayer(nn.Module):
    def __init__(self, in_channels=1, out_channels=20, kernel_size=3):
        super(ConvLayer, self).__init__()
        self.conv = nn.Conv1d(in_channels=in_channels,
                              out_channels=out_channels,
                              kernel_size=kernel_size,  # kernel_size is now a single number
                              stride=1)

    def forward(self, x):
        return F.relu(self.conv(x))


class PrimaryCaps(nn.Module):
    def __init__(self, num_capsules=4, in_channels=20, out_channels=10, kernel_size=3):
        super(PrimaryCaps, self).__init__()
        self.num_capsules = num_capsules
        self.out_channels = out_channels  # Save out_channels as an instance variable
        # Using nn.Conv1d for 1D convolution with specified parameters
        self.capsules = nn.ModuleList([
            nn.Conv1d(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size, stride=2, padding=0)
            for _ in range(num_capsules)])

    def forward(self, x):
        u = [capsule(x) for capsule in self.capsules]
        u = torch.stack(u, dim=1)
        # Adjust the calculation of output_size based on the actual output from convolutions
        # Now correctly references self.out_channels
        output_size = u.size(-1) * self.out_channels  # Calculate based on your output size
        u = u.view(x.size(0), -1, self.num_capsules)  # Correctly reshape u before squashing
        return self.squash(u)

    def squash(self, input_tensor):
        squared_norm = (input_tensor ** 2).sum(-1, keepdim=True)
        output_tensor = squared_norm * input_tensor / ((1. + squared_norm) * torch.sqrt(squared_norm))
        return output_tensor

class DigitCaps(nn.Module):
    def __init__(self, num_capsules=10, num_routes=1580, in_channels=4, out_channels=16):
        super(DigitCaps, self).__init__()

        self.in_channels = in_channels
        self.num_routes = num_routes
        self.num_capsules = num_capsules
        self.W = nn.Parameter(torch.randn(1, num_routes, num_capsules, out_channels, in_channels))

    def forward(self, x):
        batch_size = x.size(0)
        x = torch.stack([x] * self.num_capsules, dim=2).unsqueeze(4)

        W = torch.cat([self.W] * batch_size, dim=0)
        u_hat = torch.matmul(W, x)

        b_ij = Variable(torch.zeros(1, self.num_routes, self.num_capsules, 1))
        USE_CUDA = torch.cuda.is_available()
        if USE_CUDA:
            b_ij = b_ij.cuda()
            u_hat = u_hat.cuda()

        num_iterations = 3
        for iteration in range(num_iterations):
            c_ij = F.softmax(b_ij, dim=2)
            c_ij = torch.cat([c_ij] * batch_size, dim=0).unsqueeze(4)

            s_j = (c_ij * u_hat).sum(dim=1, keepdim=True)
            v_j = self.squash(s_j)

            if iteration < num_iterations - 1:
                a_ij = torch.matmul(u_hat.transpose(3, 4), torch.cat([v_j] * self.num_routes, dim=1))
                b_ij = b_ij + a_ij.squeeze(4).mean(dim=0, keepdim=True)

        return v_j.squeeze(1)

    def squash(self, input_tensor):
        squared_norm = (input_tensor ** 2).sum(-1, keepdim=True)
        output_tensor = squared_norm * input_tensor / ((1. + squared_norm) * torch.sqrt(squared_norm))
        return output_tensor

class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()
        self.classification_layers = nn.Sequential(
            nn.Linear(10 * 16, 128),
            nn.ReLU(inplace=True),
            nn.Linear(128, 16),
            nn.ReLU(inplace=True),
            nn.Linear(16, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        batch_size = x.size(0)
        x = x.view(batch_size, -1)
        probabilities = self.classification_layers(x)
        return probabilities

class CapsNet(nn.Module):
    def __init__(self):
        super(CapsNet, self).__init__()
        self.conv_layer = ConvLayer()
        self.primary_capsules = PrimaryCaps()
        self.digit_capsules = DigitCaps()
        self.decoder = Decoder()

        self.mse_loss = nn.MSELoss()

    def forward(self, data):
      x = self.conv_layer(data)
      x = self.primary_capsules(x)
      x = self.digit_capsules(x)
      x = self.decoder(x)
      return x

    def loss(self, data, x, target, reconstructions):
        return self.margin_loss(x, target) + self.reconstruction_loss(data, reconstructions)

    def margin_loss(self, x, labels, size_average=True):
        batch_size = x.size(0)

        v_c = torch.sqrt((x**2).sum(dim=2, keepdim=True))

        left = F.relu(0.9 - v_c).view(batch_size, -1)
        right = F.relu(v_c - 0.1).view(batch_size, -1)

        loss = labels * left + 0.5 * (1.0 - labels) * right
        loss = loss.sum(dim=1).mean()

        return loss

    def reconstruction_loss(self, data, reconstructions):
        loss = self.mse_loss(reconstructions.view(reconstructions.size(0), -1), data.view(reconstructions.size(0), -1))
        return loss * 0.0005

In [None]:
capsule_net = CapsNet()
if USE_CUDA:
    capsule_net = capsule_net.cuda()
optimizer = Adam(capsule_net.parameters())

In [None]:
data, target = next(iter(train_loader))

print(data.shape, target.shape)

In [None]:
target

In [None]:
capsule_net.train()  # Set the model to training mode

# Define the loss function and optimizer
criterion = nn.BCELoss()
optimizer = Adam(capsule_net.parameters(), lr=0.001)  # Adjust learning rate as needed

num_epochs = 15  # Number of epochs to train for, adjust as needed

for epoch in range(num_epochs):
    running_loss = 0.0
    for i, (data, target) in enumerate(train_loader, 0):
        # Assuming USE_CUDA flag is set correctly according to your setup
        USE_CUDA = torch.cuda.is_available()
        if USE_CUDA:
            data, target = data.cuda(), target.cuda()
            capsule_net.cuda()

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = capsule_net(data).squeeze()  # Ensure output is of correct shape (batch_size,)

        # Compute the loss
        loss = criterion(outputs, target)

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

        # Print statistics
        running_loss += loss.item()
        if i % 10 == 9:    # Print every 10 mini-batches, adjust as needed
            print('[%d, %5d] loss: %.3f' %
                  (epoch + 1, i + 1, running_loss / 10))
            running_loss = 0.0

print('Finished Training')

In [None]:
capsule_net.eval()
correct = 0
total = 0

with torch.no_grad():
    for data, target in train_loader:
        if USE_CUDA:
            data, target = data.cuda(), target.cuda()
            capsule_net.cuda()

        outputs = capsule_net(data).squeeze()

        predicted = (outputs > 0.5).float()

        total += target.size(0)
        correct += (predicted == target).sum().item()

accuracy = correct / total
print(f'Accuracy on the train set: {accuracy:.2f}')

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

# Assuming capsule_net is your Capsule Network model
capsule_net.train()  # Ensure the model is in training mode

# Define the loss function and optimizer
criterion = nn.BCELoss()
optimizer = optim.Adam(capsule_net.parameters(), lr=0.001)

num_epochs = 10  # Adjust as needed

for epoch in range(num_epochs):
    # Training phase
    running_loss = 0.0
    capsule_net.train()  # Ensure the model is in training mode
    for i, (data, target) in enumerate(train_loader, 0):
        if USE_CUDA:
            data, target = data.cuda(), target.cuda()
            capsule_net.cuda()

        optimizer.zero_grad()
        outputs = capsule_net(data).squeeze()
        loss = criterion(outputs, target)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    # Calculate training loss
    print(f'Epoch {epoch+1}, Loss: {running_loss/(i+1):.4f}')

    # Evaluation phase for both training and test sets
    def evaluate_accuracy(data_loader):
        capsule_net.eval()  # Set the model to evaluation mode
        correct = 0
        total = 0
        with torch.no_grad():
            for data, target in data_loader:
                if USE_CUDA:
                    data, target = data.cuda(), target.cuda()
                outputs = capsule_net(data).squeeze()
                predicted = (outputs > 0.5).float()
                total += target.size(0)
                correct += (predicted == target).sum().item()
        return correct / total

    train_accuracy = evaluate_accuracy(train_loader)
    print(f'Epoch {epoch+1}, Training Accuracy: {train_accuracy:.2f}')

print('Finished Training')
