In [1]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Embedding, LSTM, Dense
from tensorflow.keras.models import Model
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split
import nltk
from nltk.translate.bleu_score import sentence_bleu
from nltk.translate.bleu_score import SmoothingFunction
import math

In [2]:
# Sample data: list of source and target sentences
source_sentences = ['I am a student.', 'I love programming.', 'How are you?']
target_sentences = ['Je suis étudiant.', 'J\'aime la programmation.', 'Comment ça va?']

In [3]:
source_tokenizer = Tokenizer()
source_tokenizer.fit_on_texts(source_sentences)
source_sequences = source_tokenizer.texts_to_sequences(source_sentences)
source_sequences = pad_sequences(source_sequences, padding='post')

target_sentences = ['<start> ' + sentence + ' <end>' for sentence in target_sentences]
target_tokenizer = Tokenizer()
target_tokenizer.fit_on_texts(target_sentences)
target_sequences = target_tokenizer.texts_to_sequences(target_sentences)
target_sequences = pad_sequences(target_sequences, padding='post')

In [4]:
X_train, X_test, y_train, y_test = train_test_split(source_sequences, target_sequences, test_size=0.2)

In [5]:
class Encoder(Model):
    def __init__(self, vocab_size, embedding_dim, enc_units):
        super(Encoder, self).__init__()
        self.embedding = Embedding(vocab_size, embedding_dim)
        self.lstm = LSTM(enc_units, return_sequences=True, return_state=True)
    
    def call(self, x):
        x = self.embedding(x)
        output, state_h, state_c = self.lstm(x)
        return output, state_h, state_c

In [6]:
class Decoder(Model):
    def __init__(self, vocab_size, embedding_dim, dec_units):
        super(Decoder, self).__init__()
        self.embedding = Embedding(vocab_size, embedding_dim)
        self.lstm = LSTM(dec_units, return_sequences=True, return_state=True)
        self.fc = Dense(vocab_size, activation='softmax')
    
    def call(self, x, enc_output, state_h, state_c):
        x = self.embedding(x)
        dec_output, dec_state_h, dec_state_c = self.lstm(x, initial_state=[state_h, state_c])
        output = self.fc(dec_output)
        return output, dec_state_h, dec_state_c

In [7]:
vocab_size_src = len(source_tokenizer.word_index) + 1
vocab_size_tgt = len(target_tokenizer.word_index) + 1
embedding_dim = 256
units = 512

encoder = Encoder(vocab_size_src, embedding_dim, units)
decoder = Decoder(vocab_size_tgt, embedding_dim, units)

In [8]:
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')


In [9]:
def loss_function(real, pred):
    mask = tf.math.not_equal(real, 0)
    loss_ = loss_object(real, pred)
    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask
    return tf.reduce_mean(loss_)

In [10]:
@tf.function
def train_step(src_seq, tgt_seq):
    loss = 0
    with tf.GradientTape() as tape:
        enc_output, enc_hidden_h, enc_hidden_c = encoder(src_seq)
        dec_hidden_h, dec_hidden_c = enc_hidden_h, enc_hidden_c
        dec_input = tgt_seq[:, :-1]  # Exclude last token for decoder input
        real = tgt_seq[:, 1:]  # Actual target sequence
        pred, dec_hidden_h, dec_hidden_c = decoder(dec_input, enc_output, dec_hidden_h, dec_hidden_c)
        loss = loss_function(real, pred)

    variables = encoder.trainable_variables + decoder.trainable_variables
    gradients = tape.gradient(loss, variables)
    optimizer.apply_gradients(zip(gradients, variables))
    return loss

In [11]:
epochs = 10
batch_size = 32

for epoch in range(epochs):
    total_loss = 0
    for batch in range(len(X_train) // batch_size):
        batch_X = X_train[batch * batch_size: (batch + 1) * batch_size]
        batch_y = y_train[batch * batch_size: (batch + 1) * batch_size]
        batch_loss = train_step(batch_X, batch_y)
        total_loss += batch_loss
    print(f'Epoch {epoch + 1}, Loss: {total_loss}')

Epoch 1, Loss: 0
Epoch 2, Loss: 0
Epoch 3, Loss: 0
Epoch 4, Loss: 0
Epoch 5, Loss: 0
Epoch 6, Loss: 0
Epoch 7, Loss: 0
Epoch 8, Loss: 0
Epoch 9, Loss: 0
Epoch 10, Loss: 0


In [12]:
def calculate_bleu(reference, candidate):
    reference = [reference.split()]
    candidate = candidate.split()
    smoothing = SmoothingFunction().method4
    return sentence_bleu(reference, candidate, smoothing_function=smoothing)

In [13]:
def calculate_perplexity(loss):
    return math.exp(loss)

In [14]:
test_sentence = X_test[0:1]
enc_output, enc_hidden_h, enc_hidden_c = encoder(test_sentence)
dec_hidden_h, dec_hidden_c = enc_hidden_h, enc_hidden_c
dec_input = np.array([[target_tokenizer.word_index['start']]])


predicted_sentence = []
for t in range(y_test.shape[1]):
    pred, dec_hidden_h, dec_hidden_c = decoder(dec_input, enc_output, dec_hidden_h, dec_hidden_c)
    pred_id = np.argmax(pred[0, -1, :])
    predicted_sentence.append(pred_id)
    if pred_id == target_tokenizer.word_index['end']:
        break
    dec_input = np.array([[pred_id]])

predicted_sentence = ' '.join([target_tokenizer.index_word[i] for i in predicted_sentence if i in target_tokenizer.index_word])
reference_sentence = ' '.join([target_tokenizer.index_word[i] for i in y_test[0] if i in target_tokenizer.index_word])

bleu_score = calculate_bleu(reference_sentence, predicted_sentence)
perplexity = calculate_perplexity(total_loss / len(X_test))

print(f'Predicted: {predicted_sentence}')
print(f'Reference: {reference_sentence}')
print(f'BLEU Score: {bleu_score}')
print(f'Perplexity: {perplexity}')

Predicted: je va la programmation
Reference: start comment ça va end
BLEU Score: 0.04753271977233425
Perplexity: 1.0
