In [None]:
from tdc.single_pred import ADME
import pandas as pd
import numpy as np
import re
from rdkit import Chem
from rdkit.Chem import Draw
from IPython.display import display, Image
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer
import deepsmiles
import os
from torchvision import transforms
from PIL import Image
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

In [None]:
data = ADME(name = 'Caco2_Wang')
df = data.get_data()
splits = data.get_split()

In [None]:
train = splits['train']
valid = splits['valid']
test = splits['test']

In [None]:
# column_name = 'Drug'

# # Define the file path for the text file
# file_path = 'Drug.txt'

# # Extract the column data
# column_data = train[column_name]

# # Write column data to text file
# column_data.to_csv(file_path, header=False, index=False)

In [None]:
def tokenize_smiles(df, smiles_col='Drug'):
    df['Tokens'] = df[smiles_col].apply(lambda x: _tokenize_smiles_legacy(Chem.MolFromSmiles(str(x))))
    return df

def _tokenize_smiles_legacy(mol):

    tokens = []
    for atom in mol.GetAtoms():
        tokens.append(f"A:{atom.GetSymbol()}")
    
    for bond in mol.GetBonds():
        begin, end, bond_type = bond.GetBeginAtomIdx(), bond.GetEndAtomIdx(), bond.GetBondType()
        tokens.append(f"B:{begin}-{end}-{bond_type}")
    return tokens

In [None]:
train = tokenize_smiles(train)
test = tokenize_smiles(test)
valid = tokenize_smiles(valid)

In [None]:
# Create the folder if it doesn't exist
if not os.path.exists('train_images'):
    os.makedirs('train_images')

# Assuming df is your pandas DataFrame
for i in range(len(train)):
    smiles = train.iloc[i, 1]
    mol = Chem.MolFromSmiles(smiles)
    if mol is not None:
        img = Draw.MolToImage(mol)
        img_path = f'train_images/molecule_{i}.png'  # Adjusted path
        # Save the image
        img.save(img_path)
        print(f'Saved {img_path}')
        display(Image(filename=img_path))
    else:
        print(f'Invalid SMILES at row {i}')

In [None]:
if not os.path.exists('test_images'):
    os.makedirs('test_images')

# Assuming df is your pandas DataFrame
for i in range(len(test)):
    smiles = test.iloc[i, 1]
    mol = Chem.MolFromSmiles(smiles)
    if mol is not None:
        img = Draw.MolToImage(mol)
        img_path = f'test_images/molecule_{i}.png'  # Adjusted path
        # Save the image
        img.save(img_path)
#         print(f'Saved {img_path}')
#         display(Image(filename=img_path))
    else:
        print(f'Invalid SMILES at row {i}')

In [None]:
# Create the folder if it doesn't exist
if not os.path.exists('valid_images'):
    os.makedirs('valid_images')

for i in range(len(valid)):
    smiles = valid.iloc[i, 1]
    mol = Chem.MolFromSmiles(smiles)
    if mol is not None:
        img = Draw.MolToImage(mol)
        img_path = f'valid_images/molecule_{i}.png'  # Adjusted path
        # Save the image
        img.save(img_path)
#         print(f'Saved {img_path}')
#         display(Image(filename=img_path))
    else:
        print(f'Invalid SMILES at row {i}')

In [1]:
# for i in range(len(df)):
#     smiles = df.iloc[i, 1]
#     mol = Chem.MolFromSmiles(smiles)
#     if mol is not None:
#         img = Draw.MolToImage(mol)
#         img_path = f'molecule_{i}.png'
#         ## if you want to save the images, uncomment the following two lines
#         img.save(img_path)
# #         print(f'Saved {img_path}')
# #         display(Image(filename=img_path))
#     else:
#         print(f'Invalid SMILES at row {i}')

In [None]:
# image_path = 'molecule_0.png'  # Change this to the path of the image you want to load
# image = Image.open(image_path)
# image.show()  # This will open the image using the default image viewer on your system

