# >>Disclamer

'SamplingMethod' is a helper class for the animation widget. I could not use just 'Samling' class from the original notebook, because it often takes functions as arguments, so it is difficult to change them inside the widget. Therefore, we need this wrapper around 'Sampling' 

'SamplesArray' is for storing all samples in one place, this way it's easy to handle them.

In [50]:
# <api>
import numpy as np
from scipy.stats import norm
from scipy.stats import multivariate_normal
from functools import partial

from matplotlib import pyplot as plt
from bqplot import pyplot as bqplt
import bqplot
import ipywidgets
from ipywidgets import widgets, Layout
from IPython import display
#from ipywidgets import Layout
%matplotlib inline

# Load classes and functions from the previous parts
from jupyter_cms.loader import load_notebook
smpl1 = load_notebook('./Sampling_part1.ipynb')
smpl2 = load_notebook('./Sampling_part2.ipynb')
smpl3 = load_notebook('./Sampling_part3.ipynb')
smpl4 = load_notebook('./Sampling_part4.ipynb')
smpl5 = load_notebook('./Sampling_part5.ipynb')
smpl6 = load_notebook('./Sampling_part6.ipynb')
smpl_trgt = load_notebook('./Sampling_targets.ipynb')

In [51]:
#<api>
class SamplingMethod(object):
    '''A clas of different sampling methods for animation widget.'''  
    
    def __init__(self):
        pass
        
    def sample(self):
        return self.sampling.sample()
    
    def reset_start(self):        
        self.sampling.x = np.array([np.random.random_sample()*2*self.target.size - self.target.size, 
                                    np.random.random_sample()*2*self.target.size - self.target.size])

## Metropolis-Hastings

In [52]:
#<api>        
class MH (SamplingMethod):
    '''Metropolis-Hastings sampling'''
    def __init__(self, target=smpl_trgt.MultNorm(), prop_step=50): 
        self.x = [np.random.random_sample()*2*target.size - target.size, 
                  np.random.random_sample()*2*target.size - target.size]
        self.target = target
        self.prop_step = prop_step
        self.sampling = smpl3.MetropolisHastings(log_p=target.logpdf,
                              q=smpl3.Proposer(lambda x: multivariate_normal(mean=x, 
                                                                             cov=self.prop_step*np.eye(2)).rvs(),
                                               lambda x,y: 0), # Proposal is symmetric
                              x=self.x)    
    
    def __str__(self):
        return "Metropolis-Hastings: Accepted %d out of %d samples" % (self.sampling.accepted, 
                                                                       self.sampling.num_samples)   
    def reset_sampling(self):
        self.sampling = smpl3.MetropolisHastings(log_p=self.target.logpdf,
                              q=smpl3.Proposer(lambda x: multivariate_normal(mean=x, 
                                                                             cov=self.prop_step*np.eye(2)).rvs(),
                                               lambda x,y: 0), # Proposal is symmetric
                              x=self.x)  

In [53]:
#<api>
class MetropolisHastingsWithProposals (smpl3.MetropolisHastings):
    '''Metropolis-Hastings With Proposals (child of Sampling class)'''
    
    def sample_all (self):
        self.num_samples += 1
        # Propose new candidate
        x_prime = self.q.propose(self.x)
        a = self.log_p(x_prime) + self.q.log_trans_prob(x_prime, self.x) \
            - self.log_p(self.x) - self.q.log_trans_prob(self.x, x_prime)
        u = np.random.uniform()
        if np.log(u) < a:
            self.accepted += 1
            self.x = x_prime
        return [self.x, x_prime]
    
