In [1]:
#SCALED DOT PRODUCT
from IPython import get_ipython
from IPython.display import display
from tensorflow import matmul, cast, float32, math
from tensorflow.math import sqrt
from tensorflow.keras.layers import Layer
from tensorflow.keras.activations import softmax
import numpy as np

class DotProductAttention(Layer):
  def __init__(self, **kwargs):
    super().__init__(**kwargs)

  def call(self, queries, keys, values, *, d_k, mask=None):
    scores = matmul(queries, keys, transpose_b=True) / sqrt(cast(d_k, float32))
    if mask is not None:
      scores += -1e9 * mask
    weights = softmax(scores)
    return matmul(weights, values)

batch_size = 32
input_seq_length = 10
d_k = 64
d_v = 64

random = np.random.default_rng(seed=42)
queries = random.random((batch_size, input_seq_length, d_k))
keys = random.random((batch_size, input_seq_length, d_k))
values = random.random((batch_size, input_seq_length, d_v))
attention = DotProductAttention()
print(attention(queries, keys, values, d_k=d_k))

tf.Tensor(
[[[0.46824056 0.55641305 0.46830386 ... 0.4930685  0.4061298  0.46411476]
  [0.47598848 0.5551045  0.47800195 ... 0.49304226 0.4006043  0.47026026]
  [0.47095588 0.55246687 0.47349647 ... 0.49247038 0.41551554 0.46566948]
  ...
  [0.47359407 0.5530097  0.48761857 ... 0.49078512 0.4073912  0.47809467]
  [0.4738524  0.5515101  0.47469318 ... 0.48799846 0.40717867 0.47817174]
  [0.4573552  0.55452013 0.4731847  ... 0.48771793 0.41125485 0.4576    ]]

 [[0.5257553  0.46964663 0.6492506  ... 0.54562765 0.62523377 0.49289626]
  [0.51869744 0.48040384 0.6457132  ... 0.53287935 0.6220018  0.5044591 ]
  [0.5311054  0.48170856 0.64107096 ... 0.54553026 0.62820685 0.4921141 ]
  ...
  [0.5314952  0.48453844 0.6366704  ... 0.5249854  0.6216751  0.50992715]
  [0.52042365 0.4842645  0.64478606 ... 0.5371342  0.6203686  0.5011124 ]
  [0.51877236 0.4815875  0.64000344 ... 0.5273335  0.62762994 0.5056677 ]]

 [[0.57879597 0.4952488  0.58734167 ... 0.59600276 0.670287   0.57462704]
  [0.571639

2025-01-25 10:46:51.360898: I metal_plugin/src/device/metal_device.cc:1154] Metal device set to: Apple M2 Pro
2025-01-25 10:46:51.360925: I metal_plugin/src/device/metal_device.cc:296] systemMemory: 16.00 GB
2025-01-25 10:46:51.360929: I metal_plugin/src/device/metal_device.cc:313] maxCacheSize: 5.33 GB
2025-01-25 10:46:51.360957: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:306] Could not identify NUMA node of platform GPU ID 0, defaulting to 0. Your kernel may not have been built with NUMA support.
2025-01-25 10:46:51.360973: I tensorflow/core/common_runtime/pluggable_device/pluggable_device_factory.cc:272] Created TensorFlow device (/job:localhost/replica:0/task:0/device:GPU:0 with 0 MB memory) -> physical PluggableDevice (device: 0, name: METAL, pci bus id: <undefined>)


In [2]:
from IPython import get_ipython
from IPython.display import display
# %%
from tensorflow import math, matmul, reshape, shape, transpose, cast, float32, concat
from tensorflow.keras.layers import Dense, Layer
from tensorflow.keras.backend import softmax
# Implementing the Scaled-Dot Product Attention

class DotProductAttention(Layer):
  def __init__(self, **kwargs):
    super().__init__(**kwargs)

  def call(self, queries, keys, values, mask=None):
    d_k = queries.shape[-1]
    # Scoring the queries against the keys after transposing the latter, and scaling
    scores = matmul(queries, keys, transpose_b=True) / math.sqrt(cast(d_k, float32))
    # Apply mask to the attention scores
    if mask is not None:
      scores += -1e9 * mask
    # Computing the weights by a softmax operation
    weights = softmax(scores)
    # Computing the attention by a weighted sum of the value vectors
    return matmul(weights, values)
    # Implementing the Multi-Head Attention

