# **Study Project:** *Transformer model for prediction of grasping movements*

### Imports

In [1]:
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from matplotlib.animation import FuncAnimation, FFMpegWriter
import os

2024-04-08 11:11:23.522115: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
''' 
For video generation download FFMPEG: https://ffmpeg.org/download.html#build-windows
'''

# Include FFmpeg in the path
plt.rcParams['animation.ffmpeg_path'] = r'ffpmeg\bin\ffmpeg.exe'  

### Load Dataset

In [3]:
def deserialize(serialized_example):
    """
    Function to deserialize tensors from bytes.
    """

    feature_description = {
        'context': tf.io.FixedLenFeature([], tf.string),
        'input': tf.io.FixedLenFeature([], tf.string),
        'target': tf.io.FixedLenFeature([], tf.string)
    }

    example = tf.io.parse_single_example(serialized_example, feature_description)
    context = tf.io.parse_tensor(example['context'], out_type=tf.float64)
    x = tf.io.parse_tensor(example['input'], out_type=tf.float64)
    target = tf.io.parse_tensor(example['target'], out_type=tf.float64)

    return context, x, target

# Load tensorflow dataset
train_ds_path = "./data/train_ds.zip"
test_ds_path = "./data/test_ds.zip"

# Create a TFRecordDataset from the saved file
train_dataset = tf.data.TFRecordDataset(train_ds_path, compression_type='GZIP')
test_dataset = tf.data.TFRecordDataset(test_ds_path, compression_type='GZIP')

# Deserialize the zipped dataset
train_dataset = train_dataset.map(deserialize)
test_dataset = test_dataset.map(deserialize)

### Load Model

In [4]:
# Load the model from zip file
model_path = "./models/transformer"

PAD = -2

def compute_mask(inputs, padding_token=0):
    return tf.cast(tf.not_equal(inputs, padding_token), tf.float64)

# Define custom functions
def masked_loss(label, pred, pad_token=-2):
    mask = label != pad_token
    loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')
    loss = loss_object(label, pred)

    mask = tf.cast(mask, dtype=loss.dtype)
    loss *= mask

    loss = tf.reduce_sum(loss)/tf.reduce_sum(mask)
    return loss


def masked_accuracy(label, pred, pad_token=-2):
    pred = tf.argmax(pred, axis=2)
    label = tf.cast(label, pred.dtype)
    match = label == pred

    mask = label != pad_token

    match = match & mask

    match = tf.cast(match, dtype=tf.float64)
    mask = tf.cast(mask, dtype=tf.float64)
    return tf.reduce_sum(match)/tf.reduce_sum(mask)

tf.keras.utils.get_custom_objects()['masked_loss'] = masked_loss
tf.keras.utils.get_custom_objects()['masked_accuracy'] = masked_accuracy

# Import the model
transformer = tf.keras.models.load_model(model_path)

In [23]:
MAX_TOKENS = 100
"""
context = bbox_seq
input = angle
target = target_angle
"""

class Predictor(tf.Module):
  def __init__(self, transformer):
    self.transformer = transformer

  def __call__(self, bbox_seq, max_length=MAX_TOKENS):
    
    # The input needs `[START]` and `[END]` tokens, 
    # but this is already provided as its taken from the dataset
    assert isinstance(bbox_seq, tf.Tensor)
    if len(bbox_seq.shape) == 0:
      bbox_seq = bbox_seq[tf.newaxis]

    encoder_input = bbox_seq

    # Set tokens for `[START]` and `[END]` in later steps
    start_token = tf.constant([-333], dtype=tf.int64)
    end_token = tf.constant([-2], dtype=tf.int64)

    # # Try:
    
    # # Create an empty output array with the shape of the encoder input
    # # Fill with padding values and set the first entry to start token

    # shape = tf.shape(encoder_input)
    # empty_array = np.empty((shape[0],1))
    # empty_array.fill(PAD)  
    # empty_array[0, 0] = start_token
    # output_array = tf.convert_to_tensor(empty_array)

    # Add batch dimensions
    encoder_input = encoder_input[tf.newaxis, :]

    # `tf.TensorArray` is required here (instead of a Python list), so that the
    # dynamic-loop can be traced by `tf.function`.
    output_array = tf.TensorArray(dtype=tf.int64, size=0, dynamic_size=True)
    output_array = output_array.write(0, start_token)

    for i in tf.range(max_length):

      # What it should be (shape (1,1) in the beginning and growing)
      output = tf.transpose(output_array.stack())

      # What it needs to be (shape (66, 1) aka. matching the sequence length of the context)
      #output = tf.zeros(shape=(66, 1), dtype=tf.float32)

      predictions = self.transformer((encoder_input, output[tf.newaxis]), training=False, mask=None)

      # # Try:

      # mask = compute_mask(output_array)
      # predictions = self.transformer((encoder_input, output_array[tf.newaxis]), training=False)#, mask=mask[tf.newaxis])


      # Select the last token from the `seq_len` dimension.
      predictions = predictions[:, -1:, :]  # Shape `(batch_size, 1, vocab_size)`.

      predicted_id = tf.argmax(predictions, axis=-1)

      # Concatenate the `predicted_id` to the output which is given to the
      # decoder as its input.
      output_array = output_array.write(i+1, predicted_id[0])

      if predicted_id == end_token:
        break

    output = tf.transpose(output_array.stack())
    # The output shape is `(1, angles)`.
    angles = output[0]  # Shape: `()`.

    return angles

