<a href="https://colab.research.google.com/github/isakovero/colab/blob/main/viikko6.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
import random

from google.colab import drive
drive.mount('/content/drive')
text_file = "/content/drive/My Drive/fin.txt"

with open(text_file, encoding='utf-8') as f:
    lines = f.read().split("\n")[:-1]

text_pairs = []
for line in lines:
    english, finnish, _ = line.split("\t")
    finnish = "[start] " + finnish + " [end]"
    text_pairs.append((english, finnish))

# 2. Split the Dataset
random.shuffle(text_pairs)
num_samples = len(text_pairs)
train_pairs = text_pairs[: int(0.8 * num_samples)]
val_pairs = text_pairs[int(0.8 * num_samples): int(0.9 * num_samples)]
test_pairs = text_pairs[int(0.9 * num_samples):]

# 3. Vectorization
vocab_size = 15000
sequence_length = 20

source_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length,
)
target_vectorization = layers.TextVectorization(
    max_tokens=vocab_size,
    output_mode="int",
    output_sequence_length=sequence_length + 1,
)

train_english_texts = [pair[0] for pair in train_pairs]
train_finnish_texts = [pair[1] for pair in train_pairs]

source_vectorization.adapt(train_english_texts)
target_vectorization.adapt(train_finnish_texts)

# 4. Prepare Data Pipelines
def format_dataset(eng, fin):
    eng = source_vectorization(eng)
    fin = target_vectorization(fin)
    return {"encoder_inputs": eng, "decoder_inputs": fin[:, :-1]}, fin[:, 1:]

def make_dataset(pairs):
    eng_texts, fin_texts = zip(*pairs)
    dataset = tf.data.Dataset.from_tensor_slices((list(eng_texts), list(fin_texts)))
    dataset = dataset.batch(32).map(format_dataset).prefetch(16)
    return dataset

train_ds = make_dataset(train_pairs)
val_ds = make_dataset(val_pairs)
test_ds = make_dataset(test_pairs)

# 5. Transformer Layers

class PositionalEmbedding(layers.Layer):
    def __init__(self, sequence_length, input_dim, output_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(input_dim=input_dim, output_dim=output_dim)
        self.position_embeddings = layers.Embedding(input_dim=sequence_length, output_dim=output_dim)
        self.sequence_length = sequence_length
        self.input_dim = input_dim
        self.output_dim = output_dim

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)
        embedded_tokens = self.token_embeddings(inputs)
        embedded_positions = self.position_embeddings(positions)
        return embedded_tokens + embedded_positions

    def get_config(self):
        config = super().get_config()
        config.update({
            "output_dim": self.output_dim,
            "sequence_length": self.sequence_length,
            "input_dim": self.input_dim,
        })
        return config



class TransformerEncoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.attention = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential([
            layers.Dense(dense_dim, activation="relu"),
            layers.Dense(embed_dim),
        ])
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()

    def call(self, inputs, mask=None):
        if mask is not None:
            mask = mask[:, tf.newaxis, :]
        attention_output = self.attention(inputs, inputs, attention_mask=mask)
        proj_input = self.layernorm_1(inputs + attention_output)
        proj_output = self.dense_proj(proj_input)
        return self.layernorm_2(proj_input + proj_output)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "dense_dim": self.dense_dim,
            "num_heads": self.num_heads,
        })
        return config


