"""
Tyler Chelston

CS584 Project 3


This project presents the development of an English to Spanish language translator 
using a Seq2Seq neural network model. Essential libraries like pandas and tensorflow 
are utilized to handle and process a dataset of 140,000 bilingual sentence pairs. 
The data undergoes extensive cleaning, including normalization and punctuation removal, 
to ensure quality input for the model. 

The Seq2Seq model, comprising an encoder and decoder with LSTM layers, is trained on this data. 
Key features include vocabulary building, character-to-index mappings, and hyperparameter tuning. 
The final product is an interactive program allowing users to input an English sentence 
and receive its Spanish translation, demonstrating the model's practical application in 
natural language processing.

The less complicated words and phrases had cleaner translations than the more complicated. I believe that decreasing the batch size and adding more epochs could help make it more accurate

"""

In [1]:
import numpy as np
import pandas as pd
import re
import unicodedata
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from tensorflow.keras.utils import to_categorical


# Load and prepare data
def load_dataset(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        lines = file.read().split('\n')

    english_sentences = []
    spanish_sentences = []

    for line in lines:
        if '\t' in line:
            parts = line.split('\t')
            eng, spa = parts[0], parts[1]  # Take only the first two parts
            english_sentences.append(eng)
            spanish_sentences.append('\t' + spa + '\n')

    data = pd.DataFrame({
        'English': english_sentences,
        'Spanish': spanish_sentences
    })

    data['English'] = data['English'].apply(clean_sentence)
    data['Spanish'] = data['Spanish'].apply(clean_sentence)

    return data


def clean_sentence(sentence):
    # Normalize characters (to ASCII)
    sentence = unicodedata.normalize('NFD', sentence).encode('ascii', 'ignore').decode('utf-8')

    # Remove punctuation
    sentence = re.sub(r'[^\w\s]', '', sentence)

    # Perform case-folding (convert to lowercase)
    sentence = sentence.lower()

    # Remove non-printable characters
    sentence = re.sub(r'[^a-zA-Z\s]', '', sentence)

    # Optional: Remove digits (if you want to keep only alphabetic words)
    sentence = re.sub(r'\d+', '', sentence)

    return sentence

# Tokenization and sequence padding
def tokenize_and_pad(data, tokenizer, max_length):
    sequences = tokenizer.texts_to_sequences(data)
    padded_sequences = pad_sequences(sequences, maxlen=max_length, padding='post')
    return padded_sequences

# Function to generate the decoded sentence
def decode_sequence(input_seq):
    # Start with initial states from the encoder model
    states_value = encoder_model.predict(input_seq)

    # Start the sequence with the start token '\t'
    target_seq = np.zeros((1, 1, num_spa_characters))
    target_seq[0, 0, spa_tokenizer.word_index['\t']] = 1

    stop_condition = False
    decoded_sentence = ''
    while not stop_condition:
        output_tokens, h, c = decoder_model.predict([target_seq] + states_value)

        # Sample a token
        sampled_token_index = np.argmax(output_tokens[0, -1, :])
        sampled_char = reverse_target_char_index.get(sampled_token_index)

        if sampled_char == '\n':
            stop_condition = True
        else:
            decoded_sentence += sampled_char

            # Update the target sequence to be the last predicted character
            target_seq = np.zeros((1, 1, num_spa_characters))
            if sampled_token_index in spa_tokenizer.word_index.values():
                target_seq[0, 0, sampled_token_index] = 1

            # Update states
            states_value = [h, c]

    return decoded_sentence.strip()


# Function to convert English sentence to sequence
def sentence_to_sequence(sentence, tokenizer, max_length):
    sentence = clean_sentence(sentence)
    sequence = tokenizer.texts_to_sequences([sentence])
    padded_sequence = pad_sequences(sequence, maxlen=max_length, padding='post')
    return padded_sequence

# Function to translate a sentence given its index in the dataset
def translate_sentence_by_index(index, data, tokenizer, max_length, decode_sequence):
    # Fetch the sentence
    sentence_to_translate = data.iloc[index]['English']
    print(f"Original sentence: {sentence_to_translate}")

    # Convert sentence to sequence
    sequence = sentence_to_sequence(sentence_to_translate, tokenizer, max_length)
    sequence_one_hot = to_categorical(sequence, num_classes=len(tokenizer.word_index) + 1)

    # Translate the sequence
    translation = decode_sequence(sequence_one_hot)
    return translation

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


In [2]:
# Data preparation
file_path = 'Downloads/spa-eng/spa.txt'
data = load_dataset(file_path)
print(data.head())

if len(data) > 140000:
    data = data[:140000]

# Clean Data and remove extra text
data['English'] = data['English'].str.split('.').str[0] + '.'
data['Spanish'] = data['Spanish'].str.split('.').str[0] + '.'

# Re-tokenize and prepare data
eng_tokenizer = Tokenizer(char_level=True)
eng_tokenizer.fit_on_texts(data['English'])
spa_tokenizer = Tokenizer(char_level=True)
spa_tokenizer.fit_on_texts(data['Spanish'])

# Create reverse mapping from index to character for the Spanish tokenizer
reverse_target_char_index = dict((i, char) for char, i in spa_tokenizer.word_index.items())

# Define the number of unique characters
num_eng_characters = len(eng_tokenizer.word_index) + 1  # Number of unique English characters
num_spa_characters = len(spa_tokenizer.word_index) + 1  # Number of unique Spanish characters

# Convert text to sequences and pad
max_eng_length = max(len(seq) for seq in eng_tokenizer.texts_to_sequences(data['English']))
max_spa_length = max(len(seq) for seq in spa_tokenizer.texts_to_sequences(data['Spanish']))

# Convert text to sequences (before one-hot encoding)
encoder_sequences = tokenize_and_pad(data['English'], eng_tokenizer, max_eng_length)
decoder_sequences = tokenize_and_pad(data['Spanish'], spa_tokenizer, max_spa_length)

# One-hot encode the input sequences
encoder_input_data = to_categorical(encoder_sequences, num_classes=num_eng_characters)
print("encoder_input_data shape:", encoder_input_data.shape)
# One-hot encode the decoder input sequences
decoder_input_data = to_categorical(decoder_sequences, num_classes=num_spa_characters)

# One-hot encode the decoder target sequences
decoder_target_data = np.zeros((len(data), max_spa_length, num_spa_characters), dtype='float32')

for i, seq in enumerate(decoder_sequences):
    for t, char_idx in enumerate(seq):
        if t > 0:  # Skipping the start token represented by '\t'
            decoder_target_data[i, t - 1, char_idx] = 1.0

  English     Spanish
0      go      \tve\n
1      go    \tvete\n
2      go    \tvaya\n
3      go  \tvayase\n
4      hi    \thola\n
encoder_input_data shape: (140000, 79, 29)


In [3]:
# Check the first few tokenized English sequences
tokenized_eng = eng_tokenizer.texts_to_sequences(data['English'])
print("First few tokenized English sequences:", tokenized_eng[:5])

# Check max length of English sequences
max_eng_length = max(len(seq) for seq in tokenized_eng)
print("Max length of English sequences:", max_eng_length)

# If max_eng_length is still 0, inspect the cleaned English sentences
print("First few cleaned English sentences:", data['English'].head())

# Print the tokenizer's learned characters
print("Tokenizer word index:", eng_tokenizer.word_index)

# Test tokenizing without cleaning
eng_tokenizer.fit_on_texts(data['English'].str.replace('\t', '').str.replace('\n', ''))
tokenized_eng_raw = eng_tokenizer.texts_to_sequences(data['English'].str.replace('\t', '').str.replace('\n', ''))
print("First few tokenized English sequences (raw):", tokenized_eng_raw[:5])

# Print shapes
print("encoder_input_data shape:", encoder_input_data.shape)
print("decoder_input_data shape:", decoder_input_data.shape)
print("decoder_target_data shape:", decoder_target_data.shape)

First few tokenized English sequences: [[18, 4, 11], [18, 4, 11], [18, 4, 11], [18, 4, 11], [9, 6, 11]]
Max length of English sequences: 79
First few cleaned English sentences: 0    go.
1    go.
2    go.
3    go.
4    hi.
Name: English, dtype: object
Tokenizer word index: {' ': 1, 'e': 2, 't': 3, 'o': 4, 'a': 5, 'i': 6, 'n': 7, 's': 8, 'h': 9, 'r': 10, '.': 11, 'l': 12, 'd': 13, 'm': 14, 'y': 15, 'u': 16, 'w': 17, 'g': 18, 'c': 19, 'f': 20, 'p': 21, 'b': 22, 'k': 23, 'v': 24, 'j': 25, 'x': 26, 'q': 27, 'z': 28}
First few tokenized English sequences (raw): [[18, 4, 11], [18, 4, 11], [18, 4, 11], [18, 4, 11], [9, 6, 11]]
encoder_input_data shape: (140000, 79, 29)
decoder_input_data shape: (140000, 113, 31)
decoder_target_data shape: (140000, 113, 31)


In [4]:
# Inspect the actual English sentences
print("Sample English sentences:")
print(data['English'].head(20))

# Check the tokenizer's word index
print("Tokenizer word index:", eng_tokenizer.word_index)

# Re-initialize and re-fit the tokenizer
eng_tokenizer = Tokenizer(char_level=True)
eng_tokenizer.fit_on_texts(data['English'])

# Check the tokenizer's word index again
print("Re-initialized tokenizer word index:", eng_tokenizer.word_index)

# Tokenize and check the sequences again
tokenized_eng = eng_tokenizer.texts_to_sequences(data['English'])
print("First few tokenized English sequences after re-initialization:", tokenized_eng[:5])


Sample English sentences:
0       go.
1       go.
2       go.
3       go.
4       hi.
5      run.
6      run.
7      run.
8      run.
9      run.
10     run.
11     run.
12     who.
13     wow.
14    duck.
15    fire.
16    fire.
17    fire.
18    help.
19    help.
Name: English, dtype: object
Tokenizer word index: {' ': 1, 'e': 2, 't': 3, 'o': 4, 'a': 5, 'i': 6, 'n': 7, 's': 8, 'h': 9, 'r': 10, '.': 11, 'l': 12, 'd': 13, 'm': 14, 'y': 15, 'u': 16, 'w': 17, 'g': 18, 'c': 19, 'f': 20, 'p': 21, 'b': 22, 'k': 23, 'v': 24, 'j': 25, 'x': 26, 'q': 27, 'z': 28}
Re-initialized tokenizer word index: {' ': 1, 'e': 2, 't': 3, 'o': 4, 'a': 5, 'i': 6, 'n': 7, 's': 8, 'h': 9, 'r': 10, '.': 11, 'l': 12, 'd': 13, 'm': 14, 'y': 15, 'u': 16, 'w': 17, 'g': 18, 'c': 19, 'f': 20, 'p': 21, 'b': 22, 'k': 23, 'v': 24, 'j': 25, 'x': 26, 'q': 27, 'z': 28}
First few tokenized English sequences after re-initialization: [[18, 4, 11], [18, 4, 11], [18, 4, 11], [18, 4, 11], [9, 6, 11]]


In [5]:
# After tokenizing and one-hot encoding
print("Shape of encoder input data:", encoder_input_data.shape)
print("Number of unique English characters:", num_eng_characters)

# Adjust the model's input layer
encoder_inputs = Input(shape=(None, num_eng_characters))

Shape of encoder input data: (140000, 79, 29)
Number of unique English characters: 29


In [6]:
# Model parameters
latent_dim = 256

max_eng_length = max(len(seq) for seq in eng_tokenizer.texts_to_sequences(data['English']))
print("Max English sequence length:", max_eng_length)

# Encoder
encoder_inputs = Input(shape=(max_eng_length, num_eng_characters))
encoder = LSTM(latent_dim, return_state=True)
encoder_outputs, state_h, state_c = encoder(encoder_inputs)
encoder_states = [state_h, state_c]

# Decoder
decoder_inputs = Input(shape=(None, len(spa_tokenizer.word_index) + 1))
decoder_lstm = LSTM(latent_dim, return_sequences=True, return_state=True)
decoder_outputs, _, _ = decoder_lstm(decoder_inputs, initial_state=encoder_states)
decoder_dense = Dense(len(spa_tokenizer.word_index) + 1, activation='softmax')
decoder_outputs = decoder_dense(decoder_outputs)

Max English sequence length: 79
Instructions for updating:
Colocations handled automatically by placer.


In [7]:
# Define the model
model = Model([encoder_inputs, decoder_inputs], decoder_outputs)

# Check the Shape
print("Model output shape:", model.output_shape)
print("decoder_target_data shape:", decoder_target_data.shape)

Model output shape: (None, None, 31)
decoder_target_data shape: (140000, 113, 31)


In [8]:
# Early stopping callback
early_stopping = EarlyStopping(monitor='val_loss', patience=3, verbose=1)

# Model checkpoint callback
model_checkpoint = ModelCheckpoint('best_model.h5', monitor='val_loss', save_best_only=True, verbose=1)

# Reduce learning rate on plateau
reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=3, verbose=1)

