In [8]:
import os
import re
import nltk
import transformers
import tensorflow as tf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from bs4 import BeautifulSoup
from sklearn.metrics import recall_score, f1_score
from transformers import RobertaTokenizer, TFRobertaModel

## 1. overall extraction procedure

In [25]:
def parse_xml(file):
    # parse the corpus
    soup = BeautifulSoup(file, "xml")
    # extract all texts
    texts = soup.find_all("text")
    # filter the train data
    data = []
    for text in texts:
        if exclude_train_data(text) == True:
            pass
        else:
            sub_data = label_extraction(text)
            data += sub_data
    # transfer data to frame
    corpusdata = pd.DataFrame.from_records(data, columns = ["sentence","word","pos","label","type","subtype","function"])
    # add tags
    get_tag(corpusdata)
    # add local
    get_local(corpusdata)
    return corpusdata

### subextraction: words in each sentence

In [11]:
def label_extraction(text):
    # extract all sentence from a text
    sents = text.find_all("s")
    data = []
    for sent in sents:
        # extract all the tokens
        for token in sent.find_all("w"):
            # check whether there is any token labeled as metaphorical
            if token.find("seg"):
                # make a full list of metaphorical words
                mets_list = token.find_all("seg")
                # set indice: 1 for metaphor
                met = 1
                for m in mets_list:
                    # check whether subtype is labeled
                    if "subtype" in m.attrs.keys():
                        # define subtype
                        _subtype = m.attrs["subtype"]
                    else:
                        _subtype = "None"
                    # define type and function
                    _type, _function = m.attrs["type"], m.attrs["function"]
            else:
                # when the token is not labeled as metaphorical
                met = 0
                _type = "None"
                _subtype = "None"
                _function = "None"
            # extract the lemma and POS tag of the token
            _lemma, _pos = token.attrs["lemma"], token.attrs["type"]
            # clean the marks in the sentence
            sentence = re.sub('[\n\s]+',' ', sent.text)
            # pack the sentence, the word and its type and function together
            data.append((sentence,_lemma,_pos,met,_type,_subtype,_function))
    return(data)

### subextraction: filter train set

In [12]:
def exclude_train_data(text):
    # id of all the texts used for training
    training_partition = [
    'a1e-fragment01',
    'a1f-fragment06',
    'a1f-fragment07',
    'a1f-fragment08',
    'a1f-fragment09',
    'a1f-fragment10',
    'a1f-fragment11',
    'a1f-fragment12',
    'a1g-fragment26',
    'a1g-fragment27',
    'a1h-fragment05',
    'a1h-fragment06',
    'a1j-fragment34',
    'a1k-fragment02',
    'a1l-fragment01',
    'a1m-fragment01',
    'a1n-fragment09',
    'a1n-fragment18',
    'a1p-fragment01',
    'a1p-fragment03',
    'a1x-fragment03',
    'a1x-fragment04',
    'a1x-fragment05',
    'a2d-fragment05',
    'a38-fragment01',
    'a39-fragment01',
    'a3c-fragment05',
    'a3e-fragment03',
    'a3k-fragment11',
    'a3p-fragment09',
    'a4d-fragment02',
    'a6u-fragment02',
    'a7s-fragment03',
    'a7y-fragment03',
    'a80-fragment15',
    'a8m-fragment02',
    'a8n-fragment19',
    'a8r-fragment02',
    'a8u-fragment14',
    'a98-fragment03',
    'a9j-fragment01',
    'ab9-fragment03',
    'ac2-fragment06',
    'acj-fragment01',
    'ahb-fragment51',
    'ahc-fragment60',
    'ahf-fragment24',
    'ahf-fragment63',
    'ahl-fragment02',
    'ajf-fragment07',
    'al0-fragment06',
    'al2-fragment23',
    'al5-fragment03',
    'alp-fragment01',
    'amm-fragment02',
    'as6-fragment01',
    'as6-fragment02',
    'b1g-fragment02',
    'bpa-fragment14',
    'c8t-fragment01',
    'cb5-fragment02',
    'ccw-fragment03',
    'cdb-fragment02',
    'cdb-fragment04',
    'clp-fragment01',
    'crs-fragment01',
    'ea7-fragment03',
    'ew1-fragment01',
    'fef-fragment03',
    'fet-fragment01',
    'fpb-fragment01',
    'g0l-fragment01',
    'kb7-fragment10',
    'kbc-fragment13',
    'kbd-fragment07',
    'kbh-fragment01',
    'kbh-fragment02',
    'kbh-fragment03',
    'kbh-fragment09',
    'kbh-fragment41',
    'kbj-fragment17',
    'kbp-fragment09',
    'kbw-fragment04',
    'kbw-fragment11',
    'kbw-fragment17',
    'kbw-fragment42',
    'kcc-fragment02',
    'kcf-fragment14',
    'kcu-fragment02',
    'kcv-fragment42']
    # whether the text has attributions
    if text.attrs is not None:
        # whether it's in train set
        if 'xml:id' in text.attrs.keys():
            if text.attrs['xml:id'] in training_partition:
                # block futher extraction
                x = True
            else:
                # go on to extract sentences
                x = False
        else:
            x = True
    else:
        x = True
    
    return x

