In [421]:
#
# A Boltzmann Machines attempting 
# to solve Traveling Salesman Problems
#

import math, random
import string
import numpy as np

# global function
sigmoid = lambda dE,T: 1 / (1 + math.exp(dE/T))

In [422]:
#
# Primary classes for representing the components of a boltzmann machines
#

class node():
    """Abstract class to represent a city in the travelling
    salesman problem
    """
    
    def __init__(self,
                 state=0):
        """Initialize the node
        """
        self.state = state
    
    def get_value(self,states):
        """Returns the sums of the weights * states provided
        
        Args:
            states (np.matrix): states of connected nodes (binary representation)
            
        Returns:
            sum (float): sum of weights * states
        """
        return np.sum(self.weights[states == 1]) * self.state
    
    def get_state(self):
        """Returns current state
        """
        return self.state
    
    def set_state(self,state):
        """Sets the node state
        """
        self.state = state
        return

    def propose_state(self,dE,T):
        """Proposes to the node to change state, and the node
        follows the Metropolis Acceptance Criterion
        
        Args:
            dE (float): change in objective value with the node changing state
            T (float): current temperature in annealing process
        
        Returns:
            final_state (int): the state chosen from the above criteria
        """
        new_state = abs(self.state - 1)
        final_state = self.state
        if dE <= 0:
            final_state = new_state
        else:
            prob = (1/self.weights.size) * sigmoid(dE,T)
            final_state = new_state if random.random() < prob  else self.state
            
        return final_state

In [423]:
class boltzmann():
    """The boltzmann machine object, equipped with the necessary tools 
    to perform simulated annealing on traveling salesman problems
    """
    
    def __init__(self,
                 get_state_values=np.vectorize(lambda n: n.get_state()),
                 hamiltonian_error_charge=0.5,
                 bias_charge=-0.1):
        """Initializes the boltzmann machine with needed configurations
        
        Args:
            get_state_values (np.vectorize): mapping function to create a state matrix given a matrix of nodes
            hamiltonian_error_charge (float): weight charge against breaking hamiltonian requirements
            bias_charge (float): weight charge that favors at least one node within an epoch activating
        """
        
        self.get_state_values = get_state_values
        self.hamiltonian_error_charge = hamiltonian_error_charge
        self.bias_charge = bias_charge
    
    def create_network(self,distances,node_map=string.ascii_uppercase):
        """Creates a network and configures the nodes of the network according
        to a pre-defined use of its distance matrix
        
        Args:
            distances (array): distance matrix representing the travel distances between any nodes
            node_map (np.matrix): dictionary storing the titles for each node (city)
        """
        # store a dictionary to identify the 
        # nodes within the network
        self.node_map = node_map
        
        # number of cities
        n_count = np.size(distances,0)
        # number of epochs
        t_count = n_count + 1
        
        # create the boltzmann machine with 
        # unconfigured nodes
        self.network = np.matrix([ node() for n in range(n_count * t_count) ])
        self.network.shape = (n_count,t_count)
        
        # map a weight, w, with the provided mapping function 
        # to achieve the property that consensus value declines
        # when shorter paths are chosen AND rises when no path is chosen
        self.distances = distances
        map_weights = np.vectorize(lambda w: -1 * math.exp(-w) if w != 0 else 0.0)
        modified_distances = map_weights(distances)
        
        # iterate through all nodes (city and epoch) and configure
        # their weight matrices
        for city in range(n_count):
            for epoch in range(t_count):
                # initialize the weight matrix to zeros
                self.network[city,epoch].weights = np.matrix(np.zeros(n_count * t_count))
                self.network[city,epoch].weights.shape = (n_count,t_count)
                
                # encourage visits to short-distance destinations (epoch +1/-1) using the
                # modified distance matrix
                self.network[city,epoch].weights[:,(epoch + 1) % t_count] = modified_distances[city].T
                self.network[city,epoch].weights[:,(epoch - 1) % t_count] = modified_distances[city].T
                
                # avoid visiting the same city twice in a tour
                self.network[city,epoch].weights[city] = self.hamiltonian_error_charge
                
                # avoid visiting more than one city within an epoch
                self.network[city,epoch].weights[:,epoch] = self.hamiltonian_error_charge
                
                # if it's the first or final epoch, encourage completing
                # the hamiltonian tour by adding a high cost to closing the HT loop
                # with different cities but encourange closing it with the same city
                if epoch == 0:
                    self.network[city,epoch].weights[:,-1] = self.hamiltonian_error_charge
                    self.network[city,epoch].weights[city,-1] = -1
                elif epoch == t_count - 1:
                    self.network[city,epoch].weights[:,0] = self.hamiltonian_error_charge
                    self.network[city,epoch].weights[city,0] = -1
                
                # finally, encourage at least one city to be activated (acts as a bias)
                self.network[city,epoch].weights[city,epoch] = self.bias_charge
    
    def get_states(self):
        """Returns a matrix of node states representing the current network
        
        Returns:
            (np.matrix): node state values
        """
        return self.get_state_values(self.network)
    
    def get_distance(self):
        """Returns travel distance of current network, if it makes a hamiltonian tour (HT)
        
        Returns:
            distance (float): total distance traveled in current network
        """
        return get_distance(self.get_states(),self.distances)
    
    def get_proposed_states(self,city,epoch):
        """Generates a matrix that represents a new proposed state by flipping
        the current state of the given city-epoch node
        
        Args:
            city (int): index of the city to be changed
            epoch (int): index of the epoch containing city to be changed
            
        Returns:
            proposed_states (np.matrix): new state matrix with flipped node state
        """
        current_states = self.get_states()
        new_state = abs(current_states[city,epoch] - 1) # flips the current state to either 0 or 1
        proposed_states = np.matrix(current_states)
        proposed_states[city,epoch] = new_state
        return proposed_states
    
    def get_consensus(self):
        """Computes the consensus value of the current network
        """
        get_values = np.vectorize(lambda n: n.get_value(self.get_states()))
        total = np.sum(get_values(self.network))
        return total
    
    def get_next_node(self):
        """Returns the next randomnly-selected node within the current network
        
        Returns:
            n (int): index of the city chosen
            t (int): index of the epoch chosen
        """
        n = random.randint(0,np.size(self.network,0)-1)
        t = random.randint(0,np.size(self.network,1)-1)
        return n,t
    
    def print_states(self):
        """Prints out the current state matrix in readable format
        """
        states = self.get_states()
        cities,epochs = states.shape
        
        header = '-- network state --\n  '
        for e in range(epochs):
            header += ' %s' % (e+1)
        print(header)
        
        for i in range(cities):
            print(str(self.node_map[i]) + ' ' + str(states[i].A1))
    
    def print_tour(self):
        """Prints the hamiltonian tour in a readable format
        """
        print_tour(self.get_states(),self.node_map)

