# *Notebook* à utiliser pour faire le travail pratique # 3 sur l'analyse d'incidents.





### Création du jeu de donnée

In [1]:
import json

train_data_path = './data/dev_examples.json'
new_exemples_path = './data/new_examples.json'
test_data_path = './data/test_examples.json'

def load_incident_dataset(filename):
    with open(filename, 'r') as fp:
        incident_list = json.load(fp)

    return incident_list

# Load datasets
train_data = load_incident_dataset(train_data_path)
new_examples = load_incident_dataset(new_exemples_path)
test_data_path = load_incident_dataset(test_data_path)

# Merge datasets
merged_data = train_data + new_examples

# merged_data = merged_data[:20]

len(merged_data)

110

### Load Tokenizer from transformers

In [2]:
from transformers import AutoTokenizer

model_name = "bert-base-cased"
tokenizer = AutoTokenizer.from_pretrained(model_name)

### Encode texts from the dataset

In [3]:
def encode_texts(tokenizer, texts):
    return tokenizer(texts, padding=True, truncation=True, return_tensors="pt")

texts = [incident['text'] for incident in merged_data]
encoded_texts = encode_texts(tokenizer, texts)
encoded_texts.keys()

dict_keys(['input_ids', 'token_type_ids', 'attention_mask'])

### Encode slots

In [4]:
slot_labels = ["B-EVENT", "I-EVENT", "B-ACTIVITY", "I-ACTIVITY", "B-WHO", "I-WHO", "B-WHERE", "I-WHERE", "B-WHEN", "I-WHEN", "B-CAUSE", "I-CAUSE", "B-EQUIPMENT", "I-EQUIPMENT", "B-INJURY", "I-INJURY", "B-INJURED", "I-INJURED", "B-BODY-PARTS", "I-BODY-PARTS", "B-DEATH", "I-DEATH", "O"]
slot_label_to_id = {label: i for i, label in enumerate(slot_labels)}

print(slot_label_to_id)

{'B-EVENT': 0, 'I-EVENT': 1, 'B-ACTIVITY': 2, 'I-ACTIVITY': 3, 'B-WHO': 4, 'I-WHO': 5, 'B-WHERE': 6, 'I-WHERE': 7, 'B-WHEN': 8, 'I-WHEN': 9, 'B-CAUSE': 10, 'I-CAUSE': 11, 'B-EQUIPMENT': 12, 'I-EQUIPMENT': 13, 'B-INJURY': 14, 'I-INJURY': 15, 'B-INJURED': 16, 'I-INJURED': 17, 'B-BODY-PARTS': 18, 'I-BODY-PARTS': 19, 'B-DEATH': 20, 'I-DEATH': 21, 'O': 22}


In [5]:
def get_slot_from_word(word, data):
    found_in = []
    for argument, values in data['arguments'].items():
        # Check if the word is in any of the values for this argument
        for value in values:
            if word in value:
                # find index of word in value
                word_index = value.index(word)
                found_in.append("B-" + argument) if word_index == 0 else found_in.append('I-' + argument)

    return found_in

print(get_slot_from_word('driver', train_data[0]))

['I-CAUSE']


In [6]:
import numpy as np


def align_tokens_with_all_slots_bert(data, slot_label_to_id, tokenizer):
    number_slots = len(slot_label_to_id)
    other_array = np.zeros(number_slots)
    other_array[-1] = 1 # O slot

    aligned_token_slots = []
    words = data["text"].split()

    aligned_token_slots.append(other_array) # [CLS] token

    for word in words:
        tokens = tokenizer.tokenize(word)
        expected_slots = get_slot_from_word(word, data)  # This can now be a list of slots

        for bert_token in tokens:
            # Here, each token is represented by a list, with the first element being the token
            # and the subsequent elements being flags for each slot
            token_with_slots = []

            # Adding flags for each slot
            for slot_label in slot_label_to_id.keys():
                slot_flag = 1 if slot_label in expected_slots else 0
                token_with_slots.append(slot_flag)

            # if token_with_slots does not contain any slot, then it is an O token
            if sum(token_with_slots) == 0:
                token_with_slots[-1] = 1 # -1 is the index of the O slot (last slot)

            aligned_token_slots.append(token_with_slots)

    aligned_token_slots.append(other_array) # [SEP] token

    return aligned_token_slots


exple = {
        "text": "John had an accident at the construction site while walking.",
        "arguments": {
            "EVENT": ["accident"],
            "ACTIVITY": ["walking"],
            "WHO": ["John"],
            "WHERE": ["construction site"]
        }
    }
encoded_slots_matrix = align_tokens_with_all_slots_bert(exple, slot_label_to_id, tokenizer)
print(encoded_slots_matrix[1])


[0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


In [7]:
from transformers import BertTokenizer
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np

class SlotDataset(Dataset):
    def __init__(self, texts, tokenizer, slot_label_to_id, max_len=512):
        self.texts = texts
        self.tokenizer = tokenizer
        self.max_len = max_len
        self.slot_label_to_id = slot_label_to_id

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]["text"]
        arguments = self.texts[idx]["arguments"]

        # Tokenize text and align labels
        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            return_token_type_ids=False,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt',
        )
        labels = align_tokens_with_all_slots_bert(self.texts[idx], self.slot_label_to_id, self.tokenizer)

        # Adjust the labels to match the length of the tokenized input
        # Truncate or pad the labels
        padded_labels = []
        for label in labels:
            if len(padded_labels) < self.max_len:
                padded_labels.append(label)
            else:
                break
        while len(padded_labels) < self.max_len:
            padded_labels.append([0] * len(self.slot_label_to_id))  # Padding

        padded_labels = np.array(padded_labels)
        padded_labels = torch.tensor(padded_labels, dtype=torch.long)

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': padded_labels
        }