class MHP (MH):
    '''Metropolis-Hastings With Proposals (child of SamplingMethod class)'''
    
    def __init__(self, target=smpl_trgt.MultNorm(), prop_step=50): 
        self.x = [np.random.random_sample()*2*target.size - target.size, 
                  np.random.random_sample()*2*target.size - target.size]
        self.target = target
        self.prop_step = prop_step
        self.sampling = MetropolisHastingsWithProposals(log_p=target.logpdf,
                              q=smpl3.Proposer(lambda x: multivariate_normal(mean=x, 
                                                                             cov=self.prop_step*np.eye(2)).rvs(),
                                               lambda x,y: 0), # Proposal is symmetric
                              x=self.x)    
    
    def __str__(self):
        return "Metropolis-Hastings: Accepted %d out of %d samples" % (self.sampling.accepted, 
                                                                       self.sampling.num_samples)   
    def reset_sampling(self):
        self.sampling = MetropolisHastingsWithProposals(log_p=self.target.logpdf,
                              q=smpl3.Proposer(lambda x: multivariate_normal(mean=x, 
                                                                             cov=self.prop_step*np.eye(2)).rvs(),
                                               lambda x,y: 0), # Proposal is symmetric
                              x=self.x)  
    
    def sample_all(self):
        return self.sampling.sample_all()

## Slice sampling

In [54]:
#<api>        
class MSS (SamplingMethod):
    '''Multi Slice Sampling'''
    def __init__(self, target=smpl_trgt.MultNorm(), w=1, direct='Straight'):         
        self.target = target
        self.w = w
        self.direct = direct
        self.x = [np.random.random_sample()*2*target.size - target.size, 
                  np.random.random_sample()*2*target.size - target.size]
        self.reset_sampling()            
        
        
    def reset_sampling(self):
        if (self.direct=='Straight'):
            self.directions=[np.array([1,0]), np.array([0,1])]
        elif (self.direct=='Skew'):
            self.directions=[np.array([1,1]), np.array([1,-1])]
            
        self.sampling = smpl5.MultiSampling(lambda log_p, x: smpl5.SliceSampling(log_p, x, w=self.w), \
                         log_p=self.target.logpdf, \
                         x=self.x, \
                         directions=self.directions)
        

## HMC (without integration steps)

In [55]:
#<api>        
class HMC (SamplingMethod):
    '''Hamoltonian Monte-Carlo Sampling'''
    def __init__(self, target=smpl_trgt.MultNorm(), Tau=42, dtau=0.04):         
        self.target = target
        self.x = np.array([np.random.random_sample()*2*target.size - target.size, 
                           np.random.random_sample()*2*target.size - target.size])
        self.Tau = Tau
        self.dtau = dtau
        self.reset_sampling()          
        
    def reset_sampling(self):            
        self.sampling = smpl5.HMC(x=self.x, E=lambda x: - self.target.logpdf(x), Tau=self.Tau, dtau=self.dtau)

## HMC with integration steps

Here first I had to reimplement HMC as a 'Sampling' class and then wrap it into 'SamplingMethod' class.

In [58]:
# <api>
def integrate_H(xp, dE_dx, T, dt, method='leap_frog'):
    """
    Integrate the state xp for some time with the Hamiltonian H(x) = E(x) + K(x) 
    """
    x,p = np.copy(xp)
    int_steps = []
    t = 0
    while (t < T):
        t += dt
        if method=='euler':
            gradE = dE_dx(x)
            x += p*dt
            p += - gradE*dt
        elif method=='leap_frog':
            p += - dE_dx(x)*dt/2
            x += p*dt
            p += - dE_dx(x)*dt/2
        else:
            error('Unknown integratin method')
        int_steps.append([np.copy(x),np.copy(p)])
    return np.array(int_steps)

In [59]:
# <api>
# Simple numeric differentiation
def grad (f, x):
    eps = 1e-4
    gradx = np.zeros_like(x)
    for i in range(x.size):
        dx = np.zeros_like(x)
        dx[i] = 1
        gradx[i] = (f(x + dx*eps/2) - f(x - dx*eps/2))/eps
    return gradx

def dE2d_dx(x):
    return grad(lambda x: - smpl2.p2d.logpdf(x), x)

x0 = np.array([1.5, 0]); p0 = np.array([0.1, -0.1])
traj_euler = [(x0,p0)]; traj_leap_frog = [(x0,p0)]
dt = 0.01
iH1 = integrate_H(traj_euler[-1], dE2d_dx, 0.1, dt, 'leap_frog')


