In [11]:
# coding=utf-8

import os
from typing import Dict, List, Tuple, Union

import numpy as np
from scipy.stats import entropy
from scipy.sparse import csr_matrix
from sklearn.preprocessing import normalize
from sklearn.metrics.pairwise import cosine_similarity
try:
    import networkx as nx
    from networkx.algorithms.bipartite.matrix import from_biadjacency_matrix
except ImportError:
    nx = None
import torch
from transformers import BertModel, BertTokenizer, XLMModel, XLMTokenizer, RobertaModel, RobertaTokenizer, XLMRobertaModel, XLMRobertaTokenizer, AutoConfig, AutoModel, AutoTokenizer

    

class EmbeddingLoader(object):
    def __init__(self, model: str="bert-base-multilingual-cased", model_path: str = "bert-base-multilingual-cased", device=torch.device('cpu'), layer: int=8):
        TR_Models = {
            'bert-base-uncased': (BertModel, BertTokenizer),
            'bert-base-multilingual-cased': (BertModel, BertTokenizer),
            'bert-base-multilingual-uncased': (BertModel, BertTokenizer),
            'xlm-mlm-100-1280': (XLMModel, XLMTokenizer),
            'roberta-base': (RobertaModel, RobertaTokenizer),
            'xlm-roberta-base': (XLMRobertaModel, XLMRobertaTokenizer),
            'xlm-roberta-large': (XLMRobertaModel, XLMRobertaTokenizer),
        }

        self.model = model
        self.device = device
        self.layer = layer
        self.emb_model = None
        self.tokenizer = None

        model_class, tokenizer_class = TR_Models[model]
        self.emb_model = model_class.from_pretrained(model_path, output_hidden_states=True)
        self.emb_model.eval()
        self.emb_model.to(self.device)
        self.tokenizer = tokenizer_class.from_pretrained(model_path)
        print("Initialized the EmbeddingLoader with model: {}".format(self.model))
    
    def get_embed_list(self, sent_batch: List[List[str]]) -> torch.Tensor:
        if self.emb_model is not None:
            with torch.no_grad():
                if not isinstance(sent_batch[0], str):
                    inputs = self.tokenizer(sent_batch, is_split_into_words=True, padding=True, truncation=True, return_tensors="pt")
                else:
                    inputs = self.tokenizer(sent_batch, is_split_into_words=False, padding=True, truncation=True, return_tensors="pt")
                outputs = self.emb_model(**inputs.to(self.device))[2][self.layer]

                return outputs[:, 1:-1, :]
        else:
            return None



In [40]:

