In [14]:
from transformers import AutoTokenizer, AutoModelForMaskedLM
from torch import nn
import torch
import csv
from random import choice
from copy import deepcopy

In [15]:
translation_clusters = {}
idioms = set()
encoder = {}
tokenizer = AutoTokenizer.from_pretrained('xlm-roberta-base')

# Load data

with open('dataset.csv') as f:
	reader = csv.DictReader(f)
	for row in reader:
		if row['english'] in translation_clusters:
			translation_clusters[row['english']].add(row['german'])
		else:
			translation_clusters[row['english']] = {row['german']}

		if row['german'] in translation_clusters:
			translation_clusters[row['german']].add(row['english'])
		else:
			translation_clusters[row['german']] = {row['english']}

		idioms.add(row['english'])
		idioms.add(row['german'])

# Form translation clusters

max_length = 0

for idiom in idioms:
	for match in translation_clusters[idiom]:
		translation_clusters[idiom] = translation_clusters[idiom].union(translation_clusters[match])
	translation_clusters[idiom].remove(idiom)

	encoded = tokenizer(idiom, return_tensors='pt').input_ids
	max_length = max(max_length, encoded.shape[1])

	encoder[idiom] = encoded

num_idioms = len(idioms)
decoder = {}

# Generate idiom tensor

idiom_tensor = torch.full(size=(num_idioms, max_length), fill_value=-1)
for i, idiom in enumerate(idioms):
	encoded = encoder[idiom]
	idiom_tensor[i, :encoded.shape[1]] = encoded[0]
	encoder[idiom] = idiom_tensor[i]
	decoder[tuple(idiom_tensor[i].tolist())] = idiom

assert decoder[tuple(encoder['makes me feel like'].tolist())] == 'makes me feel like'

# Convert idiom clusters to tokenized representations

for idiom in deepcopy(translation_clusters):
	translation_clusters[tuple(encoder[idiom].tolist())] = set()

	for match in translation_clusters[idiom]:
		translation_clusters[tuple(encoder[idiom].tolist())].add(tuple(encoder[match].tolist()))

	del translation_clusters[idiom]

print(idiom_tensor.shape)


torch.Size([14673, 24])


In [5]:
train = idiom_tensor[:int(0.9*num_idioms)]
test = idiom_tensor[int(0.9*num_idioms):int(0.95*num_idioms)]
val = idiom_tensor[int(0.95*num_idioms)]

In [6]:

latent_dimensions = 1000
device = 'mps'
iterations = 2000
learning_rate = 0.01
batch_size = 64

In [75]:
def tensor_to_set(tensor):
    return  {tuple(d.tolist()) for d in tensor}

In [77]:
def get_batch(data):
	indexes = torch.randint(0, len(data), (batch_size,))
	batch = set()
	set_data = tensor_to_set(data)
	c = 0

	for i in indexes:
		idiom = data[i]
		batch.add(tuple(idiom.tolist()))

		possible_idioms = set_data.intersection(translation_clusters[tuple(idiom.tolist())])

		if possible_idioms == set():
			c+=1
			continue

		random_cluster_mate = choice(list(possible_idioms))
		batch.add(tuple(random_cluster_mate))

	batch = [list(x) for x in list(batch)]
	return torch.tensor(batch)

print(get_batch(train).shape)


torch.Size([124, 24])


In [82]:
def get_positive_sample(anchor, data):
	possible_positive = translation_clusters[tuple(anchor.tolist())].intersection(tensor_to_set(data))
	return torch.tensor(choice(list(possible_positive)))

batch = get_batch(train)
positive_sample = get_positive_sample(batch[0], batch)

assert (positive_sample in batch) and (tuple(positive_sample.tolist()) in translation_clusters[tuple(batch[0].tolist())])

In [87]:
def get_negative_sample(anchor, data):
    possible_negative = tensor_to_set(data).difference(translation_clusters[tuple(anchor.tolist())])
    return torch.tensor(choice(list(possible_negative)))

negative_sample = get_negative_sample(batch[0], batch)

assert (negative_sample in batch) and (negative_sample not in translation_clusters[tuple(batch[0].tolist())])

In [68]:
class Model(nn.Module):
	def __init__(self, pooling):
		super().__init__()
		self.pooling = pooling
		self.roberta = AutoModelForMaskedLM.from_pretrained('xlm-roberta-base')
		self.output_layer = nn.Linear(250002, latent_dimensions) 

	def forward(self, input):
		input_ids = torch.LongTensor(input["input_ids"])
		attention_mask = torch.LongTensor(torch.ones(max_length))
		roberta_logits = self.roberta(input_ids=input_ids, attention_mask=attention_mask).logits[0]
		if self.pooling == 'average': pooled = torch.mean(roberta_logits, dim=0)
		vector_representation = self.output_layer(pooled)
		
		return vector_representation

In [166]:
model = Model(pooling='average')

In [189]:
for _ in range(1):
	batch = get_batch(train)
	
	for data in batch:
		model(data)