In [1]:
# for colab
# !pip install spacy_stanza
# !pip install ckip_transformers

# 1. import packages

In [1]:
import pandas as pd
from tqdm.notebook import tqdm
import pickle
import pathlib
import sys
from spacy.tokens import Doc

# 2. Prepare Spacy Pipeline

In [2]:
spacy_pipeline_parent_path = pathlib.Path.cwd().parent.parent.parent.parent
sys.path.append(str(spacy_pipeline_parent_path))

from spacy_pipeline import pipeline_setup
from spacy_pipeline import opinion_rule

In [3]:
methods = {
    "opinion_v0": {
        "version": "opinion_v0",
        "pattern": [
            {
                "RIGHT_ID": "OPINION_OPR_found_root",
                "RIGHT_ATTRS": {
                    "TAG": {
                        "IN": ["VE"]
                    },
                }
            },
            {
                "LEFT_ID": "OPINION_OPR_found_root",
                "REL_OP": ">",
                "RIGHT_ID": "OPINION_SRC_found_root",
                "RIGHT_ATTRS": {
                    "DEP": {
                        "IN": ["nsubj"]
                    },
                }
            },
            {
                "LEFT_ID": "OPINION_OPR_found_root",
                "REL_OP": ">",
                "RIGHT_ID": "OPINION_SEG_found_root",
                "RIGHT_ATTRS": {
                    "DEP": {
                        "IN": ["ccomp", "parataxis"]
                    },
                    # "POS": {
                    #         "IN": ["VERB", "NOUN", "ADJ"]
                    # }
                }
            }
        ]
    },
}

In [4]:
pipeline = pipeline_setup.get_opinion_pipeline(methods['opinion_v0'])
vocab = pipeline.vocab

2023-03-28 07:02:05 INFO: Checking for updates to resources.json in case models have been updated.  Note: this behavior can be turned off with download_method=None or download_method=DownloadMethod.REUSE_RESOURCES


Downloading https://raw.githubusercontent.com/stanfordnlp/stanza-resources/main/resources_1.5.0.json:   0%|   …

2023-03-28 07:02:06 INFO: Loading these models for language: zh-hant (Traditional_Chinese):
| Processor | Package |
-----------------------
| tokenize  | gsd     |
| pos       | gsd     |
| lemma     | gsd     |
| depparse  | gsd     |

2023-03-28 07:02:06 INFO: Using device: cpu
2023-03-28 07:02:07 INFO: Loading: tokenize
2023-03-28 07:02:07 INFO: Loading: pos
2023-03-28 07:02:07 INFO: Loading: lemma
2023-03-28 07:02:07 INFO: Loading: depparse
2023-03-28 07:02:07 INFO: Done loading processors!


