# Sim: Gene Regulatory Network

In [1]:
import numpy as np
import torch
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import torch
import random

# print directory
import os
print(os.getcwd())
os.chdir("../..")
print(os.getcwd())

from src.models.sparse_autoencoder import *
from src.visualization.plotting import *

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# set a random seed
seed = 0

torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)

/home/dbm829/projects/interpreting_omics_models/02_experiments/notebooks
/home/dbm829/projects/interpreting_omics_models


In [2]:
# setup:
# 3 genes, transcribed into mRNA and translated into proteins
# the randomness comes from 2 places:
# 1. sampling from a poisson distribution, indicating the level of transcription activity
# 2. sampling from bernoulli distributions for each gene, indicating whether the gene is even accessible or not

# set up experiment parameters
poisson_lambda = 2.0
p_bernoulli = torch.tensor([0.5, 0.1, 0.9])
gene_regulation = torch.tensor([
    [0., 1., 1., 0., 0.],
    [1., 1., 0., 1., 0.],
    [0., 0., 0., 1., 1.]
])

n_samples_validation = 2000
n_samples = 10000 + n_samples_validation
n_epochs = 20000

# load or generate the data
data_path = "01_data/"

if os.path.exists(data_path + "sim_rna_counts.npy"):
    rna_counts = torch.tensor(np.load(data_path + "sim_rna_counts.npy"))
    tf_scores = torch.tensor(np.load(data_path + "sim_tf_scores.npy"))
    activity_score = torch.tensor(np.load(data_path + "sim_activity_scores.npy"))
    accessibility_scores = torch.tensor(np.load(data_path + "sim_accessibility_scores.npy"))

    print("Loaded data from files.")
else:
    # init distributions
    activity_distribution = torch.distributions.poisson.Poisson(torch.tensor([poisson_lambda]))
    accessibility_distribution = torch.distributions.bernoulli.Bernoulli(p_bernoulli)

    # sample from distributions to create mRNA counts
    # get the activity score
    activity_score = activity_distribution.sample((n_samples,))
    # get the accessibility scores
    accessibility_scores = accessibility_distribution.sample((n_samples,))
    # get the mRNA counts
    tf_scores = activity_score * accessibility_scores

    # correct mRNA counts for gene regulation
    # first, get the regulation scores
    def get_regulation_scores(tf_scores, gene_regulation):
        # get the regulation scores
        return torch.matmul(tf_scores, gene_regulation)

    rna_counts = get_regulation_scores(tf_scores, gene_regulation)

    """
    np.save("01_data/sim_rna_counts.npy", rna_counts.numpy())
    np.save("01_data/sim_tf_scores.npy", tf_scores.numpy())
    np.save("01_data/sim_activity_scores.npy", activity_score.numpy())
    np.save("01_data/sim_accessibility_scores.npy", accessibility_scores.numpy())
    """

print(f'RNA shape: {rna_counts.shape}')
print(f'activity score shape: {activity_score.shape}') # one per sample
print(f'accessibility score shape: {accessibility_scores.shape}') # one per originating gene

Loaded data from files.
RNA shape: torch.Size([12000, 5])
activity score shape: torch.Size([12000, 1])
accessibility score shape: torch.Size([12000, 3])


## functions

