In [None]:
import re
import csv
import sys
from pyknp import KNP
from sklearn.metrics import classification_report

In [None]:
# 定義
mode_id = 1 # 1；抽出方法1/ 2：抽出方法2

data_size = 215 # データタイズ
skip_file = [107,129,125,151,173,195] # スキップファイル

file_info = "./data/input/RS_data/rsdata" # アノテーションデータのPATH
result_path = "./data/output/Chapter5" # 出力ファイルのPATH

In [None]:
def data_fix(text):
    text = text.replace(" ","　")
    return text

In [None]:
# 正解アノテーションの単語を返す
def load_ann(file_ann):

    true_wh_dic = {"WHERE":[], "WHEN":[], "WHO":[], "WHAT":[], "HOW":[], "WHY":[],"SERIF":[]}
    
    # アノテーションデータ読み込み・保存
    with open(file_ann, 'r') as f:
        for i in f.read().split("\n"):
            if len(i) != 0:
                label_name = i.split()[1]
                true_s = int(i.split()[2])
                true_e = int(i.split()[3])-1
                true_word = i.split()[4]
                true_wh_dic[label_name].append([true_word,[true_s,true_e]])                    
    return true_wh_dic

In [None]:
"""
NERと評価語の構文解析結果を保存

入力：文章
出力：構文解析結果の辞書,NER該当リスト

"""
def save_bnst(line,word_count):
    
    result_data = []
    
    bnst_dic ={}    
    ner_list = []
    verb_list = []
    
    child_list=[]
    parent_list=[]
        
    knp = KNP()
    result = knp.parse(data_fix(line)) 
    for bnst in result.bnst_list(): # 文節
        
        ner_flag = False
        verb_flag = False
        word_list = []
        wc_l = []
        hinshi_list = []
        ner_info = []
        dic_value ={}
        
        mrph_list = bnst.mrph_list()        
        for mrph in mrph_list: # 形態素 
            
            # 単語数情報
            if len(mrph.midasi) > 1: # 単語サイズ
                wc = []
                for i in range(len(mrph.midasi)):                    
                    word_count += 1
                    wc.append(word_count)
                wc_l.append(wc)
            else:
                word_count += 1
                wc_l.append(word_count)
            
            # NER情報
            if "NE:" in mrph.fstring: # NERのときの処理 
                ner_info.append(re.search(r"<NE:(.*?)>",mrph.fstring).group(1))
                ner_flag = True
            else:
                ner_info.append("")

            # 品詞情報と単語情報
            if "■■" in mrph.midasi: # 括弧の中身は、品詞情報を「特殊」にする
                word_list.append(mrph.midasi)
                hinshi_list.append("特殊")
            else: 
                word_list.append(mrph.midasi)
                hinshi_list.append(mrph.hinsi)    
            
            # 動詞情報リスト作成のため
            if "動詞" in mrph.hinsi:
                verb_flag = True

        # 辞書追加
        dic_value["parent_id"] = bnst.parent_id
        dic_value["word_id"] = wc_l
        dic_value["word"] = word_list       
        dic_value["hinsi"] = hinshi_list
        dic_value["ner"] =  ner_info        
        
        # NER情報
        if ner_flag:
            ner_list.append(bnst.bnst_id)
            
        # 動詞情報
        if verb_flag:
            verb_list.append(bnst.bnst_id)
        
        bnst_dic[bnst.bnst_id] = dic_value
   
    return bnst_dic,ner_list,verb_list,word_count

In [None]:
"""
係り受け関係のid順を取得する

入力：構文解析結果の辞書 
出力：係り受け関係のid順

"""                            
def get_bnst_order(bnst_dic):

    bnst_order_dic = {}

    # 1. 全ての文節の係り受け順(id)を取得する
    for my_id in bnst_dic.keys():

        flag = True
        my_list=[my_id]
        serch_id = my_id

        # 1-1. 文節ごとに、最後の係り先になるまでループする
        while flag:
            parent_id = bnst_dic[serch_id]["parent_id"]

            if parent_id != -1: # 係り先が最後じゃないとき、係り先のidを格納
                my_list.append(bnst_dic[serch_id]["parent_id"])
                
            else: # 係り先が最後のとき、ループを抜ける
                flag = False
                
            serch_id = parent_id # 1-2.係り先のidを次の探索idとする
            
        bnst_order_dic[my_id]  = my_list # 1-3.文節ごとに、係り受け順を格納
    
    return bnst_order_dic

