In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import math
import copy

# Positional Encodings

Wie Ihr gerade gesehen habt erzeugen wir aus jedem Wort einen Vektor mit 512 Dimensionen.

Da Wörter, bzw. deren vektorielle Repräsentation unterschiedliche Wahrnehmungen erzeugen, 
je nachdem welche Position diese Wörter im Satz einnehmen, möchten wir diese Position erfassen. 

Daher addieren wir zu den Embeddings (vektorielle Darstellung einer Sequenz) jeweils Vektoren mit der Länge 512. 


Diese Positionsvektoren sind immer gleich und folgen wie bereits erwähnt folgendem Schema: 

Für gerade Positionen:
$$
PE(pos, 2i) = \sin \left( \frac{pos}{10000^{\frac{2i}{d_{model}}}} \right)
$$

Für ungerade Positionen:
$$
PE(pos, 2i + 1) = \cos \left( \frac{pos}{10000^{\frac{2i}{d_{model}}}} \right)
$$

### Aufgabe 1

Vervollständige __#1__ und __#2__. 

__Hinweis: Die Sinusfunktion in pytorch kannst du mit torch.sin() und die Cosinusfunktion mit torch.cos() aufrufen.__



In [10]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length):
        super(PositionalEncoding, self).__init__()
        
        pe = torch.zeros(max_seq_length, d_model)
        position = torch.arange(0, max_seq_length, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * -(math.log(10000.0) / d_model))
        
        pe[:, 0::2] = #1
        pe[:, 1::2] = #2
        
        
        self.register_buffer('pe', pe.unsqueeze(0))
        
    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

# Multi-Head Attention

Wie Ihr gesehen habt wird das Embedding (Sequenzlänge × d_model) zunächst in Q, K und V aufgeteilt, wobei diese drei Matrizen identisch sind. 
Anschliessend werden diese mit einer lernbaren Gewichtsmatrix (linear Layer) (d_model × d_model) multipliziert.

Zudem definieren wir noch eine Gewichtsmatrix derselben Größe für die zusammengefügte attention Matrix. 

### Aufgabe 2
Vervollständige __#3__, __#4__, __#5__ und __#6__. 

__Hinweis: Linear Layers (lernbare Gewichtsmatrizen) werden in pytorch mit nn.Linear() erzeugt, wobei die Dimensionen übergeben werden.__



In [2]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        # Ensure that the model dimension (d_model) is divisible by the number of heads
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        
        # Initialize dimensions
        self.d_model = d_model # Model's dimension
        self.num_heads = num_heads # Number of attention heads
        self.d_k = d_model // num_heads # Dimension of each head's key, query, and value
        
        # Linear layers for transforming inputs
        self.W_q = #3 # Query transformation
        self.W_k = #4 # Key transformation
        self.W_v = #5 # Value transformation
        self.W_o = #6 # Output transformation
        
    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        # Calculate attention scores
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        
        # Apply mask if provided (useful for preventing attention to certain parts like padding)
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
        
        # Softmax is applied to obtain attention probabilities
        attn_probs = torch.softmax(attn_scores, dim=-1)
        
        # Multiply by values to obtain the final output
        output = torch.matmul(attn_probs, V)
        return output
        
    def split_heads(self, x):
        # Reshape the input to have num_heads for multi-head attention
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)
        
    def combine_heads(self, x):
        # Combine the multiple heads back to original shape
        batch_size, _, seq_length, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
        
    def forward(self, Q, K, V, mask=None):
        # Apply linear transformations and split heads
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))
        
        # Perform scaled dot-product attention
        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
        
        # Combine heads and apply output transformation
        output = self.W_o(self.combine_heads(attn_output))
        return output

# Feed Forward Layer

Die Feed Forward Layer folgt folgendem Schema: 

$$
FFN(x) = \max(0, xW_1 + b_1)W_2 + b_2
$$

Es wird also der Relu-Output der ersten lineare Transformation mit der zweiten linearen Transformation multipliziert. 
### Aufgabe 3

Vervollständige __#7__. 

