In [1]:
import torch, random, numpy as np, seaborn as sns
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable
from sklearn.manifold import TSNE
from torch import nn, optim

In [2]:
with open('German.txt', 'r', encoding='utf-8') as f:
    sentences = f.read().splitlines()
    f.close()

In [6]:
class CharVocab: 
    ''' Create a Vocabulary for '''
    def __init__(self, type_vocab,pad_token='<PAD>', eos_token='<EOS>', unk_token='<UNK>'): 
        self.type = type_vocab
        #self.int2char ={}
        self.int2char = []
        if pad_token !=None:
            self.int2char += [pad_token]
        if eos_token !=None:
            self.int2char += [eos_token]
        if unk_token !=None:
            self.int2char += [unk_token]
        #self.int2char[1]=eos_token
        #self.int2char[2]=unk_token
        self.char2int = {}
        
    def __call__(self, text):       
        chars = set(''.join(text))
        # Dictionary mapping integers to the characters
        self.int2char += list(chars)
        # Dictionary mapping characters to integers
        self.char2int = {char: ind for ind, char in enumerate(self.int2char)}

vocab = CharVocab('char','<PAD>',None,'<UNK>')
vocab(sentences)
print('Length of vocabulary: ', len(vocab.int2char))
print('Int to Char: ', vocab.int2char)
print('Char to Int: ', vocab.char2int)

vocab = vocab.char2int

class MaskedSequenceDataset(Dataset):
    def __init__(self, sentences, vocab, mask_ratio=0.2):
        self.sentences = sentences
        self.vocab = vocab
        self.mask_ratio = mask_ratio
        self.vocab_size = len(vocab)

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, index):
        sentence = self.sentences[index]
        sequence = [self.vocab.get(c) for c in sentence]

        # mask some of the characters in the sequence
        num_masked = int(len(sequence) * self.mask_ratio)
        masked_indices = torch.randperm(len(sequence))[:num_masked]
        masked_sequence = sequence.copy()
        for i in masked_indices:
            masked_sequence[i] = self.vocab.get('<UNK>')

        return torch.LongTensor(masked_sequence), torch.LongTensor(sequence)

Length of vocabulary:  33
Int to Char:  ['<PAD>', '<UNK>', 't', 'y', 'x', 'g', 'u', 'ü', 'h', 'd', 'b', 'v', 'j', 'ö', 'l', ' ', 'r', 'n', 'i', 'o', 'm', 'w', 'p', 'c', 'z', 'e', 's', 'ä', 'q', 'ß', 'k', 'a', 'f']
Char to Int:  {'<PAD>': 0, '<UNK>': 1, 't': 2, 'y': 3, 'x': 4, 'g': 5, 'u': 6, 'ü': 7, 'h': 8, 'd': 9, 'b': 10, 'v': 11, 'j': 12, 'ö': 13, 'l': 14, ' ': 15, 'r': 16, 'n': 17, 'i': 18, 'o': 19, 'm': 20, 'w': 21, 'p': 22, 'c': 23, 'z': 24, 'e': 25, 's': 26, 'ä': 27, 'q': 28, 'ß': 29, 'k': 30, 'a': 31, 'f': 32}


Char Embeddings with RNN

In [7]:
train_size = int(0.8 * len(sentences))
test_size = len(sentences) - train_size
train_sentences, test_sentences = torch.utils.data.random_split(sentences, [train_size, test_size])

# define the collate function for the dataloader
def collate_fn(batch):
    inputs = [torch.LongTensor(b[0]) for b in batch]
    targets = [torch.LongTensor(b[1]) for b in batch]
    inputs = pad_sequence(inputs, batch_first=True, padding_value=vocab.get('<PAD>'))
    targets = pad_sequence(targets, batch_first=True, padding_value=-100)
    return inputs, targets

train_dataset = MaskedSequenceDataset(train_sentences, vocab)
test_dataset = MaskedSequenceDataset(test_sentences, vocab)

batch_size = 32
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