# Your dataset
# texts = [exple]

# Create dataset
dataset = SlotDataset(merged_data, tokenizer, slot_label_to_id)

print(dataset[0]["input_ids"].shape)
print(dataset[0]["labels"].shape)


# DataLoader
data_loader = DataLoader(dataset, batch_size=5)


torch.Size([512])
torch.Size([512, 23])


### Création du modèle

In [8]:
from transformers import BertForTokenClassification, BertTokenizer, AdamW
import torch

model_name = 'bert-base-uncased'
num_labels = len(slot_label_to_id)

tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertForTokenClassification.from_pretrained(
    model_name,
    num_labels=num_labels
)

Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-uncased and are newly initialized: ['classifier.weight', 'classifier.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


### Training

In [9]:
optimizer = AdamW(model.parameters(), lr=5e-5)
num_epochs = 5

# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model.to(device)



In [10]:
from torch.nn import BCEWithLogitsLoss
import torch

def calculate_accuracy(logits, labels):
    # Applying sigmoid to logits and rounding to get predictions
    preds = torch.sigmoid(logits) > 0.5
    correct_preds = (preds == labels).float()
    accuracy = correct_preds.sum() / correct_preds.numel()
    return accuracy.item()

model.train()
loss_fn = BCEWithLogitsLoss()

for epoch in range(num_epochs):
    total_loss = 0
    total_accuracy = 0

    for batch in data_loader:
        # Forward pass
        outputs = model(input_ids=batch['input_ids'], attention_mask=batch['attention_mask'])
        logits = outputs.logits

        # Convert labels to float
        labels = batch['labels'].float()

        # Compute loss
        loss = loss_fn(logits.view(-1, num_labels), labels.view(-1, num_labels))
        total_loss += loss.item()

        # Calculate accuracy
        accuracy = calculate_accuracy(logits, labels)
        total_accuracy += accuracy

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # Compute average loss and accuracy
    avg_loss = total_loss / len(data_loader)
    avg_accuracy = total_accuracy / len(data_loader)

    # Print metrics
    print(f"Epoch {epoch + 1}/{num_epochs} - Loss: {avg_loss:.4f}, Accuracy: {avg_accuracy:.4f}")


Epoch 1/5 - Loss: 0.2874, Accuracy: 0.9387
Epoch 2/5 - Loss: 0.1527, Accuracy: 0.9763
Epoch 3/5 - Loss: 0.1316, Accuracy: 0.9763
Epoch 4/5 - Loss: 0.1195, Accuracy: 0.9763
Epoch 5/5 - Loss: 0.1103, Accuracy: 0.9763


### Inference

In [11]:
def infer_slots(text, model, tokenizer, slot_label_to_id, threshold=0.5):
    # Tokenize input text
    inputs = tokenizer.encode_plus(
        text,
        add_special_tokens=True,
        max_length=512,
        padding='max_length',
        truncation=True,
        return_attention_mask=True,
        return_tensors='pt'
    )

    input_ids = inputs['input_ids']
    attention_mask = inputs['attention_mask']

    # Run model to get logits
    model.eval()  # Put model in evaluation mode
    with torch.no_grad():
        outputs = model(input_ids, attention_mask=attention_mask)
        logits = outputs.logits

    # Apply sigmoid and threshold
    probs = torch.sigmoid(logits)
    preds = (probs > threshold).int()

    # Convert predictions to label names
    id_to_label = {v: k for k, v in slot_label_to_id.items()}
    predicted_labels = [[id_to_label.get(idx) for idx, val in enumerate(row) if val == 1] for row in preds.squeeze().tolist()]

    # Tokenized text for reference
    tokenized_text = tokenizer.convert_ids_to_tokens(input_ids.squeeze().tolist())

    return list(zip(tokenized_text, predicted_labels))

In [15]:
def construct_response(predicted_slots):
    # Aggregate slot values
    slot_values = {}
    for token, slots in predicted_slots:
        for slot in slots:
            if slot not in slot_values:
                slot_values[slot] = []
            slot_values[slot].append(token)

    # Format the response
    response = "Identified Slots:\n"
    for slot, values in slot_values.items():
        # Join the tokens for each slot and add to the response
        token_str = " ".join(values).replace(' ##', '')  # Handling subword tokens
        response += f"- {slot}: {token_str}\n"

    return response

# Example usage
text = test_data_path[0]["text"]
print(text)

predicted_slots = infer_slots(text, model, tokenizer, slot_label_to_id)
response = construct_response(predicted_slots)
print(predicted_slots)

On August 27  2013  Employees #1 and #2  of Templar Inc.  a construction  company specializing in fiber optic installation and services  were working  along a highway. The highway speed limit was posted at 55 miles per hour.  Employee #1 was marking the location of an underground line that ran below the  turn lane. Employee #2 was next to Employee #1 and performing the duties of a  flagger. A privately owned vehicle was travelling in the travel/through lane.  The vehicle veered to the right  entered the turn lane  and struck both  workers. Emergency medical services were called. Employee #1 was declared dead  at the scene. Employee #2 refused emergency medical treatment for the bruises  he received when struck.
[('[CLS]', []), ('on', []), ('august', []), ('27', []), ('2013', []), ('employees', []), ('#', []), ('1', []), ('and', []), ('#', []), ('2', []), ('of', []), ('templar', []), ('inc', []), ('.', []), ('a', []), ('construction', []), ('company', []), ('specializing', []), ('in', [