In [None]:
import torch
import torch.nn as nn
from nltk.tokenize import word_tokenize
from os import path
from math import sqrt
import plotly.express as px

In [None]:
class Vocabulary:
	def __init__(self):
		self.vocabulary = set()
		self.stoi = {'<N>':0}
		self.itos = {0:'<N>'}

	def add(self, v):
		if type(v) == str:
			self.vocabulary.add(v)
		elif type(v) == list:
			self.vocabulary = self.vocabulary.union(set(v)) 

	def create_mappings(self):
		self.stoi |= {v:i+len(self.stoi) for i, v in enumerate(self.vocabulary)}
		self.itos |= {i+len(self.itos):v for i, v in enumerate(self.vocabulary)}

	def encode(self, s): 
		return [self.stoi[c] for c in s]
	
	def decode(self, i): 
		return [self.itos[n] for n in i]
	

class PreProcessor:
	def __init__(self):
		self.english_vocabulary = Vocabulary()
		self.cherokee_vocabulary = Vocabulary()
		self.cherokee = []
		self.english = []
		self.max_length = 0
		self.count = 0

	def load_text(self, file_name):
		data, language = [], file_name.split('.')[0]

		with open(path.join('chr_en_data', file_name)) as f:
			for line in f.readlines():
				sentence = ['<S>'] + word_tokenize(line) + ['<E>']

				if language == 'en':
					self.english_vocabulary.add(sentence)
				else:
					self.cherokee_vocabulary.add(sentence)

				self.max_length = max(self.max_length, len(sentence))
				data.append(sentence)
				self.count += 1 
		return data
	
	def get_data(self):
		cherokee = self.load_text('chr.txt')
		english  = self.load_text('en.txt' )
		assert len(cherokee) == len(english)
		self.cherokee += cherokee
		self.english  += english

		return cherokee, english

	
	def create_tensors(self):
		self.english_vocabulary.create_mappings()
		self.cherokee_vocabulary.create_mappings()

		english  = torch.zeros(size=(self.count//2, self.max_length), dtype=int)
		cherokee = torch.zeros(size=(self.count//2, self.max_length), dtype=int)

		for i, sen in enumerate(self.english):
			for j, v in enumerate(self.english_vocabulary.encode(sen)):
				english[i, j] = v
		
		for i, sen in enumerate(self.cherokee):
			for j, v in enumerate(self.cherokee_vocabulary.encode(sen)):
				cherokee[i, j] = v

		self.cherokee, self.english = cherokee, english


preprocessor = PreProcessor()
preprocessor.get_data()
preprocessor.create_tensors()

test = word_tokenize('ᎤᎵᎦᎵᏴᎮᎢ ᎠᏴᏤᏂ ᏫᎵᎻ.')

assert preprocessor.cherokee_vocabulary.decode(preprocessor.cherokee_vocabulary.encode(test)) == test
assert preprocessor.english.shape == preprocessor.cherokee.shape

In [None]:
cherokee_vocab_size, english_vocab_size = len(preprocessor.cherokee_vocabulary.stoi), len(preprocessor.english_vocabulary.stoi)

print(f'Cherokee Vocabulary Size: {cherokee_vocab_size}')
print(f'English  Vocabulary Size: {english_vocab_size}')

In [None]:
cherokee_in, english_in, expected_probabilities = [], [], []

for i, c in enumerate(preprocessor.cherokee):
	cherokee_tensor = torch.tensor(list(c))
	for j in range(1, preprocessor.max_length - 1):
		english_tensor = torch.zeros(preprocessor.max_length)
		english_tensor[:j] = preprocessor.english[i, :j]
		
		probability = torch.zeros(english_vocab_size)
		probability[preprocessor.english[i, j].item()] = 1
		if preprocessor.english[i, j].item() != 0:
			cherokee_in.append(cherokee_tensor)
			english_in.append(english_tensor)
			expected_probabilities.append(probability)


cherokee, english, expected_probabilities = torch.stack(cherokee_in).int(), torch.stack(english_in).int(), torch.stack(expected_probabilities).float()

#print(cherokee.shape)
#print(english.shape)
#print(expected_probabilities.shape)

In [None]:
size = int(cherokee.shape[0])

train_cherokee = cherokee[:int(0.8*size)]
train_english  = english[:int(0.8*size)]
train_probabilities = expected_probabilities[:int(0.8*size)]

test_cherokee  = cherokee[int(0.8*size):int(0.9*size)]
test_english   = english[int(0.8*size):int(0.9*size)]
test_probabilities = expected_probabilities[int(0.8*size):int(0.9*size)]

val_cherokee   = cherokee[int(0.9*size):]
val_english    = english[int(0.9*size):]
val_probabilities = expected_probabilities[int(0.9*size):]

In [None]:
EMBEDDING_DIMENSIONS = 64
QKV_DIMENSIONS       = 64
SEQUENCE_LENGTH      = preprocessor.max_length
ATTENTION_HEADS      = 8
DECODERS             = 4
ENCODERS             = 4
BATCH_SIZE           = 512

In [None]:
def mask_tensor(t):
	mask = torch.tril(torch.ones(size=(t.shape)))
	mask[mask==0], mask[mask==1] = float('-inf'), 0
	
	return t + mask

#test = torch.randn(SEQUENCE_LENGTH, SEQUENCE_LENGTH)
#print(test)
#print(mask_tensor(test))

In [None]:
class AttentionHead(nn.Module):
	def __init__(self, masked=False):
		super().__init__()
		self.obtain_key   = nn.Linear(EMBEDDING_DIMENSIONS, QKV_DIMENSIONS)
		self.obtain_query = nn.Linear(EMBEDDING_DIMENSIONS, QKV_DIMENSIONS)
		self.obtain_value = nn.Linear(EMBEDDING_DIMENSIONS, QKV_DIMENSIONS)
		self.masked = masked

	def forward(self, data, encoder_output=None):
		if encoder_output is None: Q, K, V = self.obtain_query(data), self.obtain_key(data), self.obtain_value(data)
		else: Q, K, V = self.obtain_query(data), self.obtain_key(encoder_output), self.obtain_value(encoder_output)
		mat_mul = Q @ K.transpose(-2, -1)
		scaled_mat_mul = mat_mul / sqrt(QKV_DIMENSIONS)
		if self.masked: scaled_mat_mul = mask_tensor(scaled_mat_mul)
		softmax_mat_mul = torch.softmax(scaled_mat_mul, dim=-1)
		output = softmax_mat_mul @ V

		return output

#test, e_output = torch.randn(size=(BATCH_SIZE, SEQUENCE_LENGTH, EMBEDDING_DIMENSIONS)), torch.randn(size=(SEQUENCE_LENGTH, QKV_DIMENSIONS))
#test_module = AttentionHead()
#print(test_module(test, encoder_output=e_output))



In [None]:
class MultiHeadedAttention(nn.Module):
	def __init__(self, masked=False):
		super().__init__()
		self.heads  = [AttentionHead(masked=masked) for _ in range(ATTENTION_HEADS)]
		self.linear = nn.Linear(EMBEDDING_DIMENSIONS*ATTENTION_HEADS, EMBEDDING_DIMENSIONS)

	def forward(self, data, encoder_output=None):
		vectors = torch.cat([head(data, encoder_output=encoder_output) for head in self.heads], dim=-1)
		return self.linear(vectors)
	

#test = torch.randn(size=(BATCH_SIZE, SEQUENCE_LENGTH, EMBEDDING_DIMENSIONS))
#test_module = MultiHeadedAttention()
#print(test_module(test))


In [None]:
class FeedForward(nn.Module):
	def __init__(self):
		super().__init__()
		self.network = nn.Sequential(
			nn.Linear(EMBEDDING_DIMENSIONS, EMBEDDING_DIMENSIONS),
			nn.ReLU(),
			nn.Linear(EMBEDDING_DIMENSIONS, EMBEDDING_DIMENSIONS)
		)

	def forward(self, data):
		return self.network(data)

#test = torch.randn(size=(BATCH_SIZE, SEQUENCE_LENGTH, EMBEDDING_DIMENSIONS))
#test_module = FeedForward()
#print(test_module(test))


In [None]:
class Encoder(nn.Module):
	def __init__(self):
		super().__init__()
		self.multi_headed_attention = MultiHeadedAttention()
		self.norm1 = nn.LayerNorm(EMBEDDING_DIMENSIONS)
		self.feed_forward = FeedForward()
		self.norm2 = nn.LayerNorm(EMBEDDING_DIMENSIONS)

	def forward(self, data):
		attention_vectors = self.multi_headed_attention(data)
		normalised = self.norm1(attention_vectors) + data # Residual Connection
		fed_through = self.feed_forward(normalised)
		normalised_2 = self.norm2(fed_through) + normalised

		return normalised_2
	
#test = torch.randn(size=(BATCH_SIZE, SEQUENCE_LENGTH, EMBEDDING_DIMENSIONS))
#print(test)
#test_module = Encoder()
#print(test_module(test))

In [None]:
class Decoder(nn.Module):
	def __init__(self):
		super().__init__()
		self.masked_attention = MultiHeadedAttention(masked=True)
		self.norm1 = nn.LayerNorm(EMBEDDING_DIMENSIONS)
		self.cross_attention  = MultiHeadedAttention()
		self.norm2 = nn.LayerNorm(EMBEDDING_DIMENSIONS)
		self.feed_forward = FeedForward()
		self.norm3 = nn.LayerNorm(EMBEDDING_DIMENSIONS)

	def forward(self, data, encoder_output):
		attention_vectors = self.masked_attention(data)
		normalised = self.norm1(attention_vectors) + data
		cross_attention = self.cross_attention(normalised, encoder_output=encoder_output)
		normalised = self.norm2(cross_attention) + normalised
		linear = self.feed_forward(normalised)
		normalised = self.norm3(linear) + normalised

		return normalised


#test = torch.randn(size=(BATCH_SIZE, SEQUENCE_LENGTH, EMBEDDING_DIMENSIONS))
#encoder_output = torch.randn(size=(BATCH_SIZE, SEQUENCE_LENGTH, QKV_DIMENSIONS))
#test_module = Decoder()
#print(test_module(test, encoder_output))

In [None]:
class Transformer(nn.Module):
	def __init__(self):
		super().__init__()
		self.cherokee_embeddings = nn.Embedding(cherokee_vocab_size, EMBEDDING_DIMENSIONS)
		self.cherokee_positional_encodings = nn.Embedding(SEQUENCE_LENGTH, EMBEDDING_DIMENSIONS)
		self.encoders = [Encoder() for _ in range(ENCODERS)]
		self.decoders = [Decoder() for _ in range(DECODERS)]
		self.english_embeddings = nn.Embedding(english_vocab_size, EMBEDDING_DIMENSIONS)
		self.english_positional_encodings = nn.Embedding(SEQUENCE_LENGTH, EMBEDDING_DIMENSIONS)
		self.linear   = nn.Linear(SEQUENCE_LENGTH, english_vocab_size) 

	def forward(self, source, target_context):
		encoder_output = self.cherokee_embeddings(source) + self.cherokee_positional_encodings(torch.arange(target_context.shape[-1]).unsqueeze(0))

		for encoder in self.encoders:
			encoder_output = encoder(encoder_output)

		decoder_output = self.english_embeddings(target_context) + self.english_positional_encodings(torch.arange(target_context.shape[-1]).unsqueeze(0))

		for decoder in self.decoders:
			decoder_output = decoder(decoder_output, encoder_output=encoder_output)

		decoder_output = torch.mean(decoder_output, dim=-1)
		logits = self.linear(decoder_output)
		probabilities = torch.softmax(logits, dim=-1)

		return probabilities
	
#cherokee_input = torch.randint(low=0, high=cherokee_vocab_size-1, size=(BATCH_SIZE, SEQUENCE_LENGTH,))
#target_context = torch.zeros(size=(BATCH_SIZE, SEQUENCE_LENGTH,), dtype=int)
#print(cherokee_input.shape, target_context.shape)
#model = Transformer()
#print(model(cherokee_input, target_context).shape)

In [None]:
class Interface:
	def __init__(self, learning_rate, model):
		self.model = model
		self.learning_rate = learning_rate
		self.optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
		self.losses = []
		self.loss = nn.CrossEntropyLoss()

	def get_batch(self, english, cherokee, probabilities):
		indexes = torch.randint(0, english.shape[0], size=(BATCH_SIZE,))
		english  = torch.stack([english[i] for i in indexes])
		cherokee = torch.stack([cherokee[i] for i in indexes])
		probabilities = torch.stack([probabilities[i] for i in indexes])

		return english, cherokee, probabilities
	
	def pass_batch(self, english, cherokee, probabilities):
		english, cherokee, probabilities = self.get_batch(english, cherokee, probabilities)
		logits = self.model(cherokee, english)
		loss = self.loss(logits, probabilities.float())

		return logits, loss
	
	def plot_loss(self):
		fig = px.line(x=range(len(self.losses)), y=self.losses, title='Line Graph of Losses')
		fig.update_xaxes(title_text='Data Point Index')
		fig.update_yaxes(title_text='Loss Values')
		fig.show()

	def beam_search(self, inputs, beam_width):
		empty_context = torch.zeros(size=(SEQUENCE_LENGTH,), dtype=int)
		empty_context[0] = preprocessor.english_vocabulary.stoi['<S>']
		initial_tokens = self.model(inputs, empty_context)
		_, top_indices = torch.topk(initial_tokens, k=beam_width)

		inputs = inputs.repeat(beam_width, 1)
		contexts = torch.zeros(size=(beam_width, SEQUENCE_LENGTH), dtype=int)
		contexts[:, 0], contexts[:, 1] = preprocessor.english_vocabulary.stoi['<S>'], top_indices

		probabilities = torch.ones(size=(beam_width,))
		
		for i in range(2, SEQUENCE_LENGTH):
			logits = self.model(inputs, contexts)
			top_values, top_indices = torch.topk(logits.view(-1), k=beam_width, largest=True)
			beams = top_indices // english_vocab_size
			tokens = top_indices % english_vocab_size
			new_contexts = torch.zeros_like(contexts)
		
			
			for j, b in enumerate(beams):
				new_contexts[j, :i] = contexts[b, :i]
				new_contexts[j, i] = tokens[j]
				probabilities[j] = probabilities[b] * top_values[j]
		
		probabilities = torch.softmax(probabilities, dim=-1)
		_, index = torch.topk(probabilities, k=1)
		index = index.item()

		return contexts[index]
			
		
	def translate_sentence(self, cherokee_sentence):
		i = torch.tensor(preprocessor.cherokee_vocabulary.encode(word_tokenize(cherokee_sentence)))
		inputs = torch.zeros(size=(SEQUENCE_LENGTH,), dtype=int)
		inputs[:len(i)] = i
		optimal_sequence = preprocessor.english_vocabulary.decode([i.item() for i in self.beam_search(inputs, beam_width=5)])
		optimal_sequence = list(filter(lambda x: x not in ('<S>', '<E>', '<N>'), optimal_sequence))

		return ''.join(optimal_sequence)
		
	def train(self, epochs):
		for i in range(epochs):
			logits, loss = self.pass_batch(train_english, train_cherokee, train_probabilities)
			self.optimizer.zero_grad(set_to_none=True)
			loss.backward()
			self.optimizer.step()
			print(loss)
			self.losses.append(loss.item())


In [None]:
model = Transformer()
trainer = Interface(learning_rate=0.1, model=model)
trainer.translate_sentence('ᏚᎾᏘᏃᎸᏃ ᏕᎦᎳᏫᎥ ᎢᎬᏱᏗᏢ ᏑᏂᎧᏁᎢ. ᏄᎬᏫᏳᏒᏃ ᎠᏥᎸ-ᎨᎶᎯ ᏚᏛᏛᏁᎢ,')

In [None]:
a=torch.tensor([1,2,3])
b=torch.zeros(size=(3,3))
b[:, 0] = a
print(b)
a[0].item()