### subextraction: Penn Treebank POS tag

In [None]:
def get_tag(data):
    # make a list of words extracted from corpus
    words = list(data["word"])
    # alpply pos tag to list
    tags = [x[1] for x in nltk.pos_tag(words)]
    # convert list to column
    data["tag"] = tags
    return data

### subextraction: local

In [23]:
def get_local(data):
    local = []
    # get sentences and tokens
    sentences = list(data['sentence'])
    words = list(data['word'])
    # for each sentence
    for i in range(len(sentences)):
        flag = 0
        w = words[i]
        # split the sentence by comma
        subsent = sentences[i].split(',')
        original_len = len(local)
        for t in range(len(subsent)):
            nex = max((i+1),(len(words) -1))
            # whether the word is in this part
            if w in subsent[t] and flag == 0:
                local.extend([subsent[t]])
                flag = 1
        # to avoid mismatch 
        if len(local) == original_len:
            local.extend([subsent[0]])
    data['local'] = local
    return data

## 2. Prepare test set

In [26]:
# get directory
corpus_root = os.getcwd() + "/" + "VUAMC.xml"
# read file
file = open(corpus_root, encoding = "utf-8").read()
# get whole test set
corpus = parse_xml(file)
# save the test set
corpus.to_csv(r'./test_data.csv', encoding = 'utf-8')

## 3. Get prediction (two alternatives)

### (1) with pre-trained model

#### Complie test set into transformers inputs (modified version of original DeepMet code)

In [None]:
def compilation(data):
    # regulate format of test set
    def preprocssing(x):
        x = str(x)
        x = '"'+x+'"'
        return x
    def _convert_to_transformer_inputs(instance, instance2, tokenizer, max_sequence_length):
        def return_id(str1, str2, truncation_strategy, length):
            inputs = tokenizer.encode_plus(str1, str2,
                                       add_special_tokens=True,
                                       max_length=length,
                                       truncation_strategy=truncation_strategy,
                                       return_token_type_ids=True)
            input_ids = inputs["input_ids"]
            input_masks = [1] * len(input_ids)
            input_segments = inputs["token_type_ids"]
            padding_length = length - len(input_ids)
            padding_id = tokenizer.pad_token_id
            input_ids = input_ids + ([padding_id] * padding_length)
            input_masks = input_masks + ([0] * padding_length)
            input_segments = input_segments + ([0] * padding_length)
            return [input_ids, input_masks, input_segments]
        input_ids, input_masks, input_segments = return_id(
            instance, None, 'longest_first', max_sequence_length)
        input_ids2, input_masks2, input_segments2 = return_id(
            instance2, None, 'longest_first', max_sequence_length)
        return [input_ids, input_masks, input_segments,
            input_ids2, input_masks2, input_segments2]
    # transfer test set into arrays
    def compute_input_arrays(df, columns, tokenizer, max_sequence_length):
        input_ids, input_masks, input_segments = [], [], []
        input_ids2, input_masks2, input_segments2 = [], [], []
        for _, instance in df[columns].iterrows():
            ids, masks, segments, ids2, masks2, segments2 = \
                _convert_to_transformer_inputs(str(instance.sentence), str(instance.sentence2), tokenizer, max_sequence_length)
            input_ids.append(ids)
            input_masks.append(masks)
            input_segments.append(segments)
            input_ids2.append(ids2)
            input_masks2.append(masks2)
            input_segments2.append(segments2)
        return [np.asarray(input_ids, dtype=np.int32),
                np.asarray(input_masks, dtype=np.int32),
                np.asarray(input_segments, dtype=np.int32),
                np.asarray(input_ids2, dtype=np.int32),
                np.asarray(input_masks2, dtype=np.int32),
                np.asarray(input_segments2, dtype=np.int32)]

    # complie test data in format: sentence, token, POS, tag, local, so as to fit transformers input
    def refomulate_test_set(test):
        test['sentence'] = test.sentence.apply(lambda x: preprocssing(x)) \
                       + "[SEP]" + test.word.apply(lambda x: preprocssing(x)) \
                       + "[SEP]" + test.pos.apply(lambda x: preprocssing(x)) \
                       + "[SEP]" + test.tag.apply(lambda x: preprocssing(x))
        test['sentence2'] = test.local.apply(lambda x: preprocssing(x)) \
                        + "[SEP]" + test.word.apply(lambda x: preprocssing(x)) \
                        + "[SEP]" + test.pos.apply(lambda x: preprocssing(x)) \
                        + "[SEP]" + test.tag.apply(lambda x: preprocssing(x))
        return test

    test = refomulate_test_set(data)

    input_categories = ["sentence","sentence2"]
    # define tokenizer
    tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
    # maximal sequence length is set as 64
    test_inputs = compute_input_arrays(test, input_categories, tokenizer, 128)
    return test_inputs

