# Randomized Optimization Algorithms

In [3]:
 # set up Python environment: numpy for numerical routines, and matplotlib for plotting
import os, sys
import numpy as np
import matplotlib.pyplot as plt
# display plots in this notebook
%matplotlib inline

# set display defaults
plt.rcParams['figure.figsize'] = (10, 10)        # large images
plt.rcParams['image.interpolation'] = 'nearest'  # don't interpolate: show square pixels

## AIMA python helper functions
Adapted from: AIMA python repository. https://github.com/aimacode/aima-python.

In [4]:
import random
import math
import bisect

identity = lambda x: x
argmin = min
argmax = max

def argmin_random_tie(seq, key=identity):
    """Return a minimum element of seq; break ties at random."""
    return argmin(shuffled(seq), key=key)


def argmax_random_tie(seq, key=identity):
    "Return an element with highest fn(seq[i]) score; break ties at random."
    return argmax(shuffled(seq), key=key)

def shuffled(iterable):
    "Randomly shuffle a copy of iterable."
    items = list(iterable)
    random.shuffle(items)
    return items

def probability(p):
    "Return true with probability p."
    return p > random.uniform(0.0, 1.0)


def weighted_sample_with_replacement(seq, weights, n):
    """Pick n samples from seq at random, with replacement, with the
    probability of each element in proportion to its corresponding
    weight."""
    sample = weighted_sampler(seq, weights)

    return [sample() for _ in range(n)]

def weighted_sampler(seq, weights):
    "Return a random-sample function that picks from seq weighted by weights."
    totals = []
    for w in weights:
        totals.append(w + totals[-1] if totals else w)

    return lambda: seq[bisect.bisect(totals, random.uniform(0, totals[-1]))]

In [5]:
class Problem(object):

    """The abstract class for a formal problem.  You should subclass
    this and implement the methods actions and result, and possibly
    __init__, goal_test, and path_cost. Then you will create instances
    of your subclass and solve them with the various search functions."""

    def __init__(self, initial, goal=None):
        """The constructor specifies the initial state, and possibly a goal
        state, if there is a unique goal.  Your subclass's constructor can add
        other arguments."""
        self.initial = initial
        self.goal = goal

    def actions(self, state):
        """Return the actions that can be executed in the given
        state. The result would typically be a list, but if there are
        many actions, consider yielding them one at a time in an
        iterator, rather than building them all at once."""
        raise NotImplementedError

    def result(self, state, action):
        """Return the state that results from executing the given
        action in the given state. The action must be one of
        self.actions(state)."""
        raise NotImplementedError

    def goal_test(self, state):
        """Return True if the state is a goal. The default method compares the
        state to self.goal or checks for state in self.goal if it is a
        list, as specified in the constructor. Override this method if
        checking against a single self.goal is not enough."""
        if isinstance(self.goal, list):
            return is_in(state, self.goal)
        else:
            return state == self.goal

    def path_cost(self, c, state1, action, state2):
        """Return the cost of a solution path that arrives at state2 from
        state1 via action, assuming cost c to get up to state1. If the problem
        is such that the path doesn't matter, this function will only look at
        state2.  If the path does matter, it will consider c and maybe state1
        and action. The default method costs 1 for every step in the path."""
        return c + 1

    def value(self, state):
        """For optimization problems, each state has a value.  Hill-climbing
        and related algorithms try to maximize this value."""
        raise NotImplementedError
        
        
