In [2]:
import torch

In [3]:
from torch import nn
from torch.autograd import Variable

In [4]:
import numpy as np
N_train = 5000
# Load Data # replace with something else (also will make .pkl file)
data = np.load('single_locus_data.npy', allow_pickle='TRUE')
# with open("single_locus_data.pkl", "wb") as tf:
#     data = pickle.load(tf)    
f = data.item()['f'][:, None]
t = data.item()['t'][:, None]
p = data.item()['phi'][:, None]
print(len(f))
idx_list = np.arange(len(f))
np.random.shuffle(idx_list)
print(idx_list)

f_train = torch.from_numpy(f[idx_list[:N_train]]).float()
t_train = torch.from_numpy(t[idx_list[:N_train]]).float()
p_train = torch.from_numpy(p[idx_list[:N_train]]).float()
f_test = torch.from_numpy(f[idx_list[N_train:]]).float()
t_test = torch.from_numpy(t[idx_list[N_train:]]).float()
p_test = torch.from_numpy(p[idx_list[N_train:]]).float()


11000
[ 5958  7800  8340 ...  6742  5191 10164]


In [5]:
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

Using cpu device


In [6]:
class PINN(nn.Module):
    def __init__(self, layers, s0, N0):
        super().__init__()
        self.model = nn.Sequential()
        for i in range(len(layers)-3):
            self.model.add_module(f"layer{i}", nn.Linear(layers[i], layers[i+1]))
            self.model.add_module(f"activation{i}", nn.Tanh())
        self.model.add_module(f"layer{len(layers)-2}", nn.Linear(layers[-3], layers[-2]))
        self.model.add_module(f"activation{len(layers)-2}", nn.Softplus())
        self.model.add_module(f"layer{len(layers)-1}", nn.Linear(layers[-2], layers[-1]))
        self.s = torch.nn.Parameter(torch.tensor(s0).float(), requires_grad = True)
        self.N = torch.nn.Parameter(torch.tensor(N0).float(), requires_grad = True)
        # self.s = Variable(torch.tensor(s0).float(), requires_grad = True).to(device)
        # self.N = Variable(torch.tensor(N0).float(), requires_grad = True).to(device)

    def forward(self, f, t):
        inputs = torch.cat([f, t], 1)
        return self.model(inputs)
    def PDE(self, f, t):
        s = self.s
        N = self.N
        p = self.model(torch.cat([f, t], 1))
        p_f = torch.autograd.grad(p.sum(), f, create_graph=True)[0]
        p_t = torch.autograd.grad(p.sum(), t, create_graph=True)[0]
        p_ff = torch.autograd.grad(p_f.sum(), f, create_graph=True)[0]
        g = p_t + s * ((1 - 2*f)*p + (f - f**2)*p_f) + 1 / (2*N) * (-2*p + 2*(1 - 2*f)*p_f + (f - f**2)*p_ff)
        return g

# Input = (f, t) series, Output = p series
model = PINN(layers = [2, 20, 20, 20, 1], s0 = 0.1, N0 = 500).to(device)
        

In [7]:
print(model)
for parameter in model.parameters():

    print(parameter)

