In [1]:
##### UNCOMMENT #####

# # Requirements
# !pip install transformers
# !pip install sentence-transformers
# !pip install sklearn

In [2]:
import os
import numpy as np
import re
import matplotlib.pyplot as plt

In [3]:
path = '../Semantic_chunk_identification' ##### MODIFY #####
os.chdir(path)

##### UNCOMMENT #####
# from google.colab import drive
# drive.mount('drive', force_remount=True)

# Chunking

## Getting the data

In [4]:
DATASET = "headlines" # or "images" or "answers-students"

if (DATASET == "headlines" or DATASET == "images") :
    ds1_chunked = f'train_2015_10_22.utf-8/STSint.input.{DATASET}.sent1.chunk.txt'  ##### MODIFY #####
    ds2_chunked = f'train_2015_10_22.utf-8/STSint.input.{DATASET}.sent2.chunk.txt'  ##### MODIFY #####
    ds1 = f'train_2015_10_22.utf-8/STSint.input.{DATASET}.sent1.txt'  ##### MODIFY #####
    ds2 = f'train_2015_10_22.utf-8/STSint.input.{DATASET}.sent2.txt'  ##### MODIFY #####
else :
    ds1_chunked = f'train_students_answers_2015_10_27.utf-8/STSint.input.{DATASET}.sent1.chunk.txt'  ##### MODIFY #####
    ds2_chunked = f'train_students_answers_2015_10_27.utf-8/STSint.input.{DATASET}.sent2.chunk.txt'  ##### MODIFY #####
    ds1 = f'train_students_answers_2015_10_27.utf-8/STSint.input.{DATASET}.sent1.txt'  ##### MODIFY #####
    ds2 = f'train_students_answers_2015_10_27.utf-8/STSint.input.{DATASET}.sent2.txt'  ##### MODIFY #####

In [5]:
# Put the sentences in lists

ds1_lines = [line.strip() for line in open(ds1)]
ds2_lines = [line.strip() for line in open(ds2)]
ds1_lines_chunked = [line.strip() for line in open(ds1_chunked)]
ds2_lines_chunked = [line.strip() for line in open(ds2_chunked)]

In [6]:
# One example

idx = 3
print(ds1_lines[idx])
print(ds1_lines_chunked[idx])

Syria peace plan conditions " unacceptable , " opposition says
[ Syria peace plan conditions ] [ " ] [ unacceptable ] [ , ] [ " ] [ opposition ] [ says ]


Train test split

In [7]:
train_size = int(3/4*len(ds1_lines))

ds1_lines_train = ds1_lines[:train_size]
ds1_lines_gold_train = ds1_lines_chunked[:train_size]
ds1_lines_test = ds1_lines[train_size:]
ds1_lines_gold_test = ds1_lines_chunked[train_size:]

ds2_lines_train = ds2_lines[:train_size]
ds2_lines_gold_train = ds2_lines_chunked[:train_size]
ds2_lines_test = ds2_lines[train_size:]
ds2_lines_gold_test = ds2_lines_chunked[train_size:]

In [8]:
print('Train size:', len(ds1_lines_train))
print('Test size:', len(ds1_lines_test))

Train size: 567
Test size: 189


## Chunking with transformers

Nous utilisons un pipeline qui tokenise une phrase en entrée, utilise un modèle BERT sur les tokens, et renvoie le chunking de la phrase d'entrée

In [9]:
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity

tokenizer = AutoTokenizer.from_pretrained("vblagoje/bert-english-uncased-finetuned-chunk")
model = AutoModelForTokenClassification.from_pretrained("vblagoje/bert-english-uncased-finetuned-chunk")
pipe = pipeline('ner', grouped_entities=True, model=model, tokenizer=tokenizer)



**Méthode 2 pour obtenir les phrases chunkées:** On récupère directement les phrases chunkées en sortie du pipeline.

Cependant, ces dernières sont au format minuscule.

In [10]:
# Cette fonction retourne une phrase chunkée par le modèle mais EN MINUSCULES
def compute_chunk_list(pipe, sentence):
    chunk_dicts = pipe(sentence)
    chunk_list = []
    for chunk_dict in chunk_dicts:
        entity = chunk_dict['word']
        entity_group = chunk_dict['entity_group']
        chunk_list.append([entity, entity_group])
    return chunk_list

def get_gold_chunk_list(sentence): # EN MINUSCULES
    return sentence.lower().strip("[ ").strip(" ]").split(" ] [ ")

In [11]:
chunk_dicts = pipe('Syria peace plan conditions " unacceptable , " opposition says')
chunk_dicts[0]

