# Methods for 2D sampling widgets

'Drawing' is an abstract general class for animation. Its customization for 'SamplingWidget' - 'Sampling' has subclasses for different sampling methods. 'Drawing' and 'Sampling' can produce only one data point (sample) at a time and can take functions as arguments, which can cause difficulties in work with widgets. Therefore, they are wrapped in 'DrawingMethod' and 'SamplingMethod', those can produce a sequence of data points (samples).

In [None]:
# <api>
import numpy as np
from scipy.stats import multivariate_normal

In [None]:
# <api>
import os
path = os.getcwd()
s = '/'
pardir = s.join(path.split(s)[:-1])

In [None]:
# <api>
# Load source notebooks
from jupyter_cms.loader import load_notebook
trgt = load_notebook(str(pardir + '/widgets/Widget_targets.ipynb'))
mtd = load_notebook(str(pardir + '/widgets/Widget_methods.ipynb'))

## Sampling

In [None]:
# <api>
class Sampling (mtd.Drawing):
    """
    Abstract base class for all sampling methods.
    
    Subclasses need to implement self.sample(), self.reset_counters().
    """
    def make_point(self):
        sample(self)
    
    def sample(self):
        pass

### Metropolis-Hastings : Sampling

In [None]:
# <api>
class Proposer (object):
    """
    Wraps two functions needed by a proposer q, i.e.
      Draw a new value y ~ q.propose(x)
      Compute log transition probability q.log_trans_prob(x,y) 
    """
    def __init__(self, propose, log_trans_prob):
        self.propose = propose
        self.log_trans_prob = log_trans_prob
        
    def propose(self, x):
        return self.propose(x)
    
    def log_trans_prob(self, x, y):
        return self.log_trans_prob(x, y)

class MetropolisHastings (Sampling):
    def __init__(self, log_p, q, x):
        """
        q is assumed to be a proposer and log_p computes log p(x)
        """
        self.x = x # Current sample
        self.log_p = log_p
        self.q = q
        self.num_samples = 0
        self.accepted = 0
        
        
    def __str__ (self):
        return "Metropolis Hastings: Accepted %d out of %d samples" % (self.accepted, self.num_samples)       
    
    
    def sample (self):        
        self.num_samples += 1
        last_accepted = np.copy(self.x)
        # Propose new candidate
        x_prime = self.q.propose(self.x)
        a = self.log_p(x_prime) + self.q.log_trans_prob(x_prime, self.x) \
            - self.log_p(self.x) - self.q.log_trans_prob(self.x, x_prime)
        u = np.random.uniform()
        if np.log(u) < a:
            self.accepted += 1
            self.x = x_prime
        return [last_accepted, x_prime, [self.accepted, self.num_samples]]

### Slice sampling : Sampling

In [None]:
# <api>
class SliceSampling (Sampling):
    
    def __init__ (self, log_p, x, w):
        self.log_p = log_p
        self.x = x
        self.w = w
        self.samples = 0
        self.evals = 0
        
    def _log_p (self, x):
        self.evals += 1
        return self.log_p(x)
    
    def sample (self):
        self.samples += 1
        # Slice sampling as in MacKay pp. 375
        log_px = self._log_p(self.x)
        log_u_prime = np.log(np.random.uniform(low=0, high=np.exp(log_px)))
        # Create interval
        r = np.random.uniform()
        xl = self.x - r*self.w
        xr = self.x + (1-r)*self.w
        counter1, counter2, counter3 = 0, 0, 0
        
        while (self._log_p(xl) > log_u_prime): 
            xl -= self.w
        while (self._log_p(xr) > log_u_prime):
            xr += self.w
        # Main sampling loop
        while True:
            x_prime = np.random.uniform(low=xl, high=xr)
            log_px_prime = self._log_p(x_prime)
            if log_px_prime > log_u_prime:
                self.x = x_prime
                return x_prime # Found new sample
            else:
                # Adjust interval
                if x_prime > self.x:
                    xr = x_prime
                else:
                    xl = x_prime
    
        
    def __str__ (self):
        return "Slice sampling: %d evaluations for %d samples" % (self.evals, self.samples)

