# Interfacing Warp Output with DEAP
# Work in Progress

Using paramiko for interface with NERSC.

Unresolved algorithm issues:
    - static or dynamic mutation operator?
    - adopt a GA approach with parent's living for one generation or go for elitist approach?
    

In [1]:
%matplotlib notebook

In [2]:
from deap import base, creator, tools, algorithms
from rswarp.utilities.deap_interface import create_runfiles, JobRunner

import os
import random
import numpy as np
import pysftp
import matplotlib.pyplot as plt

from time import sleep

from mpl_toolkits.mplot3d import Axes3D
from matplotlib import cm
from matplotlib import animation
import matplotlib.colors as colors

### DEAP Setup


Will seek to maximize collector efficiency -> define positive weight

In [3]:
creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax)

In [4]:
toolbox = base.Toolbox()

Our independent variables are parameters of the grid.

In [5]:
def typed_random_value(num_range, center):
    """
    Generates a list of random values. Types of each entry are determined by the type used for num_range and center.
    The type of each entry of same index must match.
    
    If type is float: Returns a float on the interval (center - num_range/2, center + num_range/2)
    If type is int: Returns an int on the interval [center - floor(num_range/2), center + floor(num_range/2)]
    
    :param num_range: List of ranges for random numbers, can be a mix of floats and integers.
    :param center: List of centers for random numbers, can be a mix of floats and integers.
    :yield: Next entry in list of random numbers
    """
    assert len(num_range) == len(center)
    
    for range_val, center_val in zip(num_range, center):
        assert type(range_val) == type(center_val)
        
        if type(range_val) == float:
            rand = (random.random() - 0.5) * range_val + center_val
        if type(range_val) == int:
            rand = random.randint(center_val - range_val / 2, center_val + range_val / 2)
        yield rand


In [6]:
# Test numbers
num_ranges = [8, 8, 19.5, 940e-9, 9e-9, 49e-9]
centra = [5, 5, 19.5 / 2., 940e-9 / 2., 4.5e-9, 24.5e-9]

toolbox.register("attr_rand", typed_random_value, num_ranges, centra)

In [7]:
# Test case
for i in toolbox.attr_rand():
    print i

7
7
11.7497838776
9.26880827894e-07
2.47091983438e-09
2.34591693384e-08


Define how an `individual` is created

In [8]:
# Generator function to pass to toolbox
def initYield(container, func):
    """
    Call the function *container* with function *func* that returns a 
    generator object to populate container.
    """
    return container(i for i in func())

toolbox.register("individual", initYield, creator.Individual, toolbox.attr_rand)

# Test call
print "Example individual:", toolbox.individual()

Example individual: [4, 9, 0.2296353183462827, 6.240497378296145e-07, 3.6007164250251166e-09, 4.4143373671631724e-08]


Definition for the `population` of `individuals`.


In [9]:
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
print "Example population with 2 individuals:\n", toolbox.population(n=2)

Example population with 2 individuals:
[[2, 2, 17.133225444762317, 9.979557855334887e-08, 2.375471209223548e-09, 3.51402011704093e-08], [1, 7, 3.690727525219569, 1.435509305984244e-07, 8.799834830690405e-10, 5.93970509822801e-09]]


## Interface Test
Manual Step-by-Step test of passing back and forth to Cori (or Edison)

Small population for testing

In [10]:
test_pop = toolbox.population(n=15)

### Batch Script Template

In [11]:
edison_batch_header = """#!/bin/bash -l
#SBATCH -p {queue} 
#SBATCH -N {nodes} 
#SBATCH -t {time}
#SBATCH -A m2783
#SBATCH -J {job}

export mydir="{base_directory}"

mkdir -p $mydir

cd $SLURM_SUBMIT_DIR

cp ./* $mydir/.
cd $mydir
"""

edison_batch_srun = """srun -N 1 -n 24 -c 2 --cpu_bind=cores python-mpi {warp_file} {parameters} -p 1 1 24 &
"""

edison_batch_tail = """wait
echo 0 >> COMPLETE"""


Create local copy of batch file

In [12]:
cori_scratch_path = '/global/cscratch1/sd/hallcc'
edison_scratch_path = '/scratch2/scratchdirs/hallcc'
batch_instructions = {
    'queue': 'debug',
    'nodes': len(test_pop),
    'time': '02:00:00',
    'job': 'generation_0',
    'warp_file': 'run_grid_edison.py',
    'base_directory': os.path.join(edison_scratch_path, 'run_0')
}
local_batch_file = 'run_file/eaTest_edison_trial0'
local_save_directory = '/run0_data'
remote_output_directory = batch_instructions['base_directory']