class MultiHeadAttention(Layer):
  def __init__(self, h, d_k, d_v, d_model, **kwargs):
    super().__init__(**kwargs)
    self.attention = DotProductAttention() # Scaled dot product attention
    self.heads = h # Number of attention heads to use
    self.d_k = d_k # Dimensionality of the linearly projected queries and keys
    self.d_v = d_v # Dimensionality of the linearly projected values
    self.d_model = d_model # Dimensionality of the model
    self.W_q = Dense(d_k) # Learned projection matrix for the queries
    self.W_k = Dense(d_k) # Learned projection matrix for the keys
    self.W_v = Dense(d_v) # Learned projection matrix for the values
    self.W_o = Dense(d_model) # Learned projection matrix for the multi-head output

  def reshape_tensor(self, x, heads, flag):
    if flag:
      # Tensor shape after reshaping and transposing:
      # (batch_size, heads, seq_length, -1)
      x = reshape(x, shape=(shape(x)[0], shape(x)[1], heads, -1))
      x = transpose(x, perm=(0, 2, 1, 3))
    else:
        x = transpose(x, perm=(0, 2, 1, 3))
        x_shape = shape(x)
        new_shape = (x_shape[0], x_shape[1], x_shape[2] * x_shape[3])
        x = reshape(x, new_shape)

    return x

  def call(self, queries, keys, values, mask=None):
    # Rearrange the queries to be able to compute all heads in parallel
    q_reshaped = self.reshape_tensor(self.W_q(queries), self.heads, True)
    # Resulting tensor shape: (batch_size, heads, input_seq_length, -1)
    # Rearrange the keys to be able to compute all heads in parallel
    k_reshaped = self.reshape_tensor(self.W_k(keys), self.heads, True)
    # Resulting tensor shape: (batch_size, heads, input_seq_length, -1)
    # Rearrange the values to be able to compute all heads in parallel
    v_reshaped = self.reshape_tensor(self.W_v(values), self.heads, True)
    # Resulting tensor shape: (batch_size, heads, input_seq_length, -1)
    # Compute the multi-head attention output using the reshaped queries,
    # keys, and values
    o_reshaped = self.attention(q_reshaped, k_reshaped, v_reshaped, mask=mask)
    # Resulting tensor shape: (batch_size, heads, input_seq_length, -1)
    # Rearrange back the output into concatenated form
    output = self.reshape_tensor(o_reshaped, self.heads, False)
    # Resulting tensor shape: (batch_size, input_seq_length, d_model)
    return self.W_o(output)
# %%
from numpy import random
input_seq_length = 5 # Maximum length of the input sequence
h = 8 # Number of self-attention heads
d_k = 64 # Dimensionality of the linearly projected queries and keys
d_v = 64 # Dimensionality of the linearly projected values
d_model = 512 # Dimensionality of the model sub-layers' outputs
batch_size = 64 # Batch size from the training process
queries = random.random((batch_size, input_seq_length, d_k))
keys = random.random((batch_size, input_seq_length, d_k))
values = random.random((batch_size, input_seq_length, d_v))
multihead_attention = MultiHeadAttention(h, d_k, d_v, d_model)
print(multihead_attention(queries, keys, values))