In [None]:
# <api>
class MultiSampling (Sampling):
    """
    Class that wraps a one-dimensional sampler and applies it to 
    a sequence of given directions to produce a new sample
    """
    def __init__(self, uni_sampler, log_p, x, directions):
        """
        uni_sampler is called with a log_p function and current sample x.
        It needs to return a valid sampler which is then asked to draw a sample.
        """
        self.uni_sampler = uni_sampler
        self.log_p = log_p
        self.x = x
        self.directions = directions
        
    def sample (self):
        accepted_point = np.copy(self.x)
        steps = [[accepted_point,accepted_point]]
        # Loop through directions
        for d in self.directions:
            uni_log_p = lambda ux: self.log_p(self.x + ux*d)
            ux_prime = self.uni_sampler(uni_log_p, 0).sample()
            self.x = self.x + ux_prime*d
            steps.append([accepted_point, self.x])
        return np.array(steps)

In [None]:
# <api>
class DetailedSliceSampling (SliceSampling):
    '''Slice sampling that shows a windows changes.'''
    
    def sample (self):
        self.samples += 1
        # Slice sampling as in MacKay pp. 375
        log_px = self._log_p(self.x)
        log_u_prime = np.log(np.random.uniform(low=0, high=np.exp(log_px)))
        # Create interval
        r = np.random.uniform()
        xl = self.x - r*self.w
        xr = self.x + (1-r)*self.w
        xls, xrs = [xl], [xr] 
        
        while (self._log_p(xl) > log_u_prime): 
            xl -= self.w
            xls.append(xl)
            xrs.append(xr)
        while (self._log_p(xr) > log_u_prime):
            xr += self.w
            xls.append(xl)
            xrs.append(xr)
        # Main sampling loop
        while True:
            x_prime = np.random.uniform(low=xl, high=xr)
            log_px_prime = self._log_p(x_prime)
            if log_px_prime > log_u_prime:
                self.x = x_prime
                return x_prime, xls, xrs # Found new sample
            else:
                # Adjust interval
                if x_prime > self.x:
                    xr = x_prime
                    xls.append(xl)
                    xrs.append(xr)
                else:
                    xl = x_prime
                    xls.append(xl)
                    xrs.append(xr)
    
        
    def __str__ (self):
        return "Slice sampling: %d evaluations for %d samples" % (self.evals, self.samples)

In [None]:
# <api>
class DetailedMultiSampling (MultiSampling):
        
    def sample (self):
        accepted_point = np.copy(self.x)
        steps = np.array([[ accepted_point, accepted_point, accepted_point, accepted_point ]])
        # Loop through directions
        for d in self.directions:
            uni_log_p = lambda ux: self.log_p(self.x + ux*d)
            ux_prime, xls, xrs = self.uni_sampler(uni_log_p, 0).sample()
            
            xls = np.array([xls, xls]).T*d
            xrs = np.array([xrs, xrs]).T*d
            for i in range(len(xls)):
                step = np.array([[ accepted_point, np.copy(self.x), 
                                  xls[i]+np.copy(self.x), xrs[i]+np.copy(self.x)]])
                steps = np.concatenate((steps, step), axis=0)
            self.x = self.x + ux_prime*d
        return steps

### HMC : Sampling