{'entity_group': 'NP',
 'score': 0.9940616,
 'word': 'syria peace plan conditions',
 'start': 0,
 'end': 27}

**Méthode 1 pour obtenir les phrases chunkées:** On récupère les indexs de début et de fin de chunck pour récupérer les segments de phrase correspondant dans la phrase originale.

In [12]:
# Cette fonction retourne une phrase chunkée par le modèle COMME IL FAUT
def compute_chunk_list2(pipe, sentence):
    chunk_dicts = pipe(sentence)
    chunk_list = []
    for chunk_dict in chunk_dicts:
        entity = sentence[chunk_dict['start']:chunk_dict['end']]
        entity_group = chunk_dict['entity_group']
        chunk_list.append([entity, entity_group])
    return chunk_list

def get_gold_chunk_list2(sentence): # SANS MINUSCULES
    return sentence.strip("[ ").strip(" ]").split(" ] [ ")

In [13]:
ds1_train_chunks2 = [compute_chunk_list2(pipe, sentence) for sentence in ds1_lines_train]

In [14]:
ds1_train_chunks2[0]

[['Former Nazi death camp guard Demjanjuk', 'NP'],
 ['dead', 'ADJP'],
 ['at', 'PP'],
 ['91', 'NP']]

In [15]:
ds1_chunks = [compute_chunk_list2(pipe, sentence) for sentence in ds1_lines]

In [16]:
ds2_chunks = [compute_chunk_list2(pipe, sentence) for sentence in ds2_lines]

In [17]:
ds1_chunks[0]

[['Former Nazi death camp guard Demjanjuk', 'NP'],
 ['dead', 'ADJP'],
 ['at', 'PP'],
 ['91', 'NP']]

## Evaluation

On compare le chunking du modèle avec les gold chunks

In [19]:
ds1_gold_chunks = [get_gold_chunk_list2(sentence) for sentence in ds1_lines_chunked]

In [20]:
ds2_gold_chunks = [get_gold_chunk_list2(sentence) for sentence in ds2_chunked]

In [21]:
print(ds1_chunks[0])
print(ds1_gold_chunks[0])

[['Former Nazi death camp guard Demjanjuk', 'NP'], ['dead', 'ADJP'], ['at', 'PP'], ['91', 'NP']]
['Former Nazi death camp guard Demjanjuk', 'dead', 'at 91']


In [24]:
# La fonction de score du chunking cherche si chaque chunk de l'output a un gold chunk correspondant.

def chunking_score(output_chunks, gold_chunks):
    score = 0
    for i in range(len(output_chunks)):
        sent_score = 0
        for chk in output_chunks[i]:
            if chk[0] in gold_chunks[i]:
                sent_score += 1
        score += sent_score/len(gold_chunks[i])
#         if i%10==0:
#             print(output_chunks[i])
#             print(gold_chunks[i])
    return score/len(output_chunks)

In [25]:
score = chunking_score(ds1_chunks, ds1_gold_chunks)

In [26]:
score

0.5635342574826708

In [62]:
score_file = open('chunking_scores.txt', 'a')
score_file.write(f"score on {DATASET}: " + str(score))
score_file.close()

Writing predicted chunking in a .txt file

In [27]:
# Tests
sentence = ds1_train_chunks2[0]
print(sentence)

string = "[ "
for i in range(len(sentence)-1):
    string = string + sentence[i][0] + " ] [ "
string = string + sentence[-1][0] + " ]\n"
print(string)
print(ds1_lines_gold_train[0])

[['Former Nazi death camp guard Demjanjuk', 'NP'], ['dead', 'ADJP'], ['at', 'PP'], ['91', 'NP']]
[ Former Nazi death camp guard Demjanjuk ] [ dead ] [ at ] [ 91 ]

[ Former Nazi death camp guard Demjanjuk ] [ dead ] [ at 91 ]


In [29]:
chunking_file1 = open(f"{DATASET}_sent1_predicted_chunks.txt","w")
chunking_file2 = open(f"{DATASET}_sent2_predicted_chunks.txt","w")

for sentence1, sentence2 in zip(ds1_chunks, ds2_chunks):
    string1 = "[ "
    string2 = "[ "
    for i in range(len(sentence1)-1):
        string1 = string1 + sentence1[i][0] + " ] [ "
    for i in range(len(sentence2)-1):
        string2 = string2 + sentence2[i][0] + " ] [ "
    string1 = string1 + sentence1[-1][0] + " ]\n"
    chunking_file1.write(string1)
    string2 = string2 + sentence2[-1][0] + " ]\n"
    chunking_file2.write(string2)
    
chunking_file1.close()
chunking_file2.close()

