Skip to content
Permalink
r1.8
Switch branches/tags
Go to file
 
 
Cannot retrieve contributors at this time
# Copyright 2015 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Module implementing RNN Cells that used to be in core.
@@EmbeddingWrapper
@@InputProjectionWrapper
@@OutputProjectionWrapper
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import math
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import dtypes
from tensorflow.python.framework import ops
from tensorflow.python.ops import array_ops
from tensorflow.python.ops import embedding_ops
from tensorflow.python.ops import init_ops
from tensorflow.python.ops import math_ops
from tensorflow.python.ops import nn_ops
from tensorflow.python.ops import rnn_cell_impl
from tensorflow.python.ops import variable_scope as vs
from tensorflow.python.platform import tf_logging as logging
from tensorflow.python.util import nest
# pylint: disable=protected-access,invalid-name
RNNCell = rnn_cell_impl.RNNCell
_WEIGHTS_VARIABLE_NAME = rnn_cell_impl._WEIGHTS_VARIABLE_NAME
_BIAS_VARIABLE_NAME = rnn_cell_impl._BIAS_VARIABLE_NAME
# pylint: enable=protected-access,invalid-name
class _Linear(object):
"""Linear map: sum_i(args[i] * W[i]), where W[i] is a variable.
Args:
args: a 2D Tensor or a list of 2D, batch, n, Tensors.
output_size: int, second dimension of weight variable.
dtype: data type for variables.
build_bias: boolean, whether to build a bias variable.
bias_initializer: starting value to initialize the bias
(default is all zeros).
kernel_initializer: starting value to initialize the weight.
Raises:
ValueError: if inputs_shape is wrong.
"""
def __init__(self,
args,
output_size,
build_bias,
bias_initializer=None,
kernel_initializer=None):
self._build_bias = build_bias
if args is None or (nest.is_sequence(args) and not args):
raise ValueError("`args` must be specified")
if not nest.is_sequence(args):
args = [args]
self._is_sequence = False
else:
self._is_sequence = True
# Calculate the total size of arguments on dimension 1.
total_arg_size = 0
shapes = [a.get_shape() for a in args]
for shape in shapes:
if shape.ndims != 2:
raise ValueError("linear is expecting 2D arguments: %s" % shapes)
if shape[1].value is None:
raise ValueError("linear expects shape[1] to be provided for shape %s, "
"but saw %s" % (shape, shape[1]))
else:
total_arg_size += shape[1].value
dtype = [a.dtype for a in args][0]
scope = vs.get_variable_scope()
with vs.variable_scope(scope) as outer_scope:
self._weights = vs.get_variable(
_WEIGHTS_VARIABLE_NAME, [total_arg_size, output_size],
dtype=dtype,
initializer=kernel_initializer)
if build_bias:
with vs.variable_scope(outer_scope) as inner_scope:
inner_scope.set_partitioner(None)
if bias_initializer is None:
bias_initializer = init_ops.constant_initializer(0.0, dtype=dtype)
self._biases = vs.get_variable(
_BIAS_VARIABLE_NAME, [output_size],
dtype=dtype,
initializer=bias_initializer)
def __call__(self, args):
if not self._is_sequence:
args = [args]
if len(args) == 1:
res = math_ops.matmul(args[0], self._weights)
else:
# Explicitly creating a one for a minor performance improvement.
one = constant_op.constant(1, dtype=dtypes.int32)
res = math_ops.matmul(array_ops.concat(args, one), self._weights)
if self._build_bias:
res = nn_ops.bias_add(res, self._biases)
return res
# TODO(xpan): Remove this function in a follow up.
def _linear(args,
output_size,
bias,
bias_initializer=None,
kernel_initializer=None):
"""Linear map: sum_i(args[i] * W[i]), where W[i] is a variable.
Args:
args: a 2D Tensor or a list of 2D, batch, n, Tensors.
output_size: int, second dimension of W[i].
bias: boolean, whether to add a bias term or not.
bias_initializer: starting value to initialize the bias
(default is all zeros).
kernel_initializer: starting value to initialize the weight.
Returns:
A 2D Tensor with shape `[batch, output_size]` equal to
sum_i(args[i] * W[i]), where W[i]s are newly created matrices.
Raises:
ValueError: if some of the arguments has unspecified or wrong shape.
"""
if args is None or (nest.is_sequence(args) and not args):
raise ValueError("`args` must be specified")
if not nest.is_sequence(args):
args = [args]
# Calculate the total size of arguments on dimension 1.
total_arg_size = 0
shapes = [a.get_shape() for a in args]
for shape in shapes:
if shape.ndims != 2:
raise ValueError("linear is expecting 2D arguments: %s" % shapes)
if shape[1].value is None:
raise ValueError("linear expects shape[1] to be provided for shape %s, "
"but saw %s" % (shape, shape[1]))
else:
total_arg_size += shape[1].value
dtype = [a.dtype for a in args][0]
# Now the computation.
scope = vs.get_variable_scope()
with vs.variable_scope(scope) as outer_scope:
weights = vs.get_variable(
_WEIGHTS_VARIABLE_NAME, [total_arg_size, output_size],
dtype=dtype,
initializer=kernel_initializer)
if len(args) == 1:
res = math_ops.matmul(args[0], weights)
else:
res = math_ops.matmul(array_ops.concat(args, 1), weights)
if not bias:
return res
with vs.variable_scope(outer_scope) as inner_scope:
inner_scope.set_partitioner(None)
if bias_initializer is None:
bias_initializer = init_ops.constant_initializer(0.0, dtype=dtype)
biases = vs.get_variable(
_BIAS_VARIABLE_NAME, [output_size],
dtype=dtype,
initializer=bias_initializer)
return nn_ops.bias_add(res, biases)
class EmbeddingWrapper(RNNCell):
"""Operator adding input embedding to the given cell.
Note: in many cases it may be more efficient to not use this wrapper,
but instead concatenate the whole sequence of your inputs in time,
do the embedding on this batch-concatenated sequence, then split it and
feed into your RNN.
"""
def __init__(self,
cell,
embedding_classes,
embedding_size,
initializer=None,
reuse=None):
"""Create a cell with an added input embedding.
Args:
cell: an RNNCell, an embedding will be put before its inputs.
embedding_classes: integer, how many symbols will be embedded.
embedding_size: integer, the size of the vectors we embed into.
initializer: an initializer to use when creating the embedding;
if None, the initializer from variable scope or a default one is used.
reuse: (optional) Python boolean describing whether to reuse variables
in an existing scope. If not `True`, and the existing scope already has
the given variables, an error is raised.
Raises:
TypeError: if cell is not an RNNCell.
ValueError: if embedding_classes is not positive.
"""
super(EmbeddingWrapper, self).__init__(_reuse=reuse)
rnn_cell_impl.assert_like_rnncell("cell", cell)
if embedding_classes <= 0 or embedding_size <= 0:
raise ValueError("Both embedding_classes and embedding_size must be > 0: "
"%d, %d." % (embedding_classes, embedding_size))
self._cell = cell
self._embedding_classes = embedding_classes
self._embedding_size = embedding_size
self._initializer = initializer
@property
def state_size(self):
return self._cell.state_size
@property
def output_size(self):
return self._cell.output_size
def zero_state(self, batch_size, dtype):
with ops.name_scope(type(self).__name__ + "ZeroState", values=[batch_size]):
return self._cell.zero_state(batch_size, dtype)
def call(self, inputs, state):
"""Run the cell on embedded inputs."""
with ops.device("/cpu:0"):
if self._initializer:
initializer = self._initializer
elif vs.get_variable_scope().initializer:
initializer = vs.get_variable_scope().initializer
else:
# Default initializer for embeddings should have variance=1.
sqrt3 = math.sqrt(3) # Uniform(-sqrt(3), sqrt(3)) has variance=1.
initializer = init_ops.random_uniform_initializer(-sqrt3, sqrt3)
if isinstance(state, tuple):
data_type = state[0].dtype
else:
data_type = state.dtype
embedding = vs.get_variable(
"embedding", [self._embedding_classes, self._embedding_size],
initializer=initializer,
dtype=data_type)
embedded = embedding_ops.embedding_lookup(embedding,
array_ops.reshape(inputs, [-1]))
return self._cell(embedded, state)
class InputProjectionWrapper(RNNCell):
"""Operator adding an input projection to the given cell.
Note: in many cases it may be more efficient to not use this wrapper,
but instead concatenate the whole sequence of your inputs in time,
do the projection on this batch-concatenated sequence, then split it.
"""
def __init__(self,
cell,
num_proj,
activation=None,
input_size=None,
reuse=None):
"""Create a cell with input projection.
Args:
cell: an RNNCell, a projection of inputs is added before it.
num_proj: Python integer. The dimension to project to.
activation: (optional) an optional activation function.
input_size: Deprecated and unused.
reuse: (optional) Python boolean describing whether to reuse variables
in an existing scope. If not `True`, and the existing scope already has
the given variables, an error is raised.
Raises:
TypeError: if cell is not an RNNCell.
"""
super(InputProjectionWrapper, self).__init__(_reuse=reuse)
if input_size is not None:
logging.warn("%s: The input_size parameter is deprecated.", self)
rnn_cell_impl.assert_like_rnncell("cell", cell)
self._cell = cell
self._num_proj = num_proj
self._activation = activation
self._linear = None
@property
def state_size(self):
return self._cell.state_size
@property
def output_size(self):
return self._cell.output_size
def zero_state(self, batch_size, dtype):
with ops.name_scope(type(self).__name__ + "ZeroState", values=[batch_size]):
return self._cell.zero_state(batch_size, dtype)
def call(self, inputs, state):
"""Run the input projection and then the cell."""
# Default scope: "InputProjectionWrapper"
if self._linear is None:
self._linear = _Linear(inputs, self._num_proj, True)
projected = self._linear(inputs)
if self._activation:
projected = self._activation(projected)
return self._cell(projected, state)
class OutputProjectionWrapper(RNNCell):
"""Operator adding an output projection to the given cell.
Note: in many cases it may be more efficient to not use this wrapper,
but instead concatenate the whole sequence of your outputs in time,
do the projection on this batch-concatenated sequence, then split it
if needed or directly feed into a softmax.
"""
def __init__(self, cell, output_size, activation=None, reuse=None):
"""Create a cell with output projection.
Args:
cell: an RNNCell, a projection to output_size is added to it.
output_size: integer, the size of the output after projection.
activation: (optional) an optional activation function.
reuse: (optional) Python boolean describing whether to reuse variables
in an existing scope. If not `True`, and the existing scope already has
the given variables, an error is raised.
Raises:
TypeError: if cell is not an RNNCell.
ValueError: if output_size is not positive.
"""
super(OutputProjectionWrapper, self).__init__(_reuse=reuse)
rnn_cell_impl.assert_like_rnncell("cell", cell)
if output_size < 1:
raise ValueError("Parameter output_size must be > 0: %d." % output_size)
self._cell = cell
self._output_size = output_size
self._activation = activation
self._linear = None
@property
def state_size(self):
return self._cell.state_size
@property
def output_size(self):
return self._output_size
def zero_state(self, batch_size, dtype):
with ops.name_scope(type(self).__name__ + "ZeroState", values=[batch_size]):
return self._cell.zero_state(batch_size, dtype)
def call(self, inputs, state):
"""Run the cell and output projection on inputs, starting from state."""
output, res_state = self._cell(inputs, state)
if self._linear is None:
self._linear = _Linear(output, self._output_size, True)
projected = self._linear(output)
if self._activation:
projected = self._activation(projected)
return projected, res_state