<a href="https://colab.research.google.com/github/je3we3/d-gex-reproduction/blob/main/Deep_learning_reproducibility_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
##IMPORTS
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from tqdm import tqdm
import math
from google.colab import drive
import numpy as np
import h5py

In [None]:
##DOWNLOAD DATASET FROM GOOGLE DRIVE
!pip install -U -q PyDrive
import os
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# 1. Authenticate and create the PyDrive client.
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

# choose a local (colab) directory to store the data.
local_download_path = os.path.expanduser('~/data')
try:
  os.makedirs(local_download_path)
except: pass

# 2. Auto-iterate using the query syntax
#    https://developers.google.com/drive/v2/web/search-parameters
file_list = drive.ListFile(
    {'q': "'1ZrO4Zm14keIcxrdUkfzL3J54voERO1Zt' in parents"}).GetList()

for f in file_list:
  # 3. Create & download by id.
  print('title: %s, id: %s' % (f['title'], f['id']))
  fname = os.path.join(local_download_path, f['title'])
  print('downloading to {}'.format(fname))
  f_ = drive.CreateFile({'id': f['id']})
  f_.GetContentFile(fname)

filepath = '/root/data/samples.mat'
arrays = {}
f = h5py.File(filepath)
for k, v in f.items():
    arrays[k] = np.array(v)
data = arrays['samples']

title: samples.mat, id: 1AwSM8RHUI4f7W0VcqDAZYJ0C9hTYfJwA
downloading to /root/data/samples.mat




In [None]:
##MODEL
#Constants
MOMENTUM = 0.5
LEARNING_RATE_FACTOR = 1e-2
LEARNING_RATE_START = 5e-4 * LEARNING_RATE_FACTOR
LEARNING_RATE_MIN = 1e-5 * LEARNING_RATE_FACTOR
LEARNING_RATE_DECAY = 0.9
MAX_EPOCHS = 200
BATCH_SIZE = 200
DROPOUT_LEARNING_SCALE = 3

#Model class
class Net(nn.Module):
  #Class specific constants
  INPUT_SIZE = 943 #Landmark genes
  HIDDEN_SIZE = 3000 #ook 6000 en 9000
  OUTPUT_SIZE = 4760 #9520 #Target genes
  DROPOUT_RATE = 0.1
  OUTPUT_INIT = 1e-4
  
  def __init__(self):
    super(Net, self).__init__()

    self.hidden1 = nn.Linear(self.INPUT_SIZE, self.HIDDEN_SIZE)
    dist = math.sqrt(6)/math.sqrt(self.INPUT_SIZE + self.HIDDEN_SIZE)
    nn.init.uniform_(self.hidden1.weight, a=-dist, b=dist)

    self.dropout1 = nn.Dropout2d(self.DROPOUT_RATE)
    
    self.output = nn.Linear(self.HIDDEN_SIZE, self.OUTPUT_SIZE)
    nn.init.uniform_(self.output.weight, a=-self.OUTPUT_INIT, b=self.OUTPUT_INIT)

  def forward(self, x):
      x = self.hidden1(x)
      x = torch.tanh(x)
      x = self.dropout1(x)
      x = self.output(x)

      return x
    
net = Net()
training_loss_func = nn.MSELoss(reduction = 'sum')
test_loss_func = nn.L1Loss()
optimizer = optim.SGD([{'params': [param for name, param in net.named_parameters() if 'dropout1' not in name]}, {'params': net.dropout1.parameters(), 'lr': LEARNING_RATE_START * DROPOUT_LEARNING_SCALE}], lr=LEARNING_RATE_START, momentum = MOMENTUM)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor = LEARNING_RATE_DECAY, patience = 0, threshold = 0, min_lr = LEARNING_RATE_MIN) #Set metric for learning rate decay in scheduler.step


In [None]:
##PREP DATA
#Normalize data
data_mean = data.mean(axis=0)
data_std = data.std(axis=0) + 1e-3
data_norm = (data - data_mean.reshape((1, 10463)))/data_std.reshape((1, 10463))

