In [33]:
import numpy as np

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Layer as KerasLayer
from tensorflow.keras.layers import Input, GRU, LSTM, Bidirectional

In [49]:
class Prototype(KerasLayer):
    def __init__(self, k, **kwargs):
        """
        Parameters
        ----------
        k : int
            Number of prototype vectors to create.
        """
        super(Prototype, self).__init__(**kwargs)
        self.k = k

    def build(self, input_shape):
        print(f'Called `build` with input_shape: {input_shape}')

        # what initializer should we use?
        self.prototypes = self.add_weight(
            name='prototypes',
            shape=(1, self.k, input_shape[-1]),
            initializer='random_normal',
            trainable=True
        )

    def call(self, x):
        # add reg losses here?
        print(f'Called with `x`: {x}')

        # L2 distances from prototypes
        d2 = tf.norm(x - self.prototypes, ord=2, axis=-1)

        # return exponentially squashed
        return tf.exp(-d2)

In [50]:
def rnn(input_shape=(None, 6),
        layer_type='lstm',
        layer_args={},
        layers=[32,64],
        dropout_rate=None):
    """
    Recurrent NN Encoder constructor function.
    One layer of `layer_type` will be created for each int in `layers`.
    All except the final recurrent layer will return sequences.
    """
    num_layers = len(layers)
    assert num_layers > 0, 'Must have at least one layer'

    layer_fn = GRU if 'gru' in layer_type.lower() else LSTM

    # Construct model
    model = Sequential([InputLayer(input_shape=input_shape)])

    for i, layer_units in enumerate(layers):
        return_seq = False if (i == (num_layers - 1)) else True

        next_layer = layer_fn(layer_units, return_sequences=return_seq, name=layer_type+str(i), **layer_args)

        model.add(Bidirectional(next_layer))

    return model

In [51]:
enc_model = rnn()
enc_model.input_shape, enc_model.output_shape

((None, None, 6), (None, 128))

In [52]:
enc_model.add(Prototype(16))
enc_model.input_shape, enc_model.output_shape

Called `build` with input_shape: (None, 128)
Called with `x`: Tensor("bidirectional_9/Identity:0", shape=(None, 128), dtype=float32)


TypeError: in converted code:

    <ipython-input-49-eef44e74e301>:27 call  *
        tf.shape()

    TypeError: shape_v2() missing 1 required positional argument: 'input'


In [48]:
enc_model.summary()

Model: "sequential_3"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
bidirectional_6 (Bidirection (None, None, 64)          9984      
_________________________________________________________________
bidirectional_7 (Bidirection (None, 128)               66048     
_________________________________________________________________
prototype_4 (Prototype)      (1, 16)                   2048      
Total params: 78,080
Trainable params: 78,080
Non-trainable params: 0
_________________________________________________________________


In [41]:
x = tf.convert_to_tensor(np.random.rand(10,8,6))

In [42]:
enc_model(x)



To change all layers to have dtype float64 by default, call `tf.keras.backend.set_floatx('float64')`. To change just this layer, pass dtype='float64' to the layer constructor. If you are the author of this layer, you can disable autocasting by passing autocast=False to the base Layer constructor.



InvalidArgumentError: Incompatible shapes: [10,128] vs. [1,16,128] [Op:Sub] name: sequential_2/prototype_3/sub/