This document briefly explains classes 'MCMC_MH', 'MCMC_Diag', 'MCMC_Gibbs', and 'NewtonUnconstrained'.

This is written based on 6ebb6581dc296ae43d5824bd78d12d934f3e4e84 (Mar 5, 2022) of letsjdosth/intermediateBayesInfer on github.

Link: https://github.com/letsjdosth/intermediateBayesInfer/tree/main/HW4


Dependency

- Python 3.8.7 64 bit
    - numpy 1.19.5
    - matplotlib 3.3.3
    - (for newton method,) scipy 1.6.0

In [None]:
# python standard libraries
import time
import csv
from math import log
from random import seed, uniform
from statistics import mean, variance

# other libraries
import numpy as np
import matplotlib.pyplot as plt

### class MCMC_MH_Core.MCMC_MH(log_target_pdf, log_proposal_pdf, proposal_sampler, initial, random_seed)
An object for MCMC Metropolis-Hastings algorithm.

Arguments for constructor '\_\_init\_\_'

- log_target_pdf(eval_pt, ...) and log_proposal_pdf(from_smpl, to_smpl) should be function objects returning corresponding 'float' values as their name.
- proposal_sampler(last, ...) should be a function object to return a list- or tuple-type sample under the condition that last sample is 'last'
- initial: list. initial value for MCMC-MH iteration.
- random seed: random seed (to replicate a result).

Methods

- MCMC_MH.sampler()
    - Main algorithm of MCMC-MH. It runs one iteration of MCMC-MH using functions that are given by constructor.
- MCMC_MH.generate_samples(num_samples, pid=None, verbose=True, print_iter_cycle=500)
    - It iterates the 'sampler' function with printing about some iteration information to console.