In [None]:
"""
最後の係り元idを取得する

入力：構文解析結果の辞書
出力：最後の係り元idを取得     

"""
def get_bnst_end(bnst_dic):
    
    end_list = []

    # 1. 文節ごとに、他の文節の親になっているか調べる
    for my_id in bnst_dic.keys():

        # 1-1. 他の文節の親を調査
        for value in bnst_dic.values():
            
            if my_id == value["parent_id"]: # 文節の親になっている場合、false
                flag = False
                break
            else: # 文節の親になっていない場合、true
                flag = True

        if flag:
            end_list.append(my_id) # リストに追加する
            
    return end_list

In [None]:
"""
ターゲット単語の左側のidを取得する

入力：構文解析結果の辞書、係り受け関係のid順、ターゲットid
出力：ターゲット単語の左側のid (複数あり)

"""
def get_bnst_left(bnst_dic,bnst_order_dic,keyword_id):
    
    bnst_left = []
    end_list = get_bnst_end(bnst_dic)

    # 1. 係り受けルートにターゲット単語あったとき、左側の単語を取得
    for i in end_list:
        serch_list = bnst_order_dic[i]

        # 1-2. 係り受けルートにターゲット単語があるか調べる
        if keyword_id in serch_list:
            p = serch_list.index(keyword_id)            
            if p != 0:
                tmp_list = []
                
                # 1-3. ターゲット単語の左側の単語を取得
                for j in range(p+1):
                    left_id = serch_list[j]
                    tmp_list.append(left_id)

                #1-4. 取得したものをリストに格納
                bnst_left.append(tmp_list)
            else:
                bnst_left.append([0])
        
    return bnst_left

In [None]:
"""
評価用
入力：正解データ辞書、予測データ辞書
出力： 5W1Hごとの失敗パターン辞書
    - 完全： 正解チャンク
    - 一部一致： 正解チャンク, 予測チャンク
    - ラベル誤り：正解チャンク, 予測チャンク, 正解ラベル, 予測ラベル
    - 抽出漏れ： 正解チャンク
    - 過度の抽出： 予測チャンク
    
"""
def eval_report(true_wh_dic,pred_wh_dic):
    report_data = {}

    # 予測ラベルのマッチングチェックリスト (マッチしたら1)
    check_pred_dic = {}
    for k,v in pred_wh_dic.items():
        check_pred = [0 for i in range(len(v))]
        check_pred_dic[k] = check_pred
        
    # 正解ラベルのマッチングリスト
    check_true_dic = {}
    for k,v in true_wh_dic.items():
        check_true = [0 for i in range(len(v))]
        check_true_dic[k] = check_true
        
    for k,v in true_wh_dic.items():
        report_v = {}
        result_type1 = []
        result_type2 = []
        result_type3 = []
        
        for t_num, true_text in enumerate(v):
            true_id = [i for i in range(true_text[1][0],(true_text[1][1])+1)]   
            
            for anoter_pk,anoter_pts in pred_wh_dic.items(): # すべての予想ラベルから検索
                 for p_num, anoter_pt in enumerate(anoter_pts):
                        
                        pred_id = anoter_pt[1]
                        match_n = list(set(true_id) & set(pred_id))  
                        
                        if k == anoter_pk and len(match_n) > 0: # 予想も正解もチャンクとラベルが同じ
                            if len(true_id) == len(pred_id):
                                check_pred_dic[k][p_num] = 1 
                                check_true_dic[k][t_num] = 1
                                result_type1.append(true_text[0]) # 完全
                            else:
                                check_pred_dic[k][p_num] = 1 
                                check_true_dic[k][t_num] = 1
                                result_type2.append([true_text[0],anoter_pt[0]]) #一部一致
                                
                        if k != anoter_pk and len(match_n) > 0: # 予想も正解もチャンクは同じだが、ラベルが違う
                            result_type3.append([true_text[0],anoter_pt[0],k,anoter_pk]) # ラベルミス
                            check_pred_dic[anoter_pk][p_num] = 1 
                            check_true_dic[k][t_num] = 1
                        
        report_v["完全"] = result_type1
        report_v["一部一致"] = result_type2
        report_v["ラベル誤り"] = result_type3
        report_v["抽出漏れ"] = []
        report_v["過度の抽出"] = []
        report_data[k] = report_v
    
    # 抽出漏れの処理
    for k,v in check_true_dic.items():        
        noexits_ture = [i for i,c in enumerate(v) if c == 0]        
        if len(noexits_ture) > 0:
            for i in noexits_ture:
                report_data[k]["抽出漏れ"].append(true_wh_dic[k][i][0]) # 抽出漏れ
                
    
    # 誤予測の処理
    for k,v in check_pred_dic.items():        
        miss_pred = [i for i,c in enumerate(v) if c == 0]        
        if len(miss_pred) > 0:
            for i in miss_pred:
                report_data[k]["過度の抽出"].append(pred_wh_dic[k][i][0]) # 誤予測

    return report_data


