# DA6401 Assignment 3 - Transliteration model (Encoder/Decoder model)
## Sudhanva Satish - DA24M023

In [None]:
# 1. Imports & Setup
import os
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Embedding, Dense, SimpleRNN, LSTM, GRU
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.optimizers import Adam
import wandb
from wandb.integration.keras import WandbMetricsLogger, WandbModelCheckpoint

wandb.login(key="<Add your key>")


In [None]:
# 2. Load Dakshina Dataset
DATA_DIR = "/kaggle/input/dakshinadataset/dakshina_dataset_v1.0/hi/lexicons"

def load_tsv(path):
    with open(path, encoding='utf-8') as f:
        lines = f.read().strip().split('\n')
    return [line.split('\t') for line in lines if '\t' in line]

train_pairs = load_tsv(os.path.join(DATA_DIR, "hi.translit.sampled.train.tsv"))
val_pairs = load_tsv(os.path.join(DATA_DIR, "hi.translit.sampled.dev.tsv"))


In [None]:
# 2. Load Dakshina Dataset
DATA_DIR = "/kaggle/input/dakshinadataset/dakshina_dataset_v1.0/ka/lexicons"

def load_tsv(path):
    with open(path, encoding='utf-8') as f:
        lines = f.read().strip().split('\n')
    return [line.split('\t') for line in lines if '\t' in line]

train_pairs = load_tsv(os.path.join(DATA_DIR, "ka.translit.sampled.train.tsv"))
val_pairs = load_tsv(os.path.join(DATA_DIR, "ka.translit.sampled.dev.tsv"))


In [None]:
# 3. Tokenization
def tokenize_pairs(pairs):
    latin_texts = [x[1] for x in pairs]         # What user types (input)
    devanagari_texts = [x[0] for x in pairs]    # What model should output

    devanagari_texts_in  = ['\t' + t for t in devanagari_texts]
    devanagari_texts_out = [t + '\n' for t in devanagari_texts]
    return latin_texts, devanagari_texts_in, devanagari_texts_out


train_lat, train_deva_in, train_deva_out = tokenize_pairs(train_pairs)
val_lat, val_deva_in, val_deva_out = tokenize_pairs(val_pairs)

def fit_char_tokenizer(texts):
    tokenizer = Tokenizer(char_level=True, lower=False)
    tokenizer.fit_on_texts(texts)
    return tokenizer

input_tokenizer = fit_char_tokenizer(train_lat + val_lat)
target_tokenizer = fit_char_tokenizer(train_deva_in + train_deva_out)

VOCAB_SIZE_INPUT = len(input_tokenizer.word_index) + 1
VOCAB_SIZE_TARGET = len(target_tokenizer.word_index) + 1

def encode_and_pad(texts, tokenizer, maxlen=None):
    return pad_sequences(tokenizer.texts_to_sequences(texts), padding='post', maxlen=maxlen)

MAXLEN_INPUT = max(map(len, train_lat))
MAXLEN_TARGET = max(map(len, train_deva_out))

train_encoder_input = encode_and_pad(train_lat, input_tokenizer, MAXLEN_INPUT)
train_decoder_input = encode_and_pad(train_deva_in, target_tokenizer, MAXLEN_TARGET)
train_target_output = np.expand_dims(encode_and_pad(train_deva_out, target_tokenizer, MAXLEN_TARGET), -1)

val_encoder_input = encode_and_pad(val_lat, input_tokenizer, MAXLEN_INPUT)
val_decoder_input = encode_and_pad(val_deva_in, target_tokenizer, MAXLEN_TARGET)
val_target_output = np.expand_dims(encode_and_pad(val_deva_out, target_tokenizer, MAXLEN_TARGET), -1)


In [None]:
print(train_lat[:5])

