# Numerical opt. of $alpha_{1:n}$ for probit class

Assume a model with a fairly informative prior:
$$
 \Theta \sim N(\Theta; \mu_{\theta}, \sigma^2_{\Theta}) \\
 B \sim U(B; a_{B}, b_{B})
$$

In [None]:
from math import isclose, log, log2
from functools import partial
from scipy.stats import norm, bernoulli
import numpy as np
import torch
from torch.distributions.normal import Normal
from torch.distributions.uniform import Uniform
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize'] = (12,5)


def p(z, alpha_i):
    """Likelihood p(Ỹ_i = ỹ_i | Theta = theta, X_i = x_i),
    with z = theta^T x_i - beta
    """
    return (2*alpha_i - 1)*Normal(loc=0, scale=1).cdf(z) + 1-alpha_i

def h(p):
    """Entropy of a Bernoulli variable"""
    # Set h = 0 in the limit p -> 0, and p -> 1
    entropy = torch.empty(p.shape)
    one_lim = torch.isclose(p, torch.ones(p.shape))
    zero_lim = torch.isclose(p, torch.zeros(p.shape))
    limit_inds = torch.logical_or(one_lim, zero_lim)
    entropy[limit_inds] = 0
    # For the rest, compute it the usual way.
    ok_inds = torch.logical_not(limit_inds)
    ok_p = p[ok_inds]
    entropy[ok_inds] = -ok_p*torch.log2(ok_p) - (1-ok_p)*torch.log2(1-ok_p)
    return entropy

a_B = -1
b_B = 1
mu_theta = 1e6
sigma_sq_theta = 0.1
theta = Normal(loc=mu_theta, scale=sigma_sq_theta).sample()
beta = Uniform(a_B, b_B).sample()
num_samples = 10
# xs = 2*torch.Tensor(norm.rvs(size=num_samples))
x_range = torch.arange(xs.min(), xs.max(), 1/num_samples)
y_tildes = torch.Tensor(bernoulli.rvs(p=p(theta*(xs-beta), alpha_i=1.0)))
fig, ax = plt.subplots()
ax.plot(x_range, p(theta*x_range - beta, alpha_i = 0.8), label="p(ỹ_i | x_i, t, b)")
#ax.plot(xs, torch.zeros(xs.size()), 'b.')
ax.plot(xs, y_tildes, 'b.')
ax.set_xlabel("$x_i$");
ax.set_ylabel("$p$");



def entropy_of_avg(x_seq, alpha_seq, param_sampling):
    """H[Ỹ_1:n | X_1:n]
    Compute the probability for every ỹ in {0,1}^n
    and evaluate the bernoulli entropy.
    """
    elements = torch.combinations(torch.Tensor([0,1]), r=x_seq.size(0), with_replacement=True)
    thetas, betas = param_sampling()
    entropy = torch.tensor(0.0)
    ps = torch.tensor(0.0)
    for y_tilde_seq in elements:
        for params in zip(thetas, betas):
            ps += p_y_tilde_seq_given_params(y_tilde_seq, x_seq, params, alpha_seq)
        prob = ps / thetas.size(0)
        print(prob, entropy)
        entropy -= prob * torch.log2(prob)
    return entropy / elements.size(0)

def p_y_tilde_seq_given_params(y_tilde_seq, x_seq, params, alpha_seq):
    """Likelihood p(Ỹ_1:n = ỹ_1:n | Theta = theta, X_1:n = x_1:n)
    
    Args:
        y_tilde_seq (np.array): Seq. of Bernoulli variables (n,): {0, 1}^n
        x_seq (np.array): Seq. of np.arrays (n, D_x)
        theta (np.array): Vector (D_x,)
        alpha_seq (np.array): Seq of prec. values (n,): [0.5, 1]^n 
    """
    theta, beta = params
    true_inds = y_tilde_seq == 1
    false_inds = y_tilde_seq == 0
    z = theta * (x_seq - beta)
    alpha_seq[true_inds]
    true_prod = p(z[true_inds], alpha_seq[true_inds])
    false_prod = 1 - p(z[false_inds], alpha_seq[false_inds])
    return true_prod.prod() * false_prod.prod()

xs = torch.tensor([-2, -1/2, 0, 1/2])
alphas = torch.tensor([1/2, 1/2, 1, 1/2])
sampler = partial(param_sampling, mu_theta, sigma_sq_theta, a_B, b_B, num_mc_samples)
avg_entropy(xs, alphas, sampler)

# torch.combinations_with_replacement(torch.Tensor([0,1]), r=4, with_replacement=False)
# help(torch.combinations)
p_y_tilde_seq_given_params(torch.tensor([0, 0, 1, 1]), xs, (torch.tensor(1e6), torch.tensor(0.0)), alphas)