In [None]:
"""
4タイプの結果の数を集計する
入力：ファイルid、4タイプの結果、ラベル
出力：1つのファイルにおけるタイプごとの結果数
"""
def cal_report(file_id,result,wh_label):
    wh_num = [0 for i in range(6)]
    wh_num[0] = file_id
    for wh in wh_label:
        for k,v in result[wh].items():
            if k == "完全":
                wh_num[1] += len(v)
            elif k == "一部一致":
                wh_num[2] += len(v)
            elif k == "ラベル誤り":
                wh_num[3] += len(v)
            elif k == "抽出漏れ":
                wh_num[4] += len(v)
            elif k == "過度の抽出":
                wh_num[5] += len(v)
                    
    return wh_num  

In [None]:
"""
全タイプの結果の数を集計する
入力：ファイルid、4タイプの結果、ラベル
出力：1つのファイルにおける、ラベルごとのタイプごとの結果数
"""
def cal_report_wh(file_id,result):
    report_data= {}
    for label,wh_value in result.items():
        
        wh_num = [0 for i in range(6)]
        wh_num[0] = file_id
        for rtype,value in wh_value.items():
            if rtype == "完全":
                wh_num[1] += len(value)
            elif rtype == "一部一致":
                wh_num[2] += len(value)
            elif rtype == "ラベル誤り":
                wh_num[3] += len(value)    
            elif rtype == "抽出漏れ":
                wh_num[4] += len(value)
            elif rtype == "過度の抽出":
                wh_num[5] += len(value)
            
        report_data[label] = wh_num
        
    return report_data