In [424]:
#
# Supporting functions for simulated annealing 
# on boltzmann machines
#

def hamiltonian(states):
    """Checks that the given state matrix creates a valid hamiltonian tour (HT)
    
    Returns:
        status (bool): True if the state matrix creates a HT; False otherwise
    """
    cities,epochs = states.shape
    tour = {}
    for t in range(epochs):
        event = states[:,t].A1 # collects the epoch column containing nodes within a given epoch
        if np.sum(event) != 1: # if < 1 or > 1 nodes are activated, breaks the HT
            return False

        # all of these epochs below have only one node activated
        for i,n in enumerate(event):
            if n == 1: # if a node is activated
                if t == epochs - 1: # is it the last stop in the HT
                    if tour[i] != 0: # and does it return to the starting locations
                        return False
                elif i in tour: # or does it repeat another destination
                    return False
                else: # if not, add it to the tour to review later 
                    tour[i] = t

    return True

def get_distance(states,distances):
    """Returns the distance traveled by a state matrix given the distance matrix
    
    Args:
        states (np.matrix): state matrix representing nodes visited along a tour
        distances (np.matrix): distance matrix describing the distance between any two nodes
    
    Returns:
        distance (float): total distance traveled on the hamiltonian tour
    """
    tour = get_tour(states)
    distance = 0
    for i,stop in enumerate(tour[1:]):
        distance += distances[stop,tour[i]]
    
    return distance

def get_tour(states):
    """Create a string sequence cities traveled at each epoch to
    represent the hamiltonian tour

    Returns:
        tour (string): the active tour in the current network
    """
    if not hamiltonian(states): # if a HT hasn't been completed, then distance can't be computed
        return 'hamiltonian tour not complete'
    
    cities,epochs = states.shape
    tour = []
    
    for t in range(epochs): # for each epoch
        event = states[:,t].A1 # grab the array describing the states of its nodes
        for i,n in enumerate(event): # and, for each node within it
            if n == 1: # if it's active
                tour.append(i) # add its index to the tour object
    
    return tour