In [None]:
def calculate_mean_std(data_dir):
    # Initialize lists to store channel-wise means and standard deviations
    channel_means = [0, 0, 0]
    channel_stds = [0, 0, 0]

    # Count the total number of images in your dataset
    total_images = 0

    # Iterate through the dataset
    for filename in os.listdir(data_dir):
        if filename.endswith('.png'):  # Assuming your images are in PNG format
            img_path = os.path.join(data_dir, filename)
            image = np.array(Image.open(img_path))  # Convert image to numpy array

            # Normalize pixel values to [0, 1]
            image = image / 255.0

            # Calculate per-channel means and standard deviations
            channel_means += np.mean(image, axis=(0, 1))
            channel_stds += np.std(image, axis=(0, 1))

            total_images += 1

    # Calculate the overall means and standard deviations
    overall_means = channel_means / total_images
    overall_stds = channel_stds / total_images
    
    return overall_means, overall_stds

class CustomDataset(Dataset):
    def __init__(self, data_dir, labels, transform=None):
        self.data_dir = data_dir
        self.labels = labels
        self.transform = transform

        # Load image paths
        self.image_paths = [os.path.join(data_dir, f) for f in os.listdir(data_dir) if f.endswith('.png')]

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path)

        if self.transform:
            image = self.transform(image)

        label = self.labels[idx]
        
        return image, label

# Example labels array (replace this with your actual labels)
train_labels = train['Y']
test_labels = test['Y']
valid_labels = valid['Y']

# Calculate mean and std for each dataset
train_means, train_stds = calculate_mean_std('train_images')
test_means, test_stds = calculate_mean_std('test_images')
valid_means, valid_stds = calculate_mean_std('valid_images')

# Define transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize images to a fixed size
    transforms.ToTensor(),          # Convert images to tensors
])

# Define datasets and data loaders for each dataset
datasets = {}
data_loaders = {}

datasets['train'] = CustomDataset(data_dir='train_images', labels=train_labels, transform=transform)
datasets['test'] = CustomDataset(data_dir='test_images', labels=test_labels, transform=transform)
datasets['valid'] = CustomDataset(data_dir='valid_images', labels=valid_labels, transform=transform)

for key, dataset in datasets.items():
    batch_size = 32
    data_loaders[key] = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    if key == 'train':
        overall_means, overall_stds = train_means, train_stds
    elif key == 'test':
        overall_means, overall_stds = test_means, test_stds
    elif key == 'valid':
        overall_means, overall_stds = valid_means, valid_stds

    # Normalize images using calculated means and stds
    transform_with_normalization = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=overall_means, std=overall_stds)
    ])

    datasets[key].transform = transform_with_normalization

In [None]:
train_loader = data_loaders['train']
test_loader = data_loaders['test']
val_loader = data_loaders['valid']

