The original code uses the following iterator: ``` #for book_n in range(1,38): ## for each book ``` to navigate through 37 different books. These files are generated using the step 4 notebook. 

The code comes from a partially modified version of the SimAlign script available here: https://github.com/cisnlp/simalign/blob/master/simalign/simalign.py

The UGARIT/grc-alignment model (https://huggingface.co/UGARIT/grc-alignment), a XML-ROBERTa-based language model, fine-tuned for the automatic alignment of multilingual texts at the word level.

The projection consists in the transfer of the loc_id from the English token to the Latin token. IOB labels are not projected since the position of the tokens can differ between the English and the Latin sentences.

In [1]:
import pandas as pd
import re
import torch
import os
import logging
import pandas as pd
import numpy as np
from scipy.stats import entropy
from scipy.sparse import csr_matrix
from sklearn.preprocessing import normalize
from sklearn.metrics.pairwise import cosine_similarity

In [2]:
# from tqdm import tqdm
from transformers import AutoTokenizer, AutoModelForMaskedLM, XLMRobertaModel, XLMRobertaTokenizer, AutoModel, AutoConfig

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
model = "UGARIT/grc-alignment"
device=torch.device('cpu')
layer: int=8
distortion: float = 0.0

config = AutoConfig.from_pretrained(model, output_hidden_states=True)
emb_model = AutoModelForMaskedLM.from_pretrained(model, config=config)
emb_model.eval()
emb_model.to(device)


Some weights of the model checkpoint at UGARIT/grc-alignment were not used when initializing XLMRobertaForMaskedLM: ['psi_cls.bias', 'psi_cls.decoder.weight', 'psi_cls.transform.bias', 'psi_cls.transform.weight']
- This IS expected if you are initializing XLMRobertaForMaskedLM from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing XLMRobertaForMaskedLM from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


XLMRobertaForMaskedLM(
  (roberta): XLMRobertaModel(
    (embeddings): XLMRobertaEmbeddings(
      (word_embeddings): Embedding(250002, 768, padding_idx=1)
      (position_embeddings): Embedding(514, 768, padding_idx=1)
      (token_type_embeddings): Embedding(1, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): XLMRobertaEncoder(
      (layer): ModuleList(
        (0-11): 12 x XLMRobertaLayer(
          (attention): XLMRobertaAttention(
            (self): XLMRobertaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): XLMRobertaSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
  

In [4]:
tokenizer = AutoTokenizer.from_pretrained(model, use_fast=False)

In [5]:
## define the function to get the word embeddings
def get_embed_list(sent_batch) -> torch.Tensor:
        if emb_model is not None:
            with torch.no_grad():
                if not isinstance(sent_batch[0], str):
                    inputs = tokenizer(sent_batch, is_split_into_words=True, padding=True, truncation=True, return_tensors="pt") ## tokenize the sentence
                else:
                    inputs = tokenizer(sent_batch, is_split_into_words=False, padding=True, truncation=True, return_tensors="pt")
                hidden = emb_model(**inputs.to(device))["hidden_states"]  ## create the embeddings
                if layer >= len(hidden):
                    raise ValueError(f"Specified to take embeddings from layer {layer}, but model has only {len(hidden)} layers.")
                outputs = hidden[layer]
                return outputs[:, 1:-1, :]
        else:
            return None 

In [6]:
## define the function to get the similarity matrix
def get_similarity(X: np.ndarray, Y: np.ndarray) -> np.ndarray:
    return (cosine_similarity(X, Y) + 1.0) / 2.0

In [7]:
## define the function to calculate the itermax matching method
def iter_max(sim_matrix: np.ndarray, max_count: int=2) -> np.ndarray:
    alpha_ratio = 0.9
    m, n = sim_matrix.shape
    forward = np.eye(n)[sim_matrix.argmax(axis=1)]  # m x n
    backward = np.eye(m)[sim_matrix.argmax(axis=0)]  # n x m
    inter = forward * backward.transpose()

    if min(m, n) <= 2:
        return inter

    new_inter = np.zeros((m, n))
    count = 1
    while count < max_count:
        mask_x = 1.0 - np.tile(inter.sum(1)[:, np.newaxis], (1, n)).clip(0.0, 1.0)
        mask_y = 1.0 - np.tile(inter.sum(0)[np.newaxis, :], (m, 1)).clip(0.0, 1.0)
        mask = ((alpha_ratio * mask_x) + (alpha_ratio * mask_y)).clip(0.0, 1.0)
        mask_zeros = 1.0 - ((1.0 - mask_x) * (1.0 - mask_y))
        if mask_x.sum() < 1.0 or mask_y.sum() < 1.0:
            mask *= 0.0
            mask_zeros *= 0.0

        new_sim = sim_matrix * mask
        fwd = np.eye(n)[new_sim.argmax(axis=1)] * mask_zeros
        bac = np.eye(m)[new_sim.argmax(axis=0)].transpose() * mask_zeros
        new_inter = fwd * bac

        if np.array_equal(inter + new_inter, inter):
                break
        inter = inter + new_inter
        count += 1
        
    return inter

In [8]:
## open the CSV file containing the eng text
NH_eng_sentences = pd.read_csv(r"data/intermediate/NH_eng_groupedsentences.csv")
## open the CSV file containing the lat text
NH_lat_sentences = pd.read_csv(r"data/intermediate/NH_lat_groupedsentences.csv", dtype={'reference': 'str'})
NH_lat_sentences['loc_ent_id'] = '-' ## the column will contain the id of the LOC

In [9]:
path = r"data/intermediate/alignment"
#pattern = '\[(\d+(?:,\s*\d+)*)\](?:,\s*\[(\d+(?:,\s*\d+)*)\])?.*$'
#convert the pattern to a raw string, more clear
pattern = r'\[(\d+(?:,\s*\d+)*)\](?:,\s*\[(\d+(?:,\s*\d+)*)\])?.*$'

In [12]:
#you need to have the vecalign utility ran before you can do this step!

#for book_n in range(1,38): ## for each book
for book_n, df in NH_eng_sentences.groupby('book'): 
    print(path, book_n)
    #TODO: use these alignment files: C:\Workdir\MyApps\Python_VENV\geoparsing_naturalhistory-main\data\intermediate\alignments
    file_name = path+str(book_n)+'.txt' 
    
    filter_book_eng = df #NH_eng_sentences[NH_eng_sentences['book'] == book_n] ## select the tokens in the eng book
    filter_book_lat = NH_lat_sentences[NH_lat_sentences['book'] == book_n] ## select the tokens in the lat book//

    # print(file_name)
    with open(file_name, "r") as file: ## open the file containing the sentence alignments for the eng and lat book
        
        for line in file: ## for each alignment
            match1 = re.search(pattern,line) ## get the index pair of the alignment
            index_eng = match1[1] ## get the index(es) of the english sentence(s)
            index_lat = match1[2] ## get the index(es) of the latin sentence(s)
            
            if index_eng and index_lat: ## if an alignment was found
                index_eng = index_eng.split(',') ## if there is more than one index, split them
                index_eng = [int(i) for i in index_eng]
                filter_sentence_eng = filter_book_eng[filter_book_eng['sentence'].isin(index_eng)] ## select all the tokens in the corresponding sentence(s)
                filter_sentence_eng = filter_sentence_eng.reset_index()
                                                       
                src_sent = []
                indexes_to_project = []
                loc_ent_ids = []
                
                for i, engtoken in enumerate(filter_sentence_eng['token']): ## for each token
                    src_sent.append(engtoken) ## list the tokens of the source eng sentence
                    loc_ent_ids.append(filter_sentence_eng['loc_ent_id'][i]) ## list the loc_ent_id for each token
                    
                    if 'LOC' in filter_sentence_eng['flair_ner'][i]: ## if the token is a LOC entity
                        indexes_to_project.append(i) ## keep the index
                
                if len(indexes_to_project) > 0: ## if the sentence contains a LOC entity
                                        
                    index_lat = index_lat.split(',')
                    index_lat = [int(i) for i in index_lat]
                    filter_sentence_lat = filter_book_lat[filter_book_lat['sentence'].isin(index_lat)]
                    filter_sentence_lat = filter_sentence_lat.reset_index()
                    
                    trg_sent = []
                    indexes_to_update = []
                    
                    for i, lattoken in enumerate(filter_sentence_lat['token']):
                        trg_sent.append(lattoken)  ## list the tokens of the target lat sentence
                        indexes_to_update.append(filter_sentence_lat['level_0'][i]) ## index of the token in the NH_lat_sentences dataframe
                    
                    ## perform the word alignment (code revisited from SimAlign)
                    
                    l1_tokens = [tokenizer.tokenize(word) for word in src_sent] 
                    l2_tokens = [tokenizer.tokenize(word) for word in trg_sent] 
                    bpe_lists = [[bpe for w in sent for bpe in w] for sent in [l1_tokens, l2_tokens]]
    
                    l1_b2w_map = []
                    for i, wlist in enumerate(l1_tokens):
                        l1_b2w_map += [i for x in wlist]
    
                    l2_b2w_map = []
                    for i, wlist in enumerate(l2_tokens):
                        l2_b2w_map += [i for x in wlist]
                    
                    vectors = get_embed_list([src_sent, trg_sent]).cpu().detach().numpy()
                    vectors = [vectors[i, :len(bpe_lists[i])] for i in [0, 1]]

                    all_mats = {} ## create a dictionary
                    sim = get_similarity(vectors[0], vectors[1]) ## get the cosine similarity
                    all_mats["itermax"] = iter_max(sim) ## generate a key-value
                    aligns = {} ## create a dictionary
                    aligns['itermax'] = set()

                    for i in range(len(vectors[0])):
                        for j in range(len(vectors[1])):
                            if all_mats['itermax'][i, j] > 0:
                                aligns['itermax'].add((l1_b2w_map[i], l2_b2w_map[j]))
                
                    aligns['itermax'] = sorted(aligns['itermax']) ## word pairs
                                        
                    for index_to_project in indexes_to_project: ## for each index to project
                        
                        projection = 0
                        label_to_project = flair_annotation[index_to_project]
                        ent_loc_id = loc_ent_ids[index_to_project]
                        topostext_id = topostext_ids[index_to_project]
                        
                        ## project the LOC ids
                        
                        for wordpair in aligns['itermax']: ## for each word alignment in the sentences
                            i_engword = int(wordpair[0]) ## get the index of the eng word
                            if index_to_project == i_engword: ## if the index was aligned
                                i_latword = int(wordpair[1]) ## get the index of the lat word
                                
                                NH_lat_sentences['loc_ent_id'][indexes_to_update[i_latword]] = ent_loc_id ## indexes_to_update[i_latword] gets the index of the token in the NH_lat_sentences dataframe

data/intermediate/alignment 1


FileNotFoundError: [Errno 2] No such file or directory: 'data/intermediate/alignment1.txt'

Assign the IOB labels B-LOC and I-LOC to the Latin tokens containing the projected loc_ent_ids.

In [None]:
NH_lat_loc_ent =  NH_lat_sentences[NH_lat_sentences['loc_ent_id'] != '-'] ## select all the LOC tokens
NH_lat_loc_ent.reset_index()
max_value = NH_lat_loc_ent['loc_ent_id'].max() ## get the maximum numeric value of the loc ids

for n in range(0, max_value+1): ## for each loc_ent_id
    
    filter_loc_ent = NH_lat_loc_ent[NH_lat_loc_ent['loc_ent_id'] == n] ## select all the tokens with the same loc_ent_id
    filter_loc_ent.reset_index(inplace=True)
    for i,flair_ner in enumerate(filter_loc_ent['flair_ner']): ## for each token
        index_to_update = filter_loc_ent['level_0'][i]
        if i == 0: ## for the first token
            NH_lat_sentences['flair_ner'][index_to_update] = 'B-LOC' ## assign B-LOC
        else : NH_lat_sentences['flair_ner'][index_to_update] = 'I-LOC' ## assign I-LOC

In [None]:
NH_lat_sentences.to_csv('data/intermediate/NH_lat_projected.csv', index=False)