# Parallel MCMC
## Eléments logiciels pour le traitement des données massives

Nous avons choisi d'implémenter l'algorithme Independent Metropolis–Hastings (IMH) décrit dans l'article : https://arxiv.org/pdf/1010.1595v3.pdf en utilisant la fonction *Pool* du pkg *multiprocessing*

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from multiprocessing import Pool
import numpy as np
import matplotlib.pyplot as plt
from time import time, sleep
import random
import pandas as pd
import itertools
import numpy as np
from tqdm import tqdm

In [59]:
class IMH(object):
    """
    Class IMH
    #########
    
    Implementation of IMH (Independent Metropolis Hasting) algorithm explained in :
    << Using parallel computation to improve Independent Metropolis Hasting based estimation >>
    Jacob and al. 2011
    
    Arguments
    ---------
    - omega (function) : density function of the distribution to simulate
    - gen_candidate (function(N) ) : reurn an array of N values gen from the candidate distribution

    - x0 (float) : initialisation
    - N (int) : length of the markov chain
    - njobs (int) : 

    Methods
    -------
    - compute_permutation : Return the permuted index 1,2,...,n according to self.permutation method
    - fit_simple : implementation of the fundamental version of the IMH algorithm 
    - fit_block : implementation of the block version of the IMH algorithm 
    - fit : main method interface
    """
    
    def __init__(self, omega, gen_candidate) :
        self.omega = np.vectorize(omega)
        self.gen_candidate = gen_candidate

    def compute_permutation(self, n):

        l = list(range(1,n))
        if self.permutation == "same_order":
            return_list = [l]*self.njobs
        elif self.permutation == "random": 
            all_perm = list(itertools.permutations(l))
            return_list = list(random.sample(all_perm, k=self.njobs))
        else:
            return_list = l
        return(return_list)


    def fit_simple(self, i=None, b=0):

        n = self.y.shape[0] # either equal to : 
                            # - self.N (when method ='simple')
                            # - self.nblocks (when method ='parallel')
        if i == None :
            i = list(range(1,n)) # is the identity permutation
        
        i = [0]+list(i)
        # go
        current = 0
        weight = np.full(fill_value = 0, shape = n)
        for candidate in range(1, len(i)):
            ratio = min(1, self.omega_y[i[candidate],b]/self.omega_y[i[current],b])
            u = np.random.binomial(1,ratio)
            current += u*(candidate-current) # current is updated to candidate if u = 1 
                                             # and stay current if u = 0

            weight[i[current]] += 1 # add current value to the chain
        
        return weight

            
    def fit_block(self):
        
        weight = np.full(fill_value = 0, shape = self.y.shape)
        with Pool(self.njobs) as p:
            for b in range(self.B):

                perm_list = self.compute_permutation(self.njobs)

                weight_block = np.array(p.starmap(self.fit_simple, [(i,b) for i in perm_list]))
                weight[:,b] = weight_block.sum(axis = 0)

                if b < self.B-1 : # init the next block picking randomly in the current block
                    self.y[0,b+1] = np.random.choice(self.y[:,b], size=1, p= weight[:,b]/weight[:,b].sum())
                    self.omega_y[0,b+1] = self.omega(self.y[0,b+1])
        return weight


    def fit(self, x0, N, method = 'simple', B = 1, njobs = 1, permutation = 'same_order'):
        self.B = B
        self.nblocks = int(N/B)
        self.njobs = njobs
        self.N = N-N%pgcd(self.nblocks,njobs)        
        self.permutation = permutation

        if method == "simple":            
            # (1) creation of candidate sample
            self.y = np.reshape(self.gen_candidate(self.N-1), newshape=(self.N-1,1))
            # (2) add init value
            self.y = np.append([[x0]], self.y, axis = 0)
            # (3) computation of omega values
            self.omega_y = self.omega(self.y)
            # (4) computation of weight with IMH algo
            self.weights = self.fit_simple()
            
        elif method == 'parallel':
            # (1) creation of candidate sample
            self.y = np.array(Pool(njobs).map(self.gen_candidate,[int(self.N/njobs)]*njobs))
            # (3) computation of omega values
            self.omega_y = np.array(Pool(njobs).map(self.omega, list(self.y)))
            
            # reshape : this is usefull when nblocks!=njobs
            self.y = np.reshape(self.y, newshape=(self.nblocks, self.B))
            self.omega_y = np.reshape(self.omega_y,newshape=(self.nblocks, self.B))
            # (2) add init value
            self.y = np.append(np.full((1,self.B), x0), self.y, axis = 0)
            self.omega_y = np.append(np.full((1,self.B), self.omega(x0)), self.omega_y, axis = 0)
            
            # (4) computation of weight with IMH algo
            self.weights = self.fit_block() # computation of weight
        
        self.weights = np.reshape(self.weights,newshape=self.y.shape)
        self.expectation = np.average(self.y, weights= self.weights)
        
        return self.expectation

In [60]:
def pgcd(a,b):
    if b==0:
        return a
    else:
        return pgcd(b,a%b)

In [61]:
# Parameters

def omega(x):
    return((1+x**2)*np.exp(-x**2/2))

def gen_candidate(N):
    np.random.seed() # for seeding the generator
    return np.random.standard_cauchy(size = N)

In [62]:
IMH_computer  = IMH(omega, gen_candidate)

