In [1]:
# Dependencies:
#  base_eq.ipynb
#  base_plot.ipynb
#
# These should be %run before this file.
# (Can't %run them here as this file is not necessarily in the
# same directory as the file %run-ing this file.)

In [None]:
# Ideally we'd split this file up into a few other files, but then
# we'd have even more %run dependencies when it comes to actually
# using this.

In [3]:
import multiprocessing as mp
import os

import sklearn.base as skb
import sklearn.preprocessing as skpr
import sklearn.pipeline as skpi
import sklearn.linear_model as sklm

# https://github.com/patrick-kidger/tools
import tools

import tensorflow as tf
tfd = tf.data
tfer = tf.errors
tfe = tf.estimator
tfi = tf.initializers
tfk = tf.keras
tfla = tf.layers
tflog = tf.logging
tflo = tf.losses
tft = tf.train

# Convenience imports for those files running this one
import collections as co
import functools as ft
import itertools as it

In [7]:
### Grid hyperparameters
# Everything we do is on a grid

# The separation between points of the fine grid
fine_grid_sep = tools.Object(t=0.01, x=0.01)
# The separation between points of the coarse grid
coarse_grid_sep = tools.Object(t=0.1, x=0.1)
# The amount of intervals in the coarse grid. Thus the coarse grid will contain
# (num_intervals.t + 1) * (num_intervals.x + 1) elements.
# So with num_intervals.t = 3, num_intervals.x = 3, it looks like:
#
# @ @ @ @
#
# @ @ @ @
#
# @ @ @ @
#
# @ @ @ @
num_intervals = tools.Object(t=7, x=7)