class SentenceAligner(object):
    def __init__(self, model: str = "bert", model_path: str = "bert", token_type: str = "bpe", distortion: float = 0.0, matching_methods: str = "mai", device: str = "cpu", layer: int = 8):
        model_names = {
            "bert": "bert-base-multilingual-cased",
            "xlmr": "xlm-roberta-base"
            }
        all_matching_methods = {"a": "inter", "m": "mwmf", "i": "itermax", "f": "fwd", "r": "rev"}

        self.model = model
        if model in model_names:
            self.model = model_names[model]
        self.token_type = token_type
        self.distortion = distortion
        self.matching_methods = [all_matching_methods[m] for m in matching_methods]
        self.device = torch.device(device)

        self.embed_loader = EmbeddingLoader(model=self.model, model_path=model_path, device=self.device, layer=layer)

    @staticmethod
    def get_max_weight_match(sim: np.ndarray) -> np.ndarray:
        if nx is None:
            raise ValueError("networkx must be installed to use match algorithm.")
        def permute(edge):
            if edge[0] < sim.shape[0]:
                return edge[0], edge[1] - sim.shape[0]
            else:
                return edge[1], edge[0] - sim.shape[0]
        G = from_biadjacency_matrix(csr_matrix(sim))
        matching = nx.max_weight_matching(G, maxcardinality=True)
        matching = [permute(x) for x in matching]
        matching = sorted(matching, key=lambda x: x[0])
        res_matrix = np.zeros_like(sim)
        for edge in matching:
            res_matrix[edge[0], edge[1]] = 1
        return res_matrix

    @staticmethod
    def get_similarity(X: np.ndarray, Y: np.ndarray) -> np.ndarray:
        return (cosine_similarity(X, Y) + 1.0) / 2.0

    @staticmethod
    def average_embeds_over_words(bpe_vectors: np.ndarray, word_tokens_pair: List[List[str]]) -> List[np.array]:
        w2b_map = []
        cnt = 0
        w2b_map.append([])
        for wlist in word_tokens_pair[0]:
            w2b_map[0].append([])
            for x in wlist:
                w2b_map[0][-1].append(cnt)
                cnt += 1
        cnt = 0
        w2b_map.append([])
        for wlist in word_tokens_pair[1]:
            w2b_map[1].append([])
            for x in wlist:
                w2b_map[1][-1].append(cnt)
                cnt += 1

        new_vectors = []
        for l_id in range(2):
            w_vector = []
            for word_set in w2b_map[l_id]:
                w_vector.append(bpe_vectors[l_id][word_set].mean(0))
            new_vectors.append(np.array(w_vector))
        return new_vectors

    @staticmethod
    def get_alignment_matrix(sim_matrix: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        m, n = sim_matrix.shape
        forward = np.eye(n)[sim_matrix.argmax(axis=1)]  # m x n
        backward = np.eye(m)[sim_matrix.argmax(axis=0)]  # n x m
        return forward, backward.transpose()

    @staticmethod
    def apply_distortion(sim_matrix: np.ndarray, ratio: float = 0.5) -> np.ndarray:
        shape = sim_matrix.shape
        if (shape[0] < 2 or shape[1] < 2) or ratio == 0.0:
            return sim_matrix

        pos_x = np.array([[y / float(shape[1] - 1) for y in range(shape[1])] for x in range(shape[0])])
        pos_y = np.array([[x / float(shape[0] - 1) for x in range(shape[0])] for y in range(shape[1])])
        distortion_mask = 1.0 - ((pos_x - np.transpose(pos_y)) ** 2) * ratio

        return np.multiply(sim_matrix, distortion_mask)

    @staticmethod
    def iter_max(sim_matrix: np.ndarray, max_count: int=2) -> np.ndarray:
        alpha_ratio = 0.9
        m, n = sim_matrix.shape
        forward = np.eye(n)[sim_matrix.argmax(axis=1)]  # m x n
        backward = np.eye(m)[sim_matrix.argmax(axis=0)]  # n x m
        inter = forward * backward.transpose()

        if min(m, n) <= 2:
            return inter

        new_inter = np.zeros((m, n))
        count = 1
        while count < max_count:
            mask_x = 1.0 - np.tile(inter.sum(1)[:, np.newaxis], (1, n)).clip(0.0, 1.0)
            mask_y = 1.0 - np.tile(inter.sum(0)[np.newaxis, :], (m, 1)).clip(0.0, 1.0)
            mask = ((alpha_ratio * mask_x) + (alpha_ratio * mask_y)).clip(0.0, 1.0)
            mask_zeros = 1.0 - ((1.0 - mask_x) * (1.0 - mask_y))
            if mask_x.sum() < 1.0 or mask_y.sum() < 1.0:
                mask *= 0.0
                mask_zeros *= 0.0

            new_sim = sim_matrix * mask
            fwd = np.eye(n)[new_sim.argmax(axis=1)] * mask_zeros
            bac = np.eye(m)[new_sim.argmax(axis=0)].transpose() * mask_zeros
            new_inter = fwd * bac

            if np.array_equal(inter + new_inter, inter):
                break
            inter = inter + new_inter
            count += 1
        return inter

    def get_word_aligns(self, src_sent: Union[str, List[str]], trg_sent: Union[str, List[str]]) -> Dict[str, List]:
        if isinstance(src_sent, str):
            src_sent = src_sent.split()
        if isinstance(trg_sent, str):
            trg_sent = trg_sent.split()
        l1_tokens = [self.embed_loader.tokenizer.tokenize(word) for word in src_sent]
        l2_tokens = [self.embed_loader.tokenizer.tokenize(word) for word in trg_sent]
        bpe_lists = [[bpe for w in sent for bpe in w] for sent in [l1_tokens, l2_tokens]]

        if self.token_type == "bpe":
            l1_b2w_map = []
            for i, wlist in enumerate(l1_tokens):
                l1_b2w_map += [i for x in wlist]
            l2_b2w_map = []
            for i, wlist in enumerate(l2_tokens):
                l2_b2w_map += [i for x in wlist]

        vectors = self.embed_loader.get_embed_list([src_sent, trg_sent]).cpu().detach().numpy()
        vectors = [vectors[i, :len(bpe_lists[i])] for i in [0, 1]]

        if self.token_type == "word":
            vectors = self.average_embeds_over_words(vectors, [l1_tokens, l2_tokens])

        all_mats = {}
        sim = self.get_similarity(vectors[0], vectors[1])
        sim = self.apply_distortion(sim, self.distortion)

        all_mats["fwd"], all_mats["rev"] = self.get_alignment_matrix(sim)
        all_mats["inter"] = all_mats["fwd"] * all_mats["rev"]
        if "mwmf" in self.matching_methods:
            all_mats["mwmf"] = self.get_max_weight_match(sim)
        if "itermax" in self.matching_methods:
            all_mats["itermax"] = self.iter_max(sim)

        aligns = {x: set() for x in self.matching_methods}
        for i in range(len(vectors[0])):
            for j in range(len(vectors[1])):
                for ext in self.matching_methods:
                    if all_mats[ext][i, j] > 0:
                        if self.token_type == "bpe":
                            #aligns[ext].add((l1_b2w_map[i], l2_b2w_map[j]))
                            aligns[ext].add(str(l1_b2w_map[i]) + "-" + str(l2_b2w_map[j]))
                        else:
                            aligns[ext].add(str(i) + "-" + str(j))
        for ext in aligns:
            aligns[ext] = sorted(aligns[ext])
            aligns[ext] = " ".join(aligns[ext])
        return aligns


In [41]:
source_sentence = "Sir Nils Olav III. was knighted by the norwegian king ."
target_sentence = "Nils Olav der Dritte wurde vom norwegischen König zum Ritter geschlagen ."

device ="cpu"
model = "bert-base-multilingual-cased"
model_path = "/Users/lisan/code/bert-base-multilingual-cased"



model = SentenceAligner(model=model, model_path=model_path, device=device)
result = model.get_word_aligns(source_sentence, target_sentence)
print(result)


Some weights of the model checkpoint at /Users/lisan/code/bert-base-multilingual-cased were not used when initializing BertModel: ['cls.seq_relationship.bias', 'cls.predictions.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.decoder.weight', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.LayerNorm.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to 

Initialized the EmbeddingLoader with model: bert-base-multilingual-cased
{'mwmf': '0-3 1-0 10-11 2-1 3-2 3-3 4-4 5-10 5-8 5-9 6-5 8-6 9-7', 'inter': '1-0 10-11 2-1 3-3 4-4 5-10 5-9 6-5 8-6 9-7', 'itermax': '0-0 1-0 10-11 2-1 3-3 4-4 5-10 5-8 5-9 6-5 8-6 9-7'}


In [49]:

root = "/Users/lisan/code/AlignMan/examples/"
corpus = root + "ja-cn.gold"
output = root + "ja-cn.simalign"
argmax_align_output = open(root + "align.argmax", "w")
itermax_align_output = open(root + "align.itermax", "w")
match_align_output = open(root + "align.match", "w")

device ="cpu"
model = "bert-base-multilingual-cased"
model_path = "/Users/lisan/code/bert-base-multilingual-cased"
distortion = 0.0
null_align = 1.0


model = SentenceAligner(model=model, model_path=model_path, device=device)


src_sents = [line.split("\t")[0].strip() for line in open(corpus)]
tgt_sents = [line.split("\t")[1].strip() for line in open(corpus)]

index = 0
for src, tgt in zip(src_sents, tgt_sents):
    if index % 1 == 0: print(index)
    result = model.get_word_aligns(src, tgt)
    argmax_align_output.write(src + "\t" + tgt + "\t" + result["inter"] + "\n")
    index += 1
    
        
argmax_align_output.close()
        

Some weights of the model checkpoint at /Users/lisan/code/bert-base-multilingual-cased were not used when initializing BertModel: ['cls.seq_relationship.bias', 'cls.predictions.bias', 'cls.predictions.transform.dense.weight', 'cls.predictions.decoder.weight', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.transform.LayerNorm.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to 

Initialized the EmbeddingLoader with model: bert-base-multilingual-cased
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199


In [51]:

def load_gold(g_path):
    gold_f = open(g_path, "r").readlines()
    pros = {}
    surs = {}
    all_count = 0.
    surs_count = 0.
    
    for i, line in enumerate(gold_f):
        line = line.strip().split("\t")[-1].split()

        pros[i] = set([x.replace("p", "-") for x in line]) # 以 p 或者 - 分开
        surs[i] = set([x for x in line if "p" not in x])

        all_count += len(pros[i])
        surs_count += len(surs[i])

    return pros, surs, surs_count

def calc_score(input_path, probs, surs, surs_count):
    total_hit = 0.
    p_hit = 0.
    s_hit = 0.
    target_f = open(input_path, "r").readlines()

    for i, line in enumerate(target_f):
        line = line.strip().split("\t")[-1].split()

        if i not in probs: continue
        
        if len(line[0].split("-")) > 2:
            line = ["-".join(x.split("-")[:2]) for x in line]

        p_hit += len(set(line) & set(probs[i]))
        s_hit += len(set(line) & set(surs[i]))
        total_hit += len(set(line))


    y_prec = round(p_hit / max(total_hit, 1.), 3)
    y_rec = round(s_hit / max(surs_count, 1.), 3)
    y_f1 = round(2. * y_prec * y_rec / max((y_prec + y_rec), 0.01), 3)
    aer = round(1 - (s_hit + p_hit) / (total_hit + surs_count), 3)

    return y_prec, y_rec, y_f1, aer


gold_path = "/Users/lisan/code/AlignMan/examples/ja-cn.gold"
fastalign_path = "/Users/lisan/code/AlignMan/examples/ja-cn.fastalign"
probs, surs, surs_count = load_gold(gold_path)
y_prec, y_rec, y_f1, aer = calc_score(fastalign_path, probs, surs, surs_count)

print("Prec: {}\tRec: {}\tF1: {}\tAER: {}".format(y_prec, y_rec, y_f1, aer))

fastalign_path = "/Users/lisan/code/AlignMan/examples/align.argmax"
probs, surs, surs_count = load_gold(gold_path)
y_prec, y_rec, y_f1, aer = calc_score(fastalign_path, probs, surs, surs_count)

print("Prec: {}\tRec: {}\tF1: {}\tAER: {}".format(y_prec, y_rec, y_f1, aer))

Prec: 0.713	Rec: 0.748	F1: 0.73	AER: 0.27
Prec: 0.896	Rec: 0.71	F1: 0.792	AER: 0.208


In [None]:
import matplotlib.pyplot as plt
import numpy as np
from typing import List, Text, Tuple


def line2matrix(line: Text, n: int, m: int) -> Tuple[np.ndarray, np.ndarray]:
    '''
    converts alignemnt given in the format "0-1 3p4 5-6" to alignment matrices
    n, m: maximum length of the involved sentences (i.e., dimensions of the alignemnt matrices)
    '''
    def convert(i, j):
        i, j = int(i), int(j)
        if i >= n or j >= m:
            raise ValueError("Error in Gold Standard?")
        return i, j
    possibles = np.zeros((n, m))
    sures = np.zeros((n, m))
    for elem in line.split(" "):
        if "p" in elem:
            i, j = convert(*elem.split("p"))
            possibles[i, j] = 1
        elif "-" in elem:
            i, j = convert(*elem.split("-"))
            possibles[i, j] = 1
            sures[i, j] = 1
    return sures, possibles