In [1]:
import numpy as np
from itertools import product
import math
import random
import copy

#### Only H

In [34]:
class SimpleIsingModel:
    def __init__(self, data, lr=0.0005):
        self.lr = lr
        
        # Possible values the spins can take
        self.spin_values = [-1,1]
        
        # Create a ndarray from the data
        self.data_activations_matrix = np.array(data)
        # Calculate the shape of the matrix and save useful variables
        self.num_spins = self.data_activations_matrix.shape[1]
        self.num_samples_data = self.data_activations_matrix.shape[0]
        
        # Calculate the true data expectations
        # The mean
        self.data_mean = np.mean(self.data_activations_matrix, axis=0)
        
                
        # Calculate the initial H vector, each entry proportional to its probability (according to Hinton)
        #self.H = np.sum(self.data_activations_matrix, axis=0)/ float(self.num_samples_data)
        # Try with random H
        self.H = np.random.normal(loc=0.0, scale=.01, size=self.num_spins)
    
    # Calcualte the energy of the model's parameters given a sigma state
    def calculate_energy(self, sigma):
        energy = 0
        for i in range(0, self.num_spins):
            energy += sigma[i]*self.H[i]

        return - energy
    
    # Flip a randomly selected spin
    def flip_spin(self, _sigma):
        sigma = copy.deepcopy(_sigma)
        spin_to_flip = random.choice(range(0, self.num_spins))

        spin = sigma[spin_to_flip]
        if spin == self.spin_values[0]:
            sigma[spin_to_flip] = self.spin_values[1]
        else:
            sigma[spin_to_flip] = self.spin_values[0]
        return sigma
        
        
    # Train the exact model
    def train(self, max_epochs = 500):
        
        totalParamVariation = math.inf
        stopCondition = 0.0000005
        
        epoch = 1
        
        while totalParamVariation > stopCondition and epoch < max_epochs:
            # Calculate P(sigma) for every possible combination in the model

            prob_dict = {}
            for sigma in product(self.spin_values, repeat=self.num_spins):
                prob_dict[sigma] = math.exp(-self.calculate_energy(sigma))

            # Calculate the partition function
            Z = sum(prob_dict.values())

            # Normalize dividing by the partition function
            for sigma in prob_dict.keys():
                prob_dict[sigma] = prob_dict[sigma]/Z

            model_mean = np.zeros(self.H.shape)
            for sigma in product(self.spin_values, repeat=self.num_spins):
                # Calculate the expectation of the averages
                for i in range(0, self.num_spins):
                    model_mean[i] += sigma[i]*prob_dict[sigma]


            # Calculate the step size for every H_{i}
            stepH = self.lr * (self.data_mean - model_mean)
            
            oldH = copy.deepcopy(self.H)
            # Take the step
            self.H = self.H + stepH
            
            # Calculate the variation of J for the termination condition
            totalParamVariation = np.sum(np.absolute(self.H - oldH))

            if epoch%100 == 0:
                print('Epoch', epoch, 'TotalParamVariation', round(totalParamVariation, 8))
                print(model_mean, 'Average Model')
                print(self.data_mean, 'Average Data')
                print(self.data_mean - model_mean, 'Diff')
                print()
            
            epoch += 1

        print(self.data_mean, 'Spins average')
        print(self.H, 'H')
        
    def random_walk(self, max_steps):
        # Generate a random initial spin configuration
        model_activations_list = []
        sigma_0 = random.choices([-1,1], k=self.num_spins)
        burn_in_len = 400
        
        # Calculate the energy at this time
        energy_0 = self.calculate_energy(sigma_0)
        
        for i in range(0, max_steps):
            
            # Flip a spin and calculate the new energy
            sigma_t = self.flip_spin(sigma_0)
            energy_t = self.calculate_energy(sigma_t)
            delta_energy = energy_t - energy_0

            # If delta_energy<0, accept the spin flip, energetically favourable
            if delta_energy>0:  # Ohterwise, check if transition probability p < uniform r

                p = math.exp(-delta_energy)
                r = random.random()

                if r>p: # If transition probability is lower than r, refuse the update, otherwise accept
                    sigma_t = copy.deepcopy(sigma_0)
                    energy_t = energy_0
            
            # Save the state if the burn-in phase has passed
            if i > burn_in_len:
                model_activations_list.append(sigma_t)

            sigma_0 = copy.deepcopy(sigma_t)
            energy_0 = energy_t
        
        return model_activations_list
    
    def train_metropolis(self, max_epochs = 300, random_walk_len = 100):
        random.seed(3)
        totalParamVariation = math.inf
        stopCondition = 0.0000000005
        
        epoch = 1
        
        while totalParamVariation > stopCondition and epoch < max_epochs:
            model_activations_list = self.random_walk(random_walk_len)   
            model_activations_matrix = np.array(model_activations_list)

            # Calculate the data expectations
            model_mean = np.mean(model_activations_matrix, axis=0)
            
            # Calculate the step size for every H_{i}
            stepH = self.lr * (self.data_mean - model_mean)
            # Take the step
            oldH = copy.deepcopy(self.H)
            self.H = self.H + stepH
            
            # Calculate the variation of H for the stop condition
            totalParamVariation = np.sum(np.absolute(self.H - oldH))
            
            if epoch > 1800:
                print('Epoch', epoch, 'TotalParamVariation', round(totalParamVariation, 8))
                print(model_mean, 'Average Model')
                print(self.data_mean, 'Average Data')
                print(self.data_mean - model_mean, 'Diff')
                print()

            epoch +=1
        
        print(self.data_mean, 'Spins average')
        print(self.H, 'H')


    def test_random_walk(self):
        print('Initial H', self.H)
        print('Data average', self.data_mean)
        print('Magnetization', np.tanh(self.H))
        
        model_activations_list = self.random_walk(1000)
        model_activations_matrix = np.array(model_activations_list)

        # Calculate the data expectations
        model_mean = np.mean(model_activations_matrix, axis=0)
        
        print('Metropolis magnetization', model_mean)


In [35]:
activations_list = [[1], [1], [-1], [-1], [1], [-1], [1], [1]]
s_ising = SimpleIsingModel(activations_list, lr=0.0005)


In [36]:
s_ising = s_ising.test_random_walk()

Initial H [0.01250582]
Data average [0.25]
Magnetization [0.01250517]
Metropolis magnetization [0.00834725]


In [None]:
s_ising.train_metropolis( max_epochs = 2, random_walk_len = 400)