In [None]:
"""
WHO, WHEN, WHEREをknpで抽出する
"""
def extraction_3w(bnst_dic,ner_list,wh_dic,pred_wh_dic):

    w_ner_word = ""
    w_ner_wc = []
    ner_flag = False
                
    ner_list = sorted(ner_list) # 昇順 
    for ner_id in ner_list:
        ner = bnst_dic[ner_id]["ner"]
        # NERのフラグ管理
        if any(s.endswith(":B") for s in ner) and any(s.endswith(":E") for s in ner):
            ner_flag = True
        if any(s.endswith(":S") for s in ner):
            ner_flag = True
                                        
        # 1文節でNER情報が簡潔している
        if ner_flag:
            ner_word = ""
            ner_wc = []
            ner_flag = False # ner
            for i,ner_label in enumerate(ner):
                if ":S" in ner_label: # 固有表現が一つ
                    ner_word += bnst_dic[ner_id]["word"][i]
                    w_i = bnst_dic[ner_id]["word_id"][i]
                    
                    if type(w_i) is list:
                        for j in w_i:
                            ner_wc.append(j)
                    else:
                        ner_wc.append(w_i)
                                
                    label = ner_label.replace(":S","")
                    if label in wh_dic:
                        label_name = wh_dic[label]
                        pred_wh_dic[label_name].append([ner_word,ner_wc]) # データ保存
                                
                    # 初期化
                    ner_word = ""
                    ner_wc = []
                                
                elif ":E" in ner_label: # 最後の固有表現
                    ner_word += bnst_dic[ner_id]["word"][i]   
                    w_i = bnst_dic[ner_id]["word_id"][i]
                    if type(w_i) is list:
                        for j in w_i:
                            ner_wc.append(j)
                    else:
                        ner_wc.append(w_i)
                                    
                    label = ner_label.replace(":E","")
                    if label in wh_dic:
                        label_name = wh_dic[label]                                
                        pred_wh_dic[label_name].append([ner_word,ner_wc]) # データ保存
                                
                    # 初期化
                    ner_word = ""
                    ner_wc = []
                                
                elif ":B" in ner_label or ":I" in ner_label: # 最初・途中の固有表現
                    
                    if "■" in bnst_dic[ner_id]["word"][i]: # 「■」がNER誤判定されていたら、スキップ
                        continue
                        
                    ner_word += bnst_dic[ner_id]["word"][i]
                    w_i = bnst_dic[ner_id]["word_id"][i]
                    if type(w_i) is list:
                        for j in w_i:
                            ner_wc.append(j)
                    else:
                        ner_wc.append(w_i)
                                    
                                                                
        # NER情報が複数文節に係っている
        else:
            for i,ner_label in enumerate(ner):
                if ":E" in ner_label: # 最後の固有表現
                    w_ner_word += bnst_dic[ner_id]["word"][i]
                    w_i = bnst_dic[ner_id]["word_id"][i]
                    
                    if type(w_i) is list:
                        for j in w_i:
                            w_ner_wc.append(j)
                    else:
                        w_ner_wc.append(w_i)                                
                                
                    label = ner_label.replace(":E","")
                    if label in wh_dic:                        
                        label_name = wh_dic[label]
                        pred_wh_dic[label_name].append([w_ner_word,w_ner_wc]) # データ保存
                                
                    # 初期化
                    w_ner_word = ""
                    w_ner_wc = []
                                
                elif ":B" in ner_label or ":I" in ner_label: # 最初・途中の固有表現
                    
                    if "■" in bnst_dic[ner_id]["word"][i]: # 「■」がNER誤判定されていたら、スキップ
                        continue
                        
                    w_ner_word += bnst_dic[ner_id]["word"][i]
                    w_i = bnst_dic[ner_id]["word_id"][i]
                    if type(w_i) is list:
                        for j in w_i:
                            w_ner_wc.append(j)
                    else:
                        w_ner_wc.append(w_i)
                            
    return pred_wh_dic
    

In [None]:
"""
ルール1：Whatの抽出
・文節の文末に「を」があるとき、その前の文字が名詞ならば、Whatとして抽出する。
"""
def extraction_what(bnst_dic,pred_wh_dic):

    for k,v in bnst_dic.items():

        if len(v["hinsi"]) >= 2 and "名詞" == v["hinsi"][-2] and len(v["ner"][-2]) == 0: # 文節の長さと名詞とNERの有無            
            if "を" == v["word"][-1] or "に" == v["word"][-1] : # 「を/に」                
                # 文節末の「を/に」から遡って、続く名詞のindexを調べる
                # - pは文末からのindex
                # - p_listは文頭からのindex
                p_list = []
                for i in range(2,len(v["hinsi"])+1):
                    p = i * (-1) 
                    if v["hinsi"][p] == "名詞" and len(v["ner"][p]) == 0:
                        p_list.append(len(v["hinsi"])+p)                        
                    else:
                        break
                
                # 名詞のindexを元に
                wc = []
                w = ""   
                for p_i in sorted(p_list):
                    w += v["word"][p_i] 
                    w_i = v["word_id"][p_i]
                    if type(w_i) is list:
                        for j in w_i:
                            wc.append(j)
                    else:
                        wc.append(w_i) 
                
                wc = sorted(wc) # 遡って用意したものを、順番に並べる
                pred_wh_dic["WHAT"].append([w,wc]) # データ保存
                                
    return pred_wh_dic

