In [28]:
import torch
import torch.nn as nn
import numpy as np
import random
import Ising2D

In [2]:
def decimal_to_binary_tensor(value, width=0):
    string = format(value, '0{}b'.format(width))
    binary = [0 if c == '0' else 1 for c in string]
    return torch.tensor(binary, dtype=torch.float)

In [3]:
class Ising_model():
    
    # initiate the size of 2d lattice, the (inverse-)temperature, interactions
    def __init__(self, num_rows, num_cols, beta):
        self.num_rows, self.num_cols = num_rows, num_cols
        self.num_spins = num_rows * num_cols
        self.beta = beta
        self.J = self.interactions()
    
    # find nearest neighbors in 2d lattice
    def neighbors(self, i, j):
        nhb = []
        if i > 0:
            nhb.append([i-1, j])
        if i < self.num_rows - 1:
            nhb.append([i+1, j])
        if j > 0:
            nhb.append([i, j-1])
        if j < self.num_cols - 1:
            nhb.append([i, j+1])
        return nhb
    
    # ferromagnetic interactions only with nearest neighbors
    def interactions(self):
        J = torch.zeros(self.num_spins, self.num_spins)
        for i in range(self.num_rows):
            for j in range(self.num_cols):
                for i_nhb, j_nhb in self.neighbors(i, j):
                    J[self.num_rows * i + j, self.num_rows * i_nhb + j_nhb] = 1                
        return J
    
    # calculate Hamiltonian of Ising model for a given spin state
    def hamiltonian(self, spin_state):
        s = spin_state
        J = self.J
        H = - torch.matmul(s, torch.matmul(J, s)) / 2
        return H
    
    # calculate the expectation value of the Energy of Ising model with Boltzmann distribution
    def energy_expectation(self):
        num_state = 2**(self.num_spins)
        Z = 0.
        E_true = 0.
        for i_state in range(num_state):
            binary = decimal_to_binary_tensor(i_state, width=self.num_spins)
            spin_state = 1 - 2 * binary
            H = self.hamiltonian(spin_state)
            Z += torch.exp(-self.beta * H)
        for i_state in range(num_state):
            binary = decimal_to_binary_tensor(i_state, width=self.num_spins)
            spin_state = 1 - 2 * binary
            H = self.hamiltonian(spin_state)
            E_true += H * torch.exp(- self.beta * H) / Z
        return E_true

In [20]:
###################################### made by Chae-Yeon
class IsingSampler:
    def __init__(self, n, m, beta):
        self.model = Ising2D.Ising2D(n,m)
        self.sampler = Ising2D.WolffSampler(self.model, beta)
        self.sampler.set_seed(random.randint(0, 1000))
        self.sampler.randomize_conf()

        for i in range(30):
            self.sampler.sweep()

    def sample(self, batch_size):
        res = []
        for i in range(batch_size):
            res.append(list(self.sampler.conf))
            self.sampler.sweep()
        return (torch.tensor(res, dtype=torch.float32) + 1.0)/2

In [5]:
class Ising_sampler():
    
    # initiate the size of a dataset, and create a sampled dataset
    def __init__(self, data_size, num_rows, num_cols):
        self.model = Ising_model(num_rows, num_cols)
        self.data_size = data_size
        self.data_length = num_rows * num_cols
        self.num_rows, self.num_cols = num_rows, num_cols
        self.dataset = self.metropolis_sampling()
    
    # randomly generate one sample of an spin state as 2d tensor
    def gen_sample(self, p = 0.5):
        probs = torch.rand(self.num_rows, self.num_cols)
        sample = torch.where(probs < p, torch.zeros(1), torch.ones(1))
        return sample
    
    # create a training data set by using Metropolis algorithm
    def metropolis_sampling(self):
        beta = self.model.beta
        num_burn = 10000
        # randomly sample a ising model
        sample_2d = self.gen_sample()
        # reshape a 2d sample to a 1d tensor as the visible layer
        v_current = sample_2d.view(-1)
        # transform the sample to ising spins:  v=0 ---> spin = +1,  v=1 --> spin = -1
        spin_state = 1 - 2 * v_current
        # calculate the Hamiltonian of the model
        E_current = self.model.hamiltonian(spin_state)
        # save the sample of the visible layer to the dataset
        dataset = v_current.unsqueeze(0)
        
        for _ in range(num_burn + self.data_size - 1):
            # pick a random site and flip a single spin at the site
            v_next = v_current
            rand_site = random.randint(0, self.data_length - 1)
            v_next[rand_site] = 1 - v_next[rand_site]
            spin_state = 1 - 2 * v_next
            E_next = self.model.hamiltonian(spin_state)
            # accept the next state if it has a lower energy
            delta_E = E_next - E_current
            acceptance = min(1, torch.exp(- beta * delta_E))
            p = torch.rand(1)
            if p <= acceptance:
                v_current = v_next
            else:
                v_current = v_current
            dataset = torch.cat((dataset, v_current.unsqueeze(0)), dim = 0)
            E_current = E_next
        
        dataset_after_burning = dataset[num_burn : num_burn + self.data_size]
        return dataset_after_burning

