In [3]:
from pathlib import Path
import json
import pandas as pd
import numpy as np
from numpy.lib.function_base import select
import spacy
import torch
import re

# from transformers import RobertaTokenizer, RobertaForMaskedLM, RobertaConfig
# from transformers import BertTokenizer, BertModel

from sklearn.model_selection import train_test_split
from sklearn.multioutput import MultiOutputClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn import metrics

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.modules import loss
import torch.optim as optim

from pyknp import Juman
from sentence_transformers import SentenceTransformer

import pickle




In [4]:
class preprocessor:

    def __init__(self) -> None:
        self.nlp = spacy.load('ja_ginza')
        # self.model_path = "/home/yamada/Downloads/training_bert_japanese"
        # self.sen_model = SentenceTransformer(self.model_path, show_progress_bar=False)

        # 半角全角英数字
        # self.DELETE_PATTERN_1 = re.compile(r'[0-9０-９a-zA-Zａ-ｚＡ-Ｚ]+')
        # 記号
        self.DELETE_PATTERN_2 = re.compile(
            r'[\．_－―─！＠＃＄％＾＆\-‐|\\＊\“（）＿■×+α※÷⇒—●★☆〇◎◆▼◇△□(：〜～＋=)／*&^%$#@!~`){}［］…\[\]\"\'\”\’:;<>?＜＞〔〕〈〉？、。・,\./『』【】「」→←○《》≪≫\n\u3000]+')
        
        self.emb_size = self.get_sentence_vec("emb").shape[0]
        print(self.emb_size)

    def get_sentence_vec(self, sen) -> np.array:
        # sen_ = self.DELETE_PATTERN_1.sub(sen)
        sen_ = self.DELETE_PATTERN_2.sub("", sen)
        sentence_vec = self.nlp(sen_).vector
        # sentence_vec = self.sen_model.encode(sen)[0]
        return sentence_vec
    
    def read_json_with_NoErr(self, path:str, datalist:list) -> pd.DataFrame:
        cols = ['did', 'tid', 'usr', 'sys', 'ec']
        df = pd.DataFrame(index=[], columns=cols)

        for p in datalist:
            datapath = Path(path + p + '/')
            for file in datapath.glob("*.json"):
                # print(file)
                with open(file, "r") as f:
                    json_data = json.load(f)
                    did = json_data["dialogue-id"]
                    for t in json_data["turns"]:
                        if t["turn-index"] == 0:
                            continue
                        if t["speaker"] == "U":
                            usr = t["utterance"]
                            continue
                        if t["speaker"] == "S" :
                            tid = t["turn-index"]
                            sys = t["utterance"]
                            if t["error_category"]:
                                ec = t["error_category"]
                            else:
                                ec = ["No-Err"]
                            df = df.append(pd.DataFrame([did, tid, usr, sys, ec], index = cols).T)
        df.reset_index(inplace=True, drop=True)
        return df
    
    def make_error_dict(self, error_types):
        error_dict = {}
        for e in error_types:
            error_dict[e] = len(error_dict)
        return error_dict
    
    def extract_X_y(self, df:pd.DataFrame, error_types, prev_num) -> np.array:
        # nlp = spacy.load('ja_ginza')
        
        did = df.did[0]
        n = prev_num
        # print(did)
        # 全体
        X_data = []
        y_data = []
        # 各 did 
        sequence_did = []
        y_did = []
        # エラーの辞書定義
        error_dict = self.make_error_dict(error_types)

        # 初期の調整 padding
        for i in range(n-1):
            sequence_did.append(
                np.concatenate( [np.zeros(self.emb_size), np.zeros(self.emb_size)])
            )

        # didごとに返却する？
        # エラーが発生したら、開始からエラーまでの文脈を入力とする(N=5の固定長でも可能)
        # 先にこのベクトル列を作成し，Tensorに変換して， List に保持
        for d, u, s, e in zip(df.did, df.usr, df.sys, df.ec):
            if did != d:
                did = d
                sequence_did = []
                y_did = []
                # 初期の調整 padding
                for i in range(n-1):
                    sequence_did.append(
                            np.concatenate( [np.zeros(self.emb_size), np.zeros(self.emb_size)])
                        )
                # break

            # sequence_did.append([u, s])
            sequence_did.append(
                    np.concatenate(
                        [self.get_sentence_vec(u), self.get_sentence_vec(s)]
                    )
                # [u, s]
            )
            if e[0] == "No-Err":
                continue
            else:
                y_each_error_label = np.zeros(len(error_types))
                for e_ in e:
                    y_each_error_label[error_dict[e_]] = 1
                X_data.append(sequence_did[-n:])
                # y_did = np.array(y_each_error_label)
                y_data.append(y_each_error_label)
        return np.array(X_data), np.array(y_data)