create_runfiles(test_pop, local_batch_file, batch_instructions=batch_instructions,
               run_header=edison_batch_header, run_command=edison_batch_srun, run_tail=edison_batch_tail)

Create JobRunner instance which will facilitate communication

In [13]:
server = 'edison.nersc.gov'
username = 'hallcc'
runner = JobRunner(server, username)

Upload Batch file to NERSC

In [14]:
remote_directory = 'edison_runner_upload/'

runner.upload_batch_file(remote_directory, local_batch_file)

SSH Client is live
run_file/eaTest_edison_trial0 Uploaded
SFTP Connection Closed


# Start remote job

 `JobRunner.start_job` used to initiate the job through SLURM on server

In [15]:
remote_batch_path = os.path.join(remote_directory, os.path.split(local_batch_file)[1])
runner.start_job(remote_batch_path)

print
print "jobid:", runner.jobid
print "project directory:", runner.project_directory

SSH Client is live
Starting batch file: eaTest_edison_trial0 in directory edison_runner_upload
Contents of job directory: eaTest_edison_trial0
eaTest_edison_trial0~
run_grid_edison.py
run_grid_edison.py~


jobid: 7194990
project directory: edison_runner_upload


`JobRunner.check_job_status` can be called independently to check the status of a job
Will return:

- 1: Job underway but not complete
- -1: Critical failure occured 
- 0: Job successfully complete

In [16]:
runner.check_job_status(output_directory=remote_output_directory)

SSH Client is live
Mon Sep 18 13:41:51 2017: Job active but not complete


1

The method `JoberRunner.evaluate_fitness` is designed to be called by the EA optimizer for each generation. It uses `JobRunner.check_job_status` internally but also will handle downloading the final fitness results on successful job completion.

In [17]:
runner.evaluate_fitness(120 * 60, remote_output_directory=remote_output_directory, local_directory='/run0_out')

SSH Client is live
Mon Sep 18 13:52:28 2017: Job active but not complete
SSH Client is live
Mon Sep 18 13:57:35 2017: Job active but not complete
SSH Client is live
Mon Sep 18 14:02:41 2017: Job active but not complete
SSH Client is live
Mon Sep 18 14:07:48 2017: Job active but not complete
SSH Client is live
Mon Sep 18 14:12:55 2017: Job active but not complete
SSH Client is live
Mon Sep 18 14:18:01 2017: Job active but not complete
SSH Client is live
Mon Sep 18 14:23:08 2017: Job active but not complete
SSH Client is live
Mon Sep 18 14:28:15 2017: Job active but not complete
SSH Client is live
Mon Sep 18 14:33:22 2017: Job active but not complete
SSH Client is live
Mon Sep 18 14:38:30 2017: Job active but not complete
SSH Client is live
Mon Sep 18 14:43:37 2017: Job active but not complete
SSH Client is live
Mon Sep 18 14:48:44 2017: Job active but not complete
SSH Client is live
Mon Sep 18 14:53:50 2017: Job active but not complete
SSH Client is live
Mon Sep 18 14:58:57 2017: Job ac

-1

## Interface Tools

Functions for interfacing Warp I/O with DEAP environment

Define the function for evaluating an individual's fitness.

Define the operation for mating two individuals.

Define a mutation algorithm.

Define a selection routine for choosing among the fittest individuals.

### ea Implementation Continues

In [None]:
toolbox.register("evaluate", evalFitnessRastrigin)
    
toolbox.register("mate", tools.cxTwoPoint)

# TODO: Either change mutation algorithm or fix numbers for Guassian
mu = 0.
sigma = 0.5
indpb = 0.33
toolbox.register("mutate", tools.mutGaussian, mu=mu, sigma=sigma, indpb=indpb)

toolbox.register("select", tools.selTournament, tournsize=3)

The control function.
This uses the pre-defined algorithm `eaSimple`.
Statistics for the run may be passed out of the algorithm during runtime using `tools.statistics` to define the class `stat`.
Here we pass out each generation of the population for full post-processing after the algorithm completes. 

