In [4]:
%%capture output
%pip install numpy
%pip install matplotlib
%pip install torch

![images](images/GreenMaskAlg1.png)
![images](images/GreenMaskAlg2.png)

In [165]:
import torch
from torch.distributions.normal import Normal


class SMC():
    def __init__(self, n_threads: int, n_iterations: int):
        self.n_threads = n_threads
        self.n_iterations = n_iterations
        self.theta = [self.initial_theta(loc=0, scale=1)]
        self.weights = [self.initial_weights()]
        self.estimate = []
        print(f'smc initialised, use run_smc to run all iterations')
        return
    
    def target_dist_prob(self, th):
        normal = Normal(torch.tensor([5]),torch.tensor([0.5]))
        return torch.exp(normal.log_prob(th))

    def initial_weights(self):
        normal = Normal(torch.tensor([0]),torch.tensor([1]))
        # maybe this is stupid but idk why theres no way of getting prob
        # only thing that i can get returned is log prob instead
        prob_q = (torch.exp(normal.log_prob(self.theta[-1])))

        prob_target = self.target_dist_prob(self.theta[-1])
   
        return prob_target/prob_q
    
    def initial_theta(self, loc, scale):
        # maybe this is stupid but idk why theres no
        return torch.normal(loc, scale,size=(1,self.n_threads))[-1]
    
    def normalise_weights_(self):
        self.weights[-1] = self.weights[-1] / self.weights[-1].sum()
    
    def calc_neff(self):
        return 1 / (self.weights[-1] @ self.weights[-1].T)
    
    def new_weights_(self):
        target_new = self.target_dist_prob(self.theta[-1])
        target_old = self.target_dist_prob(self.theta[-2])
        self.weights.append(self.weights[-1] * (target_new/target_old))
        return
    
    def weighted_resample_(self):
        r"""
        in place chages theta and weights using a weighted resample
        """
        new_sample_indexes = torch.multinomial(self.weights[-1],self.n_threads,replacement=True)

        theta_hat = torch.tensor([self.theta[-1][i] for i in new_sample_indexes])
 
        self.theta[-1] = theta_hat
        self.weights[-1] = (torch.ones(1,self.n_threads)[-1])*(1/self.n_threads)

    def estimator_(self):
        self.estimate.append(self.theta[-1] @ self.weights[-1].T)

    def sample_(self):
        self.theta.append(self.theta[-1]+torch.normal(0,1,size=(1,self.n_threads))[-1])

    def run_smc(self):
        for _ in range(self.n_iterations):
            self.normalise_weights_()
            # estimate quals of interest
            self.estimator_()

            # assess if we need to resample
            neff = self.calc_neff()
            if neff<(self.n_threads/2):
                self.weighted_resample_()
            self.sample_()
            self.new_weights_()
        return

def main():
    n = 100
    t = 500
    smc_torch = SMC(n,t)
    smc_torch.run_smc()
    print(smc_torch.estimate)
    return

main()

smc initialised, use run_smc to run all iterations
[tensor(2.3095), tensor(4.5821), tensor(4.8225), tensor(4.9805), tensor(5.0322), tensor(4.9884), tensor(5.1005), tensor(4.9636), tensor(5.0626), tensor(5.0666), tensor(5.1806), tensor(5.1381), tensor(4.9563), tensor(4.8918), tensor(4.8024), tensor(4.8304), tensor(5.2082), tensor(5.0258), tensor(5.0727), tensor(5.1227), tensor(4.9057), tensor(4.8960), tensor(4.8588), tensor(5.1716), tensor(5.1688), tensor(4.9531), tensor(5.0655), tensor(4.9093), tensor(4.9689), tensor(4.9919), tensor(4.9802), tensor(5.0626), tensor(5.0606), tensor(4.6610), tensor(4.9523), tensor(4.9201), tensor(4.9721), tensor(5.1272), tensor(5.0233), tensor(5.1137), tensor(5.0153), tensor(4.9872), tensor(4.9189), tensor(5.0017), tensor(4.9830), tensor(4.8515), tensor(4.9191), tensor(5.0226), tensor(5.0212), tensor(5.1113), tensor(5.4241), tensor(5.2029), tensor(5.0763), tensor(5.1229), tensor(5.0432), tensor(4.9525), tensor(5.0229), tensor(4.6932), tensor(4.8810), tens