# torch.combinations(torch.Tensor([0,1]), r=4, with_replacement=True)
entropy_of_avg(xs, alphas, sampler)

## Opt problem
maximise
$$
L = 
\mathcal{H}[\tilde{Y}_{1:n} \vert X_{1:n}]
- \mathbb{E}_{\Theta \sim p(\Theta \vert X_{1:n})} \left[\mathcal{H}[\tilde{Y}_{1:n} \vert \theta, x_{1:n}] \right] \\
 = \mathcal{H}(\int p_{\alpha_{1:n}}(Ỹ_{1:n} = ỹ_{1:n} | x_{1:n}, \theta, \beta) N(\theta; \mu_{\Theta}, \sigma^2_{\theta}) U(\beta; a_B, b_B) d \theta d \beta) \\
 - \int \sum_i  h ((2\alpha_i - 1) \Phi(\theta^{\top}(x_i - \beta)) + 1 - \alpha_i) N(\theta; \mu_{\Theta}, \sigma^2_{\theta}) U(\beta; a_B, b_B) d \theta d \beta,
$$
with respect to $\alpha_i \in [0.5, 1.0]$

In [None]:
def p_y_tilde_seq_given_params(y_tilde_seq, x_seq, params, alpha_seq):
    """Likelihood p(Ỹ_1:n = ỹ_1:n | Theta = theta, X_1:n = x_1:n)
    
    Args:
        y_tilde_seq (np.array): Seq. of Bernoulli variables (n,): {0, 1}^n
        x_seq (np.array): Seq. of np.arrays (n, D_x)
        theta (np.array): Vector (D_x,)
        alpha_seq (np.array): Seq of prec. values (n,): [0.5, 1]^n 
    """
    theta, beta = params
    true_inds = y_tilde_seq == 1
    false_inds = y_tilde_seq == 0
    z = theta * (x_seq - beta)
    alpha_seq[true_inds]
    true_prod = p(z[true_inds], alpha_seq[true_inds])
    false_prod = 1 - p(z[false_inds], alpha_seq[false_inds])
    return true_prod.prod() * false_prod.prod()

def param_sampling(mu_theta, sigma_sq_theta, a_B, b_B, num_mc_samples):
    theta = Normal(loc=mu_theta, scale=sigma_sq_theta).sample(sample_shape=(num_mc_samples,))
    beta = Uniform(a_B, b_B).sample(sample_shape=(num_mc_samples,))
    return (theta, beta)

def entropy_of_avg(x_seq, alpha_seq, param_sampling):
    """H[Ỹ_1:n | X_1:n]
    Compute the probability for every ỹ in {0,1}^n
    and evaluate the bernoulli entropy.
    """
    elements = torch.combinations(torch.Tensor([0,1]), r=x_seq.size(0), with_replacement=True)
    thetas, betas = param_sampling()
    entropy = torch.tensor(0.0)
    ps = torch.tensor(0.0)
    for y_tilde_seq in elements:
        for params in zip(thetas, betas):
            ps += p_y_tilde_seq_given_params(y_tilde_seq, x_seq, params, alpha_seq)
        prob = ps / x_seq.size(0)
        entropy -= prob * torch.log2(prob)
    return entropy

def avg_entropy(x_seq, alpha_seq, param_sampling):
    """E_p(theta) H[Ỹ_1:n | Theta, Beta, X_1:n]"""
    thetas, betas = param_sampling()
    
    for params in zip(thetas, betas):
        theta, beta = params
        zs = theta * (xs - beta)
        ps = p(zs, alpha_seq)
        entropies = h(ps)
    return entropies.mean()

num_mc_samples = 100
alphas = 0.8*torch.ones(xs.shape, requires_grad=True)
sampler = partial(param_sampling, mu_theta, sigma_sq_theta, a_B, b_B, num_mc_samples)
L = entropy_of_avg(xs, alphas, sampler) - avg_entropy(xs, alphas, sampler)
torch.autograd.grad(L, alphas)

In [None]:
from torch import nn

def alpha_mapping(zs):
    """Map unbounded proxy variable to [1/2, 1]"""
    return 1/2 + 1/2*torch.sigmoid(zs)

class MiOpt(nn.Module):
    """Custom Pytorch model for gradient optimization.
    """
    def __init__(self, xs, sampling_fn):
        super().__init__()
        # proxy params for alphas with full support
        weights = torch.distributions.Uniform(-10, 10).sample(xs.size())
        # weights = 1 * torch.ones(xs.size())
        self.weights = nn.Parameter(weights)
        self._sampler = sampling_fn
        self.xs = xs

    def forward(self, _x):
        alphas = alpha_mapping(self.weights)
        L_1 = entropy_of_avg(self.xs, alphas, self._sampler)
        #print("L_1", L_1)
        L_2 = avg_entropy(self.xs, alphas, self._sampler)
        #print("L_2", L_2)
        return L_1 - L_2
    