In [None]:
def eaSimpleWarp(population, toolbox, cxpb, mutpb, ngen, filename, stats=None,
             halloffame=None, verbose=__debug__, labels=None):
    """This algorithm reproduce the simplest evolutionary algorithm as
    presented in chapter 7 of [Back2000]_.

    :param population: A list of individuals.
    :param toolbox: A :class:`~deap.base.Toolbox` that contains the evolution
                    operators.
    :param cxpb: The probability of mating two individuals.
    :param mutpb: The probability of mutating an individual.
    :param ngen: The number of generation.
    :param stats: A :class:`~deap.tools.Statistics` object that is updated
                  inplace, optional.
    :param halloffame: A :class:`~deap.tools.HallOfFame` object that will
                       contain the best individuals, optional.
    :param verbose: Whether or not to log the statistics.
    :returns: The final population
    :returns: A class:`~deap.tools.Logbook` with the statistics of the
              evolution

    The algorithm takes in a population and evolves it in place using the
    :meth:`varAnd` method. It returns the optimized population and a
    :class:`~deap.tools.Logbook` with the statistics of the evolution. The
    logbook will contain the generation number, the number of evalutions for
    each generation and the statistics if a :class:`~deap.tools.Statistics` is
    given as argument. The *cxpb* and *mutpb* arguments are passed to the
    :func:`varAnd` function. The pseudocode goes as follow ::

        evaluate(population)
        for g in range(ngen):
            population = select(population, len(population))
            offspring = varAnd(population, toolbox, cxpb, mutpb)
            evaluate(offspring)
            population = offspring

    As stated in the pseudocode above, the algorithm goes as follow. First, it
    evaluates the individuals with an invalid fitness. Second, it enters the
    generational loop where the selection procedure is applied to entirely
    replace the parental population. The 1:1 replacement ratio of this
    algorithm **requires** the selection procedure to be stochastic and to
    select multiple times the same individual, for example,
    :func:`~deap.tools.selTournament` and :func:`~deap.tools.selRoulette`.
    Third, it applies the :func:`varAnd` function to produce the next
    generation population. Fourth, it evaluates the new individuals and
    compute the statistics on this population. Finally, when *ngen*
    generations are done, the algorithm returns a tuple with the final
    population and a :class:`~deap.tools.Logbook` of the evolution.

    .. note::

        Using a non-stochastic selection method will result in no selection as
        the operator selects *n* individuals from a pool of *n*.

    This function expects the :meth:`toolbox.mate`, :meth:`toolbox.mutate`,
    :meth:`toolbox.select` and :meth:`toolbox.evaluate` aliases to be
    registered in the toolbox.

    .. [Back2000] Back, Fogel and Michalewicz, "Evolutionary Computation 1 :
       Basic Algorithms and Operators", 2000.
    """
    logbook = tools.Logbook()
    logbook.header = ['gen', 'nevals'] + (stats.fields if stats else [])

    # Evaluate the individuals with an invalid fitness
    # TODO: Change fitness evaluation to operate on the population not on individuals
    # TODO because all fitnesses are returned from WARP we need to pass all to the batch script simulatneously
    # TODO and then wait.
    # generate_fitnesses(population, toolbox)
    
    # Find which individuals have no fitness assigned (probably all of them here)
    invalid_ind = [ind for ind in population if not ind.fitness.valid]
    # Evaluate unknown fitnesses
    fitnesses = toolbox.evaluate(invalid_ind)
    # Assign new fitness values to the individuals
    for ind, fit in zip(invalid_ind, fitnesses):
        ind.fitness.values = fit
    # Update HoF
    if halloffame is not None:
        halloffame.update(population)
    
    save_data(filename, population, generation=0, labels=None)
    record = stats.compile(population) if stats else {}
    logbook.record(gen=0, nevals=len(invalid_ind), **record)
    if verbose:
        print logbook.stream

    # Begin the generational process
    for gen in range(1, ngen + 1):
        # Select the next generation individuals
        offspring = toolbox.select(population, len(population))

        # Vary the pool of individuals
        offspring = varAnd(offspring, toolbox, cxpb, mutpb)

        # TODO: Fitness evaluation must be changed as described above
        # Evaluate the individuals with an invalid fitness
        invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
        fitnesses = toolbox.evaluate(invalid_ind)
        for ind, fit in zip(invalid_ind, fitnesses):
            ind.fitness.values = fit

        # Update the hall of fame with the generated individuals
        if halloffame is not None:
            halloffame.update(offspring)
            
        # Replace the current population by the offspring
        population[:] = offspring
        
        # Append the current generation statistics to the logbook and save data file
        save_data(filename, population, generation=gen, labels=None)
        record = stats.compile(population) if stats else {}
        logbook.record(gen=gen, nevals=len(invalid_ind), **record)
        if verbose:
            print logbook.stream

    return population, logbook


