## Get drive data

In [5]:
from google.colab import drive
import os
drive.mount('/content/drive/')
save_path = '/content/drive/MyDrive/nlu'
os.makedirs(save_path, exist_ok=True)

Mounted at /content/drive/


## Hyperparameters

In [6]:
MODEL_NAME = 'microsoft/deberta-v3-base'
MAX_SEQUENCE_LENGTH = 128
HIDDEN_SIZE = 512
NUM_CLASSES = 2
BATCH_SIZE = 32
EPOCHS = 15
DROPOUT_RATE = 0.3

## Imports

In [7]:
import pandas as pd
import numpy as np
import tensorflow as tf
import keras
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (
    Input, Dense, Dropout, Dot, Softmax, Reshape, Permute,
    Concatenate, GlobalAveragePooling1D, GlobalMaxPooling1D,
    Lambda, BatchNormalization, LSTM, Bidirectional
)
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
from transformers import TFAutoModel, AutoTokenizer
from keras.config import enable_unsafe_deserialization
from keras.saving import register_keras_serializable


## Preparation of data and creation of model

In [8]:
def prepare_deberta_data(data, tokenizer, max_length=128):

    # Encode premises as lists
    premises = data['premise'].fillna('').astype(str).tolist()
    hypotheses = data['hypothesis'].fillna('').astype(str).tolist()

    # Tokenize inputs
    encoded = tokenizer(
        premises, hypotheses,
        padding='max_length',
        truncation=True,
        max_length=max_length,
        return_tensors='np'
    )

    return encoded