In [5]:
def train_and_eval_model(encoder, decoder, rna_counts, n_samples, n_samples_validation, learning_rate=1e-4):
    # loss function
    loss_fn = torch.nn.MSELoss(reduction='mean')

    # optimizer
    optimizer = torch.optim.Adam(list(encoder.parameters()) + list(decoder.parameters()), lr=learning_rate)

    # train
    train_loss = []
    val_loss = []

    for e in range(n_epochs):
        # train
        # forward pass
        encoded = encoder(rna_counts[:(n_samples-n_samples_validation)])
        decoded = decoder(encoded)

        # compute loss
        loss = loss_fn(decoded, rna_counts[:(n_samples-n_samples_validation)])
        train_loss.append(loss.item())

        # backward pass
        optimizer.zero_grad()
        loss.backward()

        # update weights
        optimizer.step()

        # validation
        # forward pass
        encoded = encoder(rna_counts[(n_samples-n_samples_validation):])
        decoded = decoder(encoded)

        # compute loss
        loss = loss_fn(decoded, rna_counts[(n_samples-n_samples_validation):])
        val_loss.append(loss.item())
    
    # plot loss curves
    import matplotlib.pyplot as plt
    plt.plot(train_loss, label='train')
    plt.plot(val_loss, label='validation')
    plt.legend()
    plt.show()

    # make scatter plots for each gene with output vs input
    # get the output
    encoded = encoder(rna_counts)
    decoded = decoder(encoded)
    # plot subplots
    fig, axs = plt.subplots(1, 5, figsize=(20, 5))
    for i in range(5):
        axs[i].scatter(rna_counts[:, i].detach().numpy(), decoded[:, i].detach().numpy(), s=1)
        axs[i].set_xlabel('input')
        axs[i].set_ylabel('output')
        axs[i].set_title('gene {}'.format(i))
    plt.show()

    # rank underlying features by their importance
    # first, get the weights of the encoder
    encoder_weights = encoder[0].weight.detach().t().numpy()
    # then, get the weights of the decoder
    decoder_weights = decoder[-2].weight.detach().t().numpy()
    # multiply them together
    weights = np.matmul(encoder_weights, decoder_weights)
    # get the absolute values
    weights = np.abs(weights)
    # sum the weights for each gene
    weights = np.sum(weights, axis=1)
    # sort the weights
    #weights = np.argsort(weights)

    # plot the weights
    plt.bar(np.arange(5), weights)
    plt.xlabel('gene')
    plt.ylabel('weight')
    plt.title('gene importance')
    plt.show()

    history = pd.DataFrame({'train_loss': train_loss, 'val_loss': val_loss, 'epoch': np.arange(n_epochs)})

    return encoder, decoder, history

three_cols = ["#60276F", "#1663A5", "#53787B"]
fontsize = 12

