In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

#import numpy as np # linear algebra
#import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

# Import Libraries

In [None]:
!pip install tensorflow==2.13.0 --quiet

In [None]:
import os
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

import re
import io
import unicodedata

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt

import tensorflow as tf
from tensorflow.keras import layers, models, optimizers, losses
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

from sklearn.model_selection import train_test_split

# Load Data

In [None]:
file_path = "/kaggle/input/bilingual-sentence-pairs/tur.txt"

In [None]:
lines = open(file_path, encoding="UTF-8").read().strip().split("\n")

In [None]:
def preprocess(s):
    turkish_chars = "çğıöşüÇĞİÖŞÜ"
    english_equivalents = "cgiosuCGIOSU"
    mapping = str.maketrans(turkish_chars, english_equivalents)
    s = ''.join(c for c in unicodedata.normalize('NFD', s.lower().strip()) if unicodedata.category(c) != 'Mn')
    s = re.sub(r'[çğıöşüÇĞİÖŞÜ]', lambda x: x.group(0).translate(mapping), s)
    s = s.strip()
    s = '<start> ' + s + ' <end>'
    return s

In [None]:
target_lang, input_lang = zip(*[[preprocess(word) for word in line.split("\t")[:-1]] for line in lines[:50000]])

In [None]:
input_tokenizer = Tokenizer(filters="", oov_token="<unknown>")
input_tokenizer.fit_on_texts(input_lang)

In [None]:
input_tensor = input_tokenizer.texts_to_sequences(input_lang)
input_tensor = pad_sequences(input_tensor, padding="post")

In [None]:
target_tokenizer = Tokenizer(filters="", oov_token="<unknown>")
target_tokenizer.fit_on_texts(target_lang)

In [None]:
target_tensor = target_tokenizer.texts_to_sequences(target_lang)
target_tensor = pad_sequences(target_tensor, padding="post")

In [None]:
X_train, X_test, y_train, y_test = train_test_split(input_tensor, target_tensor, test_size=0.2, random_state=42)

In [None]:
print("Input tensors:", X_train.shape, X_test.shape)
print("Target tensors:", y_train.shape, y_test.shape)

# Dataset

In [None]:
BUFFER_SIZE = len(X_train)
BATCH_SIZE = 64
EMBEDDING_DIM = 256
vocab_input_size = len(input_tokenizer.index_word) + 1
vocab_target_size = len(target_tokenizer.index_word) + 1
EPOCHS = 20
STEPS_PER_EPOCH = len(X_train) // BATCH_SIZE

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices((X_train, y_train)).shuffle(BUFFER_SIZE).batch(BATCH_SIZE, drop_remainder=True).prefetch(1)
valid_dataset = tf.data.Dataset.from_tensor_slices((X_test, y_test)).batch(BATCH_SIZE, drop_remainder=True).prefetch(1)

In [None]:
input_batch, target_batch = next(iter(train_dataset))
print("Input Batch Shape:", input_batch.shape)
print("Target Batch Shape:", target_batch.shape)

# Encoder

In [None]:
class Encoder(models.Model):
    def __init__(self, vocab_size, embedding_dim, encoder_units, batch_size):
        super(Encoder, self).__init__()
        self.encoder_units = encoder_units
        self.batch_size = batch_size
        self.embedding = layers.Embedding(vocab_size, embedding_dim)
        self.lstm = layers.LSTM(self.encoder_units, return_sequences=True, return_state=True)

    def call(self, input_batch, state_h, state_c):
        input_batch = self.embedding(input_batch)
        output, state_h, state_c = self.lstm(input_batch, initial_state=[state_h, state_c])
        return output, state_h, state_c

    def initialize_state(self):
        return [
            tf.zeros((self.batch_size, self.encoder_units)),
            tf.zeros((self.batch_size, self.encoder_units))
        ]

In [None]:
encoder = Encoder(vocab_input_size, EMBEDDING_DIM, 512, BATCH_SIZE)

In [None]:
[encoder_state_h, encoder_state_c] = encoder.initialize_state()
encoder_output, encoder_state_h, encoder_state_c = encoder(input_batch, encoder_state_h, encoder_state_c)

print("Encoder output shape:", encoder_output.shape)
print("Encoder hidden state h shape:", encoder_state_h.shape)
print("Encoder hidden state c shape:", encoder_state_c.shape)

# Luong Attention

In [None]:
class LuongAttention(layers.Layer):
    def __init__(self):
        super().__init__()

    def call(self, decoder_state_h, decoder_state_c, encoder_output):
        decoder_state = tf.add(decoder_state_h, decoder_state_c)[:, :, tf.newaxis]
        score = layers.dot([encoder_output, decoder_state], axes=[2, 1])
        attention_weights = tf.nn.softmax(score, axis=1)
        context_vector = tf.reduce_sum(attention_weights * encoder_output, axis=1)
        return context_vector, attention_weights

In [None]:
attention_layer = LuongAttention()

In [None]:
decoder_state_h, decoder_state_c = encoder_state_h, encoder_state_c

# Decoder