In [5]:
class DataManager:
    def __init__(self, data_path) -> None:
        import os
        import pickle
        self.data_path = data_path
        os.makedirs(data_path, exist_ok=True)
        self.dir = os.listdir(data_path)

    def is_exist(self, name):
        return (name in self.dir)
    
    def save_data(self, name, obj):
        with open(self.data_path+name, "wb") as f:
            pickle.dump(obj, f)
        print("success save : {0}{1}".format(self.data_path, name))

    def load_data(self, name):
        with open(self.data_path+name, "rb") as f:
            obj = pickle.load(f)
        print("success load : {0}{1}".format(self.data_path, name))
        return obj

In [6]:
def predict_at_least_oneClass(clf, X) -> np.array:
    y_pred = clf.predict(X)
    p = clf.predict_proba(X)
    # print(y_pred)
    proba = np.array([[p[c][i][1] if (p[c][i].shape[0]!=1) else 0 
                     for c in range(len(error_types))] for i in range(len(X))])
    # print(proba)
  # replace [] to the highest probability label
    y_pred2 = np.empty((0, len(error_types)), int)
    for y, pr in zip(y_pred, proba):
        if  (sum(y) == 0):
            ans = np.zeros_like(y)
            ans[np.argmax(pr)] = 1
        else:
            ans = y
        y_pred2 = np.append(y_pred2, np.array([ans]), axis=0)
    return y_pred2

In [7]:
pre = preprocessor()

300


In [8]:

path = './error_category_classification/dbdc5_ja_dev_labeled/'
datalist = ['DCM', 'DIT', 'IRS']
# datalist = ['DCM']
    # List of error types
error_types = ['Unclear intention', 'Wrong information',
    'Ignore question', 'Topic transition error', 
    'Lack of information', 'Repetition', 
    'Contradiction', 'Self-contradiction',
    'Lack of common sense', 'Semantic error',
    'Grammatical error', 'Ignore proposal', 
    'Ignore offer', 'Lack of sociality', 
    'Uninterpretable', 'Ignore greeting', 
    'No-Err']
df = pre.read_json_with_NoErr(path, datalist)
print(df.shape)


(2000, 5)


In [11]:
seq_len = 1
# mode = "ginza"
# mode = "senBERT"
mode = "MOC"
OUTPUT_DIM = 8

data_path = "./X_y_data/seq{0}/".format(seq_len)

model_path = "./models/seq{0}/".format(seq_len)

files = "_".join(datalist)
data_name = "data_{0}_{1}.pickle".format(mode, files)
model_name = "model_{0}.pickle".format(mode)
print(data_name)
print(model_name)

modelM = DataManager(model_path)
# modelM.is_exist(model_name)
dataM = DataManager(data_path)

data_MOC_DCM_DIT_IRS.pickle
model_MOC.pickle


In [12]:
if dataM.is_exist(data_name):
    
    DATA_Xy = dataM.load_data(data_name)
    X_data = DATA_Xy[0]
    y_data = DATA_Xy[1]
else:
    X_data, y_data = pre.extract_X_y(df, error_types, seq_len)
    dataM.save_data(data_name, [X_data, y_data])

success load : ./X_y_data/seq1/data_MOC_DCM_DIT_IRS.pickle


In [13]:
if modelM.is_exist(model_name):
    model = modelM.load_data(model_name)
else:
    print("Not Exist Model data : you must train the model")

Not Exist Model data : you must train the model


In [14]:
X_train, X_test, y_train, y_test = train_test_split(X_data[:,0,:], y_data, test_size=0.30, random_state=5)

In [15]:
clf = MultiOutputClassifier(AdaBoostClassifier()).fit(X_train, y_train)
y_pred = predict_at_least_oneClass(clf, X_test)



EM: 0.4962962962962963
F-measure:  0.5909465020576131


In [16]:
print('EM:', metrics.accuracy_score(y_test[:,:OUTPUT_DIM], y_pred[:, :OUTPUT_DIM]))
print('F-measure: ', metrics.f1_score(y_test[:,:OUTPUT_DIM], y_pred[:, :OUTPUT_DIM], average='samples'))

EM: 0.4962962962962963
F-measure:  0.5860082304526748


  average, "true nor predicted", 'F-score is', len(true_sum)


In [17]:
modelM.save_data(model_name, clf)

success save : ./models/seq1/model_MOC.pickle


In [20]:
for i in range(OUTPUT_DIM):
    print("error[{0}]  accuracy: {1}".format(i,metrics.accuracy_score(y_test[:, i], y_pred[:, i])))

error[0]  accuracy: 0.7876543209876543
error[1]  accuracy: 0.6790123456790124
error[2]  accuracy: 0.8962962962962963
error[3]  accuracy: 0.8345679012345679
error[4]  accuracy: 0.9654320987654321
error[5]  accuracy: 0.9604938271604938
error[6]  accuracy: 0.9876543209876543
error[7]  accuracy: 0.9901234567901235