def print_tour(states,node_map):
    """Prints the tour in a readable format
    
    Args:
        states (np.matrix): state matrix
        node_map (dict): dictionary of city titles for each node
    """
    insert = '->'
    tour = get_tour(states)
    tour_str = '%s' % node_map[tour[0]]
    for i in tour[1:]:
        tour_str += (insert + node_map[i])
    print(tour_str)
    
def consensus(states,network):
    """Consensus function, computing the summated values of weights and states between
    every node (city,epoch) in the network
    
    Args:
        states (np.matrix): matrix representing the states of the nodes provided
        network (np.matrix): matrix of nodes holding the weights
    
    Returns:
        total (int): total consensus value
    """
    get_values = np.vectorize(lambda n: n.get_value(states))
    total = np.sum(get_values(network))
    return total

def anneal(machine,T=50,schedule=lambda T: math.log10(T)):
    """Simulated annealing function on a boltzmann machine (or Hopfield network, in general)
    
    Args:
        machine (boltzmann): machine with network of nodes to be optimized
        T (float): starting temperature
        schedule (lambda): function to apply to T after each cycle
    """
    stop_T = 2
    min_dist = np.inf
    min_conf = None
    
    print('-- annealing %s --' % machine)
    machine.print_states()
    
    # the temperature schedule starts here
    while T > stop_T:
        print('iterating, T=%s' % T)
        complete = False
        
        # while it has not converged to a hamiltonian tour at the
        # current temperature...
        while not complete:
            
            # randomnly select a node and generate a new proposed state
            selected_city,selected_epoch = machine.get_next_node()
            current_states = machine.get_states()
            proposed_states = machine.get_proposed_states(selected_city,selected_epoch)

            # compute the energy of the current and proposed state and take their difference
            E1 = consensus(current_states,machine.network)
            E2 = consensus(proposed_states,machine.network)
            dE = E2 - E1

            # adders, in case the new state is a hamiltonian or old state was
#             if hamiltonian(current_states):
#                 dE += 1
#             elif hamiltonian(proposed_states):
#                 dE -= 1/get_distance(proposed_states,machine.distances)

            current_state = machine.network[selected_city,selected_epoch].get_state()
            new_state = machine.network[selected_city,selected_epoch].propose_state(dE,T)

            if current_state != new_state:
                machine.network[selected_city,selected_epoch].set_state(new_state)
                final_states = machine.get_states()
                if hamiltonian(final_states):
                    final_dist = get_distance(final_states,machine.distances)
                    complete = True
                    if final_dist < min_dist:
                        min_dist = final_dist
                        min_conf = machine.get_states()
        
        print('distance=%s' % machine.get_distance())
        machine.print_tour()
        T -= schedule(T)

    print('-- annealing complete --')
    print('results: distance = %s' % min_dist)
    print(min_conf)

In [425]:
# the distance matrix to be used in the
# traveling salesman problem
distances = np.matrix([
    [0,10,20,5,18],
    [0,0,15,32,10],
    [0,0,0,25,16],
    [0,0,0,0,35],
    [0,0,0,0,0]
])
distances = distances + distances.T

In [426]:
b = boltzmann()

In [427]:
b.create_network(distances)

In [430]:
anneal(b)

-- annealing <__main__.boltzmann object at 0x10fb2bd30> --
-- network state --
   1 2 3 4 5 6
A [0 0 1 0 0 0]
B [0 0 0 0 1 0]
C [0 0 0 1 0 0]
D [1 0 0 0 0 1]
E [0 1 0 0 0 0]
iterating, T=50
distance=120
D->E->A->C->B->D
iterating, T=48.30102999566398
distance=120
D->E->A->C->B->D
iterating, T=46.6170736036979
distance=86
D->A->E->C->B->D
iterating, T=44.94852859658232
distance=86
D->A->E->C->B->D
iterating, T=43.29581311703908
distance=86
D->A->E->C->B->D
iterating, T=41.65936721671537
distance=86
D->A->E->C->B->D
iterating, T=40.03965454775212
distance=86
D->A->E->C->B->D
iterating, T=38.437164225914174
distance=73
D->A->E->B->C->D
iterating, T=36.85241288662804
distance=73
D->A->E->B->C->D
iterating, T=35.2859469583667
distance=73
D->A->E->B->C->D
iterating, T=33.73834518140951
distance=85
D->A->C->B->E->D
iterating, T=32.21022140417025
distance=66
D->A->B->E->C->D
iterating, T=30.702227694119916
distance=66
D->A->B->E->C->D
iterating, T=29.215057805933814
distance=103
D->E->A->B->C-