In [None]:
class Decoder(models.Model):
    def __init__(self, vocab_size, embedding_dim, decoder_units, batch_size):
        super().__init__()
        self.batch_size = batch_size
        self.embedding = layers.Embedding(vocab_size, embedding_dim)
        self.lstm = layers.LSTM(decoder_units, return_state=True, return_sequences=True)
        self.fc = layers.Dense(vocab_size, activation="softmax")
        self.attention = LuongAttention()

    def call(self, decoder_input, decoder_state_h, decoder_state_c, encoder_output):
        context_vector, attention_weights = self.attention(decoder_state_h, decoder_state_c, encoder_output)
        context_vector = context_vector[:, tf.newaxis, :]
        x = self.embedding(decoder_input)
        x = tf.concat([context_vector, x], axis=-1)

        output, state_h, state_c = self.lstm(x)
        output = tf.reshape(output, (-1, output.shape[2]))
        x = self.fc(output)
        return x, state_h, state_c, attention_weights

In [None]:
decoder = Decoder(vocab_target_size, EMBEDDING_DIM, 512, BATCH_SIZE)

In [None]:
decoder_input = tf.random.uniform((BATCH_SIZE, 1))
decoder_output, decoder_state_h, decoder_state_c, _ = decoder(decoder_input, decoder_state_h, decoder_state_c, encoder_output)

print("Decoder output shape:", decoder_output.shape)
print("Decoder hidden state h shape:", decoder_state_h.shape)
print("Decoder hidden state c shape:", decoder_state_c.shape)

# Train

In [None]:
optimizer = optimizers.Adam()
loss_fn = losses.SparseCategoricalCrossentropy(reduction="none")

In [None]:
@tf.function
def train_step(input_batch, target_batch, encoder_state_h, encoder_state_c):
    loss = 0
    with tf.GradientTape() as tape:
        encoder_output, encoder_state_h, encoder_state_c = encoder(input_batch, encoder_state_h, encoder_state_c)
        decoder_state_h, decoder_state_c = encoder_state_h, encoder_state_c
        decoder_input = tf.expand_dims([target_tokenizer.word_index["<start>"]] * BATCH_SIZE, 1)
        for target in range(1, target_batch.shape[1]):
            predictions, decoder_state_h, decoder_state_c, _ = decoder(decoder_input, decoder_state_h, decoder_state_c, encoder_output)
            mask = tf.cast(target_batch[:, target] != 0, dtype=predictions.dtype)
            loss += tf.reduce_mean(loss_fn(target_batch[:, target], predictions) * mask)
            decoder_input = tf.expand_dims(target_batch[:, target], 1)

    batch_loss = loss / int(target_batch.shape[1])
    optimizer.apply_gradients(zip(tape.gradient(loss, encoder.trainable_variables + decoder.trainable_variables), encoder.trainable_variables + decoder.trainable_variables))
    return batch_loss

In [None]:
total_loss_arr = []

for epoch in range(EPOCHS):
    [encoder_state_h, encoder_state_c] = encoder.initialize_state()

    total_batch_loss = 0
    for (batch, (inp, targ)) in enumerate(train_dataset.take(STEPS_PER_EPOCH)):
        batch_loss = train_step(inp, targ, encoder_state_h, encoder_state_c)
        total_batch_loss += batch_loss 

    total_loss_arr.append(total_batch_loss / STEPS_PER_EPOCH)
    print(f"Epoch {epoch}, Loss: {total_batch_loss / STEPS_PER_EPOCH}")

# Results

In [None]:
plt.figure(figsize=(8, 6))
plt.plot(total_loss_arr)
plt.xlabel("Loss")
plt.title("Loss")
plt.show()

In [None]:
def evaluate(sentence, target_tensor, input_tensor):
    sentence = preprocess(sentence)
    inputs = [input_tokenizer.word_index[i] for i in sentence.split(" ")]
    inputs = pad_sequences([inputs], maxlen=input_tensor.shape[1], padding="post")
    inputs = tf.convert_to_tensor(inputs)

    result = ""
    [encoder_state_h, encoder_state_c] = [tf.zeros((1, 512)), tf.zeros((1, 512))]
    encoder_output, encoder_state_h, encoder_state_c = encoder(inputs, encoder_state_h, encoder_state_c)
    decoder_state_h, decoder_state_c = encoder_state_h, encoder_state_c
    decoder_input = tf.expand_dims([target_tokenizer.word_index["<start>"]], 0)
    
    for t in range(target_tensor.shape[1]):
        predictions, decoder_state_h, decoder_state_c, attention_weights = decoder(decoder_input, decoder_state_h, decoder_state_c, encoder_output)
        predicted_id = tf.argmax(predictions[0]).numpy()
        result += target_tokenizer.index_word[predicted_id] + " "
        
        if target_tokenizer.index_word[predicted_id] == "<end>":
            return result, sentence

        decoder_input = tf.expand_dims([predicted_id], 0)

    return result, sentence

In [None]:
def translate(sentence, ground_truth):
    result, sentence = evaluate(sentence, target_tensor, input_tensor)

    print(f'{"Input:":15s} {sentence}')
    print(f'{"Prediction:":15s} {result}')
    print(f'{"Ground truth:":15s} {ground_truth}') 

In [None]:
for input_batch, target_batch in valid_dataset.take(10):
    for inp, targ in zip(input_batch, target_batch):
        sentence = input_tokenizer.sequences_to_texts([inp.numpy()])[0]
        sentence = " ".join([s for s in sentence.split(" ") if s not in ["<start>", "<end>", "<unknown>"]])
        ground_truth = target_tokenizer.sequences_to_texts([targ.numpy()])[0]
        ground_truth = " ".join([s for s in ground_truth.split(" ") if s not in ["<start>", "<end>", "<unknown>"]])
        translate(sentence, ground_truth)
        print()
        break