def training_loop(model, optimizer, num_epochs):
    "Training loop for torch model."
    mis = []
    for i in range(num_epochs):
        mi = model(None)
        neg_mi = - mi
        neg_mi.backward()
        optimizer.step()
        optimizer.zero_grad()
        mis.append(mi.item()) 
        print(f"Epoch: {i}, MI: {mi.item()}")
    return mis

num_mc_samples = 100
sampling_fn = partial(param_sampling, mu_theta, sigma_sq_theta, a_B, b_B, num_mc_samples)
m = MiOpt(xs, sampling_fn)
# Instantiate optimizer
opt = torch.optim.Adam(m.parameters(), lr=0.01)
losses = training_loop(m, opt, num_epochs = 10)


In [None]:
zs = m.weights
alphas = alpha_mapping(zs).detach().numpy()

fig, ax = plt.subplots()
ax.plot(x_range, p(theta*(x_range - beta), alpha_i = 1.0), label="p(ỹ_i | x_i, t, b)")
#ax.plot(xs, torch.zeros(xs.size()), 'b.')
ax.plot(xs, alphas, 'b.')
ax.set_xlabel("$x_i$");
ax.set_ylabel("$p$");

In [None]:
mis = []
h_avgs = []
avg_hs = []
a_range = torch.linspace(0.5, 1.0, 10)
for a in a_range:
    # print(a)
    alphas = a*torch.ones(xs.shape)
    h_avg = entropy_of_avg(xs, alphas, sampler)
    avg_h = avg_entropy(xs, alphas, sampler)
    mi = h_avg - avg_h
    mis.append(mi)
    h_avgs.append(h_avg)
    avg_hs.append(avg_h)
mis = torch.tensor(mis)
h_avgs = torch.tensor(h_avgs)
avg_hs = torch.tensor(avg_hs)

fig, ax = plt.subplots()
ax.plot(a_range, mis, label="MI")
ax.plot(a_range, avg_hs, label="$E_{p(\\theta, \\beta)} H[Ỹ_{1:n} | \\theta, \\beta, X_{1:n}]$")
ax.plot(a_range, h_avgs, label="$H[Ỹ_{1:n}| X_{1:n}]$")
ax.legend()
ax.set_xlabel("$\\alpha$");
ax.set_ylabel("$MI$");

In [None]:
alpha_mapping(0.1*torch.ones(1,))

In [None]:
# Number of epochs, batch size, number of training data and learning rate

num_epochs = 20 
lr = 0.01

zs = 0.8*torch.ones(xs.size(), requires_grad=True)
for epoch in range(num_epochs):
    costs = []
    alphas = alpha_mapping(zs)
    L_1 = entropy_of_avg(xs, alphas, sampler)
    L_2 = avg_entropy(xs, alphas, sampler)
   
    L = - L_2
    L.backward()
    a = alphas.sum()
    print("alphas", a, a.grad)
    print("L_1", L_1, L_1.grad)
    print("L_2", L_2, L_2.grad)
    print(L.grad)
    costs.append(L.item())
    with torch.no_grad():
        alphas += alphas.grad * lr
        alphas.grad.zero_()
            
    # Computing and printing the average loss in the current epoch
    print('Epoch: {} Cost: {}'.format(epoch, L.item()))



In [None]:
import numpy as np
import torch
import matplotlib.pyplot as plt
from torch import nn
from torch.functional import F
from copy import copy

n = 1000
noise = torch.Tensor(np.random.normal(0, 0.02, size=n))
x = torch.arange(n)
a, k, b = 0.7, .01, 0.2
y = a * np.exp(-k * x) + b + noise

class Model(nn.Module):
    """Custom Pytorch model for gradient optimization.
    """
    def __init__(self):
        
        super().__init__()
        # initialize weights with random numbers
        weights = torch.distributions.Uniform(0, 0.1).sample((3,))
        # make weights torch parameters
        self.weights = nn.Parameter(weights)        
        
    def forward(self, X):
        """Implement function to be optimised. In this case, an exponential decay
        function (a + exp(-k * X) + b),
        """
        a, k, b = self.weights
        return a * torch.exp(-k * X) + b
    
def training_loop(model, optimizer, n=1000):
    "Training loop for torch model."
    losses = []
    for i in range(n):
        preds = model(x)
        loss = F.mse_loss(preds, y).sqrt()
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        losses.append(loss.item())  
    return losses

# instantiate model
m = Model()
# Instantiate optimizer
opt = torch.optim.Adam(m.parameters(), lr=0.001)
losses = training_loop(m, opt)
#plt.figure(figsize=(14, 7))
plt.plot(losses)
print(m.weights)