# From `mcmc_for_neural_networks.ipynb`
Code previously used in `mcmc_for_neural_networks` to plot the results for section 1...

In [None]:
# Setting the plot size and font size:
fontsize = 15
fig = plt.figure(figsize=(10, 4))

# Plotting the estimated, i.e. sampled posterior:
ax1 = fig.add_subplot(111)
ax1.hist(plot_points,  bins = 20, color='C0', alpha=0.7, label='Sampled Posterior')
ax1.set_xlim(ax1.get_xlim())

# Plotting the true posterior:
ax2 = ax1.twinx()
ax2.grid(False)
ax2.plot(true_posterior[:,0], true_posterior[:,1], linewidth=2, color='C1', label='True Posterior')
ax2.set_ylim(0, ax2.get_ylim()[1])
ax2.set_ylabel('Density', fontsize = fontsize)

# Final plot display:
ax1.set_ylabel('Frequency', fontsize = fontsize, labelpad=10)
ax1.set_xlabel('Parameter value', fontsize = fontsize, labelpad=10)
ax1.set_title('Posterior', fontsize = fontsize, pad=10)
lgd = plt.legend(bbox_to_anchor=(1.25,0.5), loc='center left')
fig.tight_layout()
plt.show()

# From `hamiltonian_monte_carlo.ipynb`