class Node:

    """A node in a search tree. Contains a pointer to the parent (the node
    that this is a successor of) and to the actual state for this node. Note
    that if a state is arrived at by two paths, then there are two nodes with
    the same state.  Also includes the action that got us to this state, and
    the total path_cost (also known as g) to reach the node.  Other functions
    may add an f and h value; see best_first_graph_search and astar_search for
    an explanation of how the f and h values are handled. You will not need to
    subclass this class."""

    def __init__(self, state, parent=None, action=None, path_cost=0):
        "Create a search tree Node, derived from a parent by an action."
        self.state = state
        self.parent = parent
        self.action = action
        self.path_cost = path_cost
        self.depth = 0
        if parent:
            self.depth = parent.depth + 1

    def __repr__(self):
        return "<Node %s>" % (self.state,)

    def __lt__(self, node):
        return self.state < node.state

    def expand(self, problem):
        "List the nodes reachable in one step from this node."
        return [self.child_node(problem, action)
                for action in problem.actions(self.state)]

    def child_node(self, problem, action):
        "[Figure 3.10]"
        next = problem.result(self.state, action)
        return Node(next, self, action,
                    problem.path_cost(self.path_cost, self.state,
                                      action, next))

    def solution(self):
        "Return the sequence of actions to go from the root to this node."
        return [node.action for node in self.path()[1:]]

    def path(self):
        "Return a list of nodes forming the path from the root to this node."
        node, path_back = self, []
        while node:
            path_back.append(node)
            node = node.parent
        return list(reversed(path_back))

    # We want for a queue of nodes in breadth_first_search or
    # astar_search to have no duplicated states, so we treat nodes
    # with the same state as equal. [Problem: this may not be what you
    # want in other contexts.]

    def __eq__(self, other):
        return isinstance(other, Node) and self.state == other.state

    def __hash__(self):
        return hash(self.state)

## Problem: Maximize Ones

In [6]:
class MaximizeOnes(Problem):
    def __init__(self, initial):
        """initial state is a numpy array with each element representing a bit 0 or 1, 
        goal state is same size as initial state with all bits turned 1"""
        assert sum([n != 0 and n != 1 for n in initial]) == 0, "initial state must contain binary values."
        self.initial = initial
        self.goal = np.ones((len(self.initial)))
        
    def actions(self, state):
        """Return all locations for hamming distance 1"""
        return np.arange(len(state))
    
    def result(self, state, action):
        """Return state bit at location specified by action"""
        state = state.copy()
        state[action] = 1 - state[action] # toggle
        return state
    
    def value(self, state):
        """Return sum of all the elements"""
        return sum(state)
    
    def random_states(self, count=10):
        return np.random.randint(0, 2, size=(count, len(self.initial)))

## Problem: FlipFlop

In [7]:
class MaximizeAlternations(Problem):
    def __init__(self, initial):
        """initial state is a numpy array with each element representing a bit 0 or 1, 
        goal state is same size as initial state with all bits turned 1"""
        assert sum([n != 0 and n != 1 for n in initial]) == 0, "initial state must contain binary values."
        self.initial = initial
        self.goal = np.ones((len(self.initial)))
        
    def actions(self, state):
        """Return all locations for hamming distance 1"""
        return np.arange(len(state))
    
    def result(self, state, action):
        """Return state bit at location specified by action"""
        state = state.copy()
        state[action] = 1 - state[action] # toggle
        return state
    
    def value(self, state):
        """Return sum of all the elements"""
        return sum([state[i] != state[i+1] for i in range(len(state)-1)]) + 1
    
    def random_states(self, count=10):
        return np.random.randint(0, 2, size=(count, len(self.initial)))

## Hill Climbing

In [8]:
# Adapted from: AIMA python repository.
def hill_climbing(init_node, problem, n_sideway_moves=50):
    """From the initial node, keep choosing the neighbor with highest value,
    stopping when no neighbor is better. [Figure 4.2]"""
    
    current = init_node
    current_value = problem.value(current.state)
    no_improvement = 0
    
    while True:
        
        # get neighbor states
        neighbors = current.expand(problem)
        
        # can happen
        if not neighbors:
            break
        
        # select the neighbor greedily 
        best_neighbor = argmax_random_tie(neighbors, key=lambda node: problem.value(node.state))
        best_neighbor_value = problem.value(best_neighbor.state)
        
        # if the best neighbor value is less than current node, stop searching
        if best_neighbor_value < current_value:
            break
            
        # if the best neighbor value is same, accept–have some patience, make sideway moves
        elif best_neighbor_value == current_value:
            no_improvement += 1
            current = best_neighbor
            current_value = best_neighbor_value
        
        # if the neighbor value is higher, accept
        else:
            no_improvement = 0
            current = best_neighbor
            current_value = best_neighbor_value
        
        if no_improvement >= n_sideway_moves:
            break
        
    return current.state, current_value