# Alignement

### Définition des méthodes

In [30]:
from math import *
from scipy.optimize import linear_sum_assignment

def norm(x):
    norm=0
    for elt in x:
        norm+=elt**2
    norm=sqrt(norm)
    return(norm)

def similarity(x,y):
    '''
    Cosine Similarity
    '''
    sim=np.dot(x, y)
    sim=sim/(norm(x)*norm(y))
    return(sim)

Méthode d'alignement : Hungarian Algorithm (a.k.a. the Kuhn-Munkres algorithm)

In [31]:
def alignment_chunks(sentence_1, sentence_2):
    '''
    Hungarian Algorithm : aligner chunks pour meilleur score possible de phrase 

    Idea for improvement : 
    Normalize chunk similarity by number of tokens in shorter chunk 
    such that it assigned higher scores to pairs of chunks such as physician and general physician.
    '''
    cost=[]
    if len(sentence_2)<len(sentence_1): ##If more rows than columns, not every row needs to be assigned to a column, and vice versa.
        s1, s2 = sentence_2, sentence_1 #Always select shorter sentence as first sentence !
        inv=True
    else:
        s1, s2=sentence_1, sentence_2
        inv=False

    for i in range (len(s1)):
        inter=[]
        for j in range(len(s2)):
            sim=alignment_mots(s1[i], s2[j])        
            inter.append(sim)
        cost.append(inter)
    cost=np.array(cost)

    row_ind, col_ind = linear_sum_assignment(-cost) # - car linear_sum_assignment minimise normalement
    
    #Liste de score de similarité pour chaque alignement optimal de chunks
    sim=cost[row_ind, col_ind]
    sim=min_max_scaler(sim)

    if inv:
        row_ind, col_ind=col_ind, row_ind

    list_couples_chunks_et_score=[]
    for i in range(len(sim)):
        list_couples_chunks_et_score.append((row_ind[i], col_ind[i], sim[i]))

    return(list_couples_chunks_et_score)


def alignment_mots(chunk_1, chunk_2):
    '''
    Hungarian Algorithm sur 2 chunks : aligner mots pour meilleur score possible
    '''
    if len(chunk_2)<len(chunk_1): ##If more rows than columns, not every row needs to be assigned to a column, and vice versa.
        c1, c2 = chunk_2, chunk_1 #Always select shorter sentence as first sentence !
    else:
        c1, c2 = chunk_1, chunk_2
    cost=[]
    for i in range (len(c1)):
        inter=[]
        for j in range(len(c2)):
            inter.append(similarity(c1[i], c2[j]))
        cost.append(inter)
    cost=np.array(cost)

    # print(cost.shape)
    # print(chunk_1, chunk_2)
    # print()

    #col_ind, #col_ind donne alignement des mots pour obtenir meilleur score de similarité possible
    row_ind, col_ind = linear_sum_assignment(-cost) #- car linear_sum_assignment minimise normalement
    
    #score de similarité entre 2 chunks
    sim=cost[row_ind, col_ind].sum()

    return(sim)
 

def min_max_scaler(sim):
    scaled=[]
    a, b = 3, 1 #On remarque que les scores non scalés sont tous entre 1 et 3 #np.max(sim), np.min(sim) 
    
    for elt in sim:
        inter = (elt-b)/(a-b)
        inter = inter*5 #Pour avoir des scores entre 0 et 5
        inter = round(inter) #Pour avoir des scores entiers
        scaled.append(inter)
    return(scaled)

In [32]:
from tensorflow.keras.layers import TextVectorization
import tensorflow as tf

from sentence_transformers import SentenceTransformer
sbert_model = SentenceTransformer('bert-base-nli-mean-tokens')

In [33]:
def create_list_of_embedded_chunks_exploitable_for_alignement(chunked_sentence):
  chunk=''

  list_sentence=[]
  for i in range(len(chunked_sentence)):
    elt = chunked_sentence[i]
    if elt!='[' or elt !=']' or elt!=' ':
      chunk += elt
    if elt==']':
      chunk = removeNoise(chunk)
      list_sentence.append(chunk)
      chunk = ''

  list_sentence_embedded=[]
  for elt in list_sentence:
    inter = elt.split(' ')
    inter_copy=inter[:] #inter without '

    for elt_bis in inter_copy:
      if elt_bis=='':
        inter_copy.remove(elt_bis)

    inter_embedded=[]
    for elt_ter in inter_copy:
      elt = sbert_model.encode(elt_ter)
#       elt = elt.numpy()
      elt = list(elt)
      if len(elt)!=0:
        inter_embedded.append(elt)

    list_sentence_embedded.append(inter_embedded)

  return(list_sentence_embedded)