- MCMC_MH.log_r_calculator(candid, last)
    - Acceptance ratio. it is $log\frac{(f(x)q(x|x_{t-1})}{f(x_{t-1})q(x_{t-1}|x)}$ where f: target density, q: proposal density
    - This is used in sampler() method. Do not use it directly.
- MCMC_MH.write_samples(filename)
    - It saves drawn samples to csv file.

Attributes

- MCMC_MH.MC_sample
    - MCMC sample list
- MCMC_MH.num_total_iters
    - total number of iteration
- MCMC_MH.num_accept
    - total number of acceptance
- MCMC_MH.random_seed
    - random seed set by constructor

In [None]:
class MCMC_MH:
    def __init__(self, log_target_pdf, log_proposal_pdf, proposal_sampler, initial, random_seed):
        self.log_target_pdf = log_target_pdf #arg (smpl)
        self.log_proposal_pdf = log_proposal_pdf #arg (from_smpl, to_smpl)
        self.proposal_sampler = proposal_sampler #function with argument (smpl)
        
        self.initial = initial
        
        self.MC_sample = [initial]

        self.num_total_iters = 0
        self.num_accept = 0

        self.random_seed = random_seed
        seed(random_seed)
        

    def log_r_calculator(self, candid, last):
        log_r = (self.log_target_pdf(candid) - self.log_proposal_pdf(from_smpl=last, to_smpl=candid) - \
             self.log_target_pdf(last) + self.log_proposal_pdf(from_smpl=candid, to_smpl=last))
        return log_r

    def sampler(self):
        last = self.MC_sample[-1]
        candid = self.proposal_sampler(last) #기존 state 집어넣게
        unif_sample = uniform(0, 1)
        log_r = self.log_r_calculator(candid, last)
        # print(log(unif_sample), log_r) #for debug
        if log(unif_sample) < log_r:
            self.MC_sample.append(candid)
            self.num_total_iters += 1
            self.num_accept += 1
        else:
            self.MC_sample.append(last)
            self.num_total_iters += 1

    def generate_samples(self, num_samples, pid=None, verbose=True, print_iter_cycle=500):
        start_time = time.time()
        for i in range(num_samples):
            self.sampler()
            
            if i==100 and verbose:
                elap_time_head_iter = time.time()-start_time
                estimated_time = (num_samples/100)*elap_time_head_iter
                print("estimated running time: ", estimated_time//60, "min ", estimated_time%60, "sec")

            if i%print_iter_cycle == 0 and verbose and pid is not None:
                print("pid:",pid," iteration", i, "/", num_samples)
            elif i%print_iter_cycle == 0 and verbose and pid is None:
                print("iteration", i, "/", num_samples)
        elap_time = time.time()-start_time
        
        if pid is not None and verbose:
            print("pid:",pid, "iteration", num_samples, "/", num_samples, " done! (elapsed time for execution: ", elap_time//60,"min ", elap_time%60,"sec)")
            print("acceptance rate: ", round(self.num_accept / self.num_total_iters, 4))
        elif pid is None and verbose:
            print("iteration", num_samples, "/", num_samples, " done! (elapsed time for execution: ", elap_time//60,"min ", elap_time%60,"sec)")
            print("acceptance rate: ", round(self.num_accept / self.num_total_iters, 4))

        
    def write_samples(self, filename: str):
        with open(filename + '.csv', 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            for sample in self.MC_sample:
                csv_row = sample
                writer.writerow(csv_row)

### class MCMC_MH_Core.MCMC_Diag()
An object to diagnose samples generated from MCMC Metropolis-Hastings algorithm.


Methods

- Setters:
    - MCMC_Diag.set_mc_samples_from_list(mc_sample)
        - set MCMC samples to MCMC_Diag.MC_sample from a python list (or list-like objects)
    - MCMC_Diag.set_mc_sample_from_MCMC_MH(inst_MCMC_MH)    
        - set MCMC samples to MCMC_Diag.MC_sample from a MCMC_MH object (or any object that has an attribute named 'MC_sample')
    - MCMC_Diag.set_mc_sample_from_csv(file_name)
        - set MCMC samples to MCMC_Diag.MC_sample from a csv file

- Writers:
    - MCMC_Diag.write_samples(filename)    
        - It saves MCMC_Diag.MC_sample list as a csv file

- MCMC post-worker
    - MCMC_Diag.burnin(num_burn_in)
        - cut first 'num_bun_in' samples
    - MCMC_Diag.thinning(lag)
        - pick every one sample out of 'lag' and save it to MCMC_Diag.MC_sample

- Getters:
    - MCMC_Diag.get_specific_dim_samples(dim_idx)
        - get a list of designated dimension's marginal MC_samples
    - MCMC_Diag.get_sample_mean(self)
        - get a sample mean of each dimension of MC_samples
    - MCMC_Diag.get_sample_var(self)
        - get a sample variance of each dimension of MC_samples
    - MCMC_Diag.get_sample_quantile(quantile_list)
        - quantile_list should be python list (or list-like object)
        - get sample quantiles of each dimension of MC_samples
    - MCMC_Diag.get_autocorr(dim_idx, maxLag)
        - get a sample autocorrelation list (lag 1~maxLag) of designated dimension
    - MCMC_Diag.effective_sample_size(dim_idx, sum_lags=30)
        - get the effective sample size. It is implemented following the formula of BDA3(Gelman et al,), setting the number of chains to 1.

- Visualizers
    - MCMC_Diag.show_traceplot_specific_dim(dim_idx, show=False)
    - MCMC_Diag.show_traceplot(figure_grid_dim, show=True)
    - MCMC_Diag.show_hist_specific_dim(dim_idx, show=False)
    - MCMC_Diag.show_hist(figure_grid_dim, show=True)
    - MCMC_Diag.show_acf_specific_dim(dim_idx, maxLag, show=False)
    - MCMC_Diag.show_acf(maxLag, figure_grid_dim, show=True)
    - MCMC_Diag.show_scatterplot(dim_idx_horizontal, dim_idx_vertical, show=True)
    

Attributes

- self.MC_sample: MCMC samples that set in the instance
- self.num_dim: sample dimension

In [None]:
class MCMC_Diag:
    def __init__(self):
        self.MC_sample = []
        self.num_dim = None
    
    def set_mc_samples_from_list(self, mc_sample):
        self.MC_sample = mc_sample
        self.num_dim = len(mc_sample[0])
    
    def set_mc_sample_from_MCMC_MH(self, inst_MCMC_MH):
        self.MC_sample = inst_MCMC_MH.MC_sample
        self.num_dim = len(inst_MCMC_MH.MC_sample[0])

    def set_mc_sample_from_csv(self, file_name):
        with open(file_name + '.csv', 'r', newline='', encoding='utf-8') as csvfile:
            reader = csv.reader(csvfile)
            for csv_row in reader:
                csv_row = [float(elem) for elem in csv_row]
                self.MC_sample(csv_row)
        self.num_dim = len(self.MC_sample[0])

    def write_samples(self, filename: str):
        with open(filename + '.csv', 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            for sample in self.MC_sample:
                csv_row = sample
                writer.writerow(csv_row)

    def burnin(self, num_burn_in):
        self.MC_sample = self.MC_sample[num_burn_in-1:]

    def thinning(self, lag):
        self.MC_sample = self.MC_sample[::lag]
    
    def get_specific_dim_samples(self, dim_idx):
        if dim_idx >= self.num_dim:
            raise ValueError("dimension index should be lower than number of dimension. note that index starts at 0")
        return [smpl[dim_idx] for smpl in self.MC_sample]
    
    def get_sample_mean(self):
        mean_vec = []
        for i in range(self.num_dim):
            ith_dim_samples = self.get_specific_dim_samples(i)
            mean_vec.append(mean(ith_dim_samples))
        return mean_vec

    def get_sample_var(self):
        var_vec = []
        for i in range(self.num_dim):
            ith_dim_samples = self.get_specific_dim_samples(i)
            var_vec.append(variance(ith_dim_samples))
        return var_vec

    def get_sample_quantile(self, quantile_list):
        quantile_vec = []
        for i in range(self.num_dim):
            ith_dim_samples = self.get_specific_dim_samples(i)
            quantiles = [np.quantile(ith_dim_samples, q) for q in quantile_list]
            quantile_vec.append(quantiles)
        return quantile_vec

    def show_traceplot_specific_dim(self, dim_idx, show=False):
        traceplot_data = self.get_specific_dim_samples(dim_idx)
        plt.ylabel(str(dim_idx)+"th dim")
        plt.plot(range(len(traceplot_data)), traceplot_data)
        if show:
            plt.show()

    def show_traceplot(self, figure_grid_dim, show=True):
        grid_column= figure_grid_dim[0]
        grid_row = figure_grid_dim[1]

        plt.figure(figsize=(5*grid_column, 3*grid_row))
        for i in range(self.num_dim):
            plt.subplot(grid_row, grid_column, i+1)
            self.show_traceplot_specific_dim(i)
        if show:
            plt.show()

    
    def show_hist_specific_dim(self, dim_idx, show=False):
        hist_data = self.get_specific_dim_samples(dim_idx)
        plt.ylabel(str(dim_idx)+"th dim")
        plt.hist(hist_data, bins=100)
        if show:
            plt.show()

    def show_hist(self, figure_grid_dim, show=True):
        grid_column= figure_grid_dim[0]
        grid_row = figure_grid_dim[1]
       
        plt.figure(figsize=(5*grid_column, 3*grid_row))
        for i in range(self.num_dim):
            plt.subplot(grid_row, grid_column, i+1)
            self.show_hist_specific_dim(i)
        if show:
            plt.show()

    def get_autocorr(self, dim_idx, maxLag):
        y = self.get_specific_dim_samples(dim_idx)
        acf = []
        y_mean = mean(y)
        y = [elem - y_mean  for elem in y]
        n_var = sum([elem**2 for elem in y])
        for k in range(maxLag+1):
            N = len(y)-k
            n_cov_term = 0
            for i in range(N):
                n_cov_term += y[i]*y[i+k]
            acf.append(n_cov_term / n_var)
        return acf
    
    def show_acf_specific_dim(self, dim_idx, maxLag, show=False):
        grid = [i for i in range(maxLag+1)]
        acf = self.get_autocorr(dim_idx, maxLag)
        plt.ylim([-1,1])
        plt.ylabel(str(dim_idx)+"th dim")
        plt.bar(grid, acf, width=0.3)
        plt.axhline(0, color="black", linewidth=0.8)
        if show:
            plt.show()

    def show_acf(self, maxLag, figure_grid_dim, show=True):
        grid_column= figure_grid_dim[0]
        grid_row = figure_grid_dim[1]

        plt.figure(figsize=(5*grid_column, 3*grid_row))
        for i in range(self.num_dim):
            plt.subplot(grid_row, grid_column, i+1)
            self.show_acf_specific_dim(i, maxLag)
        if show:
            plt.show()
    
    def show_scatterplot(self, dim_idx_horizontal, dim_idx_vertical, show=True):
        x = self.get_specific_dim_samples(dim_idx_horizontal)
        y = self.get_specific_dim_samples(dim_idx_vertical)
        plt.scatter(x, y)
        if show:
            plt.show()
    
    def effective_sample_size(self, dim_idx, sum_lags=30):
        n = len(self.MC_sample)
        auto_corr = self.get_autocorr(dim_idx, sum_lags)
        ess = n / (1 + 2*sum(auto_corr))
        return ess

### class MCMC_Gibbs_Core.MCMC_Gibbs(initial)

I tried to design this class as a high-abstract-level tool for a gibbs sampler,
but, I failed to make this class to be a satisfactory level, because the structure of full-conditionals are hard to generalize.
As a result, it consists of only parts of the MCMC_MH class for now.

So, I skip the explanation for this class's methods and attributes. see the MCMC_MH part.

In [None]:

class MCMC_Gibbs:
    def __init__(self, initial):
        self.MC_sample = [initial]

    def gibbs_sampler(self):
        last = self.MC_sample[-1]
        new = [x for x in last] #[nu, theta]
        #update new

        self.MC_sample.append(new)

    
    def full_conditional_sampler(self, last_param):
        new_sample = [x for x in last_param]
        #update new

        return new_sample


    def generate_samples(self, num_samples, pid=None, verbose=True, print_iter_cycle=500):
        start_time = time.time()
        for i in range(1, num_samples):
            self.gibbs_sampler()
            
            if i==100 and verbose:
                elap_time_head_iter = time.time()-start_time
                estimated_time = (num_samples/100)*elap_time_head_iter
                print("estimated running time: ", estimated_time//60, "min ", estimated_time%60, "sec")

            if i%print_iter_cycle == 0 and verbose and pid is not None:
                print("pid:",pid," iteration", i, "/", num_samples)
            elif i%print_iter_cycle == 0 and verbose and pid is None:
                print("iteration", i, "/", num_samples)
        elap_time = time.time()-start_time
        
        if pid is not None and verbose:
            print("pid:",pid, "iteration", num_samples, "/", num_samples, " done! (elapsed time for execution: ", elap_time//60,"min ", elap_time%60,"sec)")
        elif pid is None and verbose:
            print("iteration", num_samples, "/", num_samples, " done! (elapsed time for execution: ", elap_time//60,"min ", elap_time%60,"sec)")

        
    def write_samples(self, filename: str):
        with open(filename + '.csv', 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            for sample in self.MC_sample:
                csv_row = sample
                writer.writerow(csv_row)




### class newton.NewtonUnconstrained(fn_objective, fn_objective_gradient, fn_objective_hessian, fn_objective_domain_indicator = None)

This class is an implementation of the Newton method with the backtracking line search (using Cholesky decomposition / pseudo inverse).
The reference is: BBV chap 9.5-7. Unconstrained minimization: Newton's method, Self-concordance, Implementation

This is part of https://github.com/letsjdosth/convexOptim
(I implemented some optimization algorithms last winter, cause I had nothing to do during winter break (no friend, no money, etc..))

Constructor arguments

- fn_objective(x)
    - Python function object. x is numpy.array(-like) type
    - R^n to R^1 function. Return objective function value at x
- fn_objective_gradient(x)
    - Python function object. x is numpy.array(-like) type
    - R^n to R^n function. Return gradient vector at x, as a numpy array type
- fn_objective_hessian(x)
    - Python function object. x is numpy.array(-like) type
    - R^n to R^(n*n). Return Hessian matrix at x, as a numpy array
- fn_objective_domain_indicator = None
    - Python function object. x is numpy.array(-like) type
    - Optional, if you want to restrict the domain as a subset of R^n, set a indicator (1: x is in the domain, 0: otherwise)

Methods

- Hidden methods:
    - NewtonUnconstrained._Rn_domain_indicator(eval_pt)
    - NewtonUnconstrained._backtracking_line_search(eval_pt, descent_direction, a_slope_flatter_ratio, b_step_shorten_ratio)
    - NewtonUnconstrained._l2_norm(vec)
    - NewtonUnconstrained._descent_direction_newton_cholesky(eval_pt)
    - NewtonUnconstrained._descent_direction_newton_pinv(eval_pt)

- Runner:
    - NewtonUnconstrained.run_newton_with_backtracking_line_search(starting_pt, tolerance = 0.001, method="cholesky", a_slope_flatter_ratio = 0.2, b_step_shorten_ratio = 0.5)
        - It runs the newton method with backtracking line search. 
        - You may choose the matrix-inversion method among 'cholesky'(Cholesky decomposition) and 'pinv'(Pseudo inverse).
        - starting_pt is a starting point. Note that this algorithm only guarantee a local minimum, not the global minimum, so the starting point is important.
        - tolerance is the termination condition. Less tolerance gives us more accurate minimum but makes get harder to stop.
        - a_slope_flatter_ratio should be 0 < a < 0.5", and b_step_shorten_ratio should be 0 < b < 1. These are used for backtracking line search algorithm.

- Getters:
    - NewtonUnconstrained.get_minimizing_sequence()
        - It returns the sequence of points in the domain in each newton step. THe first element is initial value, and the last element is local minimum point.
    - NewtonUnconstrained.get_minimizing_function_value_sequence()
        - It returns the sequence of (target, minimizing) function value at each newton step. the last element is a local minimum value.
    - NewtonUnconstrained.get_decrement_sequence()
        - It reterns the sequence of decrement values. (It can be used as a measure of distance between a local minimum and the newton step point now.)
    - NewtonUnconstrained.get_arg_min()
        - It returns the last element of the minimizing sequence.
    - NewtonUnconstrained.get_min()
        - It returns the last element of the minimizing function value sequence.


In [None]:
import numpy as np
import scipy.linalg


class NewtonUnconstrained:
    def __init__(self, fn_objective, fn_objective_gradient, fn_objective_hessian, fn_objective_domain_indicator = None):
        
        self.objective = fn_objective
        self.objective_gradient = fn_objective_gradient
        self.objective_hessian = fn_objective_hessian
        
        
        if fn_objective_domain_indicator is not None:
            self.objective_domain_indicator = fn_objective_domain_indicator
        else:
            self.objective_domain_indicator = self._Rn_domain_indicator
        
        self.minimizing_sequence = []
        self.decrement_sequence = []
        self.value_sequence = []
        

    def _Rn_domain_indicator(self, eval_pt):
        return True

    def _backtracking_line_search(self, eval_pt, descent_direction, 
            a_slope_flatter_ratio, b_step_shorten_ratio):

        if a_slope_flatter_ratio <= 0 or a_slope_flatter_ratio >= 0.5:
            raise ValueError("a should be 0 < a < 0.5")
        if b_step_shorten_ratio <= 0 or b_step_shorten_ratio >= 1:
            raise ValueError("b should be 0 < a < 1")

        step_size = 1

        while True:
            flatten_line_slope = self.objective_gradient(eval_pt) * a_slope_flatter_ratio * step_size
            deviation_vec = descent_direction * step_size

            objective_fn_value = self.objective(eval_pt + deviation_vec)
            flatten_line_value = self.objective(eval_pt) + sum(flatten_line_slope * deviation_vec)

            if objective_fn_value < flatten_line_value and self.objective_domain_indicator(eval_pt + deviation_vec):
                break
            else:
                step_size = step_size * b_step_shorten_ratio

        return step_size
    
    def _l2_norm(self, vec):
        return (sum(vec**2))**0.5
    
    def _descent_direction_newton_cholesky(self, eval_pt):
        hessian = self.objective_hessian(eval_pt)
        neg_gradient = self.objective_gradient(eval_pt) * (-1)
        cholesky_lowertri_of_hessian = scipy.linalg.cholesky(hessian, lower=True) # L ; H = L(L^T)
        #want: x; Hx = -g
        forward_variable = scipy.linalg.solve_triangular(cholesky_lowertri_of_hessian, neg_gradient, lower=True) # w ; Lw = -g
        newton_decrement = self._l2_norm(forward_variable)
        direction_newton_step = scipy.linalg.solve_triangular(np.transpose(cholesky_lowertri_of_hessian), forward_variable, lower=False) # x ; (L^t)x = w
        return (direction_newton_step, newton_decrement)

    # later: sparse / band hessian version

    def _descent_direction_newton_pinv(self, eval_pt):
        hessian = self.objective_hessian(eval_pt)
        neg_gradient = self.objective_gradient(eval_pt) * (-1)
        direction_newton_step = np.matmul(np.linalg.pinv(hessian), neg_gradient) #pseudo
        newton_decrement = np.matmul(np.transpose(neg_gradient), direction_newton_step)
        return (direction_newton_step, newton_decrement)

    def run_newton_with_backtracking_line_search(self, starting_pt, tolerance = 0.001, 
                                                method="cholesky", 
                                                a_slope_flatter_ratio = 0.2, b_step_shorten_ratio = 0.5):
        #method : cholesky, pinv (if hessian is singular)
        self.minimizing_sequence = [starting_pt]
        self.value_sequence = [self.objective(starting_pt)]
        self.decrement_sequence = []
        num_iter = 0
        while True:
            eval_pt = self.minimizing_sequence[-1]
            if method == "cholesky":
                descent_direction, decrement = self._descent_direction_newton_cholesky(eval_pt)
            elif method == "pinv":
                descent_direction, decrement = self._descent_direction_newton_pinv(eval_pt)
            else:
                raise ValueError("method should be ['cholesky', 'pinv']")
            self.decrement_sequence.append(decrement)

            if (decrement**2) < (tolerance*2):
                break

            descent_step_size = self._backtracking_line_search(eval_pt, descent_direction, 
                                    a_slope_flatter_ratio, b_step_shorten_ratio)
            next_point = eval_pt + descent_direction * descent_step_size
            self.minimizing_sequence.append(next_point)
            self.value_sequence.append(self.objective(next_point))
            num_iter += 1

        print("iteration: ", num_iter)
    
    def get_minimizing_sequence(self):
        return self.minimizing_sequence
    
    def get_minimizing_function_value_sequence(self):
        return self.value_sequence

    def get_decrement_sequence(self):
        return self.decrement_sequence

    def get_arg_min(self):
        return self.minimizing_sequence[-1]

    def get_min(self):
        return self.objective(self.minimizing_sequence[-1])



In [None]:
#Examples
#test 1
def test_objective1(vec_2dim, gamma = 2):
    val = 0.5 * (vec_2dim[0]**2 + gamma * (vec_2dim[1]**2))
    return np.array(val)

def test_objective1_gradient(vec_2dim, gamma = 2):
    grad = (vec_2dim[0], vec_2dim[1] * gamma)
    return np.array(grad)

def test_objective1_hessian(vec_2dim, gamma = 2):
    hessian = [[1, 0],
                [0, gamma]]
    return np.array(hessian)

test_newton_inst1 = NewtonUnconstrained(
    test_objective1, 
    test_objective1_gradient, 
    test_objective1_hessian)
test_newton_inst1.run_newton_with_backtracking_line_search(np.array([1000, 150]), method="cholesky")
print(test_newton_inst1.get_minimizing_sequence())
print(test_newton_inst1.get_decrement_sequence())
print(test_newton_inst1.get_minimizing_function_value_sequence())

