In [1]:
# setup
import os
os.environ["KERAS_BACKEND"] = "torch"
os.environ["PYTORCH_ENABLE_MPS_FALLBACK"] = "1"

import keras
import sys
sys.path.append("../")

import bayesflow as bf
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

  torch.logspace(


In [2]:
import torch
from torch import Tensor
import torch.nn as nn
import bayesflow as bf
from bayesflow.utils import filter_concatenate

In [3]:
import bayesflow as bf
from torch import Tensor
from torch.distributions import Distribution

from custom_simulators import LikelihoodBasedModel, ParameterMask, Prior, RandomNumObs
from design_networks import RandomDesign, DeepAdaptiveDesign, EmitterNetwork
from design_loss import NestedMonteCarlo
from inference_design_approximator import JointApproximator
from custom_dataset import DataSet

import torch.nn as nn

In [4]:
class PolynomialRegression(LikelihoodBasedModel):
    def __init__(self, mask_sampler, prior_sampler, tau_sampler, design_generator, sim_vars) -> None:
        super().__init__(mask_sampler, prior_sampler, tau_sampler, design_generator, sim_vars)

    def outcome_likelihood(self, params: Tensor, xi: Tensor, sim_vars: dict) -> Distribution: # params: [B, param_dim], xi: [B, 1, xi_dim]
        xi_powers = torch.stack([torch.ones_like(xi), xi, xi ** 2, xi ** 3], dim=-2).squeeze(-1) # [B, 1, 4]
        mean = torch.sum(params.unsqueeze(1) * xi_powers, dim=-1, keepdim=True) # sum([B, 1, 4] * [B, 1, 4]) = [B, 1, y_dim] (y_dim = 1 here)
        sigma = sim_vars["sigma"]
        return torch.distributions.Normal(mean, sigma) # [B, 1, y_dim]
    
    def analytical_log_marginal_likelihood(outcomes, params: Tensor, masks: Tensor) -> Tensor:
        raise NotImplementedError # TODO

class PriorPolynomialReg(Prior):
    def __init__(self, delta: Tensor = Tensor([0.1])) -> None:
        super().__init__()
        self.delta = delta

    def dist(self, masks: Tensor) -> Distribution:
        super().__init__()
        
        self.masks = masks

        default = Tensor([[0, self.delta]])
        masks_ = masks.unsqueeze(-1)

        prior_0 = torch.where(masks_[:, 0] == 1, Tensor([5, 2]), default)
        prior_1 = torch.where(masks_[:, 1] == 1, Tensor([3, 1]), default)
        prior_2 = torch.where(masks_[:, 2] == 1, Tensor([0, 0.8]), default)
        prior_3 = torch.where(masks_[:, 3] == 1, Tensor([0, 0.5]), default)

        hyper_params = torch.stack([prior_0, prior_1, prior_2, prior_3], dim=1)

        means = hyper_params[:, :, 0]
        sds = hyper_params[:, :, 1]
    
        dist = torch.distributions.MultivariateNormal(means, scale_tril=torch.stack([torch.diag(sd) for sd in sds])) # [B, theta_dim]

        return dist
    
inference_network = bf.networks.FlowMatching() # TODO replace with coupling flow bf.networks.CouplingFlow()
summary_network = bf.networks.DeepSet(summary_dim = 10)

approximator = bf.Approximator(
    inference_network = inference_network,
    summary_network = summary_network,
    inference_variables = ["params"],
    inference_conditions = ["masks", "n_obs"],
    summary_variables = ["outcomes", "designs"]
)

T = 20 # number of maximum experiments (resources)
design_shape = torch.Size([1])
mask_sampler = ParameterMask()
prior_sampler = PriorPolynomialReg()
random_num_obs = RandomNumObs(min_obs = 1, max_obs = T)
random_design_generator = RandomDesign(design_shape = design_shape)

model_1 = PolynomialRegression(mask_sampler = mask_sampler,
                                        prior_sampler = prior_sampler,
                                        tau_sampler = random_num_obs,
                                        design_generator = random_design_generator,
                                        sim_vars = {"sigma": 1.0})

decoder_net = EmitterNetwork(input_dim = 10, hidden_dim = 24, output_dim = 1) # [B, summary_dim] -> [B, design_dim]
design_net = DeepAdaptiveDesign(encoder_net = approximator.summary_network,
                                decoder_net = decoder_net,
                                design_shape = design_shape, 
                                summary_variables=["outcomes", "designs"])

model_2 = PolynomialRegression(mask_sampler = mask_sampler,
                                        prior_sampler = prior_sampler,
                                        tau_sampler = random_num_obs,
                                        design_generator = design_net,
                                        sim_vars = {"sigma": 1.0})


class DeepAdaptiveDesign(nn.Module):
  def __init__(
      self,
      encoder_net: nn.Module | bf.networks.DeepSet, # same summary for bf and dad or different?
      decoder_net: nn.Module,
      design_shape: torch.Size, # [xi_dim]
      summary_variables: list[str] = None # in case of using summary_net from bf
    ) -> None:
    super().__init__()
    self.design_shape = design_shape
    self.register_parameter(
        "initial_design",
        nn.Parameter(0.1 * torch.ones(design_shape, dtype=torch.float32)) # scalar
    )
    self.encoder_net = encoder_net
    self.decoder_net = decoder_net
    self.summary_variables = summary_variables

  def forward(self, history, batch_size: int) -> Tensor:

    if history is None:
      return self.initial_design
    else:
      # embed design-outcome pairs
      # embeddings = self.encoder_net(filter_concatenate(history, keys=self.summary_variables)).to('cpu').requires_grad_(True)  # in case of using summary_net from bf. [B, summary_dim]
      # embeddings = torch.rand([1, 10], requires_grad=True)
      # get next design
      next_design = self.decoder_net(embeddings)
    return next_design

# hyperparams for bf
B = 32
batch_shape_b = torch.Size([B])

# hyperparams for DAD
B_d = 2000 # number of poitive samples
batch_shape_d = torch.Size([B_d])
L = 2000 # number of negative samples

dataset = DataSet(batch_shape = batch_shape_b, 
                    joint_model_1 = model_1,
                    joint_model_2 = model_2)

pce_loss = NestedMonteCarlo(approximator = approximator,
                            joint_model = model_2, # joint model with design network
                            batch_shape = batch_shape_d,
                            num_negative_samples = L)

trainer = JointApproximator(
    approximator = approximator,
    design_loss = pce_loss,
    dataset = dataset
)

# hyper_params = {"epochs_1": 1, "steps_per_epoch_1": 1,
#                 "epochs_2": 1, "steps_per_epoch_2": 100,
#                 "epochs_3": 5, "steps_per_epoch_3": 100}

# PATH = "test"  # ...BayesFlow/DAD/test/
# trainer.train(PATH = PATH, **hyper_params)

In [5]:
history = model_1.sample(torch.Size([1]))

In [6]:
out = design_net(history, 1)

Input requires grad: True
Input requires grad: False
Input requires grad: False
Input requires grad: False


Below

In [7]:
import torch
from torch import Tensor
import torch.nn as nn

In [8]:
class DeepAdaptiveDesign(nn.Module):
  def __init__(
      self,
      # encoder_net: nn.Module, # same summary for bf and dad or different?
      decoder_net: nn.Module,
      design_shape: torch.Size, # [xi_dim]
      summary_variables: list[str] = None # in case of using summary_net from bf
    ) -> None:
    super().__init__()
    self.design_shape = design_shape
    self.register_parameter(
        "initial_design",
        nn.Parameter(0.1 * torch.ones(design_shape, dtype=torch.float32)) # scalar
    )
    # self.encoder_net = encoder_net
    self.decoder_net = decoder_net
    self.summary_variables = summary_variables

  def forward(self, history, batch_size: int) -> Tensor:

    if history is None:
      return self.initial_design
    else:
      # embed design-outcome pairs
      # embeddings = self.encoder_net(filter_concatenate(history, keys=self.summary_variables)).to('cpu').requires_grad_(True)  # in case of using summary_net from bf. [B, summary_dim]
      embeddings = torch.rand([1, 10], requires_grad=True)
      # get next design
      next_design = self.decoder_net(embeddings)
    return next_design

In [9]:
class EmitterNetwork(nn.Module):
  def __init__(
        self,
        input_dim, # summary_dim
        hidden_dim,
        output_dim, # xi_dim
        n_hidden_layers=2,
        activation=nn.Softplus,
    ):
    super().__init__()
    self.activation_layer = activation()
    self.input_layer = nn.Linear(input_dim, hidden_dim)
    if n_hidden_layers > 1:
      self.middle = nn.Sequential(
         *[
            nn.Sequential(nn.Linear(hidden_dim, hidden_dim), activation())
            for _ in range(n_hidden_layers - 1)
          ]
            )
    else:
      self.middle = nn.Identity()
      
    self.output_layer = nn.Linear(hidden_dim, output_dim)

  def forward(self, r):
    print("Input requires grad:", r.requires_grad) 
    x = self.input_layer(r)
    print("Input requires grad:", x.requires_grad) 
    x = self.activation_layer(x)
    print("Input requires grad:", x.requires_grad) 
    x = self.middle(x)
    print("Input requires grad:", x.requires_grad) 
    x = self.output_layer(x)
    return x.unsqueeze(1) # [B, xi_dim] -> [B, 1, xi_dim]

In [10]:
class Simulator(nn.Module):
    def __init__(self, design_net):
        super().__init__()
        self.design_net = design_net
    
    def forward(self):
        out = []

        for i in range(5):
            out.append(self.design_net(None, 0))

        out = torch.cat(out, dim=0)

        return out

In [11]:
decoder = EmitterNetwork(10, 24, 1)

In [12]:
design_net = DeepAdaptiveDesign(
    decoder_net=decoder,
    design_shape=torch.Size([1])
)

In [13]:
input = torch.rand([1, 10], requires_grad=True) # same dimension as embeddings

In [14]:
output = decoder(input)

Input requires grad: True
Input requires grad: False
Input requires grad: False
Input requires grad: False


In [15]:
next_design = design_net(None, 0)

In [16]:
next_design

Parameter containing:
tensor([0.1000], requires_grad=True)

In [17]:
simulator = Simulator(design_net)

In [18]:
out = simulator()

In [19]:
out

tensor([0.1000, 0.1000, 0.1000, 0.1000, 0.1000])