In [66]:
import os
import sys
import csv
from collections import Counter, OrderedDict
from enum import Enum
from typing import Dict, List, Tuple, Callable
from enums.ner_type import NERType
import itertools

sys.path.append('..')

In [45]:
language = 'french'

sys.argv = [
"--device cuda",
"--data-folder", "..\\data",
"--seed", "13",
"--configuration", "rnn-simple",
"--language", language,
"--challenge", "named-entity-recognition",
"--label-type", "coarse",
"--experiment-types", "word-similarity"]

In [46]:
# Configure container:
from dependency_injection.ioc_container import IocContainer

container = IocContainer()

In [47]:
file_service = container.file_service()
arguments_service = container.arguments_service()
metrics_service = container.metrics_service()
process_service = container.process_service()


In [51]:
class TokenInfo:
    def __init__(
        self,
        token: str,
        literal_coarse_tag: str=None,
        literal_fine_tag: str=None,
        metonymic_coarse_tag: str=None,
        metonymic_fine_tag: str=None,
        content_tag: str=None,
        nested_tag: str=None):
        
        self.token = token
        self.literal_tags: Dict[NERType, str] = { NERType.Coarse : literal_coarse_tag, NERType.Fine : literal_fine_tag }
        self.metonymic_tags: Dict[NERType, str] = { NERType.Coarse : metonymic_coarse_tag, NERType.Fine : metonymic_fine_tag }
        self.content_tag: str = content_tag
        self.nested_tag: str = nested_tag

class ModelType(Enum):
    GroundTruth = 0
    Baseline = 1
    Ours = 2

class NERDataStatistics:
    def __init__(self):
        self.content_tags = Counter()
        self.nested_entity_tags = Counter()
        self.literal_entity_tags: Dict[NERType, Counter] = {
            NERType.Coarse: Counter(),
            NERType.Fine: Counter(),
        }
        
        self.metonymic_entity_tags: Dict[NERType, Counter] = {
            NERType.Coarse: Counter(),
            NERType.Fine: Counter(),
        }

    def add_row_data(self, row):
        self.nested_entity_tags[row['NE-NESTED']] += 1
        self.content_tags[row['NE-FINE-COMP']] += 1

        self.literal_entity_tags[NERType.Coarse][row['NE-COARSE-LIT']] += 1
        self.literal_entity_tags[NERType.Fine][row['NE-FINE-LIT']] += 1

        self.metonymic_entity_tags[NERType.Coarse][row['NE-COARSE-METO']] += 1
        self.metonymic_entity_tags[NERType.Fine][row['NE-FINE-METO']] += 1

    def print_data(self):
        print('----------------')
        print('Statistics data:')
        print('----------------')
        self._print_collection_data(self.nested_entity_tags, 'unique nested entity tags')
        self._print_collection_data(self.content_tags, 'unique content tags')
        self._print_collection_data(self.literal_entity_tags[NERType.Fine], 'unique fine literal entity tags')
        self._print_collection_data(self.literal_entity_tags[NERType.Coarse], 'unique coarse literal entity tags')
        self._print_collection_data(self.metonymic_entity_tags[NERType.Fine], 'unique fine metonymic entity tags')
        self._print_collection_data(self.metonymic_entity_tags[NERType.Coarse], 'unique coarse metonymic entity tags')

    def _get_none_percentage(self, collection) -> float:
        result = (collection["O"] / sum(collection.values())) * 100
        return result
    
    def _print_collection_data(self, collection: Counter, name: str):
        print(f'{name}: {len(collection)} || None percentage: {self._get_none_percentage(collection)}')