# Implemented Random Restarts
def random_restarts_hill_climbing(problem, n_restarts=10):
    
    best_value = np.float("-inf")
    best_state = None
        
    for rand_state in problem.random_states(count=n_restarts):
        
        print "Restart {}, ".format(rand_state),
        
        # restart from random initial state
        state, value = hill_climbing(Node(rand_state), problem)
        if value > best_value:
            best_value = value
            best_state = state
            
        print "state = {}, value = {}".format(state, value)
        
    return best_state, best_value   
    

In [9]:
problem = MaximizeAlternations(np.ones((10), dtype=np.int))
hill_climbing(Node(problem.initial), problem, n_sideway_moves=100)

(array([1, 0, 1, 0, 1, 0, 1, 0, 1, 0]), 10)

In [10]:
random_restarts_hill_climbing(problem)

Restart [0 0 1 1 1 1 1 0 0 1],  state = [1 0 1 0 1 0 1 0 1 0], value = 10
Restart [0 1 1 0 1 1 1 1 1 0],  state = [1 0 1 0 1 0 1 0 1 0], value = 10
Restart [1 1 0 1 1 0 0 0 1 0],  state = [0 1 0 1 0 1 0 1 0 1], value = 10
Restart [1 1 0 0 0 0 1 0 0 0],  state = [0 1 0 1 0 1 0 1 0 1], value = 10
Restart [1 1 1 1 0 0 1 0 0 0],  state = [1 0 1 0 1 0 1 0 1 0], value = 10
Restart [0 1 1 0 1 1 0 1 0 0],  state = [1 0 1 0 1 0 1 0 1 0], value = 10
Restart [1 1 1 1 1 1 1 0 1 0],  state = [0 1 0 1 0 1 0 1 0 1], value = 10
Restart [1 1 0 1 1 1 0 1 0 0],  state = [0 1 0 1 0 1 0 1 0 1], value = 10
Restart [1 0 0 1 1 0 0 0 0 1],  state = [0 1 0 1 0 1 0 1 0 1], value = 10
Restart [1 0 1 1 1 1 1 0 0 0],  state = [0 1 0 1 0 1 0 1 0 1], value = 10


(array([1, 0, 1, 0, 1, 0, 1, 0, 1, 0]), 10)

In [11]:
problem = MaximizeOnes(np.zeros((10), dtype=np.int))
hill_climbing(Node(problem.initial), problem)

(array([1, 1, 1, 1, 1, 1, 1, 1, 1, 1]), 10)

In [12]:
random_restarts_hill_climbing(problem)

Restart [1 1 1 1 0 0 1 1 1 0],  state = [1 1 1 1 1 1 1 1 1 1], value = 10
Restart [0 1 0 1 0 0 1 0 0 0],  state = [1 1 1 1 1 1 1 1 1 1], value = 10
Restart [0 0 0 0 1 1 0 1 1 1],  state = [1 1 1 1 1 1 1 1 1 1], value = 10
Restart [0 0 0 1 0 0 1 0 0 1],  state = [1 1 1 1 1 1 1 1 1 1], value = 10
Restart [0 0 0 0 0 1 1 1 1 0],  state = [1 1 1 1 1 1 1 1 1 1], value = 10
Restart [1 1 0 0 0 0 1 0 1 0],  state = [1 1 1 1 1 1 1 1 1 1], value = 10
Restart [0 0 1 0 0 0 0 1 0 0],  state = [1 1 1 1 1 1 1 1 1 1], value = 10
Restart [0 0 1 0 1 1 1 1 1 0],  state = [1 1 1 1 1 1 1 1 1 1], value = 10
Restart [1 0 0 0 1 1 1 0 0 0],  state = [1 1 1 1 1 1 1 1 1 1], value = 10
Restart [0 0 0 1 0 1 1 1 1 0],  state = [1 1 1 1 1 1 1 1 1 1], value = 10