In [None]:
def main():
    
    pop = toolbox.population(n=100)
    hof = tools.HallOfFame(1)
    
#     stats = tools.Statistics(lambda ind: ind.fitness.values)
#     stats.register("avg", np.mean)
#     stats.register("min", np.min)
#     stats.register("max", np.max)   
    stats = tools.Statistics(lambda ind: ind)
    stats.register("avg", np.array)
    if opt_func == 'rastrigin':
        gen_n = 200
    elif opt_func == 'beale':
        gen_n = 15
    else:
        print "Function options are: rastrigin or beale"
    
    pop, logbook = algorithms.eaSimple(pop, toolbox, cxpb=0.5, mutpb=0.2, ngen=gen_n, stats=stats, halloffame=hof, verbose=False)
    
    return pop, logbook, hof

In [None]:
if __name__ == "__main__":
    pop, log, hof = main()
    print("Best individual is: %s\nwith fitness: %s" % (hof[0], hof[0].fitness))

### Post-Processing

In [None]:
# Visualizing the last generation on a contour plot of Beale's function.

fig1 = plt.figure(figsize=(6, 6))
ax = fig1.gca(projection='3d')

if opt_func == 'rastrigin':
    func_name = 'Rastrigin'
    X = np.arange(-1.5, 1.5, 0.001)
    Y = np.arange(-1.5, 1.5, 0.001)
    X, Y = np.meshgrid(X, Y)
    Z = rastrigin(X, Y)
elif opt_func == 'beale':
    func_name = 'Beale'
    X = np.arange(-4., 4., 0.01)
    Y = np.arange(-4., 4., 0.01)
    X, Y = np.meshgrid(X, Y)
    Z = beale(X, Y)
else:
    print "Function options are: rastrigin or beale"

ax.plot_surface(X,Y,Z, 
                cmap=cm.coolwarm, 
                norm=colors.LogNorm(),
                alpha=0.35)

test_dataset = log[-1]['avg']
def col_beal(dat):
    return beale(dat[:, 0], dat[:, 1])

ax.scatter(test_dataset[:, 0], test_dataset[:, 1], col_beal(test_dataset), c='g')
ax.view_init(25, 2.5)

ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlabel('f(x,y)')
ax.set_title('{}\'s Function\n Final Result'.format(func_name))
plt.show()

In [None]:
# Create a movie of all generations. 

fig1 = plt.figure(figsize=(6, 6))
ax = fig1.gca(projection='3d')

if opt_func == 'rastrigin':
    func_name = 'Rastrigin'
    X = np.arange(-1.5, 1.5, 0.001)
    Y = np.arange(-1.5, 1.5, 0.001)
    X, Y = np.meshgrid(X, Y)
    Z = rastrigin(X, Y)
    # Set bounds for Rastrigin since we are constricting the surface plot bounds
    ax.set_xlim(-1.5, 1.5)
    ax.set_ylim(-1.5, 1.5)
elif opt_func == 'beale':
    func_name = 'Beale'
    X = np.arange(-4., 4., 0.01)
    Y = np.arange(-4., 4., 0.01)
    X, Y = np.meshgrid(X, Y)
    Z = beale(X, Y)
else:
    print "Function options are: rastrigin or beale"

ax.plot_surface(X,Y,Z, 
                cmap=cm.coolwarm, 
                norm=colors.LogNorm(),
                alpha=0.6)
ax.view_init(25, 2.5)

lives = []

def col_beal(dat):
    return beale(dat[:, 0], dat[:, 1])
for i in range(len(log)):
    population_data = log[i]['avg']
    best_fit = np.min(col_beal(population_data))
    best_fit_individual = population_data[np.argmin(col_beal(population_data))]
    pl = ax.scatter(population_data[:, 0], population_data[:, 1], col_beal(population_data), c='g')
    gen_tex = ax.annotate('Generation = {}\nBest Individual: {}\nFitness = {}'.format(i, best_fit_individual, best_fit), 
                          fontsize=12, xy=(0., 0.85), xycoords='axes fraction')
    lives.append([pl, gen_tex])


ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlabel('f(x,y)')
ax.set_title('\nOptimization of {}\'s Function\n'.format(func_name) )

im_animation = animation.ArtistAnimation(fig1, lives, interval=450)
# im_animation.save('simple_beale_go.mp4', fps=3, dpi=300)

plt.show()