In [None]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.fc1 = nn.Linear(64 * 28 * 28, 512)
        self.fc2 = nn.Linear(512, 1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        x = F.relu(self.conv3(x))
        x = self.pool(x)
        x = x.view(-1, 64 * 28 * 28)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Initialize the model
model = CNN()

num_epochs = 50

# Define loss function and optimizer
criterion = nn.L1Loss()  # Mean Absolute Error loss for regression
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, y in train_loader:
        optimizer.zero_grad()
        outputs = model(images)
        outputs = outputs.float()  # Convert to float if the model output is double
        loss = criterion(outputs, y.view(-1, 1).float())  # Reshape y to match output shape
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    train_loss = running_loss / len(train_loader)
    print(f'Train Epoch [{epoch+1}/{num_epochs}], Loss: {train_loss:.4f}')
    
    # Validation phase
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images_val, y_val in val_loader:
            outputs_val = model(images_val)
            outputs_val = outputs_val.float()
            val_loss += criterion(outputs_val, y_val.view(-1, 1).float()).item()
    val_loss /= len(val_loader)
    print(f'Validation Epoch [{epoch+1}/{num_epochs}], Loss: {val_loss:.4f}')

# Test phase
model.eval()
test_loss = 0.0
with torch.no_grad():
    for images_test, y_test in test_loader:
        outputs_test = model(images_test)
        outputs_test = outputs_test.float()
        test_loss += criterion(outputs_test, y_test.view(-1, 1).float()).item()
test_loss /= len(test_loader)
print(f'Test Loss: {test_loss:.4f}')

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

# Assuming you have train_loader, val_loader, and test_loader for each dataset

class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=16, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.fc1 = nn.Linear(64 * 28 * 28, 512)
        self.fc2 = nn.Linear(512, 1)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        x = F.relu(self.conv3(x))
        x = self.pool(x)
        x = x.view(-1, 64 * 28 * 28)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Initialize the model
model = CNN()

num_epochs = 50

# Define loss function and optimizer
criterion = nn.L1Loss()  # Mean Absolute Error loss for regression
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, y in train_loader:
        optimizer.zero_grad()
        outputs = model(images)
        outputs = outputs.float()  # Convert to float if the model output is double
        loss = criterion(outputs, y.view(-1, 1).float())  # Reshape y to match output shape
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    train_loss = running_loss / len(train_loader)
    print(f'Train Epoch [{epoch+1}/{num_epochs}], Loss: {train_loss:.4f}')
    
    # Validation phase
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images_val, y_val in val_loader:
            outputs_val = model(images_val)
            outputs_val = outputs_val.float()
            val_loss += criterion(outputs_val, y_val.view(-1, 1).float()).item()
    val_loss /= len(val_loader)
    print(f'Validation Epoch [{epoch+1}/{num_epochs}], Loss: {val_loss:.4f}')

# Test phase
model.eval()
test_loss = 0.0
with torch.no_grad():
    for images_test, y_test in test_loader:
        outputs_test = model(images_test)
        outputs_test = outputs_test.float()
        test_loss += criterion(outputs_test, y_test.view(-1, 1).float()).item()
test_loss /= len(test_loader)
print(f'Test Loss: {test_loss:.4f}')

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# Define the Residual Block
class ResidualBlock(nn.Module):
    expansion = 1  # This attribute denotes the increase in channels

    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.stride != 1 or identity.shape[1] != out.shape[1]:
            identity = F.avg_pool2d(identity, kernel_size=1, stride=self.stride)
            identity = torch.cat((identity, torch.zeros(identity.shape[0], out.shape[1] - identity.shape[1], identity.shape[2], identity.shape[3], device=identity.device)), dim=1)

        out += identity
        out = self.relu(out)

        return out

# Define the ResNet model
class ResNet(nn.Module):
    def __init__(self, block, layers):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        self.layer1 = self.make_layer(block, 64, layers[0], stride=1)
        self.layer2 = self.make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self.make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self.make_layer(block, 512, layers[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * block.expansion, 1)  # Output is a single value for prediction

    def make_layer(self, block, out_channels, blocks, stride):
        layers = []
        layers.append(block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.in_channels, out_channels))
        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

# Initialize the ResNet model
model = ResNet(ResidualBlock, [2, 2, 2, 2])

num_epochs = 50

# Define loss function and optimizer
criterion = nn.L1Loss()  # Mean Absolute Error loss for regression
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

# Training loop (assuming data_loader is defined)
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, y in data_loader:
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, y.view(-1, 1).float())  # Reshape y to match output shape
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    train_loss = running_loss / len(data_loader)
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {train_loss:.4f}')


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from torchvision.models import resnet18

# Assuming you have train_loader, val_loader, and test_loader for each dataset