def plot_neuron_layer(activations, corr_threshold=0.9, name=''):
    # make the font size smaller
    plt.rcParams.update({'font.size': fontsize})
    if activations.shape[1] > 10:
        # look for neurons that correlate strongly with a gene
        # first, get the correlations
        for i in range(activations.shape[1]):
            for j in range(tf_scores.shape[1]):
                corr = np.corrcoef(activations[:, i].detach().numpy(), tf_scores[:, j].detach().numpy())[0, 1]
                if corr > corr_threshold:
                    # plot the neuron
                    plt.scatter(activations[:, i].detach().numpy(), tf_scores[:, j].detach().numpy())
                    plt.xlabel('neuron {}'.format(i))
                    plt.ylabel('tf_score {}'.format(j))
                    plt.title('correlation: {}'.format(corr))
                    plt.legend().remove()
                    plt.show()
    else:
        # now plot each activation against each tf_score
        fig, axs = plt.subplots(activations.shape[1], 3, figsize=(9, 3*activations.shape[1]))
        for i in range(activations.shape[1]):
            for j in range(3):
                # make a dataframe
                df = pd.DataFrame({'neuron': activations[:, i].detach().numpy(), 'tf_score': tf_scores[:, j].detach().numpy(), 'activity': activity_score.flatten().detach().numpy()})
                sns.scatterplot(x='neuron', y='tf_score', hue='activity', data=df, ax=axs[i, j])
                axs[i, j].set_xlabel('neuron {}'.format(i))
                axs[i, j].set_ylabel('tf_score {}'.format(j))
                axs[i, j].legend().remove()
        plt.show()

    # for tf_score 0, fit a linear regression model to the activations
    from sklearn.linear_model import LinearRegression

    fig, axs = plt.subplots(1, tf_scores.shape[1], figsize=(10, 3))
    # add spacing
    fig.subplots_adjust(wspace=0.5)
    for i in range(tf_scores.shape[1]):
        reg = LinearRegression().fit(activations.detach().numpy(), tf_scores[:, i].detach().numpy())
        print(reg.score(activations.detach().numpy(), tf_scores[:, i].detach().numpy()))
        # use these coefficients to plot the activations against the tf_scores
        superposition = np.matmul(activations.detach().numpy(), reg.coef_.T)
        # now plot the tf_score against the superposition
        axs[i].scatter(superposition, tf_scores[:, i].detach().numpy(), c=three_cols[i])
        axs[i].set_xlabel('superposition')
        axs[i].set_ylabel('TF score {}'.format(i))
        # the title should be the coefficients
        axs[i].set_title('coefficients: {}'.format(np.round(reg.coef_,2)), fontsize=fontsize-2)
    # save as pdf
    plt.savefig('tf_score_superposition'+name+'.png', dpi=300, bbox_inches='tight')
    plt.show()

    # also find the superposition for the activity score
    reg = LinearRegression().fit(activations.detach().numpy(), activity_score.flatten().detach().numpy())
    print(reg.score(activations.detach().numpy(), activity_score.flatten().detach().numpy()))
    # use these coefficients to plot the activations against the tf_scores
    superposition = np.matmul(activations.detach().numpy(), reg.coef_.T)
    # now plot the tf_score against the superposition
    fig, axs = plt.subplots(1, 1, figsize=(3, 3))
    plt.scatter(superposition, activity_score.flatten().detach().numpy(), c='black')
    plt.xlabel('superposition')
    plt.ylabel('activity_score')
    # the title should be the coefficients
    plt.title('coefficients: {}'.format(np.round(reg.coef_,2)), fontsize=fontsize)
    # save as pdf
    plt.savefig('activity_score_superposition'+name+'.png', dpi=300, bbox_inches='tight')
    plt.show()

    # make one plot showing a pca with the coefficient vectors for each tf score
    fig, axs = plt.subplots(1, 1, figsize=(3, 3))
    from sklearn.decomposition import PCA
    pca = PCA(n_components=2)
    pca.fit(activations.detach().numpy())
    # get the coefficients
    coefficients = []
    for i in range(tf_scores.shape[1]):
        reg = LinearRegression().fit(activations.detach().numpy(), tf_scores[:, i].detach().numpy())
        coefficients.append(reg.coef_)
    # add the activity score coefficients
    reg = LinearRegression().fit(activations.detach().numpy(), activity_score.flatten().detach().numpy())
    coefficients.append(reg.coef_)
    coefficients = np.array(coefficients)
    # transform the coefficients
    coefficients = pca.transform(coefficients)
    # plot the coefficients
    plt.scatter(coefficients[:, 0], coefficients[:, 1], s=1)
    # label the coefficients
    #for i in range(coefficients.shape[0]):
    #    plt.annotate('TF score {}'.format(i), (coefficients[i, 0]+0.05, coefficients[i, 1]*1.1))
    # plot arrows from the origin to the coefficients
    mean = pca.transform(pca.mean_.reshape(1, -1))
    for i in range(coefficients.shape[0]):
        if i == coefficients.shape[0]-1:
            plt.arrow(mean[0][0], mean[0][1], coefficients[i, 0], coefficients[i, 1], head_width=0.05, head_length=0.1, fc='black', ec='black')
        else:
            plt.arrow(mean[0][0], mean[0][1], coefficients[i, 0], coefficients[i, 1], head_width=0.05, head_length=0.1, fc=three_cols[i], ec=three_cols[i])
    plt.xlabel('PC 1')
    plt.ylabel('PC 2')
    plt.title('TF score coefficients', fontsize=fontsize)
    # save as pdf
    plt.savefig('tf_score_coefficients'+name+'.png', dpi=300, bbox_inches='tight')
    plt.show()

    # do the same for the activity score and the accessibility scores
    if activations.shape[1] > 10:
        # look for neurons that correlate strongly with a gene
        # first, get the correlations
        for i in range(activations.shape[1]):
            for j in range(accessibility_scores.shape[1]):
                corr = np.corrcoef(activations[:, i].detach().numpy(), accessibility_scores[:, j].detach().numpy())[0, 1]
                if corr > corr_threshold:
                    # plot the neuron
                    plt.scatter(activations[:, i].detach().numpy(), accessibility_scores[:, j].detach().numpy())
                    plt.xlabel('neuron {}'.format(i))
                    plt.ylabel('accessibility_score {}'.format(j))
                    plt.title('correlation: {}'.format(corr))
                    plt.legend().remove()
                    plt.show()
    else:
        fig, axs = plt.subplots(activations.shape[1], accessibility_scores.shape[1], figsize=(9, 3*activations.shape[1]))
        for i in range(activations.shape[1]):
            for j in range(accessibility_scores.shape[1]):
                # make a dataframe
                df = pd.DataFrame({'neuron': activations[:, i].detach().numpy(), 'accessibility_score': accessibility_scores[:, j].detach().numpy(), 'activity': activity_score.flatten().detach().numpy()})
                sns.scatterplot(x='neuron', y='accessibility_score', hue='activity', data=df, ax=axs[i, j])
                axs[i, j].set_xlabel('neuron {}'.format(i))
                axs[i, j].set_ylabel('accessibility_score {}'.format(j))
                axs[i, j].legend().remove()
        plt.show()
    fig, axs = plt.subplots(1, accessibility_scores.shape[1], figsize=(9, 3))
    for i in range(accessibility_scores.shape[1]):
        reg = LinearRegression().fit(activations.detach().numpy(), accessibility_scores[:, i].detach().numpy())
        # use these coefficients to plot the activations against the tf_scores
        superposition = np.matmul(activations.detach().numpy(), reg.coef_.T)
        # now plot the tf_score against the superposition
        axs[i].scatter(superposition, accessibility_scores[:, i].detach().numpy())
        axs[i].set_xlabel('superposition')
        axs[i].set_ylabel('accessibility_score {}'.format(i))
        # the title should be the coefficients
        axs[i].set_title('coefficients: {}'.format(reg.coef_))
    plt.show()

    fig, axs = plt.subplots(1, activations.shape[1], figsize=(activations.shape[1]*3, 3))
    for i in range(activations.shape[1]):
        # make a dataframe
        axs[i].scatter(activations[:, i].detach().numpy(), activity_score.flatten().detach().numpy())
        axs[i].set_xlabel('neuron {}'.format(i))
        axs[i].set_ylabel('activity_score')
    plt.show()
    reg = LinearRegression().fit(activations.detach().numpy(), activity_score.flatten().detach().numpy())
    # use these coefficients to plot the activations against the tf_scores
    superposition = np.matmul(activations.detach().numpy(), reg.coef_.T)
    # now plot the tf_score against the superposition
    plt.scatter(superposition, activity_score.flatten().detach().numpy())
    plt.xlabel('superposition')
    plt.ylabel('activity_score')
    # the title should be the coefficients
    plt.title('coefficients: {}'.format(reg.coef_))
    plt.show()