In [None]:
# compile the test data
test_inputs = compilation(corpus)
model_dir = r'./my_model.h5'
# set TFRobertaModel in custom scope 
with tf.keras.utils.custom_object_scope({'TFRobertaModel': TFRobertaModel}):
    # load model
    model = tf.keras.models.load_model(model_dir)
# predict
pred = model.predict(test_inputs)

### (2) directly read the outcome we provide (recommended)

In [9]:
pred = pd.read_csv('./predict.csv')

## 4. Evaluation

In [12]:
# read test data
data = pd.read_csv(r'./test_data.csv')
# reform and unite csv files
data['predict'] = list(pred['predict'])

label_direct = []
pred_direct = []
label_indirect = []
pred_indirect = []
label_border = []
pred_border = []
label_clear = []
pred_clear = []

# extract predication and label
def get_pred_label(pred,label,row):
    pred.append(row[1]['predict'])
    label.append(row[1]['label'])
    return pred, label
    
    
# get each row
for row in data.iterrows():
    # check label for indirect metaphor
    if row[1]['type'] == 'met':
        get_pred_label(pred_indirect,label_indirect,row)
    # direct metahpor
    elif row[1]['type'] == 'lit':
        get_pred_label(pred_direct,label_direct,row)
    # borderline cases
    if row[1]['subtype'] == 'WIDLII':
        get_pred_label(pred_border,label_border,row)
    # clear cases
    elif row[1]['type'] != 'None' and row[1]['subtype'] != 'WIDLII':
        get_pred_label(pred_clear,label_clear,row)

# evaluate
# direct
recall_direct = recall_score(label_direct, pred_direct)
print("Recall of direct metaphors: ", recall_direct)
# indirect
recall_indirect = recall_score(label_indirect, pred_indirect)
print("Recall of indirect metaphors: ", recall_indirect)
# borderline
recall_borderline = recall_score(label_border, pred_border)
print("Recall of borderline cases: ", recall_borderline)
# clear
recall_clear = recall_score(label_clear, pred_clear)
print("Recall of clear cases: ", recall_clear)

# overall f1-score and recall
Recall = recall_score(list(data['label']),list(data['predict']))
F1score = f1_score(list(data['label']),list(data['predict']))
print('F1 score: ',F1score, "Recall: ", Recall)

Recall of direct metaphors:  0.5
Recall of indirect metaphors:  0.729129397734049
Recall of borderline cases:  0.5560675883256528
Recall of clear cases:  0.7386032977691561
F1 score:  0.7324717765894236 Recall:  0.7212227585198187