In [79]:
# <api>
class HMCintegr (smpl1.Sampling):
    def __init__(self, x, E, dE_dx=None, Tau=42, dtau=0.04):
        self.x = x
        self.E = E
        if dE_dx is None:
            dE_dx = lambda x: grad(E, x)
        self.dE_dx = dE_dx
        self.Tau = Tau
        self.dtau = dtau
        self.num_samples = 0
        
    def _H (self, x, p):
        return self.E(x) + 0.5*np.dot(p.T, p)
        
    def sample (self):
        self.num_samples += 1
        # Gibbs step for momentum
        x = self.x
        p = np.random.normal(size=x.shape)
        H = self._H(x, p)
        
        # Simulate dynamics
        xnew, pnew = integrate_H((x,p), self.dE_dx, self.Tau*self.dtau, self.dtau, 'leap_frog')
        Hnew = self._H(xnew, pnew)
        
        # Metropolis step
        if np.log(np.random.uniform()) < H - Hnew: # Remember: H = - logp
            self.x = xnew
        return self.x
    
    ### Does not work yet
    def sample_all (self):
        self.num_samples += 1
        # Gibbs step for momentum
        x = self.x
        p = np.random.normal(size=x.shape)
        H = self._H(x, p)
        
        # Simulate dynamics
        int_H = integrate_H((x,p), self.dE_dx, self.Tau*self.dtau, self.dtau, 'leap_frog')
        xnew, pnew = int_H[-1]
        Hnew = self._H(xnew, pnew)
        
        # Collect all integration steps with the start point
        out = []
        intX = int_H[:,0]
        for i in intX:
            out.append([self.x, i])
        
        # Metropolis step
        if np.log(np.random.uniform()) < H - Hnew: # Remember: H = - logp
            self.x = xnew
        else:
            out = [[self.x, self.x]]
            
        return np.array(out)

In [1]:
#<api>        
class HMCi (SamplingMethod):
    '''Hamoltonian Monte-Carlo Sampling with integration steps'''
    def __init__(self, target=smpl_trgt.MultNorm(), Tau=42, dtau=0.04):         
        self.target = target
        self.x = np.array([np.random.random_sample()*2*target.size - target.size, 
                           np.random.random_sample()*2*target.size - target.size])
        self.Tau = Tau
        self.dtau = dtau
        self.reset_sampling()          
        
    def reset_sampling(self):            
        self.sampling = HMCintegr(x=self.x, E=lambda x: - self.target.logpdf(x), Tau=self.Tau, dtau=self.dtau)
        
    def sample_all(self):
        return self.sampling.sample_all()
    
    def __str__(self):
        return "The real number of HMC samples is %d" % (self.sampling.num_samples)

NameError: name 'SamplingMethod' is not defined

## SamplesArray

In [2]:
#<api>
class SamplesArray:
    '''A class to draw a set of samples and get certain intervals of them.'''
    
    def __init__(self, sampling):
        self.sampling = sampling
        self.data = np.array([])
        self.prop = False    # Flag for proposals        
        

    def draw(self, n): 
        self.reset_counters()
        try:
            self.data = np.array([self.sampling.sample_all() for _ in range(n)])
            self.prop = True
            #print('all samples! prop ', self.prop, time.time())
        except:
            self.data = np.array([self.sampling.sample() for _ in range(n)])
            self.prop = False
            #print('no proposals. prop ', self.prop, time.time())
        return self.data
    
    def reset_counters(self):
        try:
            self.sampling.reset_counters()
        except:
            pass
        
    def getX(self, num):
        if self.data.ndim == 2:
            return self.data[:num,0]
        else:
            return self.data[:num,0,0]
        
    def getY(self, num):
        if self.data.ndim == 2:
            return self.data[:num,1]
        else:
            return self.data[:num,0,1]
    
    def getXall(self, num):
        return self.data[:num,1,0]
        
    def getYall(self, num):
        return self.data[:num,1,1]
        
        
    def reassembly_HMCi_samples(self):
        '''Problem: the function SamplesArray(sampling=HMCi).draw(N) 
        produces an array of integration trajectory. 
        This function reassemlys such array in a simple array of 
        "all samples"=[accepted point, integration step]'''
        conc = np.array(self.data[0])
        for i in range(1,len(self.data)):    
            conc = np.concatenate((conc,self.data[i]), axis=0) 
        self.data = np.copy(conc)
        conc = None