PINN(
  (model): Sequential(
    (layer0): Linear(in_features=2, out_features=20, bias=True)
    (activation0): Tanh()
    (layer1): Linear(in_features=20, out_features=20, bias=True)
    (activation1): Tanh()
    (layer3): Linear(in_features=20, out_features=20, bias=True)
    (activation3): Softplus(beta=1, threshold=20)
    (layer4): Linear(in_features=20, out_features=1, bias=True)
  )
)
Parameter containing:
tensor(0.1000, requires_grad=True)
Parameter containing:
tensor(500., requires_grad=True)
Parameter containing:
tensor([[ 0.2094, -0.5100],
        [-0.3868, -0.5020],
        [-0.2009,  0.1741],
        [ 0.3426, -0.6366],
        [ 0.3686, -0.5230],
        [ 0.0813,  0.6961],
        [-0.4961, -0.6420],
        [ 0.6010,  0.2583],
        [ 0.3230, -0.7067],
        [ 0.1903, -0.0757],
        [ 0.6649, -0.2768],
        [ 0.1049,  0.2885],
        [-0.6545,  0.3637],
        [ 0.4484, -0.5865],
        [ 0.0009,  0.5008],
        [-0.2942, -0.1481],
        [-0.5370,  0.24

In [8]:
model(f_train, t_train)

tensor([[-0.7667],
        [-0.7667],
        [-0.7667],
        ...,
        [-0.7667],
        [-0.7667],
        [-0.8514]], grad_fn=<AddmmBackward0>)

In [9]:
#### Now define loss based on a PDE. We need to get derivative of p against x and t.
MSE_loss = nn.MSELoss()
f_sample = np.random.uniform(low=0.0, high=1.0, size=(500,1))
pt_f_sample = Variable(torch.from_numpy(f_sample).float(), requires_grad=True).to(device)
t_sample = np.random.uniform(low=0.0, high=10000, size=(500,1))
pt_t_sample = Variable(torch.from_numpy(t_sample).float(), requires_grad=True).to(device)
loss_pde = MSE_loss(model.PDE(pt_f_sample, pt_t_sample), torch.zeros_like(model.PDE(pt_f_sample, pt_t_sample)))
# In this case, we have p_train to compare directly against estimate of p at f_train, t_train. 
loss_data = MSE_loss(model(f_train, t_train), p_train)
loss = loss_pde + loss_data

# In a different scenario, I will put in all f, t data without estimating p before training. 
# reorganize f and t series so that I have f at each time point (t_i), add - 1/n_i sum(log(p(f(t_i), t_i))) + int model(f, t_i)df
# sort the data by time point, start from t=0 till maximum t (either t_extinct or t_fix)

In [10]:
iterations = 100
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
for epoch in range(iterations):
    optimizer.zero_grad()
    MSE_loss = nn.MSELoss()
    f_sample = np.random.uniform(low=0.0, high=1.0, size=(500,1))
    pt_f_sample = Variable(torch.from_numpy(f_sample).float(), requires_grad=True).to(device)
    t_sample = np.random.uniform(low=0.0, high=10000, size=(500,1))
    pt_t_sample = Variable(torch.from_numpy(t_sample).float(), requires_grad=True).to(device)
    loss_pde = MSE_loss(model.PDE(pt_f_sample, pt_t_sample), torch.zeros_like(model.PDE(pt_f_sample, pt_t_sample)))
    # In this case, we have p_train to compare directly against estimate of p at f_train, t_train. 
    loss_data = MSE_loss(model(f_train, t_train), p_train)
    loss = loss_pde + loss_data
    loss.backward()
    optimizer.step()

    print(epoch,"training loss:", loss.data, "s:", model.s, "N:", model.N)


0 training loss: tensor(1.2201) s: Parameter containing:
tensor(0.0990, requires_grad=True) N: Parameter containing:
tensor(499.9998, requires_grad=True)
1 training loss: tensor(1.1475) s: Parameter containing:
tensor(0.0980, requires_grad=True) N: Parameter containing:
tensor(500.0001, requires_grad=True)
2 training loss: tensor(1.0774) s: Parameter containing:
tensor(0.0970, requires_grad=True) N: Parameter containing:
tensor(500.0002, requires_grad=True)
3 training loss: tensor(1.0101) s: Parameter containing:
tensor(0.0960, requires_grad=True) N: Parameter containing:
tensor(500.0004, requires_grad=True)
4 training loss: tensor(0.9452) s: Parameter containing:
tensor(0.0951, requires_grad=True) N: Parameter containing:
tensor(500.0006, requires_grad=True)
5 training loss: tensor(0.8828) s: Parameter containing:
tensor(0.0941, requires_grad=True) N: Parameter containing:
tensor(500.0009, requires_grad=True)
6 training loss: tensor(0.8232) s: Parameter containing:
tensor(0.0932, requ

In [11]:
# s and N are not updating... why??? ==> solved! self.s and self.N should be torch.nn.Parameter, not Variable.

In [12]:
model(f_test, t_test)

tensor([[ 0.3367],
        [ 0.3479],
        [ 0.3423],
        ...,
        [ 0.3477],
        [ 0.3455],
        [-0.1735]], grad_fn=<AddmmBackward0>)

In [13]:
p_test

tensor([[0.0723],
        [0.0780],
        [0.3574],
        ...,
        [0.6743],
        [0.5192],
        [0.0000]])

In [14]:
model.N

Parameter containing:
tensor(500.0162, requires_grad=True)

In [15]:
model.s

Parameter containing:
tensor(0.0478, requires_grad=True)

In [16]:
# Return predicted s and N, plot p(f, t). Plot p(f = 1, t), which is fixation probability as a function of time.