In [63]:
t = time()
IMH_computer.fit(x0 = 10, N = 4000, method='simple')
time() - t

0.06344914436340332

In [64]:
IMH_computer.N

4000

In [80]:
t = time()
IMH_computer.fit(x0 = 0, N = 4000, method='parallel', B=10, njobs=4, permutation='random')
time() - t

0.275432825088501

In [None]:


t = time()
IMH_computer.fit(x0 = 0, N = 400000, method='parallel', pblocks=40000,njobs=4, permutation='random')
time() - t

In [14]:
l_parallel =[]
for i in tqdm(range(40)) :
    l_parallel += [IMH_computer.fit(x0 = 0, N = 40, method='parallel', njobs=4, permutation='random') ]

 45%|████▌     | 18/40 [00:12<00:15,  1.41it/s]

OSError: [Errno 24] Too many open files

In [11]:
l_simple = [IMH_computer.fit(x0 = 0, N = 40, method='simple') for i in range(40)]

In [12]:
np.var(l_parallel)

0.04556111599979288

In [13]:
np.var(l_simple)

0.049156367128033174

In [None]:
class IMH(object):
    """
    Class IMH
    #########
    
    Implementation of IMH (Independent Metropolis Hasting) algorithm explained in :
    << Using parallel computation to improve Independent Metropolis Hasting based estimation >>
    Jacob and al. 2011
    
    Arguments
    ---------
    - omega (function) : density function of the distribution to simulate
    - gen_candidate (function(N) ) : reurn an array of N values gen from the candidate distribution

    - x0 (float) : initialisation
    - N (int) : length of the markov chain
    - njobs (int) : 

    Methods
    -------
    - compute_permutation : Return the permuted index 1,2,...,n according to self.permutation method
    - fit_simple : implementation of the fundamental version of the IMH algorithm 
    - fit_block : implementation of the block version of the IMH algorithm 
    - fit : main method interface
    """
    
    def __init__(self, omega, gen_candidate) :
        self.omega = np.vectorize(omega)
        self.gen_candidate = gen_candidate

    def compute_permutation(self, n):
        """
        """
        
        l = list(range(1,n))
        
        if self.permutation == "same_order":
            return_list = [l]*self.njobs

        elif self.permutation == "random": 
            all_perm = list(itertools.permutations(l))
            return_list = list(random.sample(all_perm, k=self.njobs))

        else:
            return_list = l
            
        return(return_list)


    def fit_simple(self, i=None, b=0):

        n = self.y.shape[0] # either equal to : 
                            # - self.N (when method ='simple')
                            # - self.njobs (when method ='parallel')
        if i == None :
            i = list(range(1,n)) # is the identity permutation
        
        i = [0]+list(i)
        # go
        current = 0
        weight = np.full(fill_value = 0, shape = n)
        for candidate in range(1, len(i)):
            ratio = min(1, self.omega_y[i[candidate],b]/self.omega_y[i[current],b])
            u = np.random.binomial(1,ratio)
            current += u*(candidate-current) # current is updated to candidate if u = 1 
                                             # and stay current if u = 0

            weight[i[current]] += 1 # add current value to the chain
        
        return weight

            
    def fit_block(self):
        
        weight = np.full(fill_value = 0, shape = self.y.shape)
        with Pool(self.njobs) as p:
            for b in range(self.B):

                perm_list = self.compute_permutation(self.y.shape[0])

                weight_block = np.array(p.starmap(self.fit_simple, [(i,b) for i in perm_list]))
                weight[:,b] = weight_block.sum(axis = 0)

                if b < self.B-1 : # init the next block picking randomly in the current block
                    self.y[0,b+1] = np.random.choice(self.y[:,b], size=1, p= weight[:,b]/weight[:,b].sum())
                    self.omega_y[0,b+1] = self.omega(self.y[0,b+1])
        
        return weight


    def fit(self, x0, N, method = 'simple', njobs = 1, permutation = 'same_order'):
        self.N = N - N%njobs # so that N/njobs is equal to int(N/njobs)
        self.B = int(N/njobs)
        self.njobs = njobs
        self.permutation = permutation

        if method == "simple":            
            # (1) creation of candidate sample
            self.y = np.reshape(self.gen_candidate(N-1), newshape=(N-1,1))
            # (2) add init value
            self.y = np.append([[x0]], self.y, axis = 0)
            # (3) computation of omega values
            self.omega_y = self.omega(self.y)
            # (4) computation of weight with IMH algo
            self.weights = self.fit_simple()
            
        elif method == 'parallel':
            # (1) creation of candidate sample
            self.y = np.array(Pool(njobs).map(self.gen_candidate,[self.B]*njobs))
            # (2) add init value
            self.y = np.append(np.full((1,self.B), x0), self.y, axis = 0)
            # (3) computation of omega values
            self.omega_y = np.array(Pool(njobs).map(self.omega, list(self.y)[1:]))
            self.omega_y = np.append(np.full((1,self.B), 0), self.omega_y, axis = 0)
            self.omega_y[0,0] = self.omega(self.y[0,0])
            # (4) computation of weight with IMH algo
            self.weights = self.fit_block() # computation of weight
        
        self.weights = np.reshape(self.weights,newshape=self.y.shape)
        self.expectation = np.average(self.y, weights= self.weights)
        
        return self.expectation