In [None]:
"""
ルール2：Whatの抽出
・動詞節と直接的な係り受け関係のある名詞節を WHAT として抽出する。
"""
def new_extraction_what(bnst_dic,verb_list,pred_wh_dic):
    
    for verb_id in verb_list:

        for i in range(verb_id):
            if bnst_dic[i]["parent_id"] == verb_id and "動詞" not in bnst_dic[i]["hinsi"]:
                
                w_txt = ""
                wc = []
                w_flag = True
                      
                for j,w in enumerate(bnst_dic[i]["word"]):
                    
                    # 単語結合
                    w_txt += w
                    
                    # 単語数
                    w_i = bnst_dic[i]["word_id"][j]
                    if type(w_i) is list:
                        for x in w_i:
                            wc.append(x)
                    else:
                        wc.append(w_i)
                    
                    # NER・「■」有無判定
                    if len(bnst_dic[i]["ner"][j]) > 0 or "■" in bnst_dic[i]["word"][j]: 
                        w_flag = False                    
                        break
                        
                # What候補データ作成
                if w_flag:                                 
                    pred_wh_dic["WHAT"].append([w_txt,wc]) # データ保存
                    
    return pred_wh_dic

In [None]:
# テンプレートを作成する
def w2template(pred_wh_dic,text_list):
    
    template_txt = ""
    label_ids = []
    label_name = {}
    
    # 4W情報の保存
    for k,v in pred_wh_dic.items():
        if len(v) != 0:
            for v_i in v:
                # 要素の最初と最後
                sp_s = v_i[1][0]
                sp_e = v_i[1][-1] + 1
                
                # ラベル情報と要素
                label_name[sp_e] = k # ラベル情報保存
                label_ids.append(sp_s) # 要素の最初
                label_ids.append(sp_e) # 要素の最後
                      
    # テンプレート生成    
    data_text = text_list[0] # 1つめのデータを利用

    for i,sp in enumerate(sorted(label_ids)):
        
        if i == 0: # 最初
            if sp != 0:
                template_txt += data_text[0:sp]
                
        elif i == (len(label_ids)-1): # 最後
            if sp < len(data_text):
                template_txt += "<{0}>".format(label_name[sp])
                template_txt += data_text[sp:len(data_text)]
                
        elif i%2 == 0:
            template_txt += data_text[start_i:sp]
        
        elif i%2 != 0:
            template_txt += "<{0}>".format(label_name[sp])
            start_i = sp  
            
    return template_txt


In [None]:
# main

# アウトプットファイル
ex_file = "{0}/r{1}_extraction_result.csv".format(result_path,mode_id) 
f_file = "{0}/r{1}_failure_result.csv".format(result_path,mode_id) 
ex_txt = "{0}/r{1}_failure_result.txt".format(result_path,mode_id)

ex_txt_file = open(ex_txt, 'w')  #書き込みモードでオープン
output_result = []
failure_result = []
wh_result = {"WHERE":[], "WHEN":[], "WHO":[], "WHAT":[], "HOW":[], "WHY":[],"SERIF":[]}

true_wh_list = []
pred_wh_list = []