In [6]:
class Analyze_data():
    
    def __init__(self, dataset):
        self.data_size = dataset.size()[0]
        self.data_length = dataset.size()[1]
        self.dataset = dataset
    
    # calculate the probability of each samples in the dataset by counting
    def prob_data(self):
        num_state = 2**self.data_length
        count = torch.zeros(num_state)
        for i_state in range(num_state):
            for i_data in range(self.data_size):
                bin_state = decimal_to_binary_tensor(i_state, width=self.data_length)
                if torch.all(torch.eq(self.dataset[i_data], bin_state)) == 1:
                    count[i_state] += 1
        prob = count / self.data_size
        return prob
    
    # calculate the Entropy of the dataset
    def entropy_data(self):
        num_state = 2**self.data_length
        prob = self.prob_data()
        entropy = 0.
        for i_state in range(num_state):
            if prob[i_state] > 0:
                entropy -= prob[i_state] * torch.log(prob[i_state])  
        return entropy
    
    # calculate the mean value of the Energy of each samples' in the dataset
    def energy_data(self, model):
        spinset = 1 - 2 * self.dataset
        energy = torch.zeros(self.data_size)
        for i_data in range(self.data_size):
            energy[i_data] = model.hamiltonian(spinset[i_data]).item()
        energy_data = torch.mean(energy)
        return energy_data

In [96]:
class RBM():
    
    # Initiate the sizes of visible and hidden layers, and RBM parameters(weights, biases)
    def __init__(self, nv, nh):
        self.nv = nv
        self.nh = nh
        self.W = torch.randn(nh, nv)
        self.a = torch.randn(nh)
        self.b = torch.randn(nv)
    
    # calculate the RBM_Energy for a given visible and hidden layer
    def energy(self, v, h): 
        ah = torch.matmul(self.a, h)
        bv = torch.matmul(self.b, v)
        hWv = torch.matmul(h, torch.matmul(self.W, v))
        E = - ah - bv - hWv
        return E
    
    # calculate the Free Energy for a given visible layer
    def free_energy(self, v):
        bv = torch.matmul(self.b, v)
        Wv = torch.matmul(self.W, v)
        sp = nn.Softplus()
        F = - bv - torch.sum(sp(self.a + Wv))
        return F
    
    # calculate the partition function of RBM model
    def partition_function(self):
        num_state = 2**self.nv
        Z = 0.
        for idx_state in range(num_state):
            v = decimal_to_binary_tensor(idx_state, width=self.nv)
            Z += torch.exp(-self.free_energy(v))
        return Z
    
    # calculate the Negative Log-Likelihood
    def nll(self, dataset):
        data_size = dataset.size()[0]
        Z = self.partition_function()        
        F = torch.zeros(data_size)
        for i in range(data_size):
            F[i] = self.free_energy(dataset[i]).item()
        nll = torch.mean(F) + torch.log(Z)
        return nll
    
    def to_hidden(self, visible):
        a = self.a
        Wv = torch.matmul(self.W, visible)
        return torch.sigmoid(a + Wv)

    def to_visible(self, hidden):
        b = self.b
        hW = torch.matmul(hidden, self.W)
        return torch.sigmoid(b + hW)
    
    # update parameters by gradient descent with Contrastive Divergence
    def update_cd(self, batch, learning_rate):
        batch_size = batch.size()[0]
        delta_W = torch.zeros_like(self.W)
        delta_a = torch.zeros_like(self.a)
        delta_b = torch.zeros_like(self.b)
        for i in range(batch_size):
            a = self.a
            b = self.b
            num_iterations = 100
            
            # k-iterations of Gibbs sampling
            for k in range(num_iterations):
                if k == 0:
                    v0 = batch[i]
                    p_h0 = self.to_hidden(v0)
                    h0 = torch.bernoulli(p_h0)
                    h_old = h0
                else:
                    h_old = h_new
                
                p_v = self.to_visible(h_old)
                v_new = torch.bernoulli(p_v)
                p_h = self.to_hidden(v_new)
                h_new = torch.bernoulli(p_h)
            
            # update parameters after k-iterations of Gibbs sampling
            delta_W += learning_rate * (torch.ger(p_h0, v0) - torch.ger(p_h, v_new))
            delta_a += learning_rate * (p_h0 - p_h)
            delta_b += learning_rate * (v0 - v_new)
            
        self.W += delta_W / batch_size
        self.a += delta_a / batch_size
        self.b += delta_b / batch_size

