In [105]:
import typing
import numpy as np
import pandas as pd

import torch
from torch import nn, optim, tensor, FloatTensor
from torch.utils.data import Dataset, TensorDataset, DataLoader, random_split
import torch.nn.functional as F
from torchvision import transforms

from fastai.basics import *

In [2]:
synthetic_expression_data = pd.read_csv("data/net1_expression_data.tsv", sep='\t')
# chip_features = pd.read_csv("data/net1_chip_features.tsv", sep='\t')
# gene_ids = pd.read_csv("data/net1_gene_ids.tsv", sep='\t')
tfs = pd.read_csv("data/net1_transcription_factors.tsv", sep='\t')

In [9]:
X_idx = [g in tfs.iloc[:,0].tolist() for g in synthetic_expression_data.columns]

In [97]:
synthetic_expression_data.values[:,:600].shape

(805, 600)

In [101]:
train, valid = map(torch.tensor, (synthetic_expression_data.values[:600, :], synthetic_expression_data.values[600:, :]))

In [106]:
batch_size = 128
train_ds = TensorDataset(train[:, X_idx], train)
valid_ds = TensorDataset(valid[:, X_idx], valid)
data = DataBunch.create(train_ds, valid_ds, bs = batch_size)

In [107]:
x,y = next(iter(data.train_dl))
x.shape,y.shape

(torch.Size([128, 194]), torch.Size([128, 1643]))

In [70]:
synthetic_data = TeaDataset(synthetic_expression_data.values, X_idx, X_in_Y=True)

In [75]:
next_ = next(iter(synthetic_data))
next_['Y'].shape

torch.Size([1643])

In [108]:
class FEA(nn.Module):
    """A pytorch module to build a (standard) Forward-Embedding Autoencoders"""

    def __init__(self, input_dim: int, hidden_dim: int, output_dim: int):
        """
        Parameters
        ----------
        input_dim : int
            The number of input features
        hidden_dim : int
            The number of features in the hidden layer
        output_dim : int
            The number of output features
        """
        super().__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, output_dim)
        
    def forward(self, x):
        x = self.fc1(x)
        x = self.fc2(x)
        return x

#     def train(self, data_loader, optimizer, criterion):
#         """
#         Parameters
#         ----------
#         data_loader : torch.utils.data.DataLoader
#         optimizer : torch.optim
#         criterion : function
#         """
#         running_loss = 0
#         for i, row in enumerate(data_loader):
#             optimizer.zero_grad()
#             y_hat = self.forward(row['X'])
#             loss = criterion(y_hat, row['y'].view(y_hat.size()[0],1))
#             loss.backward()
#             optimizer.step()    
#             running_loss += loss.item()
#         print("Train loss: {0:.3f}".format(running_loss / (i + 1))) # loss needs to be averaged over all batches
#         return(running_loss / (i + 1))

#     def test(self, data_loader, criterion):
#         """
#         Parameters
#         ----------
#         data_loader : torch.utils.data.DataLoader
#         criterion : function
#         """
#         running_loss = 0
#         for i, row in enumerate(data_loader):
#             y_hat = self.forward(row['X'])
#             loss = criterion(y_hat, row['y'].view(y_hat.size()[0],1)) 
#             running_loss += loss.item()
#         print("Test loss: {0:.3f}".format(running_loss / (i + 1))) # loss needs to be averaged over all batches
#         return(running_loss / (i + 1))

In [111]:
fea_model = FEA(input_dim = sum(X_idx), hidden_dim = 256, output_dim = synthetic_expression_data.shape[1])

In [112]:
fea_model

FEA(
  (fc1): Linear(in_features=194, out_features=256, bias=True)
  (fc2): Linear(in_features=256, out_features=1643, bias=True)
)

In [63]:
class TeaDataset(Dataset):
    """A custom Pytorch Dataset class for training FEA and TEA models.
    """

    def __init__(self, all_data, X_idx, X_in_Y=False):
        """
        Paramaters
        ----------
        all_data : np.array[float]
            All the data (X and y)x
        X_in_Y : bool
            Should the predictors be included in the target array (defaults to False)?
        X_idx : list[bool]
            Indexes of the input variables (columns) in all_data
        """      
        self.all_data = all_data
        self.X = all_data[:,X_idx]
        if X_in_Y:
            self.Y = all_data
        else:
            not_Y = [not e for e in X_idx]
            self.Y = all_data[:,not_Y]

    def __len__(self):
        """Return the length of the object"""
        return self.all_data.shape[0]

    def __getitem__(self, idx):
        """Return a single sample from the dataset"""
        sample = {'X': tensor(self.X[idx]), 'Y': tensor(self.Y[idx])}
        return sample

In [None]:
transforms.ToTensor()

In [None]:
class ToTensor(object):
    """Take a sample from a SyntheticClassificationDataset and convert it to a Pytorch tensor"""
    def __call__(self, sample):
        X, y = sample['X'], sample['y']
        
        transformed_sample = {
            'X': tensor(X).type(FloatTensor), 
            'y': tensor(y).type(FloatTensor)}
        return transformed_sample

class Normalise(object):
    """Take a sample from a SyntheticClassificationDataset and normalise the input features (X)

    Parameters
    ----------
        method : str
            From 'z' or 'range'. The default is 'z'.
            'z' z-transforms the features
            'range' range normalises so that the data lie in the range [0,1]
    """
    def __init__(self, method = 'z'):
        assert isinstance(method, str)
        assert method in ['z', 'range']
        self.method = method

    def __call__(self, sample):
        X, y = sample['X'], sample['y']
        if self.method == "z":
            normalised_X = (X - X.mean())/ X.std()
        else:
            normalised_X = (X - X.min())/X.max()

        return {'X': normalised_X, 'y': y}