Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
742 lines (668 sloc) 31.1 KB
# Copyright (c) 2018 Uber Technologies, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import numpy as np
import tensorflow as tf
from keras.engine import Layer, InputSpec
import keras.backend as K
from keras.utils import conv_utils
from rproj_layers_util import _convert_string_dtype
# from keras.backend.tensorflow_backend import _convert_string_dtype
from keras import regularizers, constraints, initializers, activations
from IPython import embed
from keras.legacy import interfaces
###########
#
# Low Rank Basis Layers
#
# These layers are modified versions of standard Keras layers that
# accept an OffsetCreator*Proj to create offsets from a weight basis
# in a Dense/Sparse/Fastfood agnostic manner.
#
###########
class LowRankBasisLayer(Layer):
'''Smarter version of Layer...'''
def __init__(self, offset_creator_class, weight_basis, *args, **kwargs):
super(LowRankBasisLayer, self).__init__(*args, **kwargs)
# offset_creator is an object that creates theta offsets
self.offset_creator = offset_creator_class()
self.weight_basis = weight_basis
# These may or may not be used by subclasses
#self._basis_matrices = []
#self._basis_matrix_normalizers = []
# TODO check for use of basis_matrices
@property
def basis_matrices(self):
print 'USED HERE basis_matrices'
return self._basis_matrices
# TODO check for use of basis_matrix_normalizers
@property
def basis_matrix_normalizers(self):
print 'USED HERE basis_matrix_normalizers'
return self._basis_matrix_normalizers
def add_weight(self,
name,
shape,
dtype=None,
initializer=None,
regularizer=None,
trainable=True,
constraint=None):
'''Version of add_weight that creates a weight theta by instantiating
theta_0 and then adding to it an offset from the member
offset_creator.
'''
initializer = initializers.get(initializer)
if dtype is None:
dtype = K.floatx()
# Create Theta_0
value_0 = initializer(shape)
theta_0 = tf.Variable(value_0, trainable=False, dtype=_convert_string_dtype(dtype), name='%s_theta0' % name)
if isinstance(value_0, np.ndarray):
theta_0._keras_shape = value_0.shape
elif hasattr(value_0, 'get_shape'):
theta_0._keras_shape = tuple(map(int, value_0.get_shape()))
theta_0._uses_learning_phase = False
# Call offset creator
theta_offset, non_trainable_weights = self.offset_creator.create_theta_offset(self.weight_basis,
theta_0.get_shape(),
dtype=dtype,
name=name)
theta = tf.add(theta_0, theta_offset, name=name)
if regularizer is not None:
self.add_loss(regularizer(theta))
if constraint is not None:
self.constraints[theta] = constraint
#self._base_thetas.append(theta_0)
#self._basis_matrices.append(ww)
#self._non_trainable_weights.extend([theta_0, ww])
self._non_trainable_weights.extend([theta_0] + non_trainable_weights)
return theta
def add_non_trainable_weight(self,
name,
shape,
dtype=None,
initializer=None,
regularizer=None,
constraint=None):
'''Adds a weight variable to the layer.
# Arguments
name: String, the name for the weight variable.
shape: The shape tuple of the weight.
dtype: The dtype of the weight.
initializer: An Initializer instance (callable).
regularizer: An optional Regularizer instance.
trainable: A boolean, whether the weight should
be trained via backprop or not (assuming
that the layer itself is also trainable).
constraint: An optional Constraint instance.
# Returns
The created weight variable.
'''
initializer = initializers.get(initializer)
if dtype is None:
dtype = K.floatx()
weight = K.variable(initializer(shape), dtype=dtype, name=name)
if regularizer is not None:
self.add_loss(regularizer(weight))
if constraint is not None:
self.constraints[weight] = constraint
self._non_trainable_weights.append(weight)
return weight
class RProjDense(LowRankBasisLayer):
'''RProj version of Dense.'''
def __init__(self, offset_creator_class, weight_basis,
units,
activation=None,
use_bias=True,
kernel_initializer='glorot_uniform',
bias_initializer='zeros',
kernel_regularizer=None,
bias_regularizer=None,
activity_regularizer=None,
kernel_constraint=None,
bias_constraint=None,
**kwargs):
if 'input_shape' not in kwargs and 'input_dim' in kwargs:
kwargs['input_shape'] = (kwargs.pop('input_dim'),)
super(RProjDense, self).__init__(offset_creator_class, weight_basis, **kwargs)
self.units = units
self.activation = activations.get(activation)
self.use_bias = use_bias
self.kernel_initializer = initializers.get(kernel_initializer)
self.bias_initializer = initializers.get(bias_initializer)
self.kernel_regularizer = regularizers.get(kernel_regularizer)
self.bias_regularizer = regularizers.get(bias_regularizer)
self.activity_regularizer = regularizers.get(activity_regularizer)
self.kernel_constraint = constraints.get(kernel_constraint)
self.bias_constraint = constraints.get(bias_constraint)
self.input_spec = InputSpec(min_ndim=2)
self.supports_masking = True
def build(self, input_shape):
assert len(input_shape) >= 2
input_dim = input_shape[-1]
self.kernel = self.add_weight(shape=(input_dim, self.units),
initializer=self.kernel_initializer,
name='kernel',
regularizer=self.kernel_regularizer,
constraint=self.kernel_constraint)
if self.use_bias:
self.bias = self.add_weight(shape=(self.units,),
initializer=self.bias_initializer,
name='bias',
regularizer=self.bias_regularizer,
constraint=self.bias_constraint)
else:
self.bias = None
self.input_spec = InputSpec(min_ndim=2, axes={-1: input_dim})
self.built = True
def call(self, inputs):
output = K.dot(inputs, self.kernel)
if self.use_bias:
output = K.bias_add(output, self.bias)
if self.activation is not None:
output = self.activation(output)
return output
def compute_output_shape(self, input_shape):
assert input_shape and len(input_shape) >= 2
assert input_shape[-1]
output_shape = list(input_shape)
output_shape[-1] = self.units
return tuple(output_shape)
class _RProjConv(LowRankBasisLayer):
'''Abstract nD convolution layer (private, used as implementation base).
Only the intrinsic parameters (RProj) are Trainable.'''
def __init__(self, offset_creator_class, weight_basis,
rank,
filters,
kernel_size,
strides=1,
padding='valid',
data_format=None,
dilation_rate=1,
activation=None,
use_bias=True,
kernel_initializer='glorot_uniform',
bias_initializer='zeros',
kernel_regularizer=None,
bias_regularizer=None,
activity_regularizer=None,
kernel_constraint=None,
bias_constraint=None,
**kwargs):
super(_RProjConv, self).__init__(offset_creator_class, weight_basis, **kwargs)
self.rank = rank
self.filters = filters
self.kernel_size = conv_utils.normalize_tuple(kernel_size, rank, 'kernel_size')
self.strides = conv_utils.normalize_tuple(strides, rank, 'strides')
self.padding = conv_utils.normalize_padding(padding)
self.data_format = conv_utils.normalize_data_format(data_format)
self.dilation_rate = conv_utils.normalize_tuple(dilation_rate, rank, 'dilation_rate')
self.activation = activations.get(activation)
self.use_bias = use_bias
self.kernel_initializer = initializers.get(kernel_initializer)
self.bias_initializer = initializers.get(bias_initializer)
self.kernel_regularizer = regularizers.get(kernel_regularizer)
self.bias_regularizer = regularizers.get(bias_regularizer)
self.activity_regularizer = regularizers.get(activity_regularizer)
self.kernel_constraint = constraints.get(kernel_constraint)
self.bias_constraint = constraints.get(bias_constraint)
self.input_spec = InputSpec(ndim=self.rank + 2)
def build(self, input_shape):
if self.data_format == 'channels_first':
channel_axis = 1
else:
channel_axis = -1
if input_shape[channel_axis] is None:
raise ValueError('The channel dimension of the inputs '
'should be defined. Found `None`.')
input_dim = input_shape[channel_axis]
kernel_shape = self.kernel_size + (input_dim, self.filters)
self.kernel = self.add_weight(shape=kernel_shape,
initializer=self.kernel_initializer,
name='kernel',
regularizer=self.kernel_regularizer,
constraint=self.kernel_constraint)
if self.use_bias:
self.bias = self.add_weight(shape=(self.filters,),
initializer=self.bias_initializer,
name='bias',
regularizer=self.bias_regularizer,
constraint=self.bias_constraint)
else:
self.bias = None
# Set input spec.
self.input_spec = InputSpec(ndim=self.rank + 2,
axes={channel_axis: input_dim})
self.built = True
def call(self, inputs):
if self.rank == 1:
outputs = K.conv1d(
inputs,
self.kernel,
strides=self.strides[0],
padding=self.padding,
data_format=self.data_format,
dilation_rate=self.dilation_rate[0])
if self.rank == 2:
outputs = K.conv2d(
inputs,
self.kernel,
strides=self.strides,
padding=self.padding,
data_format=self.data_format,
dilation_rate=self.dilation_rate)
if self.rank == 3:
outputs = K.conv3d(
inputs,
self.kernel,
strides=self.strides,
padding=self.padding,
data_format=self.data_format,
dilation_rate=self.dilation_rate)
if self.use_bias:
outputs = K.bias_add(
outputs,
self.bias,
data_format=self.data_format)
if self.activation is not None:
return self.activation(outputs)
return outputs
def compute_output_shape(self, input_shape):
if self.data_format == 'channels_last':
space = input_shape[1:-1]
new_space = []
for i in range(len(space)):
new_dim = conv_utils.conv_output_length(
space[i],
self.kernel_size[i],
padding=self.padding,
stride=self.strides[i],
dilation=self.dilation_rate[i])
new_space.append(new_dim)
return (input_shape[0],) + tuple(new_space) + (self.filters,)
if self.data_format == 'channels_first':
space = input_shape[2:]
new_space = []
for i in range(len(space)):
new_dim = conv_utils.conv_output_length(
space[i],
self.kernel_size[i],
padding=self.padding,
stride=self.strides[i],
dilation=self.dilation_rate[i])
new_space.append(new_dim)
return (input_shape[0], self.filters) + tuple(new_space)
class RProjConv2D(_RProjConv):
'''Low Rank Basis Conv2D
Filters if number of filters, output dimension is filters
TODO: Documentation / unit tests
'''
def __init__(self, offset_creator_class, weight_basis,
filters,
kernel_size,
strides=(1, 1),
padding='valid',
data_format=None,
dilation_rate=(1, 1),
activation=None,
use_bias=True,
kernel_initializer='glorot_uniform',
bias_initializer='zeros',
kernel_regularizer=None,
bias_regularizer=None,
activity_regularizer=None,
kernel_constraint=None,
bias_constraint=None,
**kwargs):
super(RProjConv2D, self).__init__(
offset_creator_class=offset_creator_class,
weight_basis=weight_basis,
rank=2,
filters=filters,
kernel_size=kernel_size,
strides=strides,
padding=padding,
data_format=data_format,
dilation_rate=dilation_rate,
activation=activation,
use_bias=use_bias,
kernel_initializer=kernel_initializer,
bias_initializer=bias_initializer,
kernel_regularizer=kernel_regularizer,
bias_regularizer=bias_regularizer,
activity_regularizer=activity_regularizer,
kernel_constraint=kernel_constraint,
bias_constraint=bias_constraint,
**kwargs)
self.input_spec = InputSpec(ndim=4)
def build(self, input_shape):
assert self.data_format != 'channels_first','only b01c supported'
channel_axis = -1
if input_shape[channel_axis] is None:
raise ValueError('The channel dimension of the inputs '
'should be defined. Found `None`.')
input_dim = input_shape[-1]
self.units = self.filters
kernel_shape = self.kernel_size + (input_dim, self.filters)
self.kernel = self.add_weight(shape=kernel_shape,
initializer=self.kernel_initializer,
name='kernel',
regularizer=self.kernel_regularizer,
constraint=self.kernel_constraint)
if self.use_bias:
self.bias = self.add_weight(shape=(self.filters,),
initializer=self.bias_initializer,
name='bias',
regularizer=self.bias_regularizer,
constraint=self.bias_constraint)
else:
self.bias = None
# Set input spec.
self.input_spec = InputSpec(ndim=self.rank + 2,
axes={channel_axis: input_dim})
self.built = True
def call(self, inputs):
assert self.rank == 2, 'only conv2d supported for now...'
if self.rank == 2:
outputs = K.conv2d(
inputs,
self.kernel,
strides=self.strides,
padding=self.padding,
data_format=self.data_format,
dilation_rate=self.dilation_rate)
if self.use_bias:
outputs = K.bias_add(
outputs,
self.bias,
data_format=self.data_format)
#if self.activation is not None:
# assert False,'activation functions not supported'
# return self.activation(outputs)
return outputs
def compute_output_shape(self, input_shape):
if self.data_format == 'channels_last':
space = input_shape[1:-1]
new_space = []
for i in range(len(space)):
new_dim = conv_utils.conv_output_length(
space[i],
self.kernel_size[i],
padding=self.padding,
stride=self.strides[i],
dilation=self.dilation_rate[i])
new_space.append(new_dim)
#self.filters*2 to accomodate LU representation
return (input_shape[0],) + tuple(new_space) + (self.filters,)
class RProjBatchNormalization(LowRankBasisLayer):
'''RProj version of BatchNormalization.'''
def __init__(self, offset_creator_class, weight_basis,
axis=-1,
momentum=0.99,
epsilon=1e-3,
center=True,
scale=True,
beta_initializer='zeros',
gamma_initializer='ones',
moving_mean_initializer='zeros',
moving_variance_initializer='ones',
beta_regularizer=None,
gamma_regularizer=None,
beta_constraint=None,
gamma_constraint=None,
**kwargs):
super(RProjBatchNormalization, self).__init__(offset_creator_class, weight_basis, **kwargs)
self.supports_masking = True
self.axis = axis
self.momentum = momentum
self.epsilon = epsilon
self.center = center
self.scale = scale
self.beta_initializer = initializers.get(beta_initializer)
self.gamma_initializer = initializers.get(gamma_initializer)
self.moving_mean_initializer = initializers.get(moving_mean_initializer)
self.moving_variance_initializer = initializers.get(moving_variance_initializer)
self.beta_regularizer = regularizers.get(beta_regularizer)
self.gamma_regularizer = regularizers.get(gamma_regularizer)
self.beta_constraint = constraints.get(beta_constraint)
self.gamma_constraint = constraints.get(gamma_constraint)
def build(self, input_shape):
dim = input_shape[self.axis]
if dim is None:
raise ValueError('Axis ' + str(self.axis) + ' of '
'input tensor should have a defined dimension '
'but the layer received an input with shape ' +
str(input_shape) + '.')
self.input_spec = InputSpec(ndim=len(input_shape),
axes={self.axis: dim})
shape = (dim,)
if self.scale:
self.gamma = self.add_weight(shape=shape,
name='gamma',
initializer=self.gamma_initializer,
regularizer=self.gamma_regularizer,
constraint=self.gamma_constraint)
else:
self.gamma = None
if self.center:
self.beta = self.add_weight(shape=shape,
name='beta',
initializer=self.beta_initializer,
regularizer=self.beta_regularizer,
constraint=self.beta_constraint)
else:
self.beta = None
self.moving_mean = self.add_non_trainable_weight(
shape=shape,
name='moving_mean',
initializer=self.moving_mean_initializer)
self.moving_variance = self.add_non_trainable_weight(
shape=shape,
name='moving_variance',
initializer=self.moving_variance_initializer)
self.built = True
def call(self, inputs, training=None):
input_shape = K.int_shape(inputs)
# Prepare broadcasting shape.
ndim = len(input_shape)
reduction_axes = list(range(len(input_shape)))
del reduction_axes[self.axis]
broadcast_shape = [1] * len(input_shape)
broadcast_shape[self.axis] = input_shape[self.axis]
# Determines whether broadcasting is needed.
needs_broadcasting = (sorted(reduction_axes) != list(range(ndim))[:-1])
def normalize_inference():
if needs_broadcasting:
# In this case we must explicitly broadcast all parameters.
broadcast_moving_mean = K.reshape(self.moving_mean,
broadcast_shape)
broadcast_moving_variance = K.reshape(self.moving_variance,
broadcast_shape)
if self.center:
broadcast_beta = K.reshape(self.beta, broadcast_shape)
else:
broadcast_beta = None
if self.scale:
broadcast_gamma = K.reshape(self.gamma,
broadcast_shape)
else:
broadcast_gamma = None
return K.batch_normalization(
inputs,
broadcast_moving_mean,
broadcast_moving_variance,
broadcast_beta,
broadcast_gamma,
epsilon=self.epsilon)
else:
return K.batch_normalization(
inputs,
self.moving_mean,
self.moving_variance,
self.beta,
self.gamma,
epsilon=self.epsilon)
# If the learning phase is *static* and set to inference:
if training in {0, False}:
return normalize_inference()
# If the learning is either dynamic, or set to training:
normed_training, mean, variance = K.normalize_batch_in_training(
inputs, self.gamma, self.beta, reduction_axes,
epsilon=self.epsilon)
self.add_update([K.moving_average_update(self.moving_mean,
mean,
self.momentum),
K.moving_average_update(self.moving_variance,
variance,
self.momentum)],
inputs)
# Pick the normalized form corresponding to the training phase.
return K.in_train_phase(normed_training,
normalize_inference,
training=training)
class RProjLocallyConnected2D(LowRankBasisLayer):
"""Locally-connected layer for 2D inputs.
The `LocallyConnected2D` layer works similarly
to the `Conv2D` layer, except that weights are unshared,
that is, a different set of filters is applied at each
different patch of the input.
"""
# @interfaces.legacy_conv2d_support
def __init__(self, offset_creator_class, weight_basis,
filters,
kernel_size,
strides=(1, 1),
padding='valid',
data_format=None,
activation=None,
use_bias=True,
kernel_initializer='glorot_uniform',
bias_initializer='zeros',
kernel_regularizer=None,
bias_regularizer=None,
activity_regularizer=None,
kernel_constraint=None,
bias_constraint=None,
**kwargs):
super(RProjLocallyConnected2D, self).__init__(offset_creator_class, weight_basis, **kwargs)
self.filters = filters
self.kernel_size = conv_utils.normalize_tuple(kernel_size, 2, 'kernel_size')
self.strides = conv_utils.normalize_tuple(strides, 2, 'strides')
self.padding = conv_utils.normalize_padding(padding)
if self.padding != 'valid':
raise ValueError('Invalid border mode for LocallyConnected2D '
'(only "valid" is supported): ' + padding)
self.data_format = conv_utils.normalize_data_format(data_format)
self.activation = activations.get(activation)
self.use_bias = use_bias
self.kernel_initializer = initializers.get(kernel_initializer)
self.bias_initializer = initializers.get(bias_initializer)
self.kernel_regularizer = regularizers.get(kernel_regularizer)
self.bias_regularizer = regularizers.get(bias_regularizer)
self.activity_regularizer = regularizers.get(activity_regularizer)
self.kernel_constraint = constraints.get(kernel_constraint)
self.bias_constraint = constraints.get(bias_constraint)
self.input_spec = InputSpec(ndim=4)
def build(self, input_shape):
if self.data_format == 'channels_last':
input_row, input_col = input_shape[1:-1]
input_filter = input_shape[3]
else:
input_row, input_col = input_shape[2:]
input_filter = input_shape[1]
if input_row is None or input_col is None:
raise ValueError('The spatial dimensions of the inputs to '
' a LocallyConnected2D layer '
'should be fully-defined, but layer received '
'the inputs shape ' + str(input_shape))
output_row = conv_utils.conv_output_length(input_row, self.kernel_size[0],
self.padding, self.strides[0])
output_col = conv_utils.conv_output_length(input_col, self.kernel_size[1],
self.padding, self.strides[1])
self.output_row = output_row
self.output_col = output_col
self.kernel_shape = (output_row * output_col,
self.kernel_size[0] * self.kernel_size[1] * input_filter,
self.filters)
self.kernel = self.add_weight(shape=self.kernel_shape,
initializer=self.kernel_initializer,
name='kernel',
regularizer=self.kernel_regularizer,
constraint=self.kernel_constraint)
if self.use_bias:
self.bias = self.add_weight(shape=(output_row, output_col, self.filters),
initializer=self.bias_initializer,
name='bias',
regularizer=self.bias_regularizer,
constraint=self.bias_constraint)
else:
self.bias = None
if self.data_format == 'channels_first':
self.input_spec = InputSpec(ndim=4, axes={1: input_filter})
else:
self.input_spec = InputSpec(ndim=4, axes={-1: input_filter})
self.built = True
def compute_output_shape(self, input_shape):
if self.data_format == 'channels_first':
rows = input_shape[2]
cols = input_shape[3]
elif self.data_format == 'channels_last':
rows = input_shape[1]
cols = input_shape[2]
rows = conv_utils.conv_output_length(rows, self.kernel_size[0],
self.padding, self.strides[0])
cols = conv_utils.conv_output_length(cols, self.kernel_size[1],
self.padding, self.strides[1])
if self.data_format == 'channels_first':
return (input_shape[0], self.filters, rows, cols)
elif self.data_format == 'channels_last':
return (input_shape[0], rows, cols, self.filters)
def call(self, inputs):
_, _, filters = self.kernel_shape
output = K.local_conv2d(inputs,
self.kernel,
self.kernel_size,
self.strides,
(self.output_row, self.output_col),
self.data_format)
if self.use_bias:
if self.data_format == 'channels_first' or self.data_format == 'channels_last':
output = K.bias_add(output, self.bias, data_format=self.data_format)
output = self.activation(output)
return output
def get_config(self):
config = {
'filters': self.filters,
'kernel_size': self.kernel_size,
'strides': self.strides,
'padding': self.padding,
'data_format': self.data_format,
'activation': activations.serialize(self.activation),
'use_bias': self.use_bias,
'kernel_initializer': initializers.serialize(self.kernel_initializer),
'bias_initializer': initializers.serialize(self.bias_initializer),
'kernel_regularizer': regularizers.serialize(self.kernel_regularizer),
'bias_regularizer': regularizers.serialize(self.bias_regularizer),
'activity_regularizer': regularizers.serialize(self.activity_regularizer),
'kernel_constraint': constraints.serialize(self.kernel_constraint),
'bias_constraint': constraints.serialize(self.bias_constraint)
}
base_config = super(LocallyConnected2D, self).get_config()
return dict(list(base_config.items()) + list(config.items()))