['opinion_matcher']
[1m

#   Component         Assigns   Requires   Scores   Retokenizes
-   ---------------   -------   --------   ------   -----------
0   opinion_matcher                                 False      

[38;5;2m✔ No problems found.[0m
{'summary': {'opinion_matcher': {'assigns': [], 'requires': [], 'scores': [], 'retokenizes': False}}, 'problems': {'opinion_matcher': []}, 'attrs': {}}


## 2.2 Define workflow

In [5]:
def run_work_flow(all_docs, spacy_pipeline, n_process=1):
    
    # try:
    with tqdm(total=len(all_docs)) as pbar:
        
        for paragraphs in all_docs:
                
            for i, doc in enumerate(spacy_pipeline.pipe(paragraphs, n_process=n_process)):
                pass
            pbar.update(1)
    return all_docs

# load all_docs

In [6]:
pickle_dir = str(pathlib.Path.cwd().parent)
pickle_file = 'label_news_200_docs_stanza.pkl'

with open(pickle_dir + '/' + pickle_file, 'rb') as f:
    bytes_data = pickle.load(f)
    all_docs_dev = [[Doc(vocab).from_bytes(doc_bytes) for doc_bytes in docs] for docs in bytes_data]

all_docs_dev = run_work_flow(all_docs_dev, pipeline, n_process=1)

  0%|          | 0/200 [00:00<?, ?it/s]

# load all_docs_test

In [7]:
pickle_dir = str(pathlib.Path.cwd().parent)
pickle_file = 'label_news_50_test_docs_stanza.pkl'

with open(pickle_dir + '/' + pickle_file, 'rb') as f:
    bytes_data = pickle.load(f)
    all_docs_test = [[Doc(vocab).from_bytes(doc_bytes) for doc_bytes in docs] for docs in bytes_data]

all_docs_test = run_work_flow(all_docs_test, pipeline, n_process=1)

  0%|          | 0/50 [00:00<?, ?it/s]

# span level evaluation

In [8]:
def get_precision(TP, FP):
    return TP / (TP + FP)

def get_recall(TP, FN):
    return TP / (TP + FN)

def get_f_score(precision, recall, beta=1):
    return (1 + beta**2) * precision * recall / (beta**2 * precision + recall)

label_match_dict = {
    "OPINION_SRC": "OPINION_SRC_found",
    "OPINION_OPR": "OPINION_OPR_found",
    "OPINION_SEG": "OPINION_SEG_found" 
}

def delete_around_punt_check_span(label_span, match_span):
    puct_list = ['，', '。', '「', '」', ' ', '！', '？', ';', ':', "'", '"', '‘', '“', '『', '』', '、', '（', '）']
    
    while label_span.text[0] in puct_list:
        label_span = label_span[1:]

    while label_span.text[-1] in puct_list:
        label_span = label_span[:-1] 
    
    while match_span.text[0] in puct_list:
        match_span = match_span[1:]

    while match_span.text[-1] in puct_list:
        match_span = match_span[:-1]

    
    if label_span.start == match_span.start and label_span.end == match_span.end:
        return True
    else:
        return False
    
def interval_check_span(label_span, match_span):
    if label_span.start <= match_span.start and label_span.end >= match_span.end:
        return True
    else:
        return False
    
def tolerance_check_span(label_span, match_span, tolerance=1):
    if abs(label_span.start - match_span.start) <= tolerance and label_span.end == match_span.end:
        return True
    if label_span.start == match_span.start and abs(label_span.end - match_span.end) <= tolerance:
        return True
    else:
        return False

In [9]:
def new_eval(all_docs, method):
    
    counter = {
        'OPINION_OPR_FP': 0,
        'OPINION_OPR_FN': 0,
        'OPINION_OPR_TP': 0,

        'OPINION_SRC_FP': 0,
        'OPINION_SRC_FN': 0,
        'OPINION_SRC_TP': 0,
        
        'OPINION_SEG_FP': 0,
        'OPINION_SEG_FN': 0,
        'OPINION_SEG_TP': 0,
    }

    for docs in all_docs:
        for doc in docs:

            spans = {
                'OPINION_SRC_TP': [],
                'OPINION_OPR_TP': [],
                'OPINION_SEG_TP': [],

                'OPINION_SRC_FP': [],
                'OPINION_OPR_FP': [],
                'OPINION_SEG_FP': [],

                'OPINION_SRC_FN': [],
                'OPINION_OPR_FN': [],
                'OPINION_SEG_FN': [],
            }
            
            if 'opinion_found' in doc.spans and 'opinion_label' in doc.spans:
                for found_span in doc.spans['opinion_found']:
                    found_match = False
                    for label_span in doc.spans['opinion_label']:
                        if method(label_span, found_span) and found_span.label_ == label_match_dict[label_span.label_]:
                            spans[f"{label_span.label_}_TP"].append(label_span)
                            found_match = True
                            break
                    if not found_match:
                        spans[f"{found_span.label_[:11]}_FP"].append(found_span)
                
                for label_span in doc.spans['opinion_label']:
                    if label_span not in spans[f"{label_span.label_}_TP"]:
                        spans[f"{label_span.label_}_FN"].append(label_span)

            elif 'opinion_found' in doc.spans:
                for found_span in doc.spans['opinion_found']:
                    spans[f"{found_span.label_[:11]}_FP"].append(found_span)

            elif 'opinion_label' in doc.spans:
                for label_span in doc.spans['opinion_label']:
                    spans[f"{label_span.label_}_FN"].append(label_span)

            for key in counter.keys():
                counter[key] += len(spans[key])

    return {
        'OPINION_SRC': {
            'precision': get_precision(counter['OPINION_SRC_TP'], counter['OPINION_SRC_FP']),
            'recall': get_recall(counter['OPINION_SRC_TP'], counter['OPINION_SRC_FN']),
            'f_score': get_f_score(get_precision(counter['OPINION_SRC_TP'], counter['OPINION_SRC_FP']), get_recall(counter['OPINION_SRC_TP'], counter['OPINION_SRC_FN'])),
        },
        'OPINION_OPR': {
            'precision': get_precision(counter['OPINION_OPR_TP'], counter['OPINION_OPR_FP']),
            'recall': get_recall(counter['OPINION_OPR_TP'], counter['OPINION_OPR_FN']),
            'f_score': get_f_score(get_precision(counter['OPINION_OPR_TP'], counter['OPINION_OPR_FP']), get_recall(counter['OPINION_OPR_TP'], counter['OPINION_OPR_FN'])),
        },
        'OPINION_SEG': {
            'precision': get_precision(counter['OPINION_SEG_TP'], counter['OPINION_SEG_FP']),
            'recall': get_recall(counter['OPINION_SEG_TP'], counter['OPINION_SEG_FN']),
            'f_score': get_f_score(get_precision(counter['OPINION_SEG_TP'], counter['OPINION_SEG_FP']), get_recall(counter['OPINION_SEG_TP'], counter['OPINION_SEG_FN']))
        },
        'counter': counter
    }

In [10]:
new_eval(all_docs_dev, delete_around_punt_check_span)

{'OPINION_SRC': {'precision': 0.6675031367628608,
  'recall': 0.5368314833501514,
  'f_score': 0.5950782997762863},
 'OPINION_OPR': {'precision': 0.8181818181818182,
  'recall': 0.5982142857142857,
  'f_score': 0.6911174785100287},
 'OPINION_SEG': {'precision': 0.17697594501718214,
  'recall': 0.19452313503305005,
  'f_score': 0.18533513270355376},
 'counter': {'OPINION_OPR_FP': 134,
  'OPINION_OPR_FN': 405,
  'OPINION_OPR_TP': 603,
  'OPINION_SRC_FP': 265,
  'OPINION_SRC_FN': 459,
  'OPINION_SRC_TP': 532,
  'OPINION_SEG_FP': 958,
  'OPINION_SEG_FN': 853,
  'OPINION_SEG_TP': 206}}

In [11]:
new_eval(all_docs_test, delete_around_punt_check_span)

{'OPINION_SRC': {'precision': 0.7064676616915423,
  'recall': 0.5657370517928287,
  'f_score': 0.6283185840707964},
 'OPINION_OPR': {'precision': 0.8315789473684211,
  'recall': 0.6220472440944882,
  'f_score': 0.7117117117117117},
 'OPINION_SEG': {'precision': 0.1282051282051282,
  'recall': 0.13559322033898305,
  'f_score': 0.13179571663920922},
 'counter': {'OPINION_OPR_FP': 32,
  'OPINION_OPR_FN': 96,
  'OPINION_OPR_TP': 158,
  'OPINION_SRC_FP': 59,
  'OPINION_SRC_FN': 109,
  'OPINION_SRC_TP': 142,
  'OPINION_SEG_FP': 272,
  'OPINION_SEG_FN': 255,
  'OPINION_SEG_TP': 40}}

In [12]:
# new_eval(all_docs_dev, delete_around_punt_check_span)
# new_eval(all_docs, interval_check)
# new_eval(all_docs, tolerance_check)

In [13]:
# new_eval(all_docs_test, delete_around_punt_check_span)

# token level evaluation

In [14]:
def eval_token_level(all_docs):

    counter = {
        'OPINION_SRC_FP': 0,
        'OPINION_OPR_FP': 0,
        'OPINION_SEG_FP': 0,

        'OPINION_SRC_FN': 0,
        'OPINION_OPR_FN': 0,
        'OPINION_SEG_FN': 0,
        
        'OPINION_SRC_TP': 0,
        'OPINION_OPR_TP': 0,
        'OPINION_SEG_TP': 0,
    }

    for docs in all_docs:
        for doc in docs:
            
            if 'opinion_found' in doc.spans and 'opinion_label' in doc.spans:
                for token in doc:
                    if token._.found_type == "OPINION_SRC_found":
                        if "OPINION_SRC" in token._.label_type:
                            counter['OPINION_SRC_TP'] += 1
                        else:
                            counter['OPINION_SRC_FP'] += 1
                    elif token._.found_type == "OPINION_OPR_found":
                        if "OPINION_OPR" in token._.label_type:
                            counter['OPINION_OPR_TP'] += 1
                        else:
                            counter['OPINION_OPR_FP'] += 1
                    elif token._.found_type == "OPINION_SEG_found":
                        if "OPINION_SEG" in token._.label_type:
                            counter['OPINION_SEG_TP'] += 1
                        else:
                            counter['OPINION_SEG_FP'] += 1
                
                for token in doc:
                    if "OPINION_SRC" in token._.label_type and token._.found_type != "OPINION_SRC_found":
                        counter['OPINION_SRC_FN'] += 1
                    elif "OPINION_OPR" in token._.label_type and token._.found_type != "OPINION_OPR_found":
                        counter['OPINION_OPR_FN'] += 1
                    elif "OPINION_SEG" in token._.label_type and token._.found_type != "OPINION_SEG_found":
                        counter['OPINION_SEG_FN'] += 1

    return {
        'OPINION_SRC': {
            'precision': get_precision(counter['OPINION_SRC_TP'], counter['OPINION_SRC_FP']),
            'recall': get_recall(counter['OPINION_SRC_TP'], counter['OPINION_SRC_FN']),
            'f_score': get_f_score(get_precision(counter['OPINION_SRC_TP'], counter['OPINION_SRC_FP']), get_recall(counter['OPINION_SRC_TP'], counter['OPINION_SRC_FN'])),
        },
        'OPINION_OPR': {
            'precision': get_precision(counter['OPINION_OPR_TP'], counter['OPINION_OPR_FP']),
            'recall': get_recall(counter['OPINION_OPR_TP'], counter['OPINION_OPR_FN']),
            'f_score': get_f_score(get_precision(counter['OPINION_OPR_TP'], counter['OPINION_OPR_FP']), get_recall(counter['OPINION_OPR_TP'], counter['OPINION_OPR_FN'])),
        },
        'OPINION_SEG': {
            'precision': get_precision(counter['OPINION_SEG_TP'], counter['OPINION_SEG_FP']),
            'recall': get_recall(counter['OPINION_SEG_TP'], counter['OPINION_SEG_FN']),
            'f_score': get_f_score(get_precision(counter['OPINION_SEG_TP'], counter['OPINION_SEG_FP']), get_recall(counter['OPINION_SEG_TP'], counter['OPINION_SEG_FN']))
        },
        'counter': counter
    }

In [15]:
eval_token_level(all_docs_dev)

{'OPINION_SRC': {'precision': 0.765869365225391,
  'recall': 0.6141645149391368,
  'f_score': 0.6816786079836233},
 'OPINION_OPR': {'precision': 0.8300395256916996,
  'recall': 0.6140350877192983,
  'f_score': 0.7058823529411765},
 'OPINION_SEG': {'precision': 0.9651360237002782,
  'recall': 0.5670197185490204,
  'f_score': 0.7143544764145897},
 'counter': {'OPINION_SRC_FP': 509,
  'OPINION_OPR_FP': 129,
  'OPINION_SEG_FP': 965,
  'OPINION_SRC_FN': 1046,
  'OPINION_OPR_FN': 396,
  'OPINION_SEG_FN': 20399,
  'OPINION_SRC_TP': 1665,
  'OPINION_OPR_TP': 630,
  'OPINION_SEG_TP': 26714}}

In [16]:
eval_token_level(all_docs_test)

{'OPINION_SRC': {'precision': 0.7693693693693694,
  'recall': 0.5670650730411687,
  'f_score': 0.6529051987767585},
 'OPINION_OPR': {'precision': 0.8473684210526315,
  'recall': 0.6145038167938931,
  'f_score': 0.7123893805309733},
 'OPINION_SEG': {'precision': 0.9812138728323699,
  'recall': 0.5566030002459218,
  'f_score': 0.7102881949892776},
 'counter': {'OPINION_SRC_FP': 128,
  'OPINION_OPR_FP': 29,
  'OPINION_SEG_FP': 130,
  'OPINION_SRC_FN': 326,
  'OPINION_OPR_FN': 101,
  'OPINION_SEG_FN': 5409,
  'OPINION_SRC_TP': 427,
  'OPINION_OPR_TP': 161,
  'OPINION_SEG_TP': 6790}}

In [17]:
# eval_token_level(all_docs_dev)

In [18]:
# eval_token_level(all_docs_test)

In [25]:
all_docs_dev[0][0].ents

(民進黨, 范雲, 民進黨, 台南, 黃偉哲, 今天)

In [27]:
for ent in all_docs_dev[0][0].ents:
    print(ent, ent.label_, ent.start, ent.end)

民進黨 ORG 0 2
范雲 PERSON 3 4
民進黨 ORG 30 32
台南 GPE 35 36
黃偉哲 PERSON 37 40
今天 DATE 40 41


In [28]:
docs = all_docs_dev[0]

In [30]:
ent_list_list = []

for doc in docs:
    ent_list = [ent for ent in doc.ents if ent.label_ == "PERSON"]
    ent_list_list.append(ent_list)

ent_list_list

[[范雲, 黃偉哲], [黃偉哲, 范雲, 范雲], [范雲, 黃偉哲, 黃偉哲]]