In [5]:
class MaskedLanguageModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(MaskedLanguageModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.rnn = nn.GRU(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, inputs):
        embedded = self.embedding(inputs)
        outputs, _ = self.rnn(embedded)
        logits = self.fc(outputs)
        return logits
    
    
model = MaskedLanguageModel(len(vocab), embedding_dim=128, hidden_dim=256)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

epochs = 10
for epoch in range(epochs):
    train_loss = 0
    for batch_inputs, batch_targets in train_dataloader:
        optimizer.zero_grad()

        logits = model(batch_inputs)
        loss = criterion(logits.view(-1, len(vocab)), batch_targets.view(-1))

        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    train_loss /= len(train_dataloader)

    print(f'Epoch {epoch+1}, Train Loss: {train_loss:.4f}')

TypeError: pad_sequence(): argument 'padding_value' (position 3) must be float, not NoneType

In [None]:
embeddings = model.embedding.weight.detach

In [None]:
# def one_hot_encode(indices, dict_size):
#     ''' Define one hot encode matrix for our sequences'''
#     # Creating a multi-dimensional array with the desired output shape
#     # Encode every integer with its one hot representation
#     features = np.eye(dict_size, dtype=np.float32)[indices.flatten()]
    
#     # Finally reshape it to get back to the original array
#     features = features.reshape((*indices.shape, dict_size))
            
#     return features

# def encode_text(input_text, vocab, one_hot=False, masking=False, padding=False):
#     total_len = sum(len(s) for s in input_text)
#     avg_len = int(total_len/len(input_text))
#     mask_chars = 5
#     output = []
#     for seq in input_text:
#         seq = [vocab.char2int.get(c,0) for c in seq]
#         # seq.append(vocab.char2int.get('<EOS>'))
#         if masking:
#             mask_indices = random.sample(range(len(seq)), mask_chars)
#             for i in mask_indices:
#                 seq[i] = vocab.char2int.get('<UNK>')
#         if padding:
#             if len(seq) < avg_len:
#                 pads = [vocab.char2int.get('<PAD>')] * (avg_len - (len(seq)))
#                 seq.extend(pads)
#             elif len(seq) > avg_len:
#                 seq = seq[:avg_len]   
#         output.append(seq)
    
#     if one_hot:
#     # One hot encode every integer of the sequence
#         dict_size = len(vocab.char2int)
#         return one_hot_encode(output, dict_size)
#     else:
#         return np.array(output)
     
# train_data = encode_text(sentences, vocab, one_hot=False, masking=True, padding=False)

In [None]:
# Predict
model.eval()

input_seq = 'der schnelle braune Fuchs springt über den faulen Hund'
masked_char = '*'
x = torch.tensor([char_to_index[c] for c in input_seq]).unsqueeze(0)

mask_tensor = torch.tensor([1 if c == '*' else 0 for c in input_seq], dtype=torch.bool)
mask_index = mask_tensor.nonzero()[0].item()
mask_tensor[mask_index] = 1

y_pred = model(x, mask_tensor)
pred_index = y_pred.argmax(dim=2)
next_char = index_to_char[pred_index[0][mask_index].item()]

print(f"Given the sequence '{input_seq}', the predicted masked character is '{next_char}'")

In [None]:
tsne = TSNE(n_components=2)
embeddings_tsne = tsne.fit_transform(embeddings)

plt.scatter(embeddings_tsne[:,0], embeddings_tsne[:,1])
for i, char in enumerate(unique_chars):
    plt.annotate(char, (embeddings_tsne[i,0], embeddings_tsne[i,1]))
plt.show()

Char Embeddings with FFNN

In [6]:
train_size = int(0.8 * len(sentences))
test_size = len(sentences) - train_size
train_sentences, test_sentences = torch.utils.data.random_split(sentences, [train_size, test_size])

# define the collate function for the dataloader
def collate_fn(batch):
    # inputs = [torch.tensor([vocab[c] for c in seq[:-n]], dtype=torch.long) for seq, n in batch]
    # targets = [torch.tensor([vocab[c] for c in seq[n:]], dtype=torch.long) for seq, n in batch]
    inputs = [torch.LongTensor(b[0]) for b in batch]
    targets = [torch.LongTensor(b[1]) for b in batch]
    inputs = pad_sequence(inputs, batch_first=True, padding_value=-1)
    targets = pad_sequence(targets, batch_first=True, padding_value=-100)
    return inputs, targets

train_dataset = MaskedSequenceDataset(train_sentences, vocab)
test_dataset = MaskedSequenceDataset(test_sentences, vocab)

batch_size = 32
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

In [7]:
class FFNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super(FFNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.hidden = nn.Linear(embedding_dim, hidden_dim)
        self.output = nn.Linear(hidden_dim, vocab_size)

    def forward(self, inputs):
        embedded = self.embedding(inputs)
        hidden = torch.relu(self.hidden(embedded))
        output = self.output(hidden)
        return output


model = FFNN(vocab_size=len(vocab), embedding_dim=32, hidden_dim=64)
criterion = nn.CrossEntropyLoss(ignore_index=-100)
optimizer = optim.Adam(model.parameters(), lr=0.001)

for epoch in range(10):
    running_loss = 0.0
    for batch_inputs, batch_targets in train_dataloader:
        optimizer.zero_grad()
        outputs = model(batch_inputs)
        loss = criterion(outputs.view(-1, len(vocab)), batch_targets.view(-1))
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        running_loss += loss.item() * batch_inputs.size(0)
    epoch_loss = running_loss / len(train_dataset)
    print('Epoch {} Loss: {:.4f}'.format(epoch, epoch_loss))

IndexError: index out of range in self