# Basic Application
**_For more theoretical details about the example problem used here, see_** ["1. Practical Implementation" from `mcmc_for_neural_networks.ipynb`](https://github.com/pranigopu/mastersProject/tree/420f7b4e1c4d5a65fc1f0d94d013c4b0d2a53fc3/code#1.-Practical-Introduction-to-MCMC).

---

**General problem statement**:

Sampling the posterior of a single parameter (we shall call it the "model parameter").

**Specific problem statement**:

Obtain information about the probability of success $\theta$ (which is our model parameter) given some data of $k$ successes in $n$ trials using an uninformative prior (e.g. a uniform distribution, i.e. before making observations, we assume that $\theta$ is uniformly distributed). Here, the outcomes of the real-life process being modelled are distributed by a binomial distribution with some probability of success, which is our target distribution. Note that $\theta$ is the probability of success of our model, and we try to find the target distribution by making our model as similar as possible to the real-life process. In other words, we shall solve for the posterior distribution of parameter $\theta$, and thereby find (or at least estimate) the target distribution.

**NOTE**: $\theta$ _is the model's <u>probability</u> of success, which means its uniform prior must be_ $\text{Uniform}([0, 1])$.

In [1]:
import autograd.numpy as np
from numpy import random
from tqdm import tqdm # NOTE: `tqdm` is just used to display the progress bar when looping over iterables
from scipy import stats
import matplotlib.pyplot as plt
from autograd import grad

## Defining the prior and likelihood
By Bayes' rule, we know that:

$p = P(\theta|D) \propto P(D|\theta) P(\theta)$

Our prior has already been defined as $\text{Uniform}([0, 1])$ (see the previous sections).

---

Hence, we need to define the likelihood function given our data $(k, n)$. How? Firstly, we know it must be a binomial distribution, since the likelihood is the probability that our model would generate the outcomes $D$ (generated by the real-life process), given that the model parameterised by $\theta$. Hence, the likelihood function is a function that inputs $k$ (the number of successes), $n$ (the number of trials) and $\theta$ (the probability of success) and outputs the probability mass of $(k, n)$ under $\text{Binomial}(\theta)$, i.e. the binomial distribution parameterised by $\theta$.

In [2]:
# First define our likelihood function which will be dependent on provided `data`
likelihood_function = lambda k, n, θ: stats.binom.pmf(k, n, θ)

**KEY PRACTICAL NOTE FOR THIS EXAMPLE**:

Since we use an uinformative uniform prior $\text{Uniform}([0, 1])$, where $P(\theta) = 1$ if and only if $\theta \in [0, 1]$.

$\implies P(D|\theta) P(\theta) = P(D|\theta)$

However, in general (apart from this example), we should also define the prior function and use $P(D|\theta) P(\theta)$.

## Setting up MCMC

In [3]:
# Basic MCMC parameters:
n_samples = 10000 # number of samples to draw from the posterior
burnin = 2500 # number of samples to discard before recording draws from the posterior

# Specifying the data, ensuring k <= n:
binom_k = 80 # Number of successes
binom_n = 100 # Number of trials

# Initial parameter value from which to start sampling (drawn from the prior):
θ_current = random.uniform(0, 1)

# Variable to keep track of the number of accepted samples:
n_accepted_samples = 0

# Create an array of NaNs to fill with our samples:
p_posterior = np.full(n_samples, np.nan) 

## Running MCMC using HMC algorithm

In [4]:
def neg_log_prob(θ):
    return -1*np.log(likelihood_function(binom_k, binom_n, θ))

def hmc(θ_current, path_len=1, step_size=0.25):
    # Setup:
    steps = int(path_len/step_size) # `path_len` and `step_size` are tricky parameters to tune
    momentum_dist = stats.norm(0, 1)
    
    # Generate samples:
    θ_proposed = np.copy(θ_current)
    m_current = momentum_dist.rvs()        
    m_proposed = np.copy(m_current) 
    
    # Gradient of PDF w.r.t. position (θ_current) a.k.a. potential energy w.r.t. position:
    δVδθ = binom_k*θ_current**(binom_k-1) * (1-θ_current)**(binom_n-binom_k)
    δVδθ += (θ_current**binom_k) * (-1*(binom_n-binom_k)*(1-θ_current)**(binom_n-binom_k-1))
    δVδθ /= θ_current**binom_k * (1-θ_current)**(binom_n-binom_k)
    δVδθ *= -1

    # leapfrog integration begins...
    for s in range(steps): 
        m_proposed += step_size*δVδθ/2 # As potential energy increases, kinetic energy decreases, half-step
        θ_proposed += step_size*m_proposed # Position increases as function of momentum 
        m_proposed += step_size*δVδθ/2 # Second half-step "leapfrog" update to momentum    
    # ... leapfrog integration ends
    
    m_proposed = -1*m_proposed # Flip momentum for reversibility     

    return θ_proposed, m_current, m_proposed

In [5]:
print('Generating {} MCMC samples from the posterior:'.format(n_samples))
for i in tqdm(np.arange(n_samples), ncols=100):
    # Sample a value uniformly from 0 to 1 as a proposal:
    θ_proposed, m_current, m_proposed = hmc(θ_current)

    #------------------------------------
    # Calculate the Metropolis-Hastings acceptance probability α based on the prior and likelihood:
    θ_current_neg_log_prob = neg_log_prob(θ_current)
    θ_proposed_neg_log_prob = neg_log_prob(θ_proposed)
    
    m_current_neg_log_prob = neg_log_prob(m_current)
    m_proposed_neg_log_prob = neg_log_prob(m_proposed)
    
    # Account for negatives and log (probabilties):
    target = θ_current_neg_log_prob - θ_proposed_neg_log_prob # -P(θ_proposed)/P(θ_current)
    adjustment = m_proposed_neg_log_prob - m_current_neg_log_prob # P(m_current)/P(m_proposed)
    log_α = target + adjustment # Logarithm of acceptance probability
    # NOTE: Acceptance probability is given by [P(θ_proposed)*P(m_current)]/[P(θ_current)*P(m_proposed)] 
    
    #------------------------------------
    # Probabilistically accepting the proposal:
    if np.log(random.uniform(0, 1)) < log_α: # Simulates the probability of acceptance as α
        θ_current = θ_proposed # then update the current sample to the propoal for the next iteration
        n_accepted_samples += 1 # add to the count of accepted samples

    # Store the current sample
    p_posterior[i] = θ_current

Generating 10000 MCMC samples from the posterior:


100%|████████████████████████████████████████████████████████| 10000/10000 [00:18<00:00, 528.54it/s]


In [6]:
per_accept = (n_accepted_samples/n_samples)*100
print(f'Number of accepted samples: {per_accept:.3f} %')
posterior_mean = np.mean(p_posterior[burnin:])
print(f'Mean value of the estimated posterior (excluding burn-in): {posterior_mean:.3f}')

Number of accepted samples: 0.000 %
Mean value of the estimated posterior (excluding burn-in): 0.642


# From `bnn_basics.ipynb`

In [1]:
class ANN(nn.Module):
    def __init__(self):
        super().__init__()
        self.model = nn.Sequential(
            nn.Linear(1, 100, bias=True),
            nn.ReLU(),
            nn.Linear(100, 1, bias=True))

    #================================================
    # Initialising weights according to a standard normal distribution:
    def init_weights(self, layer):
        if isinstance(layer, nn.Linear):
            torch.nn.init.normal_(layer.weight)
    
    #================================================
    def forward(self, x):
        return self.model(x)

    #================================================
    def get_flattened_weights(self):
        flattened_weights = torch.tensor([])
        for layer in self.model:
            if isinstance(layer, nn.Linear):
                flattened_weights = torch.concat((flattened_weights, torch.flatten(layer.weight)))
        return torch.detach(flattened_weights)
        '''
        Why use `torch.detach` and not simply return the flattened weights? See implementation notes below.
        '''
    
    #================================================
    def set_weights_from_flattened_weights(self, θ):
        '''
        Set weights from proposed flattened weights.
        ------------------------------------
        θ : Flattened vector of proposed model parameters
        '''
        
        for layer in self.model:
            if isinstance(layer, nn.Linear):
                # Obtaining the size of the layer (i.e. the number of weights in the layer):
                size = int(torch.prod(torch.tensor(layer.weight.shape)))
                # NOTE 1: In our case, the above amounts to doing `int(layer.weight.shape[0] * layer.weight.shape[1])`
                # NOTE 2: Coverting `size` to an integer is necessary, since we want to use `size` for slicing

                # Setting the layer's weights and removing assigned weights from θ:
                with torch.no_grad():
                    print('HAHA', θ)
                    layer.weight = nn.Parameter(torch.reshape(θ[:size], shape=layer.weight.shape))
                '''
                REFERENCE FOR THE ABOVE: https://discuss.pytorch.org/t/how-to-manually-set-the-weights-in-a-two-layer-linear-model/45902
                '''
                if size < len(θ):
                    θ = θ[size:]

    #================================================
    # Function to take in the data and the proposed sample of the parameter and return the prediction:
    def evaluate_proposal(self, D, θ):
        '''
        Apply the proposed parameters and then use the model to predict.
        ------------------------------------
        D : Array of data points
        θ : Flattened vector of proposed model parameters

        NOTE: Although θ is a vector of parameters, as a vector, it is collectively regarded as one parameter.
        '''

        # Setting model's weights to the proposed weights:
        self.set_weights_from_flattened_weights(θ)
        
        # Predict based on the newly parameterised model:
        prediction = self.forward(D)
        return prediction

    #================================================
    def train(self, x, y, n_epochs=500):
        # Initialising MSE part of the loss function:
        mse_loss = nn.MSELoss()

        # Initialising the optimiser used for gradient descent:
        optimizer = optim.Adam(model.parameters(), lr=0.01)

        #------------------------------------
        # Training loop:
        
        for step in range(n_epochs):
            # Obtaining the model's predicted values for the inputs:
            prediction = self.model(x)
        
            # Calculating the loss function:
            mse = mse_loss(prediction, y)
            
            # Optimisation step:
            optimizer.zero_grad() # Resetting the optimiser's gradients
            mse.backward()       # Calculating the gradients for backpropagation
            optimizer.step()      # Applying the gradients for backpropagation
        
        print('MSE : %2.2f' % (mse.item()))

**IMPLEMENTATION NOTE: `.train`**:

The training function defined above is a traditional training loop, without any Bayesian elements.

---

**IMPLEMENTATION NOTE: Why use `torch.detach` and not simply return the flattened weights?**:

In later processes, we would need to apply `.numpy` (i.e. extracting the tensor's values as a NumPy array) to our tensor of flattened weights. However, since the flattened weights were obtained from the ANN layers' parameters, which require `grad` (i.e. automatic gradient calculation is enabled for them), the returned tensor of flattened weights also requires grad. Thus, when applying `.numpy` to our tensor of flattened weights, we would get the following error message:

```
RuntimeError: Can't call numpy() on Tensor that requires grad. Use tensor.detach().numpy() instead.
```

To avoid this, we use `torch.detach` on the tensor of flattened weights before returning it. `torch.detach` is a method that returns a new tensor, detached from the current graph (i.e. the [computational graph](https://pytorch.org/blog/computational-graphs-constructed-in-pytorch/) by means of which the partial gradient of a function can be calculated with respect to the tensor). The result will never require gradient, which is what we need.

> **Reference**: [`torch.Tensor.detach` (PyTorch documentation)](https://pytorch.org/docs/stable/generated/torch.Tensor.detach.html)

---

**END OF IMPLEMENTATION NOTES**

---

Some code for testing the weight flattening and assignment functions (you can copy-paste the code and run it for yourself)...

```python
# Initialising the model:
model = ANN()

# Obtaining new flattened weights:
w = torch.ones_like(model.get_flattened_weights()) # A tensor of ones with the same size as `model.get_flattened_weights()`
w[:50] *= 2
w[50:150] *= 3

# Setting the new flattened weights to the model:
model.set_weights_from_flattened_weights(w)

# Printing the model's updated layer weights:
for layer in model.model:
    if isinstance(layer, nn.Linear):
        print(layer.weight)
```

By running the above code, I have verified the correctness of the weight flattening and setting functions.

---

Training the ANN with the traditional training loop to test the ANN's functions (chiefly weight initialisation and training)...

In [None]:
model = ANN()

# Initialising weights:
model.apply(model.init_weights)
# NOTE: This model shall be our regression function

# Training model:
model.train(x, y)

**IMPLEMENTATION NOTE: `torch.nn.Module.apply(fn)`**:

Applies `fn` recursively to every submodule (as returned by `.children()`) as well as self. Typical use includes initialising the parameters of a model (for more on initialising module parameters, see:[`nn-init-doc`](https://pytorch.org/docs/stable/nn.init.html)).

> **Reference**: [Source code for `torch.nn.modules.module` (PyTorch documentation)](https://pytorch.org/docs/master/_modules/torch/nn/modules/module.html#Module.apply)