# RNN Implementation and Experiments

This notebook demonstrates the RNN implementation and experiments. We'll work with the NusaX-Sentiment dataset to perform text classification.

In [None]:
%load_ext autoreload
%autoreload 2

import sys
import os

sys.path.append('../..')

import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import Embedding, SimpleRNN, Dropout, Dense, Bidirectional
from sklearn.metrics import f1_score, classification_report

In [None]:
from src.models.src.models.base_model.utils.nusax_loader import NusaXLoader
from src.models.src.models.rnn.experiment import RNNExperiments
from src.models.src.models.rnn.rnn_model import RNNModel
from src.models.src.models.rnn.rnn_layer import RNNLayer
from src.models.src.models.base_model.layers.embedding_layer import EmbeddingLayer
from src.models.src.models.base_model.layers.dense_layer import DenseLayer
from src.models.src.models.base_model.layers.dropout_layer import DropoutLayer
from src.models.src.models.base_model.layers.activation_layer import Softmax
from src.models.src.models.base_model.utils.evaluation import compare_keras_vs_scratch
from src.models.src.models.base_model.utils.visualization import plot_training_history

In [None]:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(f"Num GPUs Available: {len(gpus)}")
        print(f"Num Logical GPUs: {len(logical_gpus)}")
    except RuntimeError as e:
        print(e)


## 1. Load Data

In [None]:
# Init data loader
data_loader = NusaXLoader(batch_size=32, add=True)
# Ini add buat ngubah pathnya nambah "../" tapi harusnya gaperlu soalnya di file siblingnsnya juga gapake ini, api entah kenapa gabisa jalan kalo gadipasang

# Train
train_dataset = data_loader.get_dataset('train')
for tokens, labels in train_dataset.take(1):
    sample_tokens = tokens.numpy()
    sample_labels = labels.numpy()
    break

# Get vocabulary
vocab = data_loader.get_vocabulary()
print(f"Vocabulary size: {len(vocab)}")
print(f"First 20 words in vocabulary: {vocab[:20]}")

# Output Example
print("\nSample texts:")
for i in range(3):
    # Convert token IDs back to words
    words = [vocab[idx] if idx < len(vocab) else "[UNK]" for idx in sample_tokens[i] if idx > 0]
    text = " ".join(words)
    print(f"Text {i+1}: {text}")
    print(f"Label: {sample_labels[i]}\n")

## 2. Hyperparameter Experiments with RNN

In [None]:
experiments = RNNExperiments(data_loader=data_loader, batch_size=32, epochs=10, embedding_dim=100)

### 2.1 Experiment: Number of RNN Layers

In [None]:
# Define variants for number of RNN layers
layer_count_variants = [
    (1, "1 RNN Layer"),
    (2, "2 RNN Layers"),
    (3, "3 RNN Layers")
]

# Run experiment
layer_count_models, layer_count_histories = experiments.run_layer_count_experiment(layer_count_variants)

### 2.2 Experiment: Number of RNN Cells per Layer

In [None]:
# Define variants for cell counts
cell_count_variants = [
    ([32], "32 Units"),
    ([64], "64 Units"),
    ([128], "128 Units")
]

# Run experiment
cell_count_models, cell_count_histories = experiments.run_cell_count_experiment(cell_count_variants)

### 2.3 Experiment: RNN Direction

In [None]:
# Define variants for RNN direction
direction_variants = [
    (False, "Unidirectional RNN"),
    (True, "Bidirectional RNN")
]

# Run experiment
direction_models, direction_histories = experiments.run_direction_experiment(direction_variants)

## 3. From-Scratch RNN Implementation

In [None]:
# Choose one of the trained models (e.g., from the bidirectional experiment)
keras_model_path = "../../output/models/rnn/rnn_bidirectional.keras"
if os.path.exists(keras_model_path):
    keras_model = load_model(keras_model_path)
elif len(direction_models) > 1 and direction_models[1][0] is not None:
    keras_model = direction_models[1][0]  # Bidirectional RNN model
else:
    keras_model = direction_models[0][0]  # Unidirectional RNN model

# Summary of the chosen model
keras_model.summary()

# Save the model weights
keras_model.save_weights('../output/models/rnn/rnn_model.weights.h5')

In [None]:
# Get embedding dimension and vocabulary size
embedding_dim = 100
vocab_size = len(data_loader.get_vocabulary())
hidden_dim = 128
num_classes = data_loader.num_classes
sequence_length = data_loader.max_sequence_length
bidirectional = True  # Set this based on the chosen model

# Create a from-scratch RNN model that matches the Keras model
scratch_model = RNNModel()

# Add layers corresponding to the Keras model architecture
scratch_model.add(EmbeddingLayer(input_dim=vocab_size, output_dim=embedding_dim))
scratch_model.add(RNNLayer(input_dim=embedding_dim, hidden_dim=hidden_dim, bidirectional=bidirectional, return_sequences=False))
scratch_model.add(DropoutLayer(dropout_rate=0.2))
scratch_model.add(DenseLayer(input_dim=hidden_dim*2 if bidirectional else hidden_dim, output_dim=num_classes))
scratch_model.add(Softmax())