__Hinweis: Um den Input für eine lineare Transformation in pytorch zu definieren kannst du den Output einer anderen linearen Transformation übergeben.__

__Beispiel:__ 

Linear1(Linear2)

Hier wird der Linearen Schicht Linear1 der Output der Schicht Linear2 übergeben. Vorrausgesetzt die Dimensionen stimmen.



In [3]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return #7
        

# Zusammenbau der Encoder Layer

Wie das Schaubild zeigt besteht der Encoder aus der MultiHeadAttention Layer sowie einer FeedForward Layer und 2 Normalisierungen. 

Der folgende Code implementiert den Encoder. 

__Hinweis:__
Der Output der MultiHeadAttention Layer sowie die Positional Encodings werden in der Normalization Layer übergeben. 
Das passiert hier mit einem '+'' obwohl man vielleicht ein ',' erwarten würde. 
Das liegt an einer sogenannten Residualverbindung.
Ohne darauf genauer einzugehen handelt sich dabei um eine Technik um den Vanishing oder Exploding Gradient zu vermeiden. 

In [31]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, mask):
        attn_output = self.self_attn(x, x, x, mask)
        x = self.norm1(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm2(x + self.dropout(ff_output))
        return x

# Zusammenbau der Decoder Layer

Analog zum Decoder. Die Schichten werden nacheinander implementiert. 

Zu unterscheiden ist hier zwischen der self-Attention und der cross-Attention. Während die self-attention im Decoder ähnlich zur self-Attention im Encoder funktioniert (bloß mit Maske), kombiniert die cross-Attention den Output des Encoders mit dem Output der self-Attention Layer. 

In [6]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, enc_output, src_mask, tgt_mask):
        attn_output = self.self_attn(x, x, x, tgt_mask)
        x = self.norm1(x + self.dropout(attn_output))
        attn_output = self.cross_attn(x, enc_output, enc_output, src_mask)
        x = self.norm2(x + self.dropout(attn_output))
        ff_output = self.feed_forward(x)
        x = self.norm3(x + self.dropout(ff_output))
        return x

# Zusammenbau des Transformers

__Das Modell wird hier zusammengebaut__ 

Hier muss nichts implementiert werden!

In [44]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout):
        super(Transformer, self).__init__()

        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        
        self.positional_encoding = PositionalEncoding(d_model, max_seq_length)

        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])

        self.fc = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)

    def generate_mask(self, src, tgt):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
        seq_length = tgt.size(1)
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool()
        tgt_mask = tgt_mask & nopeak_mask
        return src_mask, tgt_mask

    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)
        
        src_embedded = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        tgt_embedded = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))

        enc_output = src_embedded
        for enc_layer in self.encoder_layers:
            enc_output = enc_layer(enc_output, src_mask)

        dec_output = tgt_embedded
        for dec_layer in self.decoder_layers:
            dec_output = dec_layer(dec_output, enc_output, src_mask, tgt_mask)

        output = self.fc(dec_output)
        return output

# Training mit Dummy-Daten

Um den Trainingsprozess zu veranschaulichen, trainieren wir hier das Transformer Modell mit zwei Sequenzen an zufälligen Zahlen. 

In [45]:
src_vocab_size = 5000
tgt_vocab_size = 5000
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
max_seq_length = 100
dropout = 0.1

transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)

#set seed
torch.manual_seed(42)
# Generate random sample data
src_data = torch.randint(1, src_vocab_size, (64, max_seq_length)) 
tgt_data = torch.randint(1, tgt_vocab_size, (64, max_seq_length))
src_data

criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

transformer.train()

for epoch in range(100):
    optimizer.zero_grad()
    output = transformer(src_data, tgt_data[:, :-1])
    loss = criterion(output.contiguous().view(-1, tgt_vocab_size), tgt_data[:, 1:].contiguous().view(-1))
    loss.backward()
    optimizer.step()
    print(f"Epoch: {epoch+1}, Loss: {loss.item()}")