for file_id in range(1,data_size+1):
    if file_id in skip_file:
        continue
    
    print(file_id)
    file_txt = "{0}{1}.txt".format(file_info,file_id)
    file_ann = "{0}{1}.ann".format(file_info,file_id)
    
    # アノテーションデータ読み込み
    true_wh_dic = load_ann(file_ann)

    # 5W1H互換表 <MONEY/PERCENT/ARTIFACT>は利用しない
    wh_dic = {"LOCATION":"WHERE", "TIME":"WHEN","DATE":"WHEN","ORGANIZATION":"WHO","PERSON":"WHO"}
    
    # knpの5W1H予測結果
    pred_wh_dic = {"WHERE":[], "WHEN":[], "WHO":[], "WHAT":[], "HOW":[], "WHY":[],"SERIF":[]}
    
    # テキストデータ読み込み
    text_list = []
    with open(file_txt,'r') as f:       
        for line in f:    
            line_list = list(line)
            # 括弧を別に保存

            match = re.finditer(r"「(.*?)」",line)        
            for m in match:
                
                # 括弧の中身を保存する
                b_c = []
                b_t = ""
                for i in range(m.start()+1,m.end()-1):
                    b_c.append(i)
                    b_t += line_list[i]
                pred_wh_dic["WHAT"].append([b_t,b_c])
                
                # 括弧を置換する
                bracket = (m.end()-m.start())-1
                for i in range(1,bracket):
                    line_list[m.start()+i] = "■"

            text_list.append("".join(line_list))
    
    word_count = -1 # 単語サイズ
    for text in text_list:
        for l in text.split("。"):                

            if len(l) == 0:
                continue
                
            l = l + "。" # 句点を追加する
            bnst_dic,ner_list,verb_list,word_count = save_bnst(l,word_count) # 係り受け情報を取得
            
            # WHERE,WHEN,WHOの抽出 (KNPのNER情報の出力)
            if (ner_list) != 0:
                pred_wh_dic = extraction_3w(bnst_dic,ner_list,wh_dic,pred_wh_dic)
            
            # WHATの抽出
            if mode_id == 1:
                pred_wh_dic = extraction_what(bnst_dic,pred_wh_dic) # ルール1
            elif mode_id == 2:
                pred_wh_dic = new_extraction_what(bnst_dic,verb_list,pred_wh_dic) # ルール2
            else:
                sys.stdout.write('Please select a mode.')

    # 予測の評価    
    result = eval_report(true_wh_dic,pred_wh_dic)
    wh_label = ["WHERE","WHO","WHEN","WHAT"]
    c_r = cal_report(file_id,result,wh_label)
    output_result.append(c_r)

    # 結果表示
    ex_txt_file.writelines("==== {0} ====\n".format(file_id))
    ex_txt_file.writelines("{0}\n".format(line))
    ex_txt_file.writelines("\n")
    ex_txt_file.writelines("{0}\n".format(pred_wh_dic))
    ex_txt_file.writelines("{0}\n".format(w2template(pred_wh_dic,text_list)))
    
    ex_txt_file.writelines("-----------------------------\n")
    for wh_l in wh_label:
        ex_txt_file.writelines("==== {0} ====\n".format(wh_l))
        ex_txt_file.writelines("正解：{0}\n".format([t for t in true_wh_dic[wh_l]]))
        ex_txt_file.writelines("予想：{0}\n".format([t for t in pred_wh_dic[wh_l]]))
        ex_txt_file.writelines("ラベル誤り：{0}\n".format(result[wh_l]["ラベル誤り"]))
        ex_txt_file.writelines("抽出漏れ：{0}\n".format(result[wh_l]["抽出漏れ"]))
        ex_txt_file.writelines("過度の抽出：{0}\n".format(result[wh_l]["過度の抽出"]))

    ex_txt_file.writelines("-----------------------------\n") 
    ex_txt_file.writelines("{0}\n".format(["記事id","完全","一部一致","ラベル誤り","抽出漏れ","過度の抽出"]))
    ex_txt_file.writelines("{0}\n".format(c_r))
    
    # 1記事における5w1hラベルごとの結果
    for k,v in cal_report_wh(file_id,result).items():
        wh_result[k].append(v) 
      
    # 失敗データのリスト化
    for wh_l in wh_label:
        for rt in result[wh_l]["ラベル誤り"]:
            failure_result.append([file_id,wh_l,"ラベル誤り",rt])
        for rt in result[wh_l]["抽出漏れ"]:
            failure_result.append([file_id,wh_l,"抽出漏れ",rt])
        for rt in result[wh_l]["過度の抽出"]:
            failure_result.append([file_id,wh_l,"過度の抽出",rt])

ex_txt_file.close()

In [None]:
# ファイル書き込み
# 評価結果
with open(ex_file, 'w') as f:
    writer = csv.writer(f, lineterminator='\n') # 改行コード（\n）を指定しておく
    writer.writerow(["記事id","完全","一部一致","ラベル誤り","抽出漏れ","過度の抽出"])
    for o_data in output_result:
        writer.writerow(o_data)
        
# 失敗データ
with open(f_file, 'w') as f:
    writer = csv.writer(f, lineterminator='\n') # 改行コード（\n）を指定しておく
    writer.writerow(["記事id","ラベル","失敗タイプ","単語","原因"])
    for f_data in failure_result:
        writer.writerow(f_data)