#Divide into training and test set
x = np.random.rand(data_norm.shape[0], data_norm.shape[1])
np.random.shuffle(x)
training, test = x[:round(0.8*data_norm.shape[0]),:], x[round(0.8*data_norm.shape[0]):,:]

#Set correct shape
#Use second half of the dataset to speed up training and testing
training_x, training_y = training[:,:943], training[:,5703:10463]
test_x, test_y = test[:,:943], test[:,5703:10463]

# #Uncomment to use entire dataset
# training_x, training_y = training[:,:943], training[:,943:]
# test_x, test_y = test[:,:943], test[:,943:]

# Create training and test tensors
tensors_train = torch.tensor(training_x).float(), torch.tensor(training_y).float()
tensors_test = torch.tensor(test_x).float(), torch.tensor(test_y).float()

# Create training set and test set from tensors
train_set = torch.utils.data.TensorDataset(*tensors_train)
test_set = torch.utils.data.TensorDataset(*tensors_test)

# Create dataloaders from the training and test set for easier iteration over the data
train_loader = torch.utils.data.DataLoader(train_set, batch_size = BATCH_SIZE)
test_loader = torch.utils.data.DataLoader(test_set, batch_size = BATCH_SIZE)


In [None]:
##TRAIN AND TEST
#Define train and test functions
def train(train_loader, net, optimizer, criterion):
    """
    Trains network for one epoch in batches.

    Args:
        train_loader: Data loader for training set.
        net: Neural network model.
        optimizer: Optimizer (e.g. SGD).
        criterion: Loss function (e.g. cross-entropy loss).
    """
  
    avg_loss = 0

    # iterate through batches
    for i, data in enumerate(train_loader):
        # get the inputs; data is a list of [inputs, labels]
        inputs, labels = data

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimizer
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # keep track of loss
        avg_loss += loss

    return avg_loss/len(train_loader)
        
def test(test_loader, net, criterion):
    """
    Evaluates network in batches.

    Args:
        test_loader: Data loader for test set.
        net: Neural network model.
        criterion: Loss function (e.g. cross-entropy loss).
    """

    avg_loss = 0
    correct = 0
    total = 0
    
    # Use torch.no_grad to skip gradient calculation, not needed for evaluation
    with torch.no_grad():
        # iterate through batches
        for data in test_loader:
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data

            # forward pass
            outputs = net(inputs)
            loss = criterion(outputs, labels)

            # keep track of loss
            avg_loss += loss

    return avg_loss/len(test_loader)

#Train and test model
#Create array to save losses in
train_loss = np.empty(MAX_EPOCHS)
test_loss = np.empty(MAX_EPOCHS)
for epoch in tqdm(range(MAX_EPOCHS)):
    # Train on data
    train_loss[epoch] = train(train_loader,net,optimizer,training_loss_func)

    # Test on data
    test_loss[epoch] = test(test_loader,net,test_loss_func)

    #Adapt learning rate
    scheduler.step(test_loss[epoch])

100%|██████████| 200/200 [2:03:45<00:00, 37.13s/it]


In [None]:
test_loss[0]

0.25030842423439026

In [None]:
##SAVE AND LOAD MODEL
from google.colab import drive
drive.mount('/content/gdrive')

model_save_name = 'classifier.pt'
path = F"/content/gdrive/My Drive/{model_save_name}" 

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
##SAVING PART
torch.save(net.state_dict(), path)

In [None]:
##LOADING PART
model = Net()
model.load_state_dict(torch.load(path))
model.eval()

Net(
  (hidden1): Linear(in_features=943, out_features=3000, bias=True)
  (dropout1): Dropout2d(p=0.1, inplace=False)
  (output): Linear(in_features=3000, out_features=9520, bias=True)
)

**Links:**

**Paper**

https://academic.oup.com/bioinformatics/article/32/12/1832/1743989?login=true#84798257

**Data**

https://cbcl.ics.uci.edu/public_data/D-GEX/