## Exp1: Superpositions depending on the bottleneck size (relative to causal variables) for single-layer networks

Here, the number of causal variables are 4: 3 TFs and the activity in the sample.

### overcomplete case

In [33]:
encoder = torch.nn.Sequential(
    torch.nn.Linear(5, 10)
)
decoder = torch.nn.Sequential(
    torch.nn.Linear(10, 5),
    torch.nn.ReLU()
)

encoder, decoder, history = train_and_eval_model(encoder, decoder, rna_counts, n_samples, n_samples_validation)

# save this model as the best one
model_name = 'layer1_latent10_v1'

torch.save(encoder, '03_results/models/sim1_'+model_name+'_encoder.pth')
torch.save(decoder, '03_results/models/sim1_'+model_name+'_decoder.pth')

# also save the history of the training
history.to_csv('03_results/models/sim1_'+model_name+'_history.csv', index=False)

### ideal case

In [42]:
encoder = torch.nn.Sequential(
    torch.nn.Linear(5, 4)
)
decoder = torch.nn.Sequential(
    torch.nn.Linear(4, 5),
    torch.nn.ReLU()
)

encoder, decoder, history = train_and_eval_model(encoder, decoder, rna_counts, n_samples, n_samples_validation)

# save this model as the best one
model_name = 'layer1_latent4_v2'

torch.save(encoder, '03_results/models/sim1_'+model_name+'_encoder.pth')
torch.save(decoder, '03_results/models/sim1_'+model_name+'_decoder.pth')

# also save the history of the training
history.to_csv('03_results/models/sim1_'+model_name+'_history.csv', index=False)

### bottleneck smaller than the number of causal variables

In [19]:
encoder = torch.nn.Sequential(
    torch.nn.Linear(5, 2)
)
decoder = torch.nn.Sequential(
    torch.nn.Linear(2, 5),
    torch.nn.ReLU()
)

encoder, decoder, history = train_and_eval_model(encoder, decoder, rna_counts, n_samples, n_samples_validation)