tf.Tensor(
[[[ 0.13987488  0.30761647  0.02906834 ... -0.16418055  0.00747937
    0.134643  ]
  [ 0.1417179   0.3093925   0.02572403 ... -0.17111947  0.0153925
    0.14233968]
  [ 0.12901346  0.3089638   0.02434182 ... -0.16623019  0.00347683
    0.13525382]
  [ 0.14464812  0.31266147  0.0334737  ... -0.16386218  0.01659877
    0.14627695]
  [ 0.13608232  0.30922857  0.03331729 ... -0.17057301  0.01323724
    0.14136611]]

 [[ 0.03687583  0.5083465  -0.06797247 ... -0.37163785 -0.05848521
    0.18905208]
  [ 0.0328756   0.5113885  -0.06789006 ... -0.37105635 -0.06338108
    0.18322463]
  [ 0.03317652  0.5156679  -0.06193725 ... -0.36009657 -0.0592146
    0.191917  ]
  [ 0.0322786   0.5081383  -0.07410406 ... -0.36320326 -0.05663864
    0.1925717 ]
  [ 0.02948342  0.51423    -0.06779737 ... -0.3678403  -0.06324235
    0.18439998]]

 [[ 0.20744371  0.307809    0.00739995 ... -0.33406168  0.01336134
    0.10662279]
  [ 0.19506627  0.31260258 -0.00123618 ... -0.33562514  0.01799057
    0.1

In [3]:
import tensorflow as tf

class BahdanauAttention(tf.keras.layers.Layer):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)  # Dense layer for the encoder hidden states
        self.W2 = tf.keras.layers.Dense(units)  # Dense layer for the decoder hidden state
        self.V = tf.keras.layers.Dense(1)       # Dense layer to compute alignment scores

    def call(self, query, values):
        """
        Args:
            query: Decoder hidden state (shape: [batch_size, hidden_size]).
            values: Encoder outputs (shape: [batch_size, seq_len, hidden_size]).
        Returns:
            context_vector: Weighted sum of encoder outputs (shape: [batch_size, hidden_size]).
            attention_weights: Attention weights (shape: [batch_size, seq_len]).
        """
        # Add time axis to query for broadcasting (shape: [batch_size, 1, hidden_size])
        query_with_time_axis = tf.expand_dims(query, 1)

        # Compute the alignment scores (shape: [batch_size, seq_len, 1])
        score = self.V(tf.nn.tanh(self.W1(values) + self.W2(query_with_time_axis)))

        # Remove the last axis (shape: [batch_size, seq_len])
        attention_weights = tf.nn.softmax(score, axis=1)

        # Compute the context vector as the weighted sum of values (shape: [batch_size, hidden_size])
        context_vector = tf.reduce_sum(attention_weights * values, axis=1)

        return context_vector, attention_weights

# Example usage
if __name__ == "__main__":
    # Define batch size, sequence length, and hidden size
    batch_size = 64
    seq_len = 10
    hidden_size = 256
    attention_units = 128

    # Instantiate the attention layer
    attention = BahdanauAttention(units=attention_units)

    # Simulated encoder outputs (values) and decoder hidden state (query)
    encoder_outputs = tf.random.normal([batch_size, seq_len, hidden_size])
    decoder_hidden_state = tf.random.normal([batch_size, hidden_size])

    # Apply the attention mechanism
    context_vector, attention_weights = attention(decoder_hidden_state, encoder_outputs)

    print("Context vector shape:", context_vector.shape)  # Expected: [batch_size, hidden_size]
    print("Attention weights shape:", attention_weights.shape)  # Expected: [batch_size, seq_len]


Context vector shape: (64, 256)
Attention weights shape: (64, 10, 1)


In [4]:
import tensorflow as tf
from tensorflow.keras.layers import Layer, Dense

class LuongAttention(Layer):
    def __init__(self, attention_type, hidden_size):
        super(LuongAttention, self).__init__()
        self.attention_type = attention_type
        self.hidden_size = hidden_size

        if attention_type == "general":
            self.attention_weight = Dense(hidden_size)
        elif attention_type == "concat":
            self.attention_weight = Dense(hidden_size)
            self.v = tf.Variable(tf.random.normal([hidden_size]), trainable=True)

    def score(self, hidden, encoder_outputs):
        if self.attention_type == "dot":
            # Dot product between hidden state and encoder outputs
            return tf.matmul(encoder_outputs, tf.expand_dims(hidden, axis=-1))[:, :, 0]

        elif self.attention_type == "general":
            # Linear transformation followed by dot product
            energy = self.attention_weight(encoder_outputs)
            return tf.matmul(energy, tf.expand_dims(hidden, axis=-1))[:, :, 0]

        elif self.attention_type == "concat":
            # Concatenate hidden state with encoder outputs
            hidden_expanded = tf.expand_dims(hidden, axis=1)
            hidden_expanded = tf.tile(hidden_expanded, [1, tf.shape(encoder_outputs)[1], 1])
            concat_input = tf.concat([hidden_expanded, encoder_outputs], axis=-1)
            energy = tf.tanh(self.attention_weight(concat_input))
            return tf.reduce_sum(energy * self.v, axis=2)

        else:
            raise ValueError("Unknown attention type: {}".format(self.attention_type))

    def call(self, hidden, encoder_outputs):
        # Compute alignment scores
        alignment_scores = self.score(hidden, encoder_outputs)

        # Softmax normalization to obtain attention weights
        attention_weights = tf.nn.softmax(alignment_scores, axis=1)

        # Compute the context vector as the weighted sum of encoder outputs
        context_vector = tf.matmul(tf.expand_dims(attention_weights, axis=1), encoder_outputs)
        context_vector = tf.squeeze(context_vector, axis=1)

        return context_vector, attention_weights


# Example usage
if __name__ == "__main__":
    batch_size = 2
    seq_len = 5
    hidden_size = 10

    # Simulated inputs
    hidden = tf.random.normal([batch_size, hidden_size])  # Decoder hidden state
    encoder_outputs = tf.random.normal([batch_size, seq_len, hidden_size])  # Encoder outputs

    # Instantiate Luong Attention (dot, general, or concat)
    attention_type = "dot"  # Options: "dot", "general", "concat"
    attention_layer = LuongAttention(attention_type, hidden_size)

    # Forward pass
    context_vector, attention_weights = attention_layer(hidden, encoder_outputs)

    print("Context vector:", context_vector.numpy())
    print("Attention weights:", attention_weights.numpy())


Context vector: [[-1.2955989   0.6636053   0.03890432  0.89639723  0.4319114  -0.52019006
  -0.09344348  1.508288   -0.1414347  -0.24801174]
 [ 1.0715642   0.39579365  0.37740907 -0.7698864   0.5327158   0.24594975
   0.7296208   0.40479633 -0.6249022  -1.0270318 ]]
Attention weights: [[9.4991928e-01 8.8936475e-04 5.8627836e-03 5.9943022e-03 3.7334323e-02]
 [9.1376507e-01 9.7129603e-05 8.4337562e-02 9.2828377e-05 1.7074916e-03]]