def removeNoise(line):
    noiseTokens = ['[', ']', ',', '.', ':', ';', '"', "'", "''", "`", "-",]
    for token in noiseTokens:
      line = line.replace(token, "")
    return line

# def my_cosine_similarity(chunk1, liste_chunck_comparables):
#   nombre_chunk_comparables = len(liste_chunck_comparables)
#   sentences = [chunk1[0]]
#   sbert_model = SentenceTransformer('bert-base-nli-mean-tokens')
#   for i in range(nombre_chunk_comparables):
#     print(liste_chunck_comparables[i][0])
#     sentences.append(liste_chunck_comparables[i][0])
#   sentence_embeddings = sbert_model.encode(sentences)
#   cos_sim = cosine_similarity(sentence_embeddings)
#   return cos_sim

### Script

Alignement sur les gold chunks

In [63]:
def enumerate_word_with_indexe(sentence, wa_file):
  list_words=[]
  a=sentence.split(']')
  counter=1
  for i in range(len(a)):
    b=a[i]
    b=b.replace('[','')
    indices=[]
    for elt in b.split():
      wa_file.write(str(counter) + ' ' + elt + ' :\n')
      indices.append(counter)
      counter+=1
    list_words.append(b.split()+[indices])
  return(list_words)

def write_wa_file(idx, wa_file, sentence_1, sentence_2):

    sentence_a=create_list_of_embedded_chunks_exploitable_for_alignement(sentence_1)
    sentence_b=create_list_of_embedded_chunks_exploitable_for_alignement(sentence_2)

    str_liste_indices_token_chunk1, str_liste_indices_token_chunk2, scores=[], [], []
    for elt in alignment_chunks(sentence_a, sentence_b):
      str_liste_indices_token_chunk1.append(elt[0])
      str_liste_indices_token_chunk2.append(elt[1])
      scores.append(elt[2])

    wa_file.write('<sentence id="' + str(idx) + '" status="">\n')
    wa_file.write('// ' + sentence_1 + '\n')
    wa_file.write('// ' + sentence_2 + '\n')
    wa_file.write('<source>\n')
    chunks1=enumerate_word_with_indexe(sentence_1, wa_file)
    wa_file.write('</source>\n')
    wa_file.write('<translation>\n')
    chunks2=enumerate_word_with_indexe(sentence_2, wa_file)
    wa_file.write('</translation>\n')
    wa_file.write('<alignment>\n')
    for i in range(len(scores)): 
      a, b=str_liste_indices_token_chunk1[i], str_liste_indices_token_chunk2[i]
      wa_file.write(str(chunks1[a][-1]) + ' <==> ' + str(chunks2[b][-1])
          + ' // EQUI // ' + str(scores[i]) + ' // ' + str(chunks1[a][:-1]) + ' <==> ' + str(chunks2[b][:-1]) + '\n')
    wa_file.write('</alignment>\n')
    wa_file.write('</sentence>\n\n\n')


Test avant d'enregistrer les alignements dans un fichier .wa

In [64]:
dataset1_chunked = ds1_lines_chunked
dataset2_chunked = ds2_lines_chunked

alignements_gold = []
for S1, S2 in zip(dataset1_chunked[:10], dataset2_chunked[:10]): 

  #Preprocess
  s1 = create_list_of_embedded_chunks_exploitable_for_alignement(S1)
  s2 = create_list_of_embedded_chunks_exploitable_for_alignement(S2)

  score_line = alignment_chunks(s1, s2)
  alignements_gold.append(score_line)

#alignements_gold

In [65]:
wa_file = open(f"{DATASET}_predicted_alignments_on_gold_chunks.wa","w")

idx = 1
for S1, S2 in zip(dataset1_chunked, dataset2_chunked): 
    write_wa_file(idx, wa_file, S1, S2)
    idx+=1

wa_file.close()

Alignement sur les chunks prédits

In [None]:
ds1_predicted = f'{DATASET}_sent1_predicted_chunks.txt'  ##### MODIFY #####
ds2_predicted = f'{DATASET}_sent2_predicted_chunks.txt'  ##### MODIFY #####

chunkedLines1 = [line.strip() for line in open(ds1_predicted)]
chunkedLines2 = [line.strip() for line in open(ds2_predicted)]

wa_file = open(f"{DATASET}_predicted_alignments_on_predicted_chunks.wa","w")

idx = 1
for S1, S2 in zip(chunkedLines1, chunkedLines2): 
    write_wa_file(idx, wa_file, S1, S2)
    idx+=1

wa_file.close()