(array([1, 1, 1, 1, 1, 1, 1, 1, 1, 1]), 10)

## Simulated Annealing

In [13]:
# Adapted from: AIMA python repository.
def exp_schedule(k=0.5, lam=0.005, limit=100):
    "One possible schedule function for simulated annealing"
    return lambda t: (k * math.exp(-lam * t) if t < limit else 0)

def simulated_annealing(problem, schedule=exp_schedule()):
    "[Figure 4.5]"
    current_node = Node(problem.initial)
    for t in xrange(sys.maxsize):
        T = schedule(t)
        if T == 0:
            return current_node.state, problem.value(current_node.state)
        neighbors = current_node.expand(problem)
        if not neighbors:
            return current_node.state, problem.value(current_node.state)
        next_node = random.choice(neighbors)
        delta_e = problem.value(next_node.state) - problem.value(current_node.state)
        if delta_e > 0 or probability(math.exp(delta_e / T)):
            current_node = next_node

In [14]:
problem = MaximizeOnes(np.random.randint(0,2,10))
simulated_annealing(problem)

(array([1, 1, 1, 1, 1, 1, 1, 1, 1, 1]), 10)

In [15]:
problem = MaximizeAlternations(np.random.randint(0,2,10))
simulated_annealing(problem)

(array([1, 0, 1, 0, 1, 0, 0, 1, 0, 1]), 9)

## Genetic Algorithms

In [16]:
# Adapted from: AIMA python repository.

def mate(p1, p2, crossover=0.5):
    "Return a new individual crossing self and other."
    if crossover is None:
        c = random.randrange(len(p1)) # crossover point
    else:
        c = int(len(p1)*crossover)
        
    return np.hstack((p1[:c], p2[c:]))

def mutate(child):
    
    for gene_id in [random.randrange(len(child)) for i in range(random.randrange(len(child)))]:
        child[gene_id] ^= 1    
    return child

def genetic_algorithm(population, fitness_fn, ngen=1000, pmut=0.1):
    "[Figure 4.8]"
    
    for i in range(ngen):
        new_population = []
        for i in range(len(population)):
            fitnesses = map(fitness_fn, population)
            p1, p2 = weighted_sample_with_replacement(population, fitnesses, 2)
            child = mate(p1, p2)
            if random.uniform(0, 1) < pmut:
                child = mutate(child)
            new_population.append(child)
        population = new_population
    
    best_chromosome = argmax(population, key=fitness_fn)
    return best_chromosome, fitness_fn(best_chromosome)
    
def genetic_search(problem, ngen=100, pmut=0.1, k=20):
    """
    Call genetic_algorithm on the appropriate parts of a problem.
    This requires the problem to have states that can mate and mutate,
    plus a value method that scores states."""
    parent = problem.initial
    alterations = problem.actions(parent)
    
    chromosomes = [problem.result(parent, random.choice(alterations)) for i in range(k)]
    
    return genetic_algorithm(chromosomes, problem.value, ngen, pmut)

In [17]:
problem = MaximizeAlternations(np.zeros(10, dtype=np.int))
genetic_search(problem, ngen=50, pmut=0.2, k =50)

(array([0, 1, 0, 1, 0, 0, 1, 0, 1, 0]), 9)

In [18]:
problem = MaximizeOnes(np.zeros(10, dtype=np.int))
genetic_search(problem, pmut=0.2, ngen=50, k=50)

(array([1, 1, 1, 1, 1, 1, 1, 1, 1, 1]), 10)

In [19]:
# Much efficient alternative from Silviu which uses deap (Distributed Evolutionary Algorithms in Python) framework 
# for GA. Copied from: https://gist.github.com/spitis/dd5b71bd2d694c0f8e47185b3c5bbd0c.