class TransformerDecoder(layers.Layer):
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.attention_1 = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.attention_2 = layers.MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim)
        self.dense_proj = keras.Sequential([
            layers.Dense(dense_dim, activation="relu"),
            layers.Dense(embed_dim),
        ])
        self.layernorm_1 = layers.LayerNormalization()
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()

    def call(self, inputs, encoder_outputs, mask=None):
        causal_mask = self.get_causal_attention_mask(inputs)
        if mask is not None:
            padding_mask = mask[:, tf.newaxis, :]
            combined_mask = tf.minimum(padding_mask, causal_mask)
        else:
            combined_mask = causal_mask
        attention_output_1 = self.attention_1(
            inputs, inputs, attention_mask=combined_mask)
        out_1 = self.layernorm_1(inputs + attention_output_1)
        attention_output_2 = self.attention_2(
            out_1, encoder_outputs, attention_mask=mask)
        out_2 = self.layernorm_2(out_1 + attention_output_2)
        proj_output = self.dense_proj(out_2)
        return self.layernorm_3(out_2 + proj_output)

    def get_causal_attention_mask(self, inputs):
        input_shape = tf.shape(inputs)
        batch_size, seq_len = input_shape[0], input_shape[1]
        i = tf.range(seq_len)[:, tf.newaxis]
        j = tf.range(seq_len)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, seq_len, seq_len))
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)], axis=0
        )
        return tf.tile(mask, mult)

    def get_config(self):
        config = super().get_config()
        config.update({
            "embed_dim": self.embed_dim,
            "dense_dim": self.dense_dim,
            "num_heads": self.num_heads,
        })
        return config


# Build the Transformer Model
embed_dim = 256
dense_dim = 512
num_heads = 4

encoder_inputs = keras.Input(shape=(None,), dtype="int64", name="encoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(encoder_inputs)
encoder_outputs = TransformerEncoder(embed_dim, dense_dim, num_heads)(x)

decoder_inputs = keras.Input(shape=(None,), dtype="int64", name="decoder_inputs")
x = PositionalEmbedding(sequence_length, vocab_size, embed_dim)(decoder_inputs)
x = TransformerDecoder(embed_dim, dense_dim, num_heads)(x, encoder_outputs)
decoder_outputs = layers.Dense(vocab_size, activation="softmax")(x)

transformer = keras.Model([encoder_inputs, decoder_inputs], decoder_outputs)

# 6. Compile and Train
transformer.compile(
    loss="sparse_categorical_crossentropy",
    optimizer="adam",
    metrics=["accuracy"]
)

transformer.fit(train_ds, epochs=15, validation_data=val_ds)

# 7. Translation Function
finnish_vocab = target_vectorization.get_vocabulary()
finnish_index_lookup = {idx: word for idx, word in enumerate(finnish_vocab)}
max_decoded_sentence_length = 20

def decode_sequence(input_sentence):
    tokenized_input_sentence = source_vectorization([input_sentence])
    decoded_sentence = "[start]"
    for _ in range(max_decoded_sentence_length):
        tokenized_target_sentence = target_vectorization([decoded_sentence])[:, :-1]
        predictions = transformer.predict([tokenized_input_sentence, tokenized_target_sentence])
        sampled_token_index = np.argmax(predictions[0, -1, :])
        sampled_token = finnish_index_lookup[sampled_token_index]
        if sampled_token == "[end]":
            break
        decoded_sentence += " " + sampled_token
    return decoded_sentence

# 8. Evaluate and Test
for _ in range(5):
    input_sentence = random.choice([pair[0] for pair in test_pairs])
    print(f"Input: {input_sentence}")
    print(f"Output: {decode_sequence(input_sentence)}")


Mounted at /content/drive
Epoch 1/15
[1m1807/1807[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m53s[0m 22ms/step - accuracy: 0.7762 - loss: 1.8057 - val_accuracy: 0.8856 - val_loss: 0.7360
Epoch 2/15
[1m1807/1807[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m30s[0m 17ms/step - accuracy: 0.8911 - loss: 0.6554 - val_accuracy: 0.9202 - val_loss: 0.4793
Epoch 3/15
[1m1807/1807[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m31s[0m 17ms/step - accuracy: 0.9335 - loss: 0.3340 - val_accuracy: 0.9409 - val_loss: 0.3694
Epoch 4/15
[1m1807/1807[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m32s[0m 18ms/step - accuracy: 0.9554 - loss: 0.2051 - val_accuracy: 0.9454 - val_loss: 0.3606
Epoch 5/15
[1m1807/1807[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m32s[0m 17ms/step - accuracy: 0.9660 - loss: 0.1513 - val_accuracy: 0.9471 - val_loss: 0.3599
Epoch 6/15
[1m1807/1807[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m31s[0m 17ms/step - accuracy: 0.9689 - loss: 0.1378 - val_accuracy: 0.94