# save this model as the best one
model_name = 'layer1_latent2_v1'

torch.save(encoder, '03_results/models/sim1_'+model_name+'_encoder.pth')
torch.save(decoder, '03_results/models/sim1_'+model_name+'_decoder.pth')

# also save the history of the training
history.to_csv('03_results/models/sim1_'+model_name+'_history.csv', index=False)

## Exp 2: 2 layers with nonlinearity

In [49]:
import torch.nn.init as init

def custom_weight_init(m, init_option='kaiming'):
    if isinstance(m, torch.nn.Linear):
        if init_option == 'xavier':
            init.xavier_uniform_(m.weight)
        elif init_option == 'uniform':
            init.uniform_(m.weight, -0.1, 0.1)
        elif init_option == 'normal':
            init.normal_(m.weight, 0, 0.1)
        elif init_option == 'kaiming_uniform':
            init.kaiming_uniform_(m.weight)
        else:
            init.kaiming_normal_(m.weight)
        
        if m.bias is not None:
            init.zeros_(m.bias)

### overcomplete case

In [None]:
init_type = 'xavier'

encoder = torch.nn.Sequential(
    torch.nn.Linear(5, 10),
    torch.nn.SiLU(),
    torch.nn.Linear(10, 100),
)
encoder.apply(lambda m: custom_weight_init(m, init_option=init_type))

decoder = torch.nn.Sequential(
    torch.nn.Linear(100, 10),
    torch.nn.SiLU(),
    torch.nn.Linear(10, 5),
    torch.nn.ReLU()
)
decoder.apply(lambda m: custom_weight_init(m, init_option=init_type))

encoder, decoder = train_and_eval_model(encoder, decoder, rna_counts, n_samples, n_samples_validation)

# save this model as the best one
model_name = 'layer2_latent100_xavier'

torch.save(encoder, '03_results/models/sim1_'+model_name+'_encoder.pth')
torch.save(decoder, '03_results/models/sim1_'+model_name+'_decoder.pth')

# also save the history of the training
history.to_csv('03_results/models/sim1_'+model_name+'_history.csv', index=False)

### ideal

In [64]:
init_type = 'xavier'

encoder = torch.nn.Sequential(
    torch.nn.Linear(5, 5),
    torch.nn.SiLU(),
    torch.nn.Linear(5, 4),
)
encoder.apply(lambda m: custom_weight_init(m, init_option=init_type))

decoder = torch.nn.Sequential(
    torch.nn.Linear(4, 5),
    torch.nn.SiLU(),
    torch.nn.Linear(5, 5),
    torch.nn.ReLU()
)
decoder.apply(lambda m: custom_weight_init(m, init_option=init_type))

encoder, decoder = train_and_eval_model(encoder, decoder, rna_counts, n_samples, n_samples_validation, learning_rate=1e-3)

# save this model as the best one
model_name = 'layer2_latent4_xavier_v2'

torch.save(encoder, '03_results/models/sim1_'+model_name+'_encoder.pth')
torch.save(decoder, '03_results/models/sim1_'+model_name+'_decoder.pth')

### bottleneck smaller than the number of causal variables

In [None]:
init_type = 'xavier'

encoder = torch.nn.Sequential(
    torch.nn.Linear(5, 5),
    torch.nn.SiLU(),
    torch.nn.Linear(5, 2),
)
encoder.apply(lambda m: custom_weight_init(m, init_option=init_type))

decoder = torch.nn.Sequential(
    torch.nn.Linear(2, 5),
    torch.nn.SiLU(),
    torch.nn.Linear(5, 5),
    torch.nn.ReLU()
)
decoder.apply(lambda m: custom_weight_init(m, init_option=init_type))

encoder, decoder = train_and_eval_model(encoder, decoder, rna_counts, n_samples, n_samples_validation, learning_rate=1e-3)

# save this model as the best one
model_name = 'layer2_latent2_xavier'

torch.save(encoder, '03_results/models/sim1_'+model_name+'_encoder.pth')
torch.save(decoder, '03_results/models/sim1_'+model_name+'_decoder.pth')

# also save the history of the training
history.to_csv('03_results/models/sim1_'+model_name+'_history.csv', index=False)