### Predict

In [24]:
predictor = Predictor(transformer)

def print_prediction(bbox_seq, angles, ground_truth):
  #print(f'{"Input:":15s}: {bbox_seq}')
  print(f'{"Prediction"}: {angles.numpy().flatten().tolist()}')
  print(f'{"Ground truth"}: {ground_truth.numpy().flatten().tolist()}')

In [28]:
for batch, (context, x, target) in enumerate(test_dataset):

    i = 0 # Which sequence to predict

    input_seq = context[i]
    angles = predictor(tf.constant(input_seq))
    print_prediction(context[i], angles, target)
    break

ValueError: Could not find matching concrete function to call loaded from the SavedModel. Got:
  Positional arguments (3 total):
    * (<tf.Tensor 'inputs:0' shape=(1, 66, 8) dtype=float32>,
 <tf.Tensor 'inputs_1:0' shape=(1, 1, 1) dtype=int64>)
    * None
    * False
  Keyword arguments: {}

 Expected these arguments to match one of the following 4 option(s):

Option 1:
  Positional arguments (3 total):
    * (TensorSpec(shape=(None, 66, 8), dtype=tf.float32, name='input_1'),
 TensorSpec(shape=(None, 66, 1), dtype=tf.float32, name='input_2'))
    * None
    * True
  Keyword arguments: {}

Option 2:
  Positional arguments (3 total):
    * (TensorSpec(shape=(None, 66, 8), dtype=tf.float32, name='input_1'),
 TensorSpec(shape=(None, 66, 1), dtype=tf.float32, name='input_2'))
    * None
    * False
  Keyword arguments: {}

Option 3:
  Positional arguments (3 total):
    * (TensorSpec(shape=(None, 66, 8), dtype=tf.float32, name='inputs_0'),
 TensorSpec(shape=(None, 66, 1), dtype=tf.float32, name='inputs_1'))
    * None
    * True
  Keyword arguments: {}

Option 4:
  Positional arguments (3 total):
    * (TensorSpec(shape=(None, 66, 8), dtype=tf.float32, name='inputs_0'),
 TensorSpec(shape=(None, 66, 1), dtype=tf.float32, name='inputs_1'))
    * None
    * False
  Keyword arguments: {}

In [17]:
for batch, (context, x, target) in enumerate(train_dataset):

        mask = compute_mask(target, padding_token=PAD)
        logits = transformer((context, x), training=False)
        predictions = tf.argmax(logits, axis=2)

        print(f'{"Ground Truth"}: {target[1].numpy().flatten().tolist()}')
        print(f'{"Prediction"}: {predictions[1].numpy().flatten().tolist()}')
        break

Ground Truth: [105.0, 77.0, 81.0, 80.0, 65.0, 49.0, 70.0, 88.0, 83.0, 79.0, 80.0, 85.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -1.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0, -2.0]
Prediction: [345, 48, 103, 35, 35, 103, 103, 103, 103, 355, 355, 355, 355, 266, 317, 23, 1, 23, 23, 23, 23, 23, 1, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 23, 1, 23, 23, 23, 23, 23, 1, 23, 23, 23, 23, 23, 1, 1, 23, 23, 23, 23]