def create_decomposable_attention_model(transformer_model, hidden_dim, dropout_rate, max_seq_length, num_classes=2):

    # Define inputs
    input_ids = Input(shape=(max_seq_length,), dtype='int32', name='input_ids')
    attention_mask = Input(shape=(max_seq_length,), dtype='int32', name='attention_mask')
    token_type_ids = Input(shape=(max_seq_length,), dtype='int32', name='token_type_ids')

    # Convert attention_mask to float32
    @keras.saving.register_keras_serializable()
    def cast_to_float(x):
      return tf.cast(x, tf.float32)

    attention_mask_float = Lambda(cast_to_float)(attention_mask)

    # Method to handle the transformer model call in sequence_output layer
    @keras.saving.register_keras_serializable()
    def get_transformer_embeddings(inputs):
        return transformer_model(
            input_ids=inputs[0],
            attention_mask=inputs[1],
            token_type_ids=inputs[2]
        )[0]

    embedding_dim = transformer_model.config.hidden_size

    # Get transformer embeddings
    sequence_output = Lambda(
        get_transformer_embeddings,
        output_shape=(max_seq_length, embedding_dim)
    )([input_ids, attention_mask, token_type_ids])

    # Split premise and hypothesis with token_type_ids
    @register_keras_serializable()
    def cast_to_float0(x):
      return tf.cast(tf.equal(x, 0), tf.float32)

    @register_keras_serializable()
    def cast_to_float1(x):
      return tf.cast(tf.equal(x, 1), tf.float32)

    premise_mask = Lambda(cast_to_float0)(token_type_ids)
    hypothesis_mask = Lambda(cast_to_float1)(token_type_ids)

    @register_keras_serializable()
    def multiply_pair(x):
        return x[0] * x[1]

    # Include the attention mask
    premise_mask = Lambda(multiply_pair)([premise_mask, attention_mask_float])
    hypothesis_mask = Lambda(multiply_pair)([hypothesis_mask, attention_mask_float])

    @register_keras_serializable()
    def expand_last_dim(x):
        return tf.expand_dims(x, axis=-1)

    # Reshape masks
    premise_mask_expanded = Lambda(expand_last_dim)(premise_mask)
    hypothesis_mask_expanded = Lambda(expand_last_dim)(hypothesis_mask)

    # Extract premise and hypothesis embeddings
    premise_embedded = Lambda(multiply_pair)([sequence_output, premise_mask_expanded])
    hypothesis_embedded = Lambda(multiply_pair)([sequence_output, hypothesis_mask_expanded])

    # Encoding layer
    premise_encoded = Dense(hidden_dim, activation='tanh')(premise_embedded)
    premise_encoded = Dropout(dropout_rate)(premise_encoded)

    hypothesis_encoded = Dense(hidden_dim, activation='tanh')(hypothesis_embedded)
    hypothesis_encoded = Dropout(dropout_rate)(hypothesis_encoded)

    # Attention mechanism
    @register_keras_serializable()
    def compute_attention(inputs):
        p_enc, h_enc = inputs
        return tf.matmul(p_enc, tf.transpose(h_enc, perm=[0, 2, 1]))

    attention_scores = Lambda(compute_attention)([premise_encoded, hypothesis_encoded])

    # Apply softmax to get attention weights
    @register_keras_serializable()
    def apply_softmax_1(x):
        return tf.nn.softmax(x, axis=-1)
    @register_keras_serializable()
    def apply_softmax_2(x):
        return tf.nn.softmax(x, axis=1)

    premise_attention = Lambda(apply_softmax_1)(attention_scores)
    hypothesis_attention = Lambda(apply_softmax_2)(attention_scores)

    # Get attended vectors
    @register_keras_serializable()
    def get_attended_1(inputs):
        att, h_enc = inputs
        return tf.matmul(att, h_enc)
    @register_keras_serializable()
    def get_attended_2(inputs):
        att, p_enc = inputs
        return tf.matmul(tf.transpose(att, perm=[0, 2, 1]), p_enc)

    attended_hypothesis = Lambda(get_attended_1)([premise_attention, hypothesis_encoded])
    attended_premise = Lambda(get_attended_2)([hypothesis_attention, premise_encoded])

    # Combine original and attended vectors
    enhanced_premise = Concatenate()([premise_encoded, attended_hypothesis])
    enhanced_hypothesis = Concatenate()([hypothesis_encoded, attended_premise])

    # Compare step
    compared_premise = Dense(hidden_dim, activation='tanh')(enhanced_premise)
    compared_premise = Dropout(dropout_rate)(compared_premise)

    compared_hypothesis = Dense(hidden_dim, activation='tanh')(enhanced_hypothesis)
    compared_hypothesis = Dropout(dropout_rate)(compared_hypothesis)

    # Aggregate step
    @register_keras_serializable()
    def pooling_with_mask(inputs):
        compared, mask = inputs
        sum_values = tf.reduce_sum(compared * mask, axis=1)
        count = tf.reduce_sum(mask[:,:,0], axis=1, keepdims=True)
        return sum_values / (count + 1e-10)

    aggregated_premise = Lambda(pooling_with_mask)([compared_premise, premise_mask_expanded])
    aggregated_hypothesis = Lambda(pooling_with_mask)([compared_hypothesis, hypothesis_mask_expanded])

    # Combine aggregated vectors
    merged = Concatenate()([aggregated_premise, aggregated_hypothesis])

    # Final classification layers
    dense = Dense(hidden_dim, activation='tanh')(merged)
    dense = Dropout(dropout_rate)(dense)
    output = Dense(num_classes, activation='softmax')(dense)

    # Create model
    model = Model(inputs=[input_ids, attention_mask, token_type_ids], outputs=output)

    # Compile model
    model.compile(
        optimizer=Adam(learning_rate=1e-5),
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    return model



## Get pre-trained tokenizer and model

In [9]:
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
transformer_model = TFAutoModel.from_pretrained(MODEL_NAME, trainable=False)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/52.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/579 [00:00<?, ?B/s]

spm.model:   0%|          | 0.00/2.46M [00:00<?, ?B/s]



tf_model.h5:   0%|          | 0.00/736M [00:00<?, ?B/s]

All model checkpoint layers were used when initializing TFDebertaV2Model.

All the layers of TFDebertaV2Model were initialized from the model checkpoint at microsoft/deberta-v3-base.
If your task is similar to the task the model of the checkpoint was trained on, you can already use TFDebertaV2Model for predictions without further training.


## Run model on test data

In [10]:
# Create model with hyperparameters
model = create_decomposable_attention_model(
    transformer_model=transformer_model,
    hidden_dim=HIDDEN_SIZE,
    dropout_rate=DROPOUT_RATE,
    max_seq_length=MAX_SEQUENCE_LENGTH,
    num_classes=NUM_CLASSES
)

# Get saved model weights
enable_unsafe_deserialization()
model.load_weights("/content/drive/MyDrive/nlu/attmodel/finalemodelB.weights.h5")

# Load CSV
df = pd.read_csv('/content/drive/MyDrive/nlu/test.csv')

# Tokenize and encode
test_inputs = prepare_deberta_data(df, tokenizer)

# Predict class probabilities
y_probs = model.predict([
    test_inputs['input_ids'],
    test_inputs['attention_mask'],
    test_inputs['token_type_ids']
])

# Convert to predicted class
y_labels = np.argmax(y_probs, axis=1)

  saveable.load_own_variables(weights_store.get(inner_path))


[1m104/104[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m40s[0m 223ms/step


## Put results in a csv file

In [None]:
# Convert predictions to a DataFrame
predictions_df = pd.DataFrame({'prediction': y_labels})

# Save as csv
predictions_df.to_csv('/content/drive/MyDrive/nlu/Group_15_B.csv', index=False)
print("Saved Group_15_C.csv with", len(y_labels), "rows.")

Saved Group_15_C.csv with 3302 rows.


## Show predictions

In [14]:
pd.set_option("display.max_rows", None)
pd.set_option("display.max_columns", None)

# Load and display the predictions CSV
predictions_df = pd.read_csv("Group_15_B.csv")
display(predictions_df)

Unnamed: 0,predictions
0,1
1,1
2,1
3,1
4,1
5,0
6,0
7,0
8,0
9,1