In [82]:
# set ising model parameters
num_rows = 2
num_cols = 2
beta = 1.0

In [103]:
# set RBM parameters
nv = num_rows * num_cols
nh = 10
rbm = RBM(nv, nh)

In [None]:
# set training parameters
num_epoch = 1000
data_size = 10000
batch_size = 100
learning_rate = 1e-4

# calculate true energy of Ising model
model = Ising_model(num_rows, num_cols, beta)
E_model = model.energy_expectation()
print('E_model = {}'.format(E_model))

# sample dataset for training
sampler = IsingSampler(num_rows, num_cols, beta)
D = sampler.sample(data_size)
anal = Analyze_data(D)
E_data = anal.energy_data(model)
print('E_data = {}'.format(E_data))
S_data = anal.entropy_data()
print('S_data = {}'.format(S_data))

# train the RBM parameters
for epoch in range(0, num_epoch + 1):
    if epoch > 0:
        for batch_idx in range(int(data_size / batch_size)):
            batch = D[batch_idx * batch_size : (batch_idx + 1) * batch_size]
            rbm.update_cd(batch, learning_rate)
    loss = rbm.nll(D) - S_data
    if epoch%100 == 0:
        print('epoch {}: loss = {}'.format(epoch, loss))

E_model = -3.6016509532928467
E_data = -3.6059999465942383
S_data = 1.1912140846252441
epoch 0: loss = 2.1371192932128906


In [198]:
# sampling after training
num_sample = 10
num_iteration = 1000
for k in range(num_sample * num_iteration):
    if k == 0:
        v_0 = torch.bernoulli(torch.rand(nv))
        Wv = torch.matmul(rbm.W, v_0)
        p_h_given_v_0 = torch.sigmoid(rbm.a + Wv)
        h_0 = torch.bernoulli(p_h_given_v_0)
        h_old = h_0
    else:
        h_old = h_new
                
    hW = torch.matmul(torch.t(rbm.W), h_old)
    p_v_given_h = torch.sigmoid(rbm.b + hW)
    v_new = torch.bernoulli(p_v_given_h)
    Wv = torch.matmul(rbm.W, v_new)
    p_h_given_v = torch.sigmoid(rbm.a + Wv)
    h_new = torch.bernoulli(p_h_given_v)
    
    if k % num_iteration == num_iteration - 1:
        print(v_new)

tensor([0., 0., 1., 1.])
tensor([0., 0., 1., 0.])
tensor([0., 1., 1., 0.])
tensor([1., 0., 0., 1.])
tensor([0., 1., 1., 0.])
tensor([1., 1., 1., 1.])
tensor([0., 0., 1., 1.])
tensor([1., 0., 0., 1.])
tensor([0., 0., 1., 0.])
tensor([1., 1., 0., 1.])