# Compile & train the model
model.compile(optimizer='adam', loss='categorical_crossentropy')
model.fit([encoder_input_data, decoder_input_data], decoder_target_data, 
          batch_size=128, 
          epochs=10, 
          validation_split=0.2,
          callbacks=[early_stopping, model_checkpoint, reduce_lr])

Train on 112000 samples, validate on 28000 samples
Instructions for updating:
Use tf.cast instead.
Instructions for updating:
Deprecated in favor of operator or tf.math.divide.
Epoch 1/10


2023-11-27 01:12:51.366412: I tensorflow/core/platform/cpu_feature_guard.cc:141] Your CPU supports instructions that this TensorFlow binary was not compiled to use: SSE4.1 SSE4.2
2023-11-27 01:12:51.367092: I tensorflow/core/common_runtime/process_util.cc:71] Creating new thread pool with default inter op setting: 10. Tune using inter_op_parallelism_threads for best performance.


Epoch 00001: val_loss improved from inf to 0.89650, saving model to best_model.h5
Epoch 2/10
Epoch 00002: val_loss improved from 0.89650 to 0.77204, saving model to best_model.h5
Epoch 3/10
Epoch 00003: val_loss improved from 0.77204 to 0.70000, saving model to best_model.h5
Epoch 4/10
Epoch 00004: val_loss improved from 0.70000 to 0.64147, saving model to best_model.h5
Epoch 5/10
Epoch 00005: val_loss improved from 0.64147 to 0.60279, saving model to best_model.h5
Epoch 6/10
Epoch 00006: val_loss improved from 0.60279 to 0.57301, saving model to best_model.h5
Epoch 7/10
Epoch 00007: val_loss improved from 0.57301 to 0.55520, saving model to best_model.h5
Epoch 8/10
Epoch 00008: val_loss did not improve from 0.55520
Epoch 9/10
Epoch 00009: val_loss improved from 0.55520 to 0.53639, saving model to best_model.h5
Epoch 10/10
Epoch 00010: val_loss improved from 0.53639 to 0.52457, saving model to best_model.h5