def read_output_files(coarse_path, fine_path=None, print_stats=False):
    result = []
    statistics = NERDataStatistics()
    with open(coarse_path, 'r', encoding='utf-8') as coarse_file:
        reader = csv.DictReader(coarse_file, dialect=csv.excel_tab)
        for i, row in enumerate(reader):
            if row['TOKEN'].startswith('#'):
                continue
                
            statistics.add_row_data(row)
            token_info = TokenInfo(
                token=row['TOKEN'],
                literal_coarse_tag=row['NE-COARSE-LIT'].lower(),
                literal_fine_tag=row['NE-FINE-LIT'].lower())

            result.append(token_info)

    if fine_path is not None:
        with open(fine_path, 'r', encoding='utf-8') as fine_file:
            reader = csv.DictReader(fine_file, dialect=csv.excel_tab)
            counter = 0
            for i, row in enumerate(reader):
                if row['TOKEN'].startswith('#'):
                    continue
                
                statistics.add_row_data(row)

                result[counter].literal_tags[NERType.Fine] = row['NE-FINE-LIT'].lower()
                counter += 1
                
    if print_stats:
        statistics.print_data()
        
    return result

In [52]:
scorer_path = os.path.join('..', '..', '..', 'challenges', 'clef', 'scorer', 'CLEF-HIPE-2020-scorer')

dev_data_path = os.path.join(scorer_path, 'data', language, f'HIPE-data-v1.0-dev-{process_service.get_language_suffix(arguments_service.language)}.tsv')
train_data_path = os.path.join(scorer_path, 'data', language, f'HIPE-data-v1.0-train-{process_service.get_language_suffix(arguments_service.language)}.tsv')
baseline_coarse_path = os.path.join(scorer_path, 'output-baseline-coarse.tsv')
baseline_fine_path = os.path.join(scorer_path, 'output-baseline-fine.tsv')
model_coarse_path = os.path.join(scorer_path, 'output-french-coarse-pretr.tsv')
model_fine_path = os.path.join(scorer_path, 'output-french-fine-pretr.tsv')

# _ = read_output_files(train_data_path, dev_data_path, print_stats=True)

truth_tokens = read_output_files(dev_data_path)
baseline_tokens = read_output_files(baseline_coarse_path, baseline_fine_path)
new_tokens = read_output_files(model_coarse_path, model_fine_path)

assert len(baseline_tokens) == len(truth_tokens)
assert len(baseline_tokens) == len(new_tokens)

In [81]:
class ModelsDifference:
    def __init__(self):
        self.ground_truth = List[TokenInfo]
        self.tokens_by_model: Dict[ModelType, List[TokenInfo]] = {}
        self.difference_set: Dict[str, Counter] = {}

    def add_token_set(self, model_type: ModelType, tokens: List[TokenInfo]):
        if model_type == ModelType.GroundTruth:
            self.ground_truth = tokens
            return

        if model_type in self.tokens_by_model.keys():
            raise Exception("This model type is already added")

        self.tokens_by_model[model_type] = tokens

    def calculate_difference_set(self):
        self.difference_set = {}
        for i, truth_token_info in enumerate(self.ground_truth):
            self._calculate_difference_set_per_column(i, truth_token_info, lambda token_info: token_info.literal_tags[NERType.Coarse], 'LITERAL|COARSE')
            self._calculate_difference_set_per_column(i, truth_token_info, lambda token_info: token_info.literal_tags[NERType.Fine], 'LITERAL|FINE')
            self._calculate_difference_set_per_column(i, truth_token_info, lambda token_info: token_info.metonymic_tags[NERType.Coarse], 'METONYMICAL|FINE')
            self._calculate_difference_set_per_column(i, truth_token_info, lambda token_info: token_info.metonymic_tags[NERType.Fine], 'METONYMICAL|FINE')
            self._calculate_difference_set_per_column(i, truth_token_info, lambda token_info: token_info.content_tag, 'CONTENT')
            self._calculate_difference_set_per_column(i, truth_token_info, lambda token_info: token_info.nested_tag, 'NESTED')
            
            
    def _calculate_difference_set_per_column(self, i: int, truth_token_info: TokenInfo, column_extract: Callable, column_value: str):
        predicted_values_by_model = { model_type: column_extract(tokens[i]) for model_type, tokens in self.tokens_by_model.items() }
        difference_key = self._get_difference_key(column_extract(truth_token_info), predicted_values_by_model, column_value)
        if difference_key not in self.difference_set.keys():
            self.difference_set[difference_key] = Counter()

        self.difference_set[difference_key][truth_token_info.token] += 1

    def _get_difference_key(
        self,
        truth_value: str,
        predicted_values_by_model: Dict[ModelType, str],
        column_value: str) -> str:
        results_dict = {}
        for model_type, predicted_value in predicted_values_by_model.items():
            results_dict[model_type] = (predicted_value == truth_value)

        result = self._get_difference_key_by_bool(results_dict, column_value)
        return result
        
    def _get_difference_key_by_bool(
        self,
        desired_prediction_status: Dict[ModelType, bool],
        column_value: str) -> str:
        results_dict = OrderedDict(sorted(desired_prediction_status.items(), key=lambda x: x[0].value))
        result = '|'.join([f'{key.value}-{int(value)}' for key, value in results_dict.items()])
        result += f'|{column_value}'

        return result

    def get_tokens_per_result(self, desired_prediction_status: Dict[ModelType, bool], column_value: str) -> Counter:
        key = self._get_difference_key_by_bool(desired_prediction_status, column_value)
        if key not in self.difference_set.keys():
            return Counter()

        return self.difference_set[key]