In [None]:
# <api>
class HMC (Sampling):
    def __init__(self, x, E, dE_dx=None, Tau=42, dtau=0.04):
        self.x = x
        self.E = E        
        self.dE_dx = lambda x: dE_dx(x)
        self.Tau = Tau
        self.dtau = dtau
        self.num_samples = 0
        self.accepted = 0
        
    def integrate_H(self, xp, dE_dx, T, dt, method='leap_frog'):
        """
        Integrate the state xp for some time with the Hamiltonian H(x) = E(x) + K(x) 
        """
        x,p = np.copy(xp)
        int_steps = []
        t = 0
        while (t < T):
            t += dt
            if method=='euler':
                gradE = dE_dx(x)
                x += p*dt
                p += - gradE*dt
            elif method=='leap_frog':
                p += - dE_dx(x)*dt/2
                x += p*dt
                p += - dE_dx(x)*dt/2
            else:
                error('Unknown integration method')
            int_steps.append([np.copy(x),np.copy(p)])
        return np.array(int_steps)
        
    def _H (self, x, p):
        return self.E(x) + 0.5*np.dot(p.T, p)
    
    
    def sample (self):
        self.num_samples += 1
        # Gibbs step for momentum
        x = self.x
        p = np.random.normal(size=x.shape)
        H = self._H(x, p)
        
        # Simulate dynamics
        int_H = self.integrate_H((x,p), self.dE_dx, self.Tau*self.dtau, self.dtau, 'leap_frog')
        xnew, pnew = int_H[-1]
        Hnew = self._H(xnew, pnew)
        
        # Collect all integration steps with the start point
        out = []
        intX = int_H[:,0]
        for i in intX:
            out.append([self.x, i, [self.accepted, self.num_samples]])
        
        # Metropolis step        
        if np.log(np.random.uniform()) < H - Hnew: # Remember: H = - logp
            self.x = xnew 
            self.accepted += 1
        else:
            out = [[self.x, self.x, [self.accepted, self.num_samples]]]
        return np.array(out)   
        

## SamplingMethod

In [None]:
#<api>
class SamplingMethod(mtd.DrawingMethod):
    '''A clas of different sampling methods for SamplingWidget.'''  
          
    def draw(self, n, start_point=None):        
        self.sampling.reset_counters()
        self.sampling.reset_start(size=self.target.get_size(), x=start_point)
        self.data = np.array([self.sampling.sample() for _ in range(n)])        
        return {'accepted points' : self.data}
    
    def __str__(self):
        return self.sampling.__str__()
    
    def set_param(self, param_dict):
        '''Allows to set additional sampling parameters, given from SamplingExtraWidgets object.'''
        for i in param_dict:
            if i =='N':
                self.N = param_dict['N']
            elif i=='proposal step':
                self.prop_step = param_dict['proposal step']
            elif i=='width':
                self.w = param_dict['width']
            elif i=='direct':
                self.direct = param_dict['direct']
            elif i=='Tau':
                self.Tau = param_dict['Tau']
            elif i=='dtau':
                self.dtau = param_dict['dtau']

### Metropolis-Hastings : SamplingMethod

In [None]:
#<api>        
class MHS (SamplingMethod):
    '''Metropolis-Hastings sampling'''
    def __init__(self, target=trgt.MultNorm(), prop_step=50):         
        self.target = target
        self.prop_step = prop_step
        self.reset_sampling()
        
    def draw(self, n, start_point=None):
        self.sampling.reset_counters()
        self.sampling.reset_start(size=self.target.get_size(), x=start_point)
        self.data = np.array([self.sampling.sample() for _ in range(n)])        
        return {'accepted points' : self.data[:,0], 'proposed points' : self.data[:,1],
                'accepted number' : self.data[:,2,0], 'proposed number' : self.data[:,2,1] }
    
    def reset_sampling(self):
        self.sampling = MetropolisHastings(log_p=self.target.value, 
                                           q=Proposer(lambda x: multivariate_normal(mean=x, 
                                                      cov=self.prop_step*np.eye(2)).rvs(),
                                                      lambda x,y: 0), # Proposal is symmetric
                                           x=[0,0])

### Slice sampling : SamplingMethod