<tensorflow.python.keras.callbacks.History at 0x7fcea6876550>

In [9]:
# Inference models
encoder_model = Model(encoder_inputs, encoder_states)
decoder_state_input_h = Input(shape=(latent_dim,))
decoder_state_input_c = Input(shape=(latent_dim,))
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
decoder_outputs, state_h, state_c = decoder_lstm(decoder_inputs, initial_state=decoder_states_inputs)
decoder_states = [state_h, state_c]
decoder_outputs = decoder_dense(decoder_outputs)
decoder_model = Model([decoder_inputs] + decoder_states_inputs, [decoder_outputs] + decoder_states)

In [10]:
# Test the model & Prompt user for input
try:
    user_input = int(input("Enter an integer up to 140000 to see a sentence translated: "))
    if 0 <= user_input < len(data):
        translated_sentence = translate_sentence_by_index(user_input, data, eng_tokenizer, max_eng_length, decode_sequence)
        print("Translated sentence:", translated_sentence)
    else:
        print("Input integer is out of range. Please enter a number between 0 and", len(data) - 1)
except ValueError:
    print("Invalid input. Please enter an integer.")

Enter an integer up to 140000 to see a sentence translated:  10000


Original sentence: i hate tomatoes.
Translated sentence: odio a tom


In [11]:
# Test the model & Prompt user for input
try:
    user_input = int(input("Enter an integer up to 140000 to see a sentence translated: "))
    if 0 <= user_input < len(data):
        translated_sentence = translate_sentence_by_index(user_input, data, eng_tokenizer, max_eng_length, decode_sequence)
        print("Translated sentence:", translated_sentence)
    else:
        print("Input integer is out of range. Please enter a number between 0 and", len(data) - 1)
except ValueError:
    print("Invalid input. Please enter an integer.")

Enter an integer up to 140000 to see a sentence translated:  110000


Original sentence: they didnt feel like playing any more.
Translated sentence: ellos se consejaron el pelo de la cama


In [12]:
# Test the model & Prompt user for input
try:
    user_input = int(input("Enter an integer up to 140000 to see a sentence translated: "))
    if 0 <= user_input < len(data):
        translated_sentence = translate_sentence_by_index(user_input, data, eng_tokenizer, max_eng_length, decode_sequence)
        print("Translated sentence:", translated_sentence)
    else:
        print("Input integer is out of range. Please enter a number between 0 and", len(data) - 1)
except ValueError:
    print("Invalid input. Please enter an integer.")

Enter an integer up to 140000 to see a sentence translated:  20


Original sentence: help.
Translated sentence: se despertado
