In [107]:
import json
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoModelForSequenceClassification
from sklearn.metrics import accuracy_score

In [108]:
tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-large")
model = AutoModelForSeq2SeqLM.from_pretrained("./parsing-model-2023-06-09 13:28")

In [109]:
with open('../input/metrics_dataset-traffic-test.json', 'r') as f:
    training_data_a = json.load(f)

with open('../input/metrics_dataset-domesticDeclarations.json', 'r') as f:
    training_data_b = json.load(f)

In [110]:
data = training_data_a['metrics']
data.extend(training_data_b['metrics'])

In [111]:
useful_tags = ['TSE', 'TEE', 'TBE', 'CE','AttributeName', 'AttributeValue', 'AGR', 'GBC', 'FDE']

cleaned_evaluation_data = []
for phrase in data:
    useful_slots = []
    for slots in phrase['slots']:
        if slots in useful_tags:
            useful_slots.append(f"{phrase['slots'][slots]}: {slots}")
    cleaned_evaluation_data.append((phrase['description'], useful_slots))


In [112]:
expected_outputs = []
predicted_outputs = []
for phrase in cleaned_evaluation_data:
    input_text = phrase[0]
    expected_output = "; ".join(phrase[1])
    expected_outputs.append(expected_output)

    prompt = f"Sentence: {input_text}\nAvailable Tags: {', '.join(useful_tags)}"
    encoded_prompt = tokenizer.encode(prompt, add_special_tokens=False, return_tensors="pt")
    input_ids = tokenizer(prompt, return_tensors="pt").input_ids

    outputs = model.generate(input_ids, max_length=1000)
    predicted_outputs.append(tokenizer.decode(outputs[0], skip_special_tokens=True)) 
 

In [113]:
from hermetrics.levenshtein import Levenshtein
import numpy as np

def compute_slot_metric(preds, real):
    phrases_metrics = []
    for real_phrase, pred_phrase in zip(real,preds):
        real_slots = real_phrase.split("; ") 
        pred_slots = pred_phrase.split("; ")
        slots_metrics = []
        for real_slot in real_slots:
            real_tag = real_slot.split(": ")[1]
            slot_metric = 0
            for pred_slot in pred_slots:
                try:
                    pred_tag = pred_slot.split(": ")[1]
                    if real_tag == pred_tag:
                        real_text = real_slot.split(": ")[0]
                        pred_text = pred_slot.split(": ")[0]
                        slot_metric = (1 - Levenshtein().normalized_distance(real_text, pred_text))
                except Exception as e:
                    pass
            slots_metrics.append(slot_metric)
        phrases_metrics.append(np.mean(slots_metrics))
    return np.mean(phrases_metrics)

In [114]:
def compute_slot_accuracy(preds, real):
    phrases_metrics = []
    for real_phrase, pred_phrase in zip(real,preds):
        real_slots = real_phrase.split("; ") 
        pred_slots = pred_phrase.split("; ")
        slots_metrics = []
        for real_slot in real_slots:
            real_tag = real_slot.split(": ")[1]
            slot_metric = 0
            for pred_slot in pred_slots:
                try:
                    pred_tag = pred_slot.split(": ")[1]
                    if real_tag == pred_tag:
                        real_text = real_slot.split(": ")[0]
                        pred_text = pred_slot.split(": ")[0]
                        if real_text == pred_text:
                            slot_metric = 1
                except Exception as e:
                    pass
            slots_metrics.append(slot_metric)
        phrases_metrics.append(np.mean(slots_metrics))
    return np.mean(phrases_metrics)

In [115]:
def compute_slot_metric_per_tag(preds, real):
    results_per_tag = {}
    for tag in useful_tags:
        results_per_tag[tag] = []
        
    for real_phrase, pred_phrase in zip(real,preds):
        real_slots = real_phrase.split("; ") 
        pred_slots = pred_phrase.split("; ")
        for real_slot in real_slots:
            real_tag = real_slot.split(": ")[1]
            matched_tag = False
            for pred_slot in pred_slots:
                try:
                    pred_tag = pred_slot.split(": ")[1]
                    if real_tag == pred_tag:
                        matched_tag = True
                        real_text = real_slot.split(": ")[0]
                        pred_text = pred_slot.split(": ")[0]
                        distance = 1 - Levenshtein().normalized_distance(real_text, pred_text)
                        results_per_tag[real_tag].append(distance)
                except Exception as e:
                    results_per_tag[real_tag].append(0)
                    pass
            if not matched_tag:
                results_per_tag[real_tag].append(0)
    
    for tag in results_per_tag:
        results_per_tag[tag] = np.mean(results_per_tag[tag])
    return results_per_tag


In [116]:
def compute_slot_accuracy_per_tag(preds, real):
    results_per_tag = {}
    for tag in useful_tags:
        results_per_tag[tag] = []
        
    for real_phrase, pred_phrase in zip(real,preds):
        real_slots = real_phrase.split("; ") 
        pred_slots = pred_phrase.split("; ")
        for real_slot in real_slots:
            real_tag = real_slot.split(": ")[1]
            matched_tag = False
            for pred_slot in pred_slots:
                try:
                    pred_tag = pred_slot.split(": ")[1]
                    if real_tag == pred_tag:
                        matched_tag = True
                        real_text = real_slot.split(": ")[0]
                        pred_text = pred_slot.split(": ")[0]
                        if real_text == pred_text:
                            results_per_tag[real_tag].append(1)
                        else:
                            results_per_tag[real_tag].append(0)
                except Exception as e:
                    results_per_tag[real_tag].append(0)
                    pass
            if not matched_tag:
                results_per_tag[real_tag].append(0)
    
    for tag in results_per_tag:
        results_per_tag[tag] = np.mean(results_per_tag[tag])
    return results_per_tag


In [117]:
print(f"Accuracy: {accuracy_score(expected_outputs, predicted_outputs)}")
print(f"Slot Metric: {compute_slot_metric(predicted_outputs, expected_outputs)}")
print(f"Slot Accuracy Metric: {compute_slot_accuracy(predicted_outputs, expected_outputs)}")
print(f"Slot Accuracy Per Tag: {compute_slot_accuracy_per_tag(predicted_outputs, expected_outputs)}")
print(f"Slot Metric Per Tag: {compute_slot_metric_per_tag(predicted_outputs, expected_outputs)}")

Accuracy: 0.43137254901960786
Slot Metric: 0.7701802803981406
Slot Accuracy Metric: 0.6045751633986928
Slot Accuracy Per Tag: {'TSE': 0.4, 'TEE': 0.42857142857142855, 'TBE': 0.2222222222222222, 'CE': 0.7142857142857143, 'AttributeName': 0.5, 'AGR': 0.7142857142857143, 'GBC': 1.0, 'FDE': 1.0}
Slot Metric Per Tag: {'TSE': 0.7815325670498084, 'TEE': 0.6037305244678517, 'TBE': 0.6618205868205869, 'CE': 0.8399143031902588, 'AttributeName': 0.5, 'AGR': 0.789010989010989, 'GBC': 1.0, 'FDE': 1.0}


In [119]:
example = "Average of declared costs detailed per trip"
prompt = f"Sentence: {example}\nAvailable Tags: {', '.join(useful_tags)}"
encoded_prompt = tokenizer.encode(prompt, add_special_tokens=False, return_tensors="pt")
input_ids = tokenizer(prompt, return_tensors="pt").input_ids

outputs = model.generate(input_ids, max_length=1000)
print(tokenizer.decode(outputs[0], skip_special_tokens=True))

Average: AGR; declared costs detailed: CE; trip: GBC