In [83]:
models_difference = ModelsDifference()
models_difference.add_token_set(ModelType.GroundTruth, truth_tokens)
models_difference.add_token_set(ModelType.Baseline, baseline_tokens)
models_difference.add_token_set(ModelType.Ours, new_tokens)

models_difference.calculate_difference_set()

In [84]:
model_types = [ModelType.Baseline, ModelType.Ours]
column_values = ['LITERAL|COARSE', 'LITERAL|FINE']

model_types_combinations = list(itertools.product([True, False], repeat=len(model_types)))
for column_value in column_values:
    for model_types_combination in model_types_combinations:
        desired_prediction_status_dict = { model_type : model_types_combination[i] for i, model_type in enumerate(model_types) }
        tokens_result = models_difference.get_tokens_per_result(
            desired_prediction_status_dict,
            column_value)

        result_string = '------------------\n'
        for model_type, value in desired_prediction_status_dict.items():
            result_string += 'correct' if value else 'wrong'
            result_string += f' by {model_type.name.lower()}, '

        result_string = result_string[:-2]

        result_string += f' [{column_value}]\n'
        result_string += f'\tTotal amount: {sum(tokens_result.values())}\n'
        result_string += f'\tTop 10 most common: {tokens_result.most_common(10)}'
        print(result_string)

------------------
correct by baseline, correct by ours [LITERAL|COARSE]
	Total amount: 27254
	Top 10 most common: [(',', 1407), ('.', 1360), ("'", 1285), ('de', 1035), ('la', 619), ('à', 509), ('l', 452), ('et', 444), ('le', 414), ('les', 373)]
------------------
correct by baseline, wrong by ours [LITERAL|COARSE]
	Total amount: 970
	Top 10 most common: [('de', 26), (',', 26), ('-', 23), ('.', 17), ('Conseil', 12), ("'", 12), ('fédéral', 11), ('des', 9), ('York', 9), ('Havas', 8)]
------------------
wrong by baseline, correct by ours [LITERAL|COARSE]
	Total amount: 669
	Top 10 most common: [('.', 62), (',', 30), ('de', 29), ("'", 28), ('-', 27), ('"', 14), ('à', 13), ('_', 12), ('M', 8), ('des', 8)]
------------------
wrong by baseline, wrong by ours [LITERAL|COARSE]
	Total amount: 960
	Top 10 most common: [('de', 50), (',', 38), ('-', 34), ('.', 25), ("'", 16), ('M', 12), ('l', 10), ('la', 9), ('du', 8), ('et', 8)]
------------------
correct by baseline, correct by ours [LITERAL|FINE