In [None]:
# Error checking
"""start_token = target_tokenizer.word_index.get('<s>')
end_token = target_tokenizer.word_index.get('</s>')

if start_token is None or end_token is None:
    raise ValueError("Start/end tokens not found in tokenizer vocabulary. Did you include <s> and </s> during training?")
"""

# Question 1 - Build the model

In [None]:
# 4. Build Seq2Seq Model
def build_seq2seq_model(vocab_size, embedding_dim, hidden_dim, cell_type, num_encoder_layers, num_decoder_layers, dropout_rate):
    encoder_inputs = Input(shape=(None,))
    decoder_inputs = Input(shape=(None,))
    embedding = Embedding(vocab_size, embedding_dim, mask_zero=True, name="embedding")
    enc_emb = embedding(encoder_inputs)
    dec_emb = embedding(decoder_inputs)
    RNN = {"RNN": SimpleRNN, "LSTM": LSTM}[cell_type]

    x = enc_emb
    for i in range(num_encoder_layers):
        rnn = RNN(hidden_dim, return_sequences=True, return_state=True, dropout=dropout_rate, name=f"encoder_{cell_type}_{i}")
        if cell_type == "LSTM":
            x, state_h, state_c = rnn(x)
            encoder_states = [state_h, state_c]
        else:
            x, state_h = rnn(x)
            encoder_states = [state_h]

    y = dec_emb
    for i in range(num_decoder_layers):
        rnn = RNN(hidden_dim, return_sequences=True, return_state=True, dropout=dropout_rate, name=f"decoder_{cell_type}_{i}")
        if cell_type == "LSTM":
            y, _, _ = rnn(y, initial_state=encoder_states)
        else:
            y, _ = rnn(y, initial_state=encoder_states)

    decoder_dense = Dense(vocab_size, activation="softmax", name="dense")
    outputs = decoder_dense(y)
    return Model([encoder_inputs, decoder_inputs], outputs)


In [None]:
# 5. Inference Models (LSTM only)
def build_inference_models(model, hidden_dim, cell_type):
    encoder_inputs = model.input[0]
    decoder_inputs = model.input[1]
    embedding_layer = model.get_layer("embedding")
    encoder_emb = embedding_layer(encoder_inputs)
    decoder_emb = embedding_layer(decoder_inputs)

    # Dynamically get the first matching RNN layer for encoder and decoder
    encoder_rnn = next(layer for layer in model.layers if layer.name.startswith(f"encoder_{cell_type}"))
    decoder_rnn = next(layer for layer in model.layers if layer.name.startswith(f"decoder_{cell_type}"))
    decoder_dense = model.get_layer("dense")

    if cell_type == "LSTM":
        _, state_h, state_c = encoder_rnn(encoder_emb)
        encoder_model = Model(encoder_inputs, [state_h, state_c])

        decoder_state_input_h = Input(shape=(hidden_dim,))
        decoder_state_input_c = Input(shape=(hidden_dim,))
        decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]

        decoder_outputs, state_h, state_c = decoder_rnn(decoder_emb, initial_state=decoder_states_inputs)
        decoder_outputs = decoder_dense(decoder_outputs)

        decoder_model = Model([decoder_inputs] + decoder_states_inputs, [decoder_outputs, state_h, state_c])
    else:  # RNN
        _, state_h = encoder_rnn(encoder_emb)
        encoder_model = Model(encoder_inputs, [state_h])

        decoder_state_input_h = Input(shape=(hidden_dim,))
        decoder_outputs, state_h = decoder_rnn(decoder_emb, initial_state=[decoder_state_input_h])
        decoder_outputs = decoder_dense(decoder_outputs)

        decoder_model = Model([decoder_inputs, decoder_state_input_h], [decoder_outputs, state_h])

    return encoder_model, decoder_model