class ResNetRegression(nn.Module):
    def __init__(self):
        super(ResNetRegression, self).__init__()
        self.resnet = resnet18(pretrained=False)
        # Modify the last fully connected layer for regression
        num_ftrs = self.resnet.fc.in_features
        self.resnet.fc = nn.Linear(num_ftrs, 1)

    def forward(self, x):
        return self.resnet(x)

# Initialize the model
model = ResNetRegression()

num_epochs = 50

# Define loss function and optimizer
criterion = nn.L1Loss()  # Mean Absolute Error loss for regression
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, y in train_loader:
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, y.view(-1, 1).float())  # Reshape y to match output shape
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    train_loss = running_loss / len(train_loader)
    print(f'Train Epoch [{epoch+1}/{num_epochs}], Loss: {train_loss:.4f}')
    
    # Validation phase
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images_val, y_val in val_loader:
            outputs_val = model(images_val)
            val_loss += criterion(outputs_val, y_val.view(-1, 1).float()).item()
    val_loss /= len(val_loader)
    print(f'Validation Epoch [{epoch+1}/{num_epochs}], Loss: {val_loss:.4f}')

# Test phase
model.eval()
test_loss = 0.0
with torch.no_grad():
    for images_test, y_test in test_loader:
        outputs_test = model(images_test)
        test_loss += criterion(outputs_test, y_test.view(-1, 1).float()).item()
test_loss /= len(test_loader)
print(f'Test Loss: {test_loss:.4f}')

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import models
from torch.utils.data import DataLoader, TensorDataset
import torchvision.transforms as transforms

# Load pre-trained ResNet model
resnet = models.resnet18(pretrained=True)

# Freeze parameters of the pre-trained layers
for param in resnet.parameters():
    param.requires_grad = False

# Get the number of input features for the fully connected layer
num_ftrs = resnet.fc.in_features

# Define additional layers
additional_layers = nn.Sequential(
    nn.Linear(num_ftrs, 512),  # Add a linear layer with 512 output features
    nn.ReLU(inplace=True),     # Add ReLU activation function
    nn.Dropout(0.2),            # Add dropout layer with 50% dropout probability
    nn.Linear(512, 1)           # Final linear layer for single output
)

# Replace the fully connected layer in ResNet with additional_layers
resnet.fc = additional_layers

# Define loss function and optimizer
criterion = nn.L1Loss()
optimizer = optim.Adam(resnet.parameters(), lr=0.001)

# Assuming train_loader, val_loader, and test_loader are defined
# and other necessary variables are defined

# Move model to GPU if available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
resnet = resnet.to(device)

num_epochs = 50

# Training loop
for epoch in range(num_epochs):
    resnet.train()
    running_loss = 0.0
    for images, y in train_loader:
        images, y = images.to(device), y.to(device)
        optimizer.zero_grad()
        outputs = resnet(images)
        loss = criterion(outputs, y.view(-1, 1).float())  # Reshape y to match output shape
        loss.backward()
        optimizer.step()
        running_loss += loss.item()
    train_loss = running_loss / len(train_loader)
    print(f'Train Epoch [{epoch+1}/{num_epochs}], Loss: {train_loss:.4f}')
    
    # Validation phase
    resnet.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images_val, y_val in val_loader:
            images_val, y_val = images_val.to(device), y_val.to(device)
            outputs_val = resnet(images_val)
            val_loss += criterion(outputs_val, y_val.view(-1, 1).float()).item()
    val_loss /= len(val_loader)
    print(f'Validation Epoch [{epoch+1}/{num_epochs}], Loss: {val_loss:.4f}')

# Test phase
resnet.eval()
test_loss = 0.0
with torch.no_grad():
    for images_test, y_test in test_loader:
        images_test, y_test = images_test.to(device), y_test.to(device)
        outputs_test = resnet(images_test)
        test_loss += criterion(outputs_test, y_test.view(-1, 1).float()).item()
test_loss /= len(test_loader)
print(f'Test Loss: {test_loss:.4f}')