fine_grid_fineness = tools.Object(t=int(coarse_grid_sep.t // fine_grid_sep.t), 
                                  x=int(coarse_grid_sep.x // fine_grid_sep.x))
coarse_grid_size = tools.Object(t=num_intervals.t * coarse_grid_sep.t,
                                x=num_intervals.x * coarse_grid_sep.x)


### Grids to evaluate our solution on

def grid(point, grid_size, grid_fineness):
    """Creates a grid whose bottom left entry is at the specified :point:
    location. The size of the overall grid may be specified via :grid_size:, and
    the fineness of the subdivision by :grid_fineness:, both of which should be
    of the form tools.Object(t, x). Thus the resulting grid has
    (grid_fineness.t + 1) * (grid_fineness.x + 1) elements."""
    t, x = point
    return [(t_, x_) for t_ in np.linspace(t, t + grid_size.t, 
                                           grid_fineness.t + 1)
                     for x_ in np.linspace(x, x + grid_size.x, 
                                           grid_fineness.x + 1)]

def fine_grid(point):
    """Creates a fine grid whose bottom left entry is at the specified :point:
    location, with size and fineness determined by the earlier hyperparameters.
    """
    return grid(point, coarse_grid_sep, fine_grid_fineness)

def coarse_grid(point):
    """Creates a coarse grid for which the bottom left entry of its middle
    square is as the specified :t:, :x: location, with size and fineness
    determined by the earlier hyperparameters.
    """
    left_intervals_t = np.floor((num_intervals.t - 1) / 2)
    left_intervals_x = np.floor((num_intervals.x - 1) / 2)
    
    left_amount_t = left_intervals_t * coarse_grid_sep.t
    left_amount_x = left_intervals_x * coarse_grid_sep.x
    
    t, x = point
    bottomleft_point = (t - left_amount_t, x - left_amount_x)
    return grid(bottomleft_point, coarse_grid_size, num_intervals)

In [8]:
### Data generation

# These next four functions may be combined to generate (feature, label)
# pairs, e.g.
# >>> point, solution = gen_one_peakon()
# >>> X, y = sol_on_grid(point, solution)
def gen_one_peakon():
    """Returns a random peakon and a random location."""
    # Random solution to the CH equation
    c = np.random.uniform(3, 10)
    peakon = Peakon(c=c)
    # Random location near the peak
    t = np.random.uniform(0, 10)
    x = c * t + np.random.uniform(-2, 2)
    return (t, x), peakon

def gen_two_peakon():
    """Returns a random two peakon solution, and a random location."""
    # Random solution to the CH equation
    p1 = np.random.uniform(3, 10)
    p2 = np.random.uniform(3, 10)
    x1 = np.random.uniform(0, 3)
    x2 = np.random.uniform(3.001, 6)
    twopeakon = TwoPeakon(x1, x2, p1, p2)
    # Random location near both of the peaks
    t = np.random.uniform(0, 0.5)
    left = min(x1 - 0.5 + p1 * t, x2 - 0.5 + p2 * t)
    right = max(x1 + 0.5 + p1 * t, x2 + 0.5 + p2 * t)
    middle = (right + left) / 2
    semidist = (right - left) / 2
    x = middle + semidist * np.random.uniform(-1, 1) ** 3
    return (t, x), twopeakon

def sol_on_grid(point, solution):
    """Returns the values of the :solution: on fine and coarse grids around the
    specified :point:.
    """
    # Grids at the location
    cg = coarse_grid(point)
    fg = fine_grid(point)
    # Features: the solution on the coarse grid
    X = solution.on_grid(cg)
    # Labels: the solution on the fine grid
    y = solution.on_grid(fg)
    return X, y

def sol_at_point(point, solution):
    """Returns the values of the :solution: on a coarse grid and at a random
    point near the specified :point:.
    """
    
    cg = coarse_grid(point)
    
    # Random offset from the random location that we ask for predictions at. The
    # distribution is asymmetric because we're moving relative to :point:, which
    # is in the _bottom left_ of the central cell of the coarse grid. The asymmetric
    # distribution thus makes this relative to te centre of the central cell.
    #
    # This value is not scaled relative to the size of the grid as we expect
    # that the predictions should be scale invariant, and we do not want the
    # network to unnecessarily learn the size of coarse_grid_sep.
    x_offset = np.random.uniform(-0.5, 1.5)
    t_offset = np.random.uniform(-0.5, 1.5)
    
    # Features: the solution on the coarse grid and the point to interpolate at.
    X = solution.on_grid(cg, extra=2)
    # We tell the network the offset; as the network has no way of knowing the
    # location of the grid then adding a translation would only confuse it.
    X[-2] = t_offset - 0.5  # -0.5 to normalise
    X[-1] = x_offset - 0.5  # -0.5 to normalise
    
    t, x = point
    # Label: the solution at the interpolation point
    y = np.full(1, peakon((t + t_offset * coarse_grid_sep.t, 
                           x + x_offset * coarse_grid_sep.x)))
    
    return X, y

# A particularly nice X, y that is right on the peak of the peakon
X_peak = np.array([0.71136994, 0.64367414, 0.58242045, 0.52699581, 0.47684553,
                   0.43146768, 0.3904081 , 0.35325586, 1.53965685, 1.39313912,
                   1.26056441, 1.14060584, 1.03206285, 0.93384908, 0.84498159,
                   0.76457096, 3.33236346, 3.01524715, 2.72830845, 2.46867557,
                   2.23375003, 2.02118061, 1.82883985, 1.65480272, 7.21241639,
                   6.52606422, 5.9050271 , 5.34308947, 4.83462728, 4.37455167,
                   3.95825804, 3.58157998, 3.81911647, 4.22077645, 4.66467938,
                   5.155268  , 5.69745227, 6.29665855, 6.95888391, 7.69075612,
                   1.76455206, 1.95013162, 2.15522875, 2.38189614, 2.63240234,
                   2.90925451, 3.21522348, 3.55337148, 0.81527861, 0.90102221,
                   0.99578354, 1.10051101, 1.21625277, 1.34416719, 1.48553448,
                   1.64176951, 0.37668439, 0.41630063, 0.46008335, 0.50847074,
                   0.56194707, 0.62104756, 0.68636371, 0.75854921])
y_peak = np.array([5.34308947, 5.28992485, 5.23728921, 5.18517732, 5.13358394,
                   5.08250393, 5.03193217, 4.98186361, 4.93229323, 4.8832161 ,
                   4.83462728, 5.77198627, 5.71455405, 5.65769329, 5.6013983 ,
                   5.54566345, 5.49048318, 5.43585196, 5.38176433, 5.32821488,
                   5.27519826, 5.22270916, 6.23531118, 6.1732688 , 6.11184375,
                   6.05102988, 5.99082113, 5.93121147, 5.87219493, 5.81376561,
                   5.75591767, 5.69864534, 5.64194287, 6.73582778, 6.66880518,
                   6.60244946, 6.53675399, 6.4717122 , 6.40731759, 6.34356371,
                   6.2804442 , 6.21795273, 6.15608307, 6.09482902, 7.27652151,
                   7.20411891, 7.13243673, 7.0614678 , 6.99120502, 6.92164137,
                   6.85276989, 6.78458369, 6.71707595, 6.65023993, 6.58406894,
                   7.58429925, 7.66052273, 7.70496678, 7.62830108, 7.55239822,
                   7.4772506 , 7.40285071, 7.32919112, 7.25626445, 7.18406341,
                   7.11258079, 7.0207356 , 7.09129517, 7.16256387, 7.23454883,
                   7.30725726, 7.38069641, 7.45487364, 7.52979637, 7.60547208,
                   7.68190835, 7.68351697, 6.49904846, 6.56436498, 6.63033795,
                   6.69697395, 6.76427966, 6.8322618 , 6.90092717, 6.97028264,
                   7.04033515, 7.11109169, 7.18255935, 6.01612613, 6.0765892 ,
                   6.13765994, 6.19934444, 6.26164889, 6.32457951, 6.38814259,
                   6.45234449, 6.51719163, 6.58269049, 6.64884763, 5.56908812,
                   5.62505839, 5.68159116, 5.7386921 , 5.79636692, 5.85462137,
                   5.9134613 , 5.97289257, 6.03292114, 6.093553  , 6.15479423,
                   5.155268  , 5.2070793 , 5.25941132, 5.31226928, 5.36565848,
                   5.41958424, 5.47405197, 5.5290671 , 5.58463515, 5.64076167,
                   5.69745227])


# Here begins hackery.
def _gen_one_data(_, gen_one_data):
    # See the comments in BatchData.from_func
    return gen_one_data()


class BatchData:
    """Wrapper around tf.data.Dataset."""
    
    @staticmethod
    def from_func(gen_one_data, batch_size=1):
        """Takes a function :gen_one_data: which returns a generator and a
        :batch_size:, and returns a function which returns a tf.data.Dataset 
        producing batches of that size.
        """
        
        # Call the function once so we know what its size and type is.
        X, y = gen_one_data()
        Xdtype = X.dtype
        ydtype = y.dtype
        X_batch_shape = (batch_size, *X.shape)
        y_batch_shape = (batch_size, *y.shape)
        
        # So this function's implementation probably needs a little explaning.
        # Generating data is _slow_, because in general we have to use Python
        # to do so. So we have to use tf.py_func to feed that into TensorFlow,
        # which in turn only runs its function inside the one interpreter.
        # So in order to achieve speedup via multiprocessing, we have to do
        # multiprocessing the Python way rather than the TensorFlow way.
        pool = mp.Pool(processes=None)
        
        # Now we have this strange looking partial of a global function. When
        # we come to generate our data later, this is the function that we'll
        # be calling. It's a global function because the default Python
        # multiprocessing package is only capable of pickling top-level
        # functions. It then has to be a partial of this function so that we
        # can pass it gen_one_data, i.e. the function that we're actually
        # calling. The redundant _ argument in _gen_one_data is the the value
        # from the iterable (defined next), which is necessary to tell the
        # multiprocessing map how many times we want to call the function.
        gen_data = ft.partial(_gen_one_data, gen_one_data=gen_one_data)
        # We've defined both gen_data and batch_list here so that
        # gen_batch_data, below, doesn't need to recreate them each time.
        # Although to be honest this is probably one of those tiny unnoticable
        # speedups.
        batch_list = list(range(batch_size))
        
        # Next we wrap everything else in a _wrapper function, because it is
        # creating Tensorflow's Datasets, and this has to be done inside the
        # session, which doesn't start until we're inside the Estimator.
        # (As a side note, there's no particular reason to put the above three 
        # lines of code either inside or outside of this wrapper; they happen 
        # to be outside.)
        def _wrapper():
            # Now we vectorize our data generation. Note that we have to do
            # this here (and not via the Dataset.batch method), because we're
            # doing Python multiprocessing, not TensorFlow multiprocessing:
            # and we're using multiprocessing to generate multiple elements
            # of a batch simulataneously.
            def gen_batch_data():
                results = pool.map(gen_data, batch_list)
                X_batch = np.empty(X_batch_shape, dtype=Xdtype)
                y_batch = np.empty(y_batch_shape, dtype=ydtype)
                X_batch[:], y_batch[:] = zip(*results)
                return X_batch, y_batch
            
            # Now wrap gen_batch_data to turn it into a generator.
            def generator():
                while True:
                    yield gen_batch_data()
            
            # And finally produces our Dataset from this generator. Note that
            # each element of this Dataset is itself a batch; we don't need to
            # batch again.
            ds = tfd.Dataset.from_generator(generator, (Xdtype, ydtype),
                                            (X_batch_shape, y_batch_shape))
            
            return ds
        return _wrapper

    @staticmethod
    def to_dataset(data):
        """Returns a tf.data.Dataset which endlessly repeats :data:."""
        # Lambda wrapper is because in order to be part of the same graph as
        # the DNN, it has to be called later on.
        return lambda: tfd.Dataset.from_tensors(data).repeat()
    
    @staticmethod
    def batch(gen_one_data, batch_size=1):
        """Takes a function :gen_one_data: which returns a generator and a
        :batch_size:, and returns a batch of that size.
        """
        X_batch = []
        y_batch = []
        for _ in range(batch_size):
            X, y = gen_one_data()
            X_batch.append(X)
            y_batch.append(y)
        return (np.array(X_batch), np.array(y_batch))

In [9]:
### Data preprocessing

class _Processor(tools.SubclassTrackerMixin('__name__')):
    """Base class for preprocessors."""
    
    save_attr = []
    checkpoint_filename = 'processor-checkpoint.ckpt'
    
    def __init__(self, training=True, **kwargs):
        self._training = training
        super(_Processor, self).__init__(**kwargs)
        
    def init(self):
        """Initialises TensorFlow variables."""
        self._saver = tft.Saver([getattr(self, name + '_tf') 
                                 for name in self.save_attr], 
                                allow_empty=True)
        
    def training(self, val):
        """Provides a context to set the training variable to :val:."""
        return tools.set_context_variables(self, ('_training',), val)
    
    def transform(self, X, y):
        """Processes the data."""
        # Note that y may be None during prediction; make sure transform is
        # appropriately defined.
        raise NotImplementedError
        
    def inverse_transform(self, y):
        """Performs the inverse transform on the data."""
        raise NotImplementedError
    
    def save(self, session, step, model_dir):
        """Saves the processor to a file in the directory :model_dir:. The argument
        :step: is logged out to specify at what global step this was performed."""
        file_loc = model_dir + '/processor/' + self.checkpoint_filename
        self._saver.save(session, file_loc, global_step=step)
        self._sync_variables(session)
        tflog.info('Saving processor checkpoint for {} into {}'.format(step, file_loc))
        
    def load(self, session, model_dir):
        """Sets the processor's variables to what is specified in the save file
        located in the directory :model_dir:.
        """
        file_loc = model_dir + '/processor/' + self.checkpoint_filename
        try:
            self._saver.restore(session, file_loc)
        except (tfer.NotFoundError, tfer.InvalidArgumentError):
            # NotFoundError caused by the file not existing.
            # InvalidArgumentError caused by the folder not existing.
            tflog.info("No processor checkpoint file {} found.".format(file_loc))
        else:
            tflog.info("Restoring processor parameters from {}".format(file_loc))
            self._sync_variables(session)
            
    def _sync_variables(self, session):
        """We have variables both as TensorFlow Variables, and as regular python
        variables. In general we'll want to access the values both during a
        TensorFlow Session, and after it has been run. So for simplicity we keep
        two sets of variables and sync between them."""
        
        for name in self.save_attr:
            tf_variable = getattr(self, name + '_tf')
            value = tf_variable.eval(session=session)
            setattr(self, name, value)
            
            
    
class IdentityProcessor(_Processor):
    """Performs no processing."""
    
    def transform(self, X, y):
        return X, y
    
    def inverse_transform(self, y):
        return y
  
    
class ScaleOverall(_Processor):
    """Scales data to between -1 and 1. Scaling is done across all batches."""
    
    save_attr = ['mean', 'extent', 'momentum', '_started']
    
    def __init__(self, momentum=0.99, **kwargs):
        self.momentum = momentum
        self.mean = 0.0
        self.extent = 1.0
        self._started = False
        super(ScaleDataOverall, self).__init__(**kwargs)
        
    def init(self):
        self.momentum_tf = tf.Variable(self.momentum, trainable=False, dtype=tf.float64)
        self.mean_tf = tf.Variable(self.mean, trainable=False, dtype=tf.float64)
        self.extent_tf = tf.Variable(self.extent, trainable=False, dtype=tf.float64)
        self._started_tf = tf.Variable(self._started, trainable=False)
        super(NormalisationOverall, self).init()
        
    def transform(self, X, y):
        def first_time():
            self._started_tf.assign(True)
            mean = tf.reduce_mean(X)
            extent = tf.reduce_max(tf.abs(X - mean))
            self.mean_tf.assign(X_mean)
            self.extent_tf.assign(X_extent)
        
        def later_times():
            mean = tf.reduce_mean(X)
            extent = tf.reduce_max(tf.abs(X - mean))
            self.mean_tf.assign(self.mean_tf * self.momentum_tf + mean * (1 - self.momentum_tf))
            self.extent_tf.assign(self.extent_tf * self.momentum_tf + extent * (1 - self.momentum_tf))
        
        if self._training:
            mean, extent = tf.cond(tf.equal(self._started_tf, False), first_time, later_times)
        else:
            mean, extent = self.mean_tf, self.extent_tf
        
        X_scaled = (X - mean) / extent
        y_scaled =  None if y is None else (y - mean) / extent
        return X_scaled, y_scaled
    
    def inverse_transform(self, y):
        return (y * self.extent) + self.mean
    
    
class NormalisationOverall(_Processor):
    """Normalises inputs by subtracting mean and dividing by standard deviation.
    Scaling is done across all batches.
    """
    
    save_attr = ['mean', 'stddev', 'momentum', '_started']
    
    def __init__(self, momentum=0.99, **kwargs):
        self.momentum = momentum
        self.mean = 0.0
        self.stddev = 1.0
        self._started = False
        super(NormalisationOverall, self).__init__(**kwargs)
        
    def init(self):
        self.momentum_tf = tf.Variable(self.momentum, trainable=False, dtype=tf.float64)
        self.mean_tf = tf.Variable(self.mean, trainable=False, dtype=tf.float64)
        self.stddev_tf = tf.Variable(self.stddev, trainable=False, dtype=tf.float64)
        self._started_tf = tf.Variable(self._started, trainable=False)
        super(NormalisationOverall, self).init()
        
    def transform(self, X, y):
        def first_time():
            self._started_tf.assign(True)
            mean = tf.reduce_mean(X)
            stddev = tf.sqrt(tf.reduce_mean(tf.square(X - mean)))
            m = self.mean_tf.assign(mean)
            s = self.stddev_tf.assign(stddev)
            return m, s
        
        def later_times():
            mean = tf.reduce_mean(X)
            stddev = tf.sqrt(tf.reduce_mean(tf.square(X - mean)))
            m = self.mean_tf.assign(self.mean_tf * self.momentum_tf + mean * (1 - self.momentum_tf))
            s = self.stddev_tf.assign(self.stddev_tf * self.momentum_tf + stddev * (1 - self.momentum_tf))
            return m, s
        
        if self._training:
            mean, stddev = tf.cond(tf.equal(self._started_tf, False), first_time, later_times)
        else:
            mean, stddev = self.mean_tf, self.stddev_tf
            mean = tf.Print(mean, [mean], 'mean: ')
            stddev = tf.Print(stddev, [stddev], 'stddev: ')
            
        X_scaled = (X - mean) / stddev
        y_scaled = None if y is None else (y - mean) / stddev
        return X_scaled, y_scaled
    
    def inverse_transform(self, y):
        return (y * self.stddev) + self.mean
    
    
### Hooks

class _ProcessorSavingHook(tft.SessionRunHook):
    """Saves the processor data."""
    # Adapted from the source code for tf.train.CheckpointSaverHook
    
    def __init__(self, processor, model_dir, save_secs=600, 
                 save_steps=None, **kwargs):
        self.processor = processor
        self.model_dir = model_dir
        self._timer = tft.SecondOrStepTimer(every_secs=save_secs,
                                            every_steps=save_steps)
        self._global_step_tensor = None
        super(_ProcessorSavingHook, self).__init__(**kwargs)
    
    def begin(self):
        self._global_step_tensor = tft.get_global_step()
        
    def after_create_session(self, session, coord):
        global_step = session.run(self._global_step_tensor)
        self._save(session, global_step)
        self._timer.update_last_triggered_step(global_step)
        
    def before_run(self, run_context):
        return tft.SessionRunArgs(self._global_step_tensor)
        
    def after_run(self, run_context, run_values):
        stale_global_step = run_values.results
        if self._timer.should_trigger_for_step(stale_global_step + 1):
            global_step = run_context.session.run(self._global_step_tensor)
            if self._timer.should_trigger_for_step(global_step):
                self._timer.update_last_triggered_step(global_step)
                self._save(run_context.session, global_step)
            
    def end(self, session):
        last_step = session.run(self._global_step_tensor)
        if last_step != self._timer.last_triggered_step():
            self._save(session, last_step)
        
    def _save(self, session, step):
        self.processor.save(session, step, self.model_dir)

In [21]:
### DNN Construction via code

# Keras-inspired nice interface, just without the slow speed and lack of 
# multicore functionality of Keras...
# (Plus it allows us to integrate our preprocessing)

class Sequential:
    """Defines a neural network. Expected usage is roughly:
    
    >>> model = Sequential()
    >>> model.add(tf.layers.Dense(units=100, activation=tf.nn.relu))
    >>> model.add_train(tf.layers.Dropout(rate=0.4))
    >>> model.add(tf.layers.Dense(units=50, activation=tf.nn.relu))
    >>> model.add_train(tf.layers.Dropout(rate=0.4))
    >>> model.add(tf.layers.Dense(units=10, activation=tf.nn.relu))
    
    to define the neural network in the abstract (note that the last dense layer
    are treated as the logits), followed by:
    
    >>> dnn = model.compile()
    
    to actually create it in TensorFlow. Here, 'dnn' is a tf.Estimator, so may
    be used like:
    
    >>> dnn.train(...)
    >>> dnn.predict(...)
    >>> dnn.evaluate(...)
    """
    
    def __init__(self):
        """Creates a Sequential. See Sequential.__doc__ for more info."""
        self._layer_funcs = []
        self._layer_train = []
        
    def add(self, layer):
        """Add a layer to the network.
        """
        self._layer_funcs.append(layer)
        self._layer_train.append(False)
        
    def add_train(self, layer):
        """Add a layer to the network which needs to know if the network is in
        training or not.
        """
        self.add(layer)
        self._layer_train[-1] = True
        
    def compile(self, optimizer=None, loss_fn=tflo.mean_squared_error, 
                model_dir=None, gradient_clip=None, processor=None, **kwargs):
        """Takes its abstract neural network definition and compiles it into a
        tf.estimator.Estimator.
        
        May be given an :optimizer:, defaulting to tf.train.AdamOptimizer().
        May be given a :loss_fn:, defaulting to tf.losses.mean_squared_error.
        May be given a :gradient_clip:, defaulting to no clipping.
        May be given a :processor:, which will be saved and loaded.
        
        Any additional kwargs are passed into the creation of the
        tf.estimator.Estimator.
        """
        
        # Probably shouldn't use the same optimizer instance every time? Hence
        # this.
        if optimizer is None:
            optimizer = tft.AdamOptimizer()
            
        if processor is None:
            processor = IdentityProcessor()
            
        def model_fn(features, labels, mode):
            # Create processor variables
            processor.init()
            processor.load()
            
            # Will be called once the session starts up, in order to load any
            # existing saved values for the processor variables
#             def init_fn():
#                 if model_dir is not None:
#                     processor.load(tf.get_default_session(), model_dir)
#                 return tf.constant(True)
#             scaffold = tft.Scaffold(local_init_op=tf.py_func(init_fn, [], tf.bool))
            
            # Apply any preprocessing to the features and labels
            features, labels = processor.transform(features, labels)
            
            # First layer is the feature inputs.
            layers = [features]
            
            for prev_layer, layer_func, train in zip(layers, self._layer_funcs, 
                                                     self._layer_train):
                if train:
                    layer = layer_func(inputs=prev_layer, 
                                       training=mode == tfe.ModeKeys.TRAIN)
                else:
                    layer = layer_func(inputs=prev_layer)
                    
                # Deliberately using the generator nature of zip to add elements
                # to the layers list as we're iterating through it.
                # https://media.giphy.com/media/3oz8xtBx06mcZWoNJm/giphy.gif
                layers.append(layer)
                
            logits = layers[-1]
            
            if mode == tfe.ModeKeys.PREDICT:
                return tfe.EstimatorSpec(mode=mode, predictions=logits, scaffold=scaffold)
            
            loss = loss_fn(labels, logits)

            if mode == tfe.ModeKeys.TRAIN:
                g_step = tft.get_global_step()
                if gradient_clip is None:
                    train_op = optimizer.minimize(loss=loss, global_step=g_step)
                else:
                    # Perform Gradient clipping
                    update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
                    with tf.control_dependencies(update_ops):
                        gradients, variables = zip(*optimizer.compute_gradients(loss))
#                         gradients0 = tf.Print(gradients[0], [tf.global_norm(gradients)], 'Global norm: ')
#                         gradients = tuple([gradients0, *gradients[1:]])
                        gradients, _ = tf.clip_by_global_norm(gradients, 
                                                              gradient_clip)
                        train_op = optimizer.apply_gradients(zip(gradients, 
                                                                 variables),
                                                             global_step=g_step)
                training_hooks = [] if model_dir is None else [_ProcessorSavingHook(processor, model_dir)]
                return tfe.EstimatorSpec(mode=mode, loss=loss, train_op=train_op,
                                         scaffold=scaffold,
                                         training_hooks=training_hooks)
            
            if mode == tfe.ModeKeys.EVAL:
                return tfe.EstimatorSpec(mode=mode, loss=loss, scaffold=scaffold)
            
            raise RuntimeError("mode '{}' not understood".format(mode))
                
        return tfe.Estimator(model_fn=model_fn, model_dir=model_dir, **kwargs)
    
    
def model_dir_str(model_dir, hidden_units, logits, processor, activation, 
                  uuid=None):
    """Returns a string for the model directory describing the network.
    """
    
    layer_counter = [(k, sum(1 for _ in g)) for k, g in it.groupby(hidden_units)]
    for layer_size, layer_repeat in layer_counter:
        if layer_repeat == 1:
            model_dir += '{}_'.format(layer_size)
        else:
            model_dir += '{}x{}_'.format(layer_size, layer_repeat)
    model_dir += '{}__'.format(logits)
    model_dir += processor.__class__.__name__
    
    if isinstance(activation, ft.partial):
        activation_fn = activation.func
        alpha = str(activation.keywords['alpha']).replace('.', '')
    else:
        activation_fn = activation
        alpha = '02'
        
    model_dir += '_' + activation_fn.__name__.replace('_', '')
    if activation_fn is tf.nn.leaky_relu:
        model_dir += alpha

    if uuid not in (None, ''):
        model_dir += '_' + str(uuid)
    return model_dir

In [21]:
### DNN construction from folder

def _dnn_hyperparameters_from_dir(dir_name):
    """Creates DNN hyperparameters from the name of the directory of the DNN.
    """
    
    dnn_details = {}

    units, rest = dir_name.split('__')
    units = units.split('_')
    rest = rest.split('_')
    
    all_units = []
    for unit in units:
        if 'x' in unit:
            unit_size, unit_repeat = unit.split('x')
            unit_size, unit_repeat = int(unit_size), int(unit_repeat)
            all_units.extend([unit_size for _ in range(unit_repeat)])
        else:
            all_units.append(int(unit))
    dnn_details['hidden_units'] = all_units[:-1]
    dnn_details['logits'] = all_units[-1]
    
    processor_name = rest[0]
    processor_class = _Processor.find_subclass(processor_name)
    dnn_details['processor'] = processor_class()
    dnn_details['batch_norm'] = False
    
    activation_name = rest[1].lower()
    
    # Not a great way to do this inversion, admittedly
    if activation_name[:9] == 'leakyrelu':
        alpha = float(str(activation_name[9]) + '.' + str(activation_name[10:]))
        dnn_details['activation'] = ft.partial(tf.nn.leaky_relu, alpha=alpha)
    else:
        try:
            activation_fn = getattr(tf.nn, activation_name)
        except AttributeError:
            raise RuntimeError("Activation '{}' not understood.".format(activation_name))
        else:
            dnn_details['activation'] = activation_fn
        
    remaining = rest[2:]
    if len(remaining) == 0:
        uuid = None
    elif len(remaining) == 1:
        uuid = remaining[0]
    else:
        raise RuntimeError("Bad dir_name string '{}'. Too many remaining "
                           "arguments: {}".format(dir_name, remaining))
        
    return dnn_details, uuid


def dnn_factory_from_model_dir(model_dir, **kwargs):
    """Creates a DNN from the :model_dir: argument. Any additional keyword
    arguments provided override the details of the DNN found."""
    
    if model_dir[-1] in ('/', '\\'):
        model_dir = model_dir[:-1]
    model_dir_split = tools.split(['/', '\\'], model_dir)
    dir_name = model_dir_split[-1]
    # I suspect that we should be able to restore the DNN just from the
    # information saved in the model directory, without needing to know
    # its structure from the directory name...
    dnn_details, uuid = _dnn_hyperparameters_from_dir(dir_name)
    dnn_details.update(kwargs)
    dnn_factory = DNNFactory(model_dir=model_dir, **dnn_details)
    return dnn_factory


def dnn_factories_from_dir(dir_, exclude_start=('.',), exclude_end=(), 
                           exclude_in=(), **kwargs):
    """Creates multiple DNNs and processors from a directory containing the
    directories for multiple DNNs and processors.
    
    Its arguments :exclude_start:, :exclude_end:, :exclude_in: are each
    tuples which allow for excluding particular models, if their model 
    directories start, end, or include any of the strings specified
    in each tuple respectively.
    
    Essentially just a wrapper around dnn_factory_from_model_dir, to run it
    multiple times. It will forward any additional keyword arguments onto
    each call of dnn_factory_from_model_dir.
    """
    
    subdirectories = sorted(next(os.walk(dir_))[1])
    if dir_[-1] in ('/', '\\'):
        dir_ = dir_[:-1]
    dnn_factories = []
    names = []
    
    for subdir in subdirectories:
        if any(subdir.startswith(ex) for ex in exclude_start):
            tflog.info("Excluding '{}' based on start.".format(subdir))
            continue
        if any(subdir.endswith(ex) for ex in exclude_end):
            tflog.info("Excluding '{}' based on end.".format(subdir))
            continue
        if any(ex in subdir for ex in exclude_in):
            tflog.info("Excluding '{}' based on containment.".format(subdir))
            continue
            
        model_dir = dir_ + '/' + subdir
        try:
            dnn_factory = dnn_factory_from_model_dir(model_dir, **kwargs)
        except (FileNotFoundError, RuntimeError) as e:
            tflog.info("Could not load DNN from '{}'. Error message: '{}'"
                       .format(subdir, e))
        else:
            dnn_factories.append(dnn_factory)
            names.append(subdir)
            
    return dnn_factories, names

In [9]:
### Simpler interpolation methods
# Useful to give a baseline to compare the neural network models against.

class _InterpolatorBase:
    """Base class for performing predictions based on just the input. Subclasses
    are expected to provide a predict_single classmethod specifying their
    predictions.
    
    Its predict and evaluate methods are designed to resemble that of
    tf.estimator.Estimator's, so that we can call them in the same way. (We don't
    actually inherit from tf.estimator.Estimator because none of what these 
    classes use TensorFlow, so messing around with model functions and 
    EstimatorSpecs is just unnecessary faff and overhead.)
    
    WARNING: All subclasses must expect no preprocessing, i.e. must use
    IdentityProcessor() as their preprocessing. This is because preprocessing is
    done in TensorFlow, which, of course, this class is explicitly about not 
    using... if any preprocessing is necessary then subclasses must implement it
    themselves as part of their predict_single method.
    """
    
    @staticmethod
    def _index_tol(cg, point, tol=0.001):
        """Searches through a list of 2-tuples, :cg:, to find the first element 
        which is within tolerance :tol: of :point:. Essentially the index method
        for lists, except this one makes sense for high precision floating point
        numbers.
        """
        
        t, x = point
        for i, element in enumerate(cg):
            t2, x2 = element
            if max(np.abs(t - t2), np.abs(x - x2)) < tol:
                return i
        raise ValueError('{} is not in {}'.format(point, type(cg)))
        
    def _prepare(self, Xi):
        """Performs any necessary preparations on the data :Xi: before making 
        predictions.
        """
        pass
    
    def _interp(self, Xi, point):
        """Helper function for performing interpolation on a coarse
        grid :Xi:, giving the value of the interpolation at :point:.
        
        The spacing of the grid is known from the global hyperparameters
        defining the coarse grid size, whilst it isn't necessary to know its
        location.
        
        The argument :point: should be scaled to the grid size, i.e.
        coarse_grid_sep.
        """
        raise NotImplementedError
    
    def predict_single(self, Xi, y):
        """Makes a prediction corresponding to input feature :Xi:.
        
        It is given the true result :y:. Not to cheat and return perfect
        results, but to determine its shape etc.
        """
        raise NotImplementedError
    
    def predict(self, input_fn, yield_single_examples=False):
        """The argument :input_fn: should probably be a lambda wrapper around
        the result of BatchData.test.
        
        The argument :yield_single_examples: is there for compatibility with the
        interface for the usual TF Estimators and is ignored.
        """
        
        returnval = []
        X, y = input_fn()
        
        for Xi in X:
            returnval.append(self.predict_single(Xi, y))
            
        returnval = np.array(returnval)
        while True:
            yield returnval


class BilinearInterpMixin(_InterpolatorBase):
    """Mixin to help perform bilinear interpolation."""
        
    def _interp(self, Xi, point):        
        # The actual t, x values for the grid don't matter from this point 
        # onwards; so this is just a translation from wherever X was actually 
        # calculated. So WLOG assume it was around 0.
        cg = coarse_grid((0, 0))
        t, x = point
        
        # The grid points nearest :point:.
        t_below = tools.round_mult(t, coarse_grid_sep.t, 'down')
        t_above = tools.round_mult(t, coarse_grid_sep.t, 'up')
        x_below = tools.round_mult(x, coarse_grid_sep.x, 'down')
        x_above = tools.round_mult(x, coarse_grid_sep.x, 'up')
        
        # The value of :Xi: at those grid points.
        t_b_x_b = Xi[self._index_tol(cg, (t_below, x_below))]
        t_a_x_b = Xi[self._index_tol(cg, (t_above, x_below))]
        t_b_x_a = Xi[self._index_tol(cg, (t_below, x_above))]
        t_a_x_a = Xi[self._index_tol(cg, (t_above, x_above))]
        
        # Shift the t, x values to be relative to the bottom-left point of the
        # grid square in which (t, x) lies.
        t_scale = (t % coarse_grid_sep.t) / coarse_grid_sep.t
        x_scale = (x % coarse_grid_sep.x) / coarse_grid_sep.x
        
        # Bilinear interpolation
        returnval = (1 - t_scale) * (1 - x_scale) * t_b_x_b
        returnval += t_scale * (1 - x_scale) * t_a_x_b
        returnval += (1 - t_scale) * x_scale * t_b_x_a
        returnval += t_scale * x_scale * t_a_x_a
        
        return returnval
    
    
class PolyInterpMixin(_InterpolatorBase):
    """Mixin to help perform polynomial interpolation."""
    
    def __init__(self, poly_deg, *args, **kwargs):
        self.poly_deg = poly_deg
        self._poly_coefs = None
        super(PolyInterpBase, self).__init__(*args, **kwargs)
        
    def poly(self, point):
        """Interprets its currently stored polynomial coefficients as a 
        polynomial, and evaluates them at the specified point."""
        
        if self._poly_coefs is None:
            raise RuntimeError('Must run _prepare first!')
        
        t, x = point
        coefs = iter(self._poly_coefs)

        result = next(coefs)  # Intercept, i.e. constant term
        for power in range(1, self.poly_deg + 1):
            for x_power in range(0, power + 1):
                t_power = power - x_power
                coef = next(coefs)
                result += coef * (t ** t_power) * (x ** x_power)
        try:
            next_coef = next(coefs)
        except StopIteration:
            return result
        else:
            raise RuntimeError('coef_: {coef_}, poly_deg: {poly_deg}, '
                               'coef that shouldn\'t exist: {next_coef}'
                               .format(coef_=coef_, 
                                       poly_deg=self.poly_deg, 
                                       next_coef=next_coef))
    
    def _prepare(self, Xi):
        poly_features = skpr.PolynomialFeatures(degree=self.poly_deg, 
                                                include_bias=True)
        lin_reg = sklm.LinearRegression(fit_intercept=False)
        poly_pipe = skpi.Pipeline([('pf', poly_features), ('lr', lin_reg)])
        
        # The actual t, x values for the grid don't matter from this point 
        # onwards; so this is just a translation from wherever X was actually 
        # calculated. So WLOG assume it was around 0.
        cg = coarse_grid((0, 0))
        poly_pipe.fit(cg, Xi)
        self._poly_coefs = poly_pipe.named_steps['lr'].coef_
        
    
    def _interp(self, Xi, point):
        return self.poly(point)
    
    
class NearestInterpMixin(_InterpolatorBase):
    """Mixin to help perform nearest-neighbour interpolation."""
        
    def _interp(self, Xi, point):        
        # The actual t, x values for the grid don't matter from this point 
        # onwards; so this is just a translation from wherever X was actually 
        # calculated. So WLOG assume it was around 0.
        cg = coarse_grid((0, 0))
        t, x = point
        
        # The grid point nearest :point:.
        t_nearest = tools.round_mult(t, coarse_grid_sep.t, 'round')
        x_nearest = tools.round_mult(x, coarse_grid_sep.x, 'round')
        
        # The value of :Xi: at those grid points.
        t_n_x_n = Xi[self._index_tol(cg, (t_nearest, x_nearest))]

        return t_n_x_n
    
    
class FineGridInterpolator(_InterpolatorBase):
    """Provides the predict_single function for predictions on a fine grid.
    
    Requires the _prepare and _interp methods provided by one of the mixins
    above."""
    
    def predict_single(self, Xi, y):
        returnval = []
        # Translation doesn't matter at this point so WLOG the fine grid is
        # around 0, 0. (cls._interp makes the same assumption; these assumptions
        # must be consistent)
        for point in fine_grid((0, 0)):
            self._prepare(Xi)
            returnval.append(self._interp(Xi, point))
        return returnval
    
    
class PointInterpolator(_InterpolatorBase):
    """Provides the predict_single function for predictions at a single point.
    
    Requires the _prepare and _interp methods provided by one of the mixins
    above."""
    
    def predict_single(self, Xi):
        # Separate the location data and the grid data
        t_offset = Xi[-2]
        x_offset = Xi[-1]
        Xi = Xi[:-2]
        self._prepare(Xi)
        # Wrapped in a list for consistency: this network just happens to only
        # be trying to predict a single label.
        return [self._interp(Xi, (t_offset * coarse_grid_sep.t, 
                                  x_offset * coarse_grid_sep.x))]
    
    
class Perfect(_InterpolatorBase):
    """Regressor that cheats to always give the perfect prediction."""
    
    def predict_single(self, Xi, y):
        return y

In [9]:
### Using regressors in ensembles

class RegressorAverager:
    """Regressor that averages the results of other regressors to make its
    prediction. It is capable of using both TensorFlow-based regressors
    and non-TensorFlow-based regressors as it expects regressor factories,
    and the difference between them is handled by the factory wrapper.
    """
    
    def __init__(self, regressor_factories=None, mask=None, **kwargs):
        """Should be passed an iterable of :regressor_factories:. It will
        make predictions according to their average.
        
        May also pass a :mask: argument, which should be a tuple of bools
        the same length as the number of regressors, specifying whether or
        not particular regressor should be used when making predictions.
        """
        self.regressor_factories = regressor_factories
        self.mask = None
        self.reset_mask()
        if mask is not None:
            self.set_mask(mask)
        super(RegressorAverager, self).__init__(**kwargs)
        
    def set_mask(self, mask):
        assert len(mask) == len(self.regressors_with_pro)
        self.mask = tuple(mask)
        return self  # for chaining
        
    def reset_mask(self):
        self.mask = [True for _ in range(len(self.regressors_with_pro))]
        return self  # for chaining
        
    def predict(self, input_fn, *args, **kwargs):
        X, y = input_fn()
        
        returnval = tools.AddBase()
        counter = 0
        for regressor_factory, mask in zip(self.regressor_factories, self.mask):
            if mask:
                counter += 1
                returnval += _eval_regressor(regressor_factory, X, y).prediction
        returnval = returnval / counter
        
        while True:
            yield returnval

In [9]:
### Regressor factories
# TensorFlow demands that its Estimators be rebuilt before each train(...)
# etc. call, so DNNFactory is necessary to construct it each time.
# For a consistent interface, RegressorFactory is also provided to wrap
# around non-TensorFlow regressors.

class _RegressorFactoryBase:
    """Defines the interface for factories which make regressors, i.e. DNNs
    or simple interpolators.
    """
    # No good way to set abstract instance attributes (even with @property)
    # so we just list them in comments here. (Best way would probably be to
    # check for the attributes existence in the metaclass' __call__, which
    # is easily more faff than it's worth.)
    
    # Instances should have a 'processor' attribute returning the processor
    # for preprocessing input data to the regressor
    
    # Instances should have a 'use_tf' attribute returning True or False
    # for whether the regressor uses TensorFlow
    
    def __call__(self):
        # Should return the regressor itself
        raise NotImplementedError
        
            
class DNNFactory(_RegressorFactoryBase):
    """Shortcut for creating a simple DNN with dense, dropout and batch 
    normalization layers, and then compiling it.
    
    The reason its __call__ function has this class wrapper is because of
    how TensorFlow operates: the DNN needs to be recreated before every
    train(...), predict(...) or evaluate(...) call, so it is convenient to
    cache the hyperparameters for the DNN in this class, and simply call
    the class before each train/predict/evaluate call.
    """
    
    def __init__(self, hidden_units, logits, processor=None, 
                 activation=tf.nn.relu, drop_rate=0.0, 
                 drop_type='dropout', model_dir=None, log_steps=100,
                 gradient_clip=None, batch_norm=False, 
                 kernel_initializer=tfi.truncated_normal(mean=0, stddev=0.05),
                 compile_kwargs = None,
                 **kwargs):
        self.hidden_units = hidden_units
        self.logits = logits
        self.processor = processor
        self.activation = activation
        self.drop_rate = drop_rate
        self.drop_type = drop_type
        self.model_dir = model_dir
        self.log_steps = log_steps
        self.gradient_clip = gradient_clip
        self.batch_norm = batch_norm
        self.kernel_initializer = kernel_initializer
        self.compile_kwargs = {} if compile_kwargs is None else compile_kwargs
        self.use_tf = True
        super(DNNFactory, self).__init__(**kwargs)

    def __call__(self):
        model = Sequential()
        if self.batch_norm:
            model.add_train(tfla.BatchNormalization())
        for units in self.hidden_units:
            model.add(tfla.Dense(units=units, activation=self.activation,
                                 kernel_initializer=self.kernel_initializer))
            if self.batch_norm:
                model.add_train(tfla.BatchNormalization())
            if self.drop_rate != 0:
                if self.drop_type in ('normal', 'dropout'):
                    model.add_train(tfla.Dropout(rate=self.drop_rate))
                elif self.drop_type in ('alpha', 'alpha_dropout'):
                    model.add_train(tfk.layers.AlphaDropout(rate=self.drop_rate))
        model.add(tf.layers.Dense(units=self.logits, 
                                  kernel_initializer=self.kernel_initializer))

        return model.compile(gradient_clip=self.gradient_clip, 
                             processor=self.processor,
                             model_dir=self.model_dir,
                             config=tfe.RunConfig(log_step_count_steps=self.log_steps),
                             **self.compile_kwargs)
    
    
class RegressorFactory(_RegressorFactoryBase):
    """Factory wrapper around any regressor which doesn't use TensorFlow; i.e.
    the interpolators above, or RegressorAverager (whether or not
    RegressorAverager itself is using TensorFlow-based regressors.)"""
    
    def __init__(self, interpolator, **kwargs):
        self.interpolator = interpolator
        self.processor = IdentityProcessor()
        self.use_tf = False
        super(RegressorFactory, self).__init__(**kwargs)
        
    def __call__(self):
        return self.interpolator

In [None]:
### Testing and evaluating regressors

def _eval_regressor(regressor_factory, X, y):
    """Evaluates a regressor on some test data :X:, :y:.
    """
    
    regressor = regressor_factory()
    processor = regressor_factory.processor
    use_tf = regressor_factory.use_tf
    
    if use_tf:
        data_func = BatchData.to_dataset((X, y))
    else:
        data_func = lambda: (X, y)
    
    with processor.training(False):
        predictor = regressor.predict(input_fn=data_func,
                                      yield_single_examples=False)
        prediction_before_postprocessing = next(predictor)
        prediction = processor.inverse_transform(prediction_before_postprocessing)
        
    diff = prediction - y
    squared_error = np.square(diff)
    result = tools.Object(prediction=prediction,
                          X=X,
                          y=y,
                          diff=diff,
                          average_loss=np.mean(squared_error),
                          loss=np.sum(squared_error))
    return result

def _eval_regressors(regressor_factories, X, y):
    """Evaluates an iterable of regressors on some test data
    :X:, :y:."""
    results = []
    for regressor_factory in regressor_factories:
        result = _eval_regressor(regressor_factory, X, y)
        results.append(result)
        
    return results


def eval_regressor(regressor_factory, gen_one_data, batch_size=1):
    """Evaluates a regressor on some test data of size :batch_size:
    generated from :gen_one_data:.
    """
    X, y = BatchData.batch(gen_one_data, batch_size)
    return _eval_regressor(regressor_factory, X, y)


def eval_regressors(regressor_factories, gen_one_data, batch_size=1):
    """Evaluates an iterable of regressors on some test data of size
    :batch_size: generated from :gen_one_data:.
    """
    X, y = BatchData.batch(gen_one_data, batch_size)
    return _eval_regressors(regressor_factories, X, y)

In [11]:
### Visualising the results of regressors

# Only plots fine grid style stuff at the moment
def plot_regressors(regressor_factories, names, X, y):
    """Plots the results of some regressors using the given data."""
    
    fig = plt.figure(figsize=(8, 8 * len(regressor_factories)))
    
    results = _eval_regressors(regressor_factories, X, y)
    
    for i, (result, name) in enumerate(zip(results, names)):
        ax = make_3d_ax_for_grid_plotting(fig, (len(regressor_factories), 1, i + 1))
        grid_plot(ax, X, 'cg', '_nolegend_')
        grid_plot(ax, result.prediction, 'fg', name)
        ax.legend()
        
    return results