Epoch: 1, Loss: 8.67752742767334
Epoch: 2, Loss: 8.548323631286621
Epoch: 3, Loss: 8.483274459838867
Epoch: 4, Loss: 8.426873207092285
Epoch: 5, Loss: 8.36446762084961
Epoch: 6, Loss: 8.295404434204102
Epoch: 7, Loss: 8.215231895446777
Epoch: 8, Loss: 8.132074356079102
Epoch: 9, Loss: 8.052645683288574
Epoch: 10, Loss: 7.967570781707764
Epoch: 11, Loss: 7.8843770027160645
Epoch: 12, Loss: 7.803915500640869
Epoch: 13, Loss: 7.71641731262207
Epoch: 14, Loss: 7.6328349113464355
Epoch: 15, Loss: 7.552999019622803
Epoch: 16, Loss: 7.470870018005371
Epoch: 17, Loss: 7.388860702514648
Epoch: 18, Loss: 7.305726528167725
Epoch: 19, Loss: 7.224644660949707
Epoch: 20, Loss: 7.1422553062438965
Epoch: 21, Loss: 7.05832052230835
Epoch: 22, Loss: 6.984397888183594
Epoch: 23, Loss: 6.9108357429504395
Epoch: 24, Loss: 6.824675559997559
Epoch: 25, Loss: 6.750906467437744
Epoch: 26, Loss: 6.676790714263916
Epoch: 27, Loss: 6.607114315032959
Epoch: 28, Loss: 6.532591342926025
Epoch: 29, Loss: 6.4665932655

# Ausblick: Übersetzen mit Transformer Modellen

Gerne dürft ihr den untenstehenden Code ausführen um eine Übersetzung durchzuführen. Der Prozess ist im Grunde derselbe wie oben.

Die Übersetzung wird aufgrund der wenigen Daten natürlich (sehr) schlecht sein, 
man kann aber auch problemlos große Datensätze zum trainieren heranziehen. Falls man Lust hat, kann man das ja in seiner Freizeit mal probieren. 

Mit steigender Anzahl an Epochen steigt natürlich auch die Leistungsfähigkeit des Modells. 


In [80]:
from torchtext.vocab import build_vocab_from_iterator
from torchtext.data.utils import get_tokenizer

src_vocab_size = 5000
tgt_vocab_size = 5000
d_model = 512
num_heads = 8
num_layers = 6
d_ff = 2048
max_seq_length = 100
dropout = 0.1

train_data = [
    ("Ich kann nicht glauben, dass das wahr ist.", "I can't believe this is true."),
    ("Das Wetter ist heute schön.", "The weather is nice today."),
    ("Ich liebe Programmierung.", "I love programming."),
    ("Wie geht es dir?", "How are you?"),
    ("Das ist ein Test.", "This is a test."),
    ("Die Mensa macht immer gutes Essen.", "The canteen always makes good food."),
    ("Ich bin hungrig.", "I am hungry."),
    ("Was machst du?", "What are you doing?"),
    ("Es ist ein schöner Tag.", "It is a beautiful day."),
    ("Das Essen schmeckt schlecht.", "The food tastes bad."),
    ("Das Essen schmeckt gut.", "The food tastes good."),
    ("Ich gehe zur Schule.", "I am going to school."),
    ("Was hast du heute gelernt?", "What did you learn today?"),
    ("Ich habe eine Katze.", "I have a cat."),
    ("Ich habe einen Hund.", "I have a dog.")
]

src_tokenizer = get_tokenizer('basic_english')
tgt_tokenizer = get_tokenizer('basic_english')


def yield_tokens(data, tokenizer):
    for src_sentence, tgt_sentence in data:
        yield tokenizer(src_sentence)
        yield tokenizer(tgt_sentence)

src_vocab = build_vocab_from_iterator(yield_tokens(train_data, src_tokenizer), specials=['<pad>', '<sos>', '<eos>'])
tgt_vocab = build_vocab_from_iterator(yield_tokens(train_data, tgt_tokenizer), specials=['<pad>', '<sos>', '<eos>'])

src_vocab.set_default_index(src_vocab['<pad>'])
tgt_vocab.set_default_index(tgt_vocab['<pad>'])