from deap import base, creator, tools, algorithms

creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)

def ga(stringlen, evalfn, pop=50, cxpb=0.5, mutpb=0.2, patience=10):
    toolbox = base.Toolbox()
    toolbox.register("attr_bool", random.randint, 0, 1)
    toolbox.register("individual", tools.initRepeat, creator.Individual, toolbox.attr_bool, n=stringlen)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)
       
    def evalReturningTuple(x):
        return (evalfn(x),)
    
    toolbox.register("evaluate", evalReturningTuple)
    toolbox.register("mate", tools.cxTwoPoint)
    toolbox.register("mutate", tools.mutFlipBit, indpb=0.10)
    toolbox.register("select", tools.selTournament, tournsize=3)
    
    def run_5_gens(pop, hof):
        if pop is None:
            pop = toolbox.population(n=50)
        if hof is None:
            hof = tools.HallOfFame(1)
        stats = tools.Statistics(lambda ind: ind.fitness.values)
        stats.register("avg", np.mean)
        stats.register("min", np.min)
        stats.register("max", np.max)

        pop, logbook = algorithms.eaSimple(pop, toolbox, cxpb=0.5, mutpb=0.2, ngen=10, stats=stats, halloffame=hof, verbose=False)

        return pop, hof
    
    no_improvement = 0
    best_fitness = float('-inf')
    pop, hof = None, None
    while True:
        pop, hof = run_5_gens(pop, hof)
        fitness = hof[0].fitness.values[0]
        if fitness > best_fitness:
            best_fitness = fitness
            no_improvement = 0
        else:
            no_improvement += 1
            
        if no_improvement >= patience:
            break
    return hof[0], hof[0].fitness.values[0]

In [20]:
ga(20, sum)

([1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], 20.0)

In [21]:
ga(20, lambda x: sum([x[i] != x[i+1] for i in range(len(x)-1)])+1)

([1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0], 20.0)

In [22]:
# Copied from Silviu's MIMIC implementation: https://gist.github.com/spitis/dd5b71bd2d694c0f8e47185b3c5bbd0c
import tensorflow as tf, numpy as np
def mimic(stringlen, evalfn, latentlen = 100, num_samples = 100, samples_to_keep = 50, patience=20):
    batchsize = tf.placeholder(tf.int32)
    r = tf.random_uniform([batchsize, latentlen])
    logits = tf.layers.dense(r, stringlen)
    labels = tf.placeholder(tf.float32, shape=[None, stringlen])
    loss = tf.nn.sigmoid_cross_entropy_with_logits(labels=labels, logits=logits)
    generated_samples = tf.floor(tf.nn.sigmoid(logits) + tf.random_uniform([batchsize, stringlen]))
    train_step = tf.train.GradientDescentOptimizer(0.01).minimize(loss)
    sess = tf.Session()
    sess.run(tf.global_variables_initializer())
    
    best_fitness = float('-inf')
    bitstring = None
    no_improvement = 0
    while True: 
        samples = sess.run(generated_samples, feed_dict={batchsize: num_samples})
        fitnesses = np.array(list(map(evalfn, samples)))
        sorted_fitnesses = np.argsort(fitnesses)
        kept_samples = samples[sorted_fitnesses][-samples_to_keep:]
        fitness = evalfn(kept_samples[-1])
        
        if fitness > best_fitness:
            bitstring = kept_samples[-1]
            best_fitness = fitness
            no_improvement = -1            
        
        no_improvement += 1    
        
        if no_improvement >= patience:
            break
            
        sess.run(train_step, feed_dict={batchsize: samples_to_keep, labels: kept_samples})
    
    sess.close()
    return bitstring, best_fitness

In [23]:
mimic(20, sum)

(array([ 1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,
         1.,  1.,  1.,  1.,  1.,  1.,  1.], dtype=float32), 20.0)