# Load weights from the Keras model
scratch_model.load_weights_from_keras(keras_model)

print("Weights loaded from Keras model to from-scratch implementation.")

In [None]:
# Get test data
x_test, y_test = data_loader.get_vectorized_data('test')

# Compare predictions
comparison = compare_keras_vs_scratch(keras_model, scratch_model, x_test, y_test, batch_size=32)

print("\nKeras Model Metrics:")
print(f"Accuracy: {comparison['keras_metrics']['accuracy']:.4f}")
print(f"Macro F1-Score: {comparison['keras_metrics']['macro_f1']:.4f}")

print("\nFrom-Scratch Model Metrics:")
print(f"Accuracy: {comparison['scratch_metrics']['accuracy']:.4f}")
print(f"Macro F1-Score: {comparison['scratch_metrics']['macro_f1']:.4f}")

print(f"\nModel Agreement: {comparison['model_agreement']:.4f}")

In [None]:
# Get a few test samples
num_samples = 5
sample_indices = np.random.choice(x_test.shape[0], num_samples, replace=False)
sample_texts = x_test[sample_indices]
sample_labels = y_test[sample_indices]

# Make predictions with both models
keras_preds = np.argmax(keras_model.predict(sample_texts), axis=1)
scratch_preds = scratch_model.predict(sample_texts)

# Define sentiment labels
sentiment_labels = ["Negative", "Neutral", "Positive"]

# Visualize the results
for i in range(num_samples):
    # Convert token IDs back to words
    words = [vocab[idx] if idx < len(vocab) else "[UNK]" for idx in sample_texts[i] if idx > 0]
    text = " ".join(words)
    
    # Show true label and predictions
    keras_correct = keras_preds[i] == sample_labels[i]
    scratch_correct = scratch_preds[i] == sample_labels[i]
    
    print(f"\nText: {text[:100]}{'...' if len(text) > 100 else ''}")
    print(f"True sentiment: {sentiment_labels[sample_labels[i]]}")
    print(f"Keras prediction: {sentiment_labels[keras_preds[i]]} {'✓' if keras_correct else '✗'}")
    print(f"Scratch prediction: {sentiment_labels[scratch_preds[i]]} {'✓' if scratch_correct else '✗'}")

## 4. Additional Experiments: Multi-layer RNN Models

In [None]:
# Ensure output directories exist
os.makedirs("../../output/models/rnn", exist_ok=True)
os.makedirs("../../output/results/rnn", exist_ok=True)

In [None]:
# Load models from previously saved files
model_paths = {
    "1_layer": "../output/models/rnn/rnn_layers_1.keras",
    "2_layer": "../output/models/rnn/rnn_layers_2.keras",
    "3_layer": "../output/models/rnn/rnn_layers_3.keras"
}

loaded_models = {}

for name, path in model_paths.items():
    if os.path.exists(path):
        print(f"Loading model: {name}")
        loaded_models[name] = load_model(path)
        loaded_models[name].summary()
    else:
        print(f"Model file not found: {path}")

In [None]:
# Compare the multi-layer models
results = {}

for name, path in model_paths.items():
    if not os.path.exists(path):
        print(f"Model file not found: {path}")
        continue
        
    print(f"\nBuilding and comparing {name} model...")
    
    # Create a matching from-scratch model for each Keras model
    try:
        comparison_result = experiments.compare_models(keras_model_path=path)
        
        # Store the results
        results[name] = comparison_result
        
        # Print key metrics
        print("\nComparison Results:")
        print(f"Keras Accuracy: {comparison_result['keras_metrics']['accuracy']:.4f}")
        print(f"Scratch Accuracy: {comparison_result['scratch_metrics']['accuracy']:.4f}")
        print(f"Model Agreement: {comparison_result['model_agreement']:.4f}")
        
    except Exception as e:
        print(f"Error processing model {name}: {e}")
        import traceback
        traceback.print_exc()

In [None]:
# Visualize agreement between Keras and from-scratch models
if results:
    model_names = list(results.keys())
    agreements = [results[name]['model_agreement'] for name in model_names]
    
    plt.figure(figsize=(10, 6))
    plt.bar(model_names, agreements, color='skyblue')
    plt.title('Agreement Between Keras and From-Scratch Models')
    plt.xlabel('Model')
    plt.ylabel('Agreement')
    plt.ylim(0, 1.0)
    plt.grid(axis='y', alpha=0.3)
    
    # Add agreement values on top of bars
    for i, v in enumerate(agreements):
        plt.text(i, v + 0.01, f'{v:.4f}', ha='center')
    
    plt.tight_layout()
    plt.savefig('../../output/results/rnn/model_agreement.png')
    plt.show()