def sentence_to_tensor(sentence, vocab, tokenizer):
    tokens = tokenizer(sentence)
    indices = [vocab['<sos>']] + [vocab[token] for token in tokens] + [vocab['<eos>']]
    return torch.tensor(indices).unsqueeze(0)

train_src_tensors = [sentence_to_tensor(src, src_vocab, src_tokenizer) for src, tgt in train_data]
train_tgt_tensors = [sentence_to_tensor(tgt, tgt_vocab, tgt_tokenizer) for src, tgt in train_data]

src_vocab_size = len(src_vocab)
tgt_vocab_size = len(tgt_vocab)


transformer = Transformer(src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_seq_length, dropout)

criterion = nn.CrossEntropyLoss(ignore_index=src_vocab['<pad>'])
optimizer = optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)

transformer.train()

for epoch in range(100): 
    total_loss = 0
    for src_tensor, tgt_tensor in zip(train_src_tensors, train_tgt_tensors):
        optimizer.zero_grad()
        output = transformer(src_tensor, tgt_tensor[:, :-1])
        loss = criterion(output.contiguous().view(-1, tgt_vocab_size), tgt_tensor[:, 1:].contiguous().view(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch: {epoch+1}, Loss: {total_loss/len(train_data)}")

    

def translate(transformer, src_sentence, src_vocab, tgt_vocab, max_seq_length):
    transformer.eval()
    
    src_tokens = src_tokenizer(src_sentence)
    src_indices = [src_vocab['<sos>']] + [src_vocab[token] for token in src_tokens] + [src_vocab['<eos>']]
    src_tensor = torch.tensor(src_indices).unsqueeze(0)
    
    tgt_indices = [tgt_vocab['<sos>']]
    tgt_tensor = torch.tensor(tgt_indices).unsqueeze(0)
    
    with torch.no_grad():
        for _ in range(max_seq_length):
            output = transformer(src_tensor, tgt_tensor)
            next_token = output.argmax(2)[:, -1].item()
            tgt_indices.append(next_token)
            if next_token == tgt_vocab['<eos>']:
                break
            tgt_tensor = torch.tensor(tgt_indices).unsqueeze(0)
    
    tgt_tokens = [tgt_vocab.get_itos()[idx] for idx in tgt_indices]
    return ' '.join(tgt_tokens)

Epoch: 1, Loss: 4.017286030451457
Epoch: 2, Loss: 2.861263378461202
Epoch: 3, Loss: 1.725689705212911
Epoch: 4, Loss: 1.1149355173110962
Epoch: 5, Loss: 0.7745535929997762
Epoch: 6, Loss: 0.5794821302096049
Epoch: 7, Loss: 0.416988006234169
Epoch: 8, Loss: 0.3256495401263237
Epoch: 9, Loss: 0.3196621763209502
Epoch: 10, Loss: 0.2375067062675953
Epoch: 11, Loss: 0.21858055517077446
Epoch: 12, Loss: 0.20526507397492727
Epoch: 13, Loss: 0.08913759837547938
Epoch: 14, Loss: 0.050353662483394145
Epoch: 15, Loss: 0.0762476397678256
Epoch: 16, Loss: 0.16411986568321785
Epoch: 17, Loss: 0.15593199878931047
Epoch: 18, Loss: 0.23985166649023693
Epoch: 19, Loss: 0.1940862928206722
Epoch: 20, Loss: 0.17035357523709535
Epoch: 21, Loss: 0.23044233818848928
Epoch: 22, Loss: 0.0744379414866368
Epoch: 23, Loss: 0.03281161999329925
Epoch: 24, Loss: 0.025955278240144253
Epoch: 25, Loss: 0.01208933675661683
Epoch: 26, Loss: 0.008737101343770821
Epoch: 27, Loss: 0.004833026199291149
Epoch: 28, Loss: 0.0039

In [86]:
test_sentence = "Ich habe ein Hund."
translated_sentence = translate(transformer, test_sentence, src_vocab, tgt_vocab, max_seq_length)
print(f"Translated Sentence: {translated_sentence}")

Translated Sentence: <sos> i have a dog . <eos>