In [None]:
#<api>        
class MSS (SamplingMethod):
    '''Multi Slice Sampling with separate steps in different diections.'''
    
    def __init__(self, target=trgt.MultNorm(), w=1, direct='Straight'):         
        self.target = target
        self.w = w
        self.direct = direct        
        self.reset_sampling()       
        
    def draw(self, n, start_point=None):        
        self.sampling.reset_counters()
        self.sampling.reset_start(size=self.target.get_size(), x=start_point)
        self.data = np.array([self.sampling.sample() for _ in range(n)])
        conc = np.array(self.data[0])
        for i in range(1,len(self.data)):    
            conc = np.concatenate((conc,self.data[i]), axis=0) 
        self.data = np.copy(conc)
        return {'accepted points' : self.data[:,0], 'trajectories' : self.data[:,1]}        
        
    def reset_sampling(self):
        if (self.direct=='Straight'):
            self.directions=[np.array([1,0]), np.array([0,1])]
        elif (self.direct=='Skew'):
            self.directions=[np.array([1,1]), np.array([1,-1])]
            
        self.sampling = MultiSampling(lambda log_p, x: SliceSampling(log_p, x, w=self.w), \
                         log_p=self.target.value, \
                         x=[0,0], \
                         directions=self.directions)

In [None]:
#<api>        
class MSSD (MSS):
    '''Detailed Multi Slice Sampling with separate steps in different diections 
    and windows changes.'''      
        
    def draw(self, n, start_point=None):        
        self.sampling.reset_counters()
        self.sampling.reset_start(size=self.target.get_size(), x=start_point)
        self.data = np.array([self.sampling.sample() for _ in range(n)])
        conc = np.array(self.data[0])
        for i in range(1,len(self.data)):    
            conc = np.concatenate((conc, self.data[i]), axis=0) 
        self.data = np.copy(conc)
        return {'accepted points' : self.data[:,0], 'trajectories' : self.data[:,1],
               'xls' : self.data[:,2], 'xrs' : self.data[:,3]}        
        
    def reset_sampling(self):
        if (self.direct=='Straight'):
            self.directions=[np.array([1,0]), np.array([0,1])]
        elif (self.direct=='Skew'):
            self.directions=[np.array([1,1]), np.array([1,-1])]
            
        self.sampling = DetailedMultiSampling(lambda log_p, x: DetailedSliceSampling(log_p, x, w=self.w), \
                         log_p=self.target.value, \
                         x=[0,0], \
                         directions=self.directions)

### HMC : SamplingMethod

In [None]:
#<api>        
class HMCS (SamplingMethod):
    '''Hamoltonian Monte-Carlo Sampling with integration steps'''
    def __init__(self, target=trgt.MultNorm(), dE_dx=None, Tau=42, dtau=0.04):         
        self.target = target        
        self.Tau = Tau
        self.dtau = dtau        
        self.reset_sampling()          
        
    def reset_sampling(self):
        self.dE_dx = lambda x: - self.target.grad(x)
        self.sampling = HMC(x=[0,0], E=lambda x: - self.target.value(x), dE_dx=self.dE_dx, 
                            Tau=self.Tau, dtau=self.dtau)
        
    def draw(self, n, start_point=None): 
        self.sampling.reset_counters()
        self.sampling.reset_start(size=self.target.get_size(), x=start_point)
        self.data = np.array([self.sampling.sample() for _ in range(n)])  
        conc = np.array(self.data[0])
        for i in range(1,len(self.data)):    
            conc = np.concatenate((conc,self.data[i]), axis=0) 
        self.data = np.copy(conc)
        conc = None
        return {'accepted points' : self.data[:,0], 'trajectories' : self.data[:,1],
                'accepted number' : self.data[:,2,0], 'proposed number' : self.data[:,2,1] }
    
    def __str__(self):
        return "HMC: Accepted %d out of %d samples" % (self.sampling.accepted, self.sampling.num_samples)    
    