<a href="https://colab.research.google.com/github/muhammadaarif-9/NLP/blob/main/multilingual_wordAlignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# SimAlign

In [1]:
from simalign import SentenceAligner

ModuleNotFoundError: ignored

In [10]:
#!pip install simalign
!pip install sacremoses

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting sacremoses
  Downloading sacremoses-0.0.53.tar.gz (880 kB)
[K     |████████████████████████████████| 880 kB 4.5 MB/s 
Building wheels for collected packages: sacremoses
  Building wheel for sacremoses (setup.py) ... [?25l[?25hdone
  Created wheel for sacremoses: filename=sacremoses-0.0.53-py3-none-any.whl size=895260 sha256=66283c7c2c13cedf08607eac21922f587513158204921379d56751c646b8f9fd
  Stored in directory: /root/.cache/pip/wheels/87/39/dd/a83eeef36d0bf98e7a4d1933a4ad2d660295a40613079bafc9
Successfully built sacremoses
Installing collected packages: sacremoses
Successfully installed sacremoses-0.0.53


In [24]:
import os
import logging
from typing import Dict, List, Tuple, Union

import numpy as np
from scipy.stats import entropy
from scipy.sparse import csr_matrix
from sklearn.preprocessing import normalize
from sklearn.metrics.pairwise import cosine_similarity
try:
	import networkx as nx
	from networkx.algorithms.bipartite.matrix import from_biadjacency_matrix
except ImportError:
	nx = None
import torch
from transformers import BertModel, BertTokenizer, XLMModel, XLMTokenizer, RobertaModel, RobertaTokenizer, XLMRobertaModel, XLMRobertaTokenizer, AutoConfig, AutoModel, AutoTokenizer, AutoModelForMaskedLM

from simalign.utils import get_logger

LOG = get_logger(__name__)


class EmbeddingLoader(object):
	def __init__(self, model: str="xlm-roberta-base", device=torch.device('cpu'), layer: int=8):
		TR_Models = {
			'bert-base-uncased': (BertModel, BertTokenizer),
			'bert-base-multilingual-cased': (BertModel, BertTokenizer),
			'bert-base-multilingual-uncased': (BertModel, BertTokenizer),
			'xlm-mlm-100-1280': (XLMModel, XLMTokenizer),
			'roberta-base': (RobertaModel, RobertaTokenizer),
			'xlm-roberta-base': (XLMRobertaModel, XLMRobertaTokenizer),
			'xlm-roberta-large': (XLMRobertaModel, XLMRobertaTokenizer),
		}

		self.model = model
		self.device = device
		self.layer = layer
		self.emb_model = None
		self.tokenizer = None

		if False:
			model_class, tokenizer_class = TR_Models[model]
			self.emb_model = AutoModelForMaskedLM.from_pretrained(model, output_hidden_states=True)
			self.emb_model.eval()
			self.emb_model.to(self.device)
			self.tokenizer = AutoTokenizer.from_pretrained(model,add_prefix_space=True)
		else:
			# try to load model with auto-classes
			config = AutoConfig.from_pretrained(model, output_hidden_states=True)
			self.emb_model = AutoModel.from_pretrained(model, config=config)
			self.emb_model.eval()
			self.emb_model.to(self.device)
			self.tokenizer = AutoTokenizer.from_pretrained(model,add_prefix_space=True) # add_prefix_space=True for roberta and gpt2
		LOG.info("Initialized the EmbeddingLoader with model: {}".format(self.model))

	def get_embed_list(self, sent_batch: List[List[str]]) -> torch.Tensor:
		if self.emb_model is not None:
			with torch.no_grad():
				if not isinstance(sent_batch[0], str):
					inputs = self.tokenizer(sent_batch, is_split_into_words=True, padding=True, truncation=True, return_tensors="pt")
				else:
					inputs = self.tokenizer(sent_batch, is_split_into_words=False, padding=True, truncation=True, return_tensors="pt")
				hidden = self.emb_model(**inputs.to(self.device))["hidden_states"]
				if self.layer >= len(hidden):
					raise ValueError(f"Specified to take embeddings from layer {self.layer}, but model has only {len(hidden)} layers.")
				outputs = hidden[self.layer]
				return outputs[:, 1:-1, :]
		else:
			return None


class SentenceAligner(object):
	def __init__(self, model: str = "xlm-roberta-base", token_type: str = "bpe", distortion: float = 0.0, matching_methods: str = "mai", device: str = "cpu", layer: int = 8):
		model_names = {
			"bert": "bert-base-multilingual-cased",
			"xlmr": "xlm-roberta-base"
			}
		all_matching_methods = {"a": "inter", "m": "mwmf", "i": "itermax", "f": "fwd", "r": "rev"}

		self.model = model
		if model in model_names:
			self.model = model_names[model]
		self.token_type = token_type
		self.distortion = distortion
		self.matching_methods = [all_matching_methods[m] for m in matching_methods]
		self.device = torch.device(device)

		self.embed_loader = EmbeddingLoader(model=self.model, device=self.device, layer=layer)

	@staticmethod
	def get_max_weight_match(sim: np.ndarray) -> np.ndarray:
		if nx is None:
			raise ValueError("networkx must be installed to use match algorithm.")
		def permute(edge):
			if edge[0] < sim.shape[0]:
				return edge[0], edge[1] - sim.shape[0]
			else:
				return edge[1], edge[0] - sim.shape[0]
		G = from_biadjacency_matrix(csr_matrix(sim))
		matching = nx.max_weight_matching(G, maxcardinality=True)
		matching = [permute(x) for x in matching]
		matching = sorted(matching, key=lambda x: x[0])
		res_matrix = np.zeros_like(sim)
		for edge in matching:
			res_matrix[edge[0], edge[1]] = 1
		return res_matrix

	@staticmethod
	def get_similarity(X: np.ndarray, Y: np.ndarray) -> np.ndarray:
		return (cosine_similarity(X, Y) + 1.0) / 2.0

	@staticmethod
	def average_embeds_over_words(bpe_vectors: np.ndarray, word_tokens_pair: List[List[str]]) -> List[np.array]:
		w2b_map = []
		cnt = 0
		w2b_map.append([])
		for wlist in word_tokens_pair[0]:
			w2b_map[0].append([])
			for x in wlist:
				w2b_map[0][-1].append(cnt)
				cnt += 1
		cnt = 0
		w2b_map.append([])
		for wlist in word_tokens_pair[1]:
			w2b_map[1].append([])
			for x in wlist:
				w2b_map[1][-1].append(cnt)
				cnt += 1

		new_vectors = []
		for l_id in range(2):
			w_vector = []
			for word_set in w2b_map[l_id]:
				w_vector.append(bpe_vectors[l_id][word_set].mean(0))
			new_vectors.append(np.array(w_vector))
		return new_vectors

	@staticmethod
	def get_alignment_matrix(sim_matrix: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
		m, n = sim_matrix.shape
		forward = np.eye(n)[sim_matrix.argmax(axis=1)]  # m x n
		backward = np.eye(m)[sim_matrix.argmax(axis=0)]  # n x m
		return forward, backward.transpose()

	@staticmethod
	def apply_distortion(sim_matrix: np.ndarray, ratio: float = 0.5) -> np.ndarray:
		shape = sim_matrix.shape
		if (shape[0] < 2 or shape[1] < 2) or ratio == 0.0:
			return sim_matrix

		pos_x = np.array([[y / float(shape[1] - 1) for y in range(shape[1])] for x in range(shape[0])])
		pos_y = np.array([[x / float(shape[0] - 1) for x in range(shape[0])] for y in range(shape[1])])
		distortion_mask = 1.0 - ((pos_x - np.transpose(pos_y)) ** 2) * ratio

		return np.multiply(sim_matrix, distortion_mask)

	@staticmethod
	def iter_max(sim_matrix: np.ndarray, max_count: int=2) -> np.ndarray:
		alpha_ratio = 0.9
		m, n = sim_matrix.shape
		forward = np.eye(n)[sim_matrix.argmax(axis=1)]  # m x n
		backward = np.eye(m)[sim_matrix.argmax(axis=0)]  # n x m
		inter = forward * backward.transpose()

		if min(m, n) <= 2:
			return inter

		new_inter = np.zeros((m, n))
		count = 1
		while count < max_count:
			mask_x = 1.0 - np.tile(inter.sum(1)[:, np.newaxis], (1, n)).clip(0.0, 1.0)
			mask_y = 1.0 - np.tile(inter.sum(0)[np.newaxis, :], (m, 1)).clip(0.0, 1.0)
			mask = ((alpha_ratio * mask_x) + (alpha_ratio * mask_y)).clip(0.0, 1.0)
			mask_zeros = 1.0 - ((1.0 - mask_x) * (1.0 - mask_y))
			if mask_x.sum() < 1.0 or mask_y.sum() < 1.0:
				mask *= 0.0
				mask_zeros *= 0.0

			new_sim = sim_matrix * mask
			fwd = np.eye(n)[new_sim.argmax(axis=1)] * mask_zeros
			bac = np.eye(m)[new_sim.argmax(axis=0)].transpose() * mask_zeros
			new_inter = fwd * bac

			if np.array_equal(inter + new_inter, inter):
				break
			inter = inter + new_inter
			count += 1
		return inter

	def get_word_aligns(self, src_sent: Union[str, List[str]], trg_sent: Union[str, List[str]]) -> Dict[str, List]:
		if isinstance(src_sent, str):
			src_sent = src_sent.split()
		if isinstance(trg_sent, str):
			trg_sent = trg_sent.split()
		l1_tokens = [self.embed_loader.tokenizer.tokenize(word) for word in src_sent]
		l2_tokens = [self.embed_loader.tokenizer.tokenize(word) for word in trg_sent]
		bpe_lists = [[bpe for w in sent for bpe in w] for sent in [l1_tokens, l2_tokens]]

		if self.token_type == "bpe":
			l1_b2w_map = []
			for i, wlist in enumerate(l1_tokens):
				l1_b2w_map += [i for x in wlist]
			l2_b2w_map = []
			for i, wlist in enumerate(l2_tokens):
				l2_b2w_map += [i for x in wlist]

		vectors = self.embed_loader.get_embed_list([src_sent, trg_sent]).cpu().detach().numpy()
		vectors = [vectors[i, :len(bpe_lists[i])] for i in [0, 1]]

		if self.token_type == "word":
			vectors = self.average_embeds_over_words(vectors, [l1_tokens, l2_tokens])

		all_mats = {}
		sim = self.get_similarity(vectors[0], vectors[1])
		sim = self.apply_distortion(sim, self.distortion)

		all_mats["fwd"], all_mats["rev"] = self.get_alignment_matrix(sim)
		all_mats["inter"] = all_mats["fwd"] * all_mats["rev"]
		if "mwmf" in self.matching_methods:
			all_mats["mwmf"] = self.get_max_weight_match(sim)
		if "itermax" in self.matching_methods:
			all_mats["itermax"] = self.iter_max(sim)

		aligns = {x: set() for x in self.matching_methods}
		for i in range(len(vectors[0])):
			for j in range(len(vectors[1])):
				for ext in self.matching_methods:
					if all_mats[ext][i, j] > 0:
						if self.token_type == "bpe":
							aligns[ext].add((l1_b2w_map[i], l2_b2w_map[j]))
						else:
							aligns[ext].add((i, j))
		for ext in aligns:
			aligns[ext] = sorted(aligns[ext])
		return aligns

In [30]:
# making an instance of our model.
# You can specify the embedding model and all alignment settings in the constructor.
#myaligner = SentenceAligner(model="xlm-roberta-base", token_type="bpe", matching_methods="mai") 
#myaligner = SentenceAligner(model="bert-base-multilingual-uncased", token_type="bpe", matching_methods="mai") 
#myaligner = SentenceAligner(model="roberta-base", token_type="bpe", matching_methods="mai") 
myaligner = SentenceAligner(model="xlm-roberta-large", token_type="bpe", matching_methods="mai") 

Downloading config.json:   0%|          | 0.00/616 [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/2.09G [00:00<?, ?B/s]

Some weights of the model checkpoint at xlm-roberta-large were not used when initializing XLMRobertaModel: ['lm_head.dense.weight', 'lm_head.dense.bias', 'lm_head.layer_norm.bias', 'lm_head.decoder.weight', 'lm_head.bias', 'lm_head.layer_norm.weight']
- This IS expected if you are initializing XLMRobertaModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing XLMRobertaModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Downloading sentencepiece.bpe.model:   0%|          | 0.00/4.83M [00:00<?, ?B/s]

Downloading tokenizer.json:   0%|          | 0.00/8.68M [00:00<?, ?B/s]

2022-07-28 13:57:48,940 - __main__ - INFO - Initialized the EmbeddingLoader with model: xlm-roberta-large
2022-07-28 13:57:48,940 - __main__ - INFO - Initialized the EmbeddingLoader with model: xlm-roberta-large
2022-07-28 13:57:48,940 - __main__ - INFO - Initialized the EmbeddingLoader with model: xlm-roberta-large
2022-07-28 13:57:48,940 - __main__ - INFO - Initialized the EmbeddingLoader with model: xlm-roberta-large


In [26]:
# The source and target sentences should be tokenized to words.
src_sentence = ["varsh","1985","86","men","tatkalin","kalyan","ministry","ko","mahila","and","bal","development"] 
trg_sentence = ["in","the","varsh","1985","86","the","erstwhile","mntralay","of"]

In [27]:
# The output is a dictionary with different matching methods.
# Each method has a list of pairs indicating the indexes of aligned words (The alignments are zero-indexed).
alignments = myaligner.get_word_aligns(src_sentence, trg_sentence)

for i in list(enumerate(src_sentence)):
  print(i)

(0, 'varsh')
(1, '1985')
(2, '86')
(3, 'men')
(4, 'tatkalin')
(5, 'kalyan')
(6, 'ministry')
(7, 'ko')
(8, 'mahila')
(9, 'and')
(10, 'bal')
(11, 'development')


In [28]:
for i in list(enumerate(trg_sentence)):
  print(i)

(0, 'in')
(1, 'the')
(2, 'varsh')
(3, '1985')
(4, '86')
(5, 'the')
(6, 'erstwhile')
(7, 'mntralay')
(8, 'of')


In [29]:
for matching_method in alignments:
    print(matching_method, ":", alignments[matching_method])

mwmf : [(0, 0), (0, 2), (1, 3), (2, 4), (3, 5), (4, 2), (4, 6), (4, 7), (5, 7), (6, 7), (8, 6), (9, 8), (10, 1), (11, 6)]
inter : [(0, 0), (0, 2), (1, 3)]
itermax : [(0, 0), (0, 2), (1, 3), (2, 4)]