In [None]:
# 6. Beam Search Decoder
def decode_sequence_beam_search(input_seq, encoder_model, decoder_model, target_tokenizer, beam_width=3, max_output_len=30):
    index_to_char = {i: c for c, i in target_tokenizer.word_index.items()}
    index_to_char[0] = ''
    start_token = target_tokenizer.word_index['<s>']
    end_token = target_tokenizer.word_index['</s>']

    states_value = encoder_model.predict(input_seq)
    sequences = [([start_token], 0.0, states_value)]

    for _ in range(max_output_len):
        all_candidates = []
        for seq, score, states in sequences:
            if seq[-1] == end_token:
                all_candidates.append((seq, score, states))
                continue
            target_seq = np.array([[seq[-1]]])
            output_tokens, h, c = decoder_model.predict([target_seq] + states)
            top_k = np.argsort(output_tokens[0, -1, :])[-beam_width:]

            for token in top_k:
                prob = output_tokens[0, -1, token]
                candidate = (seq + [token], score - np.log(prob + 1e-9), [h, c])
                all_candidates.append(candidate)

        sequences = sorted(all_candidates, key=lambda tup: tup[1])[:beam_width]

    best_seq = sequences[0][0]
    return ''.join(index_to_char.get(idx, '') for idx in best_seq[1:] if idx != end_token)


# Question 2 - Train the model via wandb sweeps

In [None]:
# 7. WandB Sweep Config
sweep_config = {
    'method': 'bayes',
    'metric': {'name': 'val_accuracy', 'goal': 'maximize'},
    'parameters': {
        'embedding_dim': {'values': [16, 32, 64, 256]},
        'hidden_dim': {'values': [16, 32, 64, 256]},
        'cell_type': {'values': ['RNN', 'LSTM']},
        'num_encoder_layers': {'values': [1, 2]},
        'num_decoder_layers': {'values': [1, 2]},
        'dropout_rate': {'values': [0.2, 0.3]},
        'batch_size': {'values': [32, 64]},
        'epochs': {'value': 10},
        'beam_size': {'values': [1, 3, 5]}
    }
}


In [None]:
# 8. Sweep Train Function
def sweep_train(config=None):
    with wandb.init(config=config):
        config = wandb.config
        wandb.run.name = f"{config.cell_type}_emb{config.embedding_dim}_hid{config.hidden_dim}_enc{config.num_encoder_layers}_dec{config.num_decoder_layers}_drop{int(config.dropout_rate*100)}_beam{config.beam_size}"

        model = build_seq2seq_model(
            vocab_size=VOCAB_SIZE_TARGET,
            embedding_dim=config.embedding_dim,
            hidden_dim=config.hidden_dim,
            cell_type=config.cell_type,
            num_encoder_layers=config.num_encoder_layers,
            num_decoder_layers=config.num_decoder_layers,
            dropout_rate=config.dropout_rate
        )

        model.compile(optimizer=Adam(), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

        model.fit(
            [train_encoder_input, train_decoder_input],
            train_target_output,
            validation_data=([val_encoder_input, val_decoder_input], val_target_output),
            batch_size=config.batch_size,
            epochs=config.epochs,
            callbacks=[WandbMetricsLogger(), WandbModelCheckpoint("model_checkpoint.keras")],
            verbose=2
        )

        # Beam Search part - commented to reduce runtime
        """encoder_model, decoder_model = build_inference_models(model, config.hidden_dim, config.cell_type)
        input_seq = val_encoder_input[0:1]
        prediction = decode_sequence_beam_search(
            input_seq=input_seq,
            encoder_model=encoder_model,
            decoder_model=decoder_model,
            target_tokenizer=target_tokenizer,
            beam_width=config.beam_size
        )

        print("Predicted:", prediction)
        print("Ground truth:", val_deva_out[0])"""


In [None]:
# 9. Start the Sweep
sweep_id = wandb.sweep(sweep_config, project="DL_A3")
wandb.agent(sweep_id, function=sweep_train, count=100)


# Question 3 in report

# Question 4 - Evaluate model on Test set

In [None]:
best_config = {
    'embedding_dim': 256,
    'hidden_dim': 256,
    'cell_type': 'LSTM',
    'num_encoder_layers': 2,
    'num_decoder_layers': 2,
    'dropout_rate': 0.2,
    'batch_size': 64,
    'epochs': 10#20
}


In [None]:
best_model = build_seq2seq_model(
    vocab_size=VOCAB_SIZE_TARGET,
    embedding_dim=best_config['embedding_dim'],
    hidden_dim=best_config['hidden_dim'],
    cell_type=best_config['cell_type'],
    num_encoder_layers=best_config['num_encoder_layers'],
    num_decoder_layers=best_config['num_decoder_layers'],
    dropout_rate=best_config['dropout_rate']
)

best_model.compile(optimizer=Adam(), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

best_model.fit(
    [train_encoder_input, train_decoder_input],
    train_target_output,
    validation_data=([val_encoder_input, val_decoder_input], val_target_output),
    batch_size=best_config['batch_size'],
    epochs=best_config['epochs'],
    verbose=2
)

best_model.save("best_model.keras")



In [None]:
from tensorflow.keras.models import load_model

best_model = load_model("best_model.keras")

In [None]:
# Load test data
test_path = "/kaggle/input/dakshinadataset/dakshina_dataset_v1.0/hi/lexicons/hi.translit.sampled.test.tsv"
#test_path = "/kaggle/input/dakshinadataset/dakshina_dataset_v1.0/ka/lexicons/ka.translit.sampled.test.tsv"
test_pairs = load_tsv(test_path)
test_lat, test_deva_in, test_deva_out = tokenize_pairs(test_pairs)

test_encoder_input = encode_and_pad(test_lat, input_tokenizer, MAXLEN_INPUT)
test_decoder_input = encode_and_pad(test_deva_in, target_tokenizer, MAXLEN_TARGET)
test_target_output = encode_and_pad(test_deva_out, target_tokenizer, MAXLEN_TARGET)
test_target_output = np.expand_dims(test_target_output, -1)

wandb.init(project="DL_A3", name="Vanilla_best_test")

test_loss, test_acc = best_model.evaluate([test_encoder_input, test_decoder_input], test_target_output)
print(f"✅ Test Accuracy: {test_acc:.4f}")

wandb.log({'test_accuracy':test_acc})

In [None]:
index_to_char = {i: c for c, i in target_tokenizer.word_index.items()}
index_to_char[0] = ''

def decode_seq(seq):
    decoded = []
    for idx in seq:
        if idx == 0:
            continue
        token = index_to_char.get(idx, '')  # fallback to '' if invalid index
        # check this token based on what was given first
        if token in ['\n']:
            break
        decoded.append(token)
    return ''.join(decoded)

In [None]:
os.makedirs("predictions_vanilla", exist_ok=True)
preds = best_model.predict([test_encoder_input, test_decoder_input])
pred_indices = np.argmax(preds, axis=-1)

index_to_char = {i: c for c, i in target_tokenizer.word_index.items()}
index_to_char[0] = ''

def decode_seq(seq):
    decoded = []
    for idx in seq:
        if idx == 0:
            continue
        token = index_to_char.get(idx, '')  # fallback to '' if invalid index
        if token in ['\n']:#['<','\\','/']:
            break
        decoded.append(token)
    return ''.join(decoded)


decoded_preds = [decode_seq(seq) for seq in pred_indices]
# Here too
decoded_refs = [x.replace(' </s>', '') for x in test_deva_out]

with open("predictions_vanilla/test_predictions.txt", "w", encoding="utf-8") as f:
    for inp, pred, ref in zip(test_lat, decoded_preds, decoded_refs):
        f.write(f"{inp}\t{pred}\t{ref}\n")


In [None]:
# Display first 10 test predictions
for i in range(10):
    input_latin = test_lat[i]
    predicted = decoded_preds[i]
    reference = decoded_refs[i]
    print(f"{i+1}. Input: {input_latin}\n   Predicted: {predicted}\n   Reference: {reference}\n")


In [None]:
import wandb

# Initialize a Wandb run
wandb.init(project="seq2seq_sweep", name="prediction_samples_colored_table_10")

# Create a Wandb table
table = wandb.Table(columns=["Input Word", "Predicted Word", "Target Word"])

# Sample predictions (10 rows)
samples = [
    ("ankit", "अंकीत", "अंकित"),
    ("angreji", "अगग्ेजी", "अंग्रज़ी"),
    ("andhapan", "अंधापन", "अंधापन"),
    ("achnera", "अच्रेर", "अछनेरा"),
    ("advait", "एडववि", "अद्वैत"),
    ("aakar", "आकार", "आकार"),
    ("anupam", "अनुपम", "अनुपम"),
    ("aadesh", "आदेश", "आदेश"),
    ("abhay", "अभाय", "अभय"),
    ("aastik", "आस्तिक्", "आस्तिक")
]

# Add data with color-coded Predicted Word
for input_word, pred_word, target_word in samples:
    # Determine color based on exact match
    color = "#00FF00" if pred_word == target_word else "#FF0000"
    # Wrap predicted word in HTML span
    colored_pred = f'<span style="color: {color}">{pred_word}</span>'
    table.add_data(input_word, colored_pred, target_word)

# Log the table to the Wandb run
wandb.log({"Prediction Samples Colored Table": table})

# Finish the run
wandb.finish()

# Question 5 - Add Attention mechanism to model

In [None]:
attention_config = {
    'embedding_dim': 256,
    'hidden_dim': 256,
    'dropout_rate': 0.2,
    'batch_size': 64,
    'epochs': 10
}


In [None]:
from tensorflow.keras.layers import Layer
import tensorflow.keras.backend as K
from tensorflow.keras.layers import Attention, Concatenate

class BahdanauAttention(Layer):
    def __init__(self, units):
        super().__init__()
        self.W1 = Dense(units)
        self.W2 = Dense(units)
        self.V = Dense(1)

    def call(self, query, values):
        # query: (batch_size, dec_len, hidden)
        # values: (batch_size, enc_len, hidden)
        query_with_time_axis = tf.expand_dims(query, 2)  # (batch_size, dec_len, 1, hidden)
        values_with_time_axis = tf.expand_dims(values, 1)  # (batch_size, 1, enc_len, hidden)

        score = self.V(tf.nn.tanh(self.W1(values_with_time_axis) + self.W2(query_with_time_axis)))  # (batch_size, dec_len, enc_len, 1)
        attention_weights = tf.nn.softmax(score, axis=2)  # (batch_size, dec_len, enc_len, 1)
        context_vector = attention_weights * values_with_time_axis  # (batch_size, dec_len, enc_len, hidden)
        context_vector = tf.reduce_sum(context_vector, axis=2)  # (batch_size, dec_len, hidden)
        return context_vector, tf.squeeze(attention_weights, -1)  # return both context and weights


In [None]:
# Modified attention class to export weights

class BahdanauAttention(Layer):
    def __init__(self, units, return_attention=False):
        super().__init__()
        self.W1 = Dense(units)
        self.W2 = Dense(units)
        self.V = Dense(1)
        self.return_attention = return_attention

    def call(self, query, values):
        query_with_time_axis = tf.expand_dims(query, 2)  # (batch, dec_len, 1, hidden)
        values_with_time_axis = tf.expand_dims(values, 1)  # (batch, 1, enc_len, hidden)

        score = self.V(tf.nn.tanh(self.W1(values_with_time_axis) + self.W2(query_with_time_axis)))  # (batch, dec_len, enc_len, 1)
        attention_weights = tf.nn.softmax(score, axis=2)
        context_vector = tf.reduce_sum(attention_weights * values_with_time_axis, axis=2)

        if self.return_attention:
            return context_vector, tf.squeeze(attention_weights, -1)
        return context_vector


In [None]:
def build_attention_seq2seq_model(vocab_size_input, vocab_size_target, embedding_dim, hidden_dim, dropout_rate):
    # Encoder
    encoder_inputs = Input(shape=(None,))
    encoder_emb = Embedding(vocab_size_input, embedding_dim, mask_zero=True)(encoder_inputs)
    encoder_outputs, state_h, state_c = LSTM(hidden_dim, return_sequences=True, return_state=True)(encoder_emb)

    # Decoder
    decoder_inputs = Input(shape=(None,))
    decoder_emb = Embedding(vocab_size_target, embedding_dim, mask_zero=True)(decoder_inputs)
    decoder_outputs, _, _ = LSTM(hidden_dim, return_sequences=True, return_state=True)(decoder_emb, initial_state=[state_h, state_c])

    # Attention
    context_vector, attention_weights = BahdanauAttention(hidden_dim)(decoder_outputs, encoder_outputs)
    concat = Concatenate()([decoder_outputs, context_vector])

    outputs = Dense(vocab_size_target, activation='softmax')(concat)

    model = Model([encoder_inputs, decoder_inputs], outputs)
    return model


In [None]:
# Attention extraction model

def build_attention_seq2seq_model(vocab_size_input, vocab_size_target, embedding_dim, hidden_dim, dropout_rate):
    # Encoder
    encoder_inputs = Input(shape=(None,))
    encoder_emb = Embedding(vocab_size_input, embedding_dim, mask_zero=True)(encoder_inputs)
    encoder_outputs, state_h, state_c = LSTM(hidden_dim, return_sequences=True, return_state=True)(encoder_emb)

    # Decoder
    decoder_inputs = Input(shape=(None,))
    decoder_emb = Embedding(vocab_size_target, embedding_dim, mask_zero=True)(decoder_inputs)
    decoder_outputs, _, _ = LSTM(hidden_dim, return_sequences=True, return_state=True)(decoder_emb, initial_state=[state_h, state_c])

    # Attention
    context_vector, attention_weights = BahdanauAttention(hidden_dim, True)(decoder_outputs, encoder_outputs)
    concat = Concatenate()([decoder_outputs, context_vector])

    outputs = Dense(vocab_size_target, activation='softmax')(concat)

    model = Model([encoder_inputs, decoder_inputs], outputs)
    return model


In [None]:
attention_sweep_config = {
    'method': 'bayes',
    'metric': {'name': 'val_accuracy', 'goal': 'maximize'},
    'parameters': {
        'embedding_dim': {'values': [128, 256]},
        'hidden_dim': {'values': [128, 256]},
        'dropout_rate': {'values': [0.0, 0.2, 0.3]},
        'batch_size': {'values': [32, 64]},
        'epochs': {'value': 10}
    }
}


In [None]:
def sweep_train_attention(config=None):
    with wandb.init(config=config):
        config = wandb.config
        wandb.run.name = f"attn_emb{config.embedding_dim}_hid{config.hidden_dim}_drop{int(config.dropout_rate*100)}"

        model = build_attention_seq2seq_model(
            vocab_size_input=VOCAB_SIZE_INPUT,
            vocab_size_target=VOCAB_SIZE_TARGET,
            embedding_dim=config.embedding_dim,
            hidden_dim=config.hidden_dim,
            dropout_rate=config.dropout_rate
        )

        model.compile(optimizer=Adam(), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

        model.fit(
            [train_encoder_input, train_decoder_input],
            train_target_output,
            validation_data=([val_encoder_input, val_decoder_input], val_target_output),
            batch_size=config.batch_size,
            epochs=config.epochs,
            callbacks=[WandbMetricsLogger()],
            verbose=2
        )


In [None]:
sweep_id = wandb.sweep(attention_sweep_config, project="DL_A3")
wandb.agent(sweep_id, function=sweep_train_attention, count=50)  # Run 10 configs


In [None]:
attention_model = build_attention_seq2seq_model(
    vocab_size_input=VOCAB_SIZE_INPUT,
    vocab_size_target=VOCAB_SIZE_TARGET,
    embedding_dim=256,
    hidden_dim=256,
    dropout_rate=0.0
)

attention_model.compile(optimizer=Adam(), loss='sparse_categorical_crossentropy', metrics=['accuracy'])

attention_model.fit(
    [train_encoder_input, train_decoder_input],
    train_target_output,
    validation_data=([val_encoder_input, val_decoder_input], val_target_output),
    batch_size=64,
    epochs=10,
    verbose=2
)


In [None]:
attention_model.save("best_attention_model.keras")

In [None]:
from tensorflow.keras.models import load_model

attention_model = load_model('best_attention_model.keras')

In [None]:
wandb.init(project="DL_A3", name='Attention_best_test')

# Evaluate test accuracy
test_loss, test_acc = attention_model.evaluate([test_encoder_input, test_decoder_input], test_target_output, verbose=2)
print(f"✅ Test Accuracy (Attention Model): {test_acc:.4f}")


wandb.log({'test_accuracy':test_acc})


In [None]:
os.makedirs("predictions_attention", exist_ok=True)

attention_preds = attention_model.predict([test_encoder_input, test_decoder_input])
attention_pred_indices = np.argmax(attention_preds, axis=-1)

decoded_attention_preds = [decode_seq(seq) for seq in attention_pred_indices]
decoded_refs = [t.replace(' </s>', '') for t in test_deva_out]

with open("predictions_attention/test_predictions.txt", "w", encoding='utf-8') as f:
    for inp, pred, ref in zip(test_lat, decoded_attention_preds, decoded_refs):
        f.write(f"{inp}\t{pred}\t{ref}\n")


In [None]:
vanilla_preds = []
with open("predictions_vanilla/test_predictions.txt", encoding="utf-8") as f:
    for line in f:
        pred = line.strip().split('\t')
        vanilla_preds.append(pred)

# Compare and print improvements
print("✅ Attention Model Improvements:")
for i, (v, a, r) in enumerate(zip(vanilla_preds, decoded_attention_preds, decoded_refs)):
    if v != r and a == r:
        print(f"{i+1}. Input: {test_lat[i]}\n   Vanilla: {v}\n   Attention: {a}\n   Ref: {r}\n")


In [None]:
# Encoder model for inference
"""encoder_model = Model(
    attention_model.input[0],  # encoder_inputs
    attention_model.get_layer("lstm_3").output  # encoder_outputs, state_h, state_c
)"""
# Find encoder LSTM
encoder_lstm = next(layer for layer in attention_model.layers if isinstance(layer, LSTM))

# Get encoder input
encoder_inputs = attention_model.input[0]  # this is fine

# Get all 3 outputs from encoder LSTM
encoder_outputs, state_h_enc, state_c_enc = encoder_lstm.output

# Reconstruct encoder model
encoder_model = Model(encoder_inputs, [encoder_outputs, state_h_enc, state_c_enc])


# Decoder setup
decoder_inputs = attention_model.input[1]
decoder_emb_layer = attention_model.layers[3]
decoder_lstm_layer = attention_model.layers[4]
attention_layer = attention_model.layers[5]
concat_layer = attention_model.layers[6]
output_layer = attention_model.layers[7]


In [None]:
def decode_with_attention(input_seq, max_len=MAXLEN_TARGET):
    encoder_outs, state_h, state_c = encoder_model.predict(input_seq)
    
    target_seq = np.zeros((1, 1))
    target_seq[0, 0] = target_tokenizer.word_index['\t']

    decoded = []
    attention_weights_all = []

    for _ in range(max_len):
        dec_emb = decoder_emb_layer(target_seq)
        print("dec_emb shape:", dec_emb.shape)
        print("state_h shape:", state_h.shape)
        print("state_c shape:", state_c.shape)

        dec_out, h, c = decoder_lstm_layer(dec_emb, initial_state=[state_h, state_c])
        context_vector, attn_weights = attention_layer(dec_out, encoder_outs)
        concat = concat_layer([dec_out, context_vector])
        output_probs = output_layer(concat)
        sampled_token = np.argmax(output_probs[0, -1, :])
        decoded.append(sampled_token)
        attention_weights_all.append(attn_weights.numpy()[0][0])  # shape: (enc_len,)

        if sampled_token == target_tokenizer.word_index['\n']:
            break

        target_seq[0, 0] = sampled_token
        state_h, state_c = h, c

    decoded_text = ''.join([index_to_char.get(idx, '') for idx in decoded])
    return decoded_text, attention_weights_all


In [None]:
def plot_attention_heatmap(attn, input_text, output_text, idx=1):
    plt.figure(figsize=(6, 5))
    ax = sns.heatmap(
        attn,
        xticklabels=list(input_text),
        yticklabels=list(output_text),
        cmap='magma',
        cbar=False,
        linewidths=0.5,
        annot=False
    )
    plt.xlabel("Input (Latin)")
    plt.ylabel("Output (Hindi)")
    plt.title(f"Sample {idx} Attention")
    plt.tight_layout()
    plt.show()


In [None]:
def plot_9_grid():
    plt.figure(figsize=(18, 15))
    for i in range(9):
        input_text = test_lat[i]
        input_seq = test_encoder_input[i:i+1]
        output_text, attn_weights = decode_with_attention(input_seq)
        attn_matrix = np.stack(attn_weights)  # shape: (dec_len, enc_len)

        plt.subplot(3, 3, i+1)
        sns.heatmap(attn_matrix, xticklabels=list(input_text), yticklabels=list(output_text), cmap='coolwarm', cbar=False)
        plt.title(f"Input: {input_text}")
        plt.xlabel("Latin chars")
        plt.ylabel("Hindi chars")
    plt.tight_layout()
    plt.show()


In [None]:
os.makedirs("attention_heatmaps", exist_ok=True)
for i in range(10):
    input_text = test_lat[i]
    input_seq = test_encoder_input[i:i+1]
    output_text, attn_weights = decode_with_attention(input_seq)
    attn_matrix = np.stack(attn_weights)

    plt.figure(figsize=(6, 5))
    sns.heatmap(attn_matrix, xticklabels=list(input_text), yticklabels=list(output_text), cmap='plasma')
    plt.title(f"Sample {i+1}")
    plt.xlabel("Input (Latin)")
    plt.ylabel("Output (Hindi)")
    plt.tight_layout()
    plt.savefig(f"attention_heatmaps/sample_{i+1}.png")
    plt.close()


# Question 6 - Connectivity visualization

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

def plot_attention_heatmap(input_text, output_text, attention_weights):
    """
    input_text: str (e.g., 'ghar')
    output_text: str (e.g., 'घर')
    attention_weights: list of attention vectors per output token.
                       Shape: (output_length, input_length)
    """
    fig, ax = plt.subplots(figsize=(min(10, len(input_text)), min(6, len(output_text))))
    
    sns.heatmap(
        np.array(attention_weights),
        xticklabels=list(input_text),
        yticklabels=list(output_text),
        cmap='Greens',
        cbar=True,
        linewidths=0.5,
        linecolor='gray',
        ax=ax
    )
    
    ax.set_xlabel("Input")
    ax.set_ylabel("Output")
    ax.set_title("Attention Heatmap")
    plt.yticks(rotation=0)
    plt.xticks(rotation=90)
    plt.tight_layout()
    plt.show()
