In [None]:
import pandas as pd
import numpy as np
import plotly.express as px
from datasets import list_metrics,load_metric
from sklearn.metrics import confusion_matrix

# Evaluation function

In [None]:
def performance(y_ture,y_pred):
    f1_metric = load_metric("f1")
    re_metric = load_metric("recall")
    pre_metric = load_metric("precision")
    type_c_int = list(set(np.concatenate([y_ture, y_pred])))
    type_c = [str(i) for i in type_c_int]
    
    f1_m_list = []
    re_m_list = []
    pre_m_list = []
    
    for i in type_c_int:
        bi_ture = list(y_ture == i)
        bi_pred = list(y_pred == i)
        f1_m_results = f1_metric.compute(predictions=bi_pred, references=bi_ture, average="macro")
        re_m_results = re_metric.compute(predictions=bi_pred, references=bi_ture, average="macro")
        pre_m_results = pre_metric.compute(predictions=bi_pred, references=bi_ture, average="macro")
        
        f1_m_list.append(f1_m_results["f1"])
        re_m_list.append(re_m_results["recall"])
        pre_m_list.append(pre_m_results["precision"])
        
    data = {'Class_type':type_c_int,'F1-macro':f1_m_list,'Recall-macro':re_m_list,'Precision-macro':pre_m_list}
    df = pd.DataFrame(data)
    display(df)
    
    
    z = confusion_matrix(y_ture, y_pred)
    x_lab = type_c

    fig = px.imshow(z, 
                    text_auto=True,
                    labels=dict(x="True label", y="Predicted label", color="times"),
                    x=x_lab,
                    y=x_lab)
#     fig.show()
    
    return z,fig

# Loading the data for use

In [None]:
!pip install -U sentence-transformers
!pip install openpyxl

In [None]:
from copy import deepcopy
from random import randint,shuffle

In [None]:
def read_and_split_the_excel(QA_path):
    """
    :func: 根据xlsx文件获取问题list和答案list（需要更新openyxl）
    :param path: 文件路径
    :return: 问题list，答案list
    """
    # 读取文件
    df1 = pd.read_excel(QA_path)
    # 分开
    question_list = df1.iloc[:,0].tolist()
    answer_list = df1.iloc[:,1].tolist()
    # 返回
    return question_list,answer_list

In [None]:
# 测试read_and_split_the_excel
question_list,answer_list = read_and_split_the_excel("../input/uic-cn-admission/CN_QA_dataset_all.xlsx")
display(question_list[:3])

In [None]:
def read_and_split_the_01(zero_one_path):
    """
    :func: 根据xlsx文件获取原始list和测试list和label
    :param path: 文件路径
    :return: 问题list，答案list
    """
    # 读取文件
    df1 = pd.read_csv(zero_one_path)
    # 分开
    Sen1_list = df1.iloc[:,0].tolist()
    Sen2_list = df1.iloc[:,1].tolist()
    label_list = df1.iloc[:,2].tolist()
    # 返回
    return Sen1_list,Sen2_list,label_list

In [None]:
# 测试read_and_split_the_excel
Sen1_list, Sen2_list, label_list = read_and_split_the_01("../input/01-uic-rm-dup/01_all_rm_dup.csv")
display(Sen1_list[:3])
display(Sen2_list[:3])
display(label_list[:3])

In [None]:
def shuffle_without_repeated(list_):
    temp_list = deepcopy(list_)
    m = len(temp_list)
    m = m-1
    for i_current in range(m,1,-1):
        rest = i_current - 1
        i_replace = randint(0, rest)
#         print(i_current)
#         print(i_replace)
        temp_list[i_current], temp_list[i_replace] = temp_list[i_replace], temp_list[i_current]
    return temp_list
    
def obtain_shuffle_01(ori_list):
    shuffle_q_list = shuffle_without_repeated(ori_list)
    
    shuffle_label_list = [0]*len(shuffle_q_list)
    
    return ori_list,shuffle_q_list,shuffle_label_list

In [None]:
# Test the shuffle
question_list = ['The cat sits outside',
      'A man is playing guitar',
      'The new movie is awesome',
      'The new opera is nice']
obtain_shuffle_01(question_list)

In [None]:
def read_qa_and_expand_training_set(QA_path, zero_one_path):
    # get the qa_data
    question_list,answer_list = read_and_split_the_excel(QA_path)
    # get the 01_data
    Sen1_list, Sen2_list, label_list = read_and_split_the_01(zero_one_path)
    # get expand 01 data
    ori_list,shuffle_q_list,shuffle_label_list = obtain_shuffle_01(question_list)
    Sen1_list.extend(ori_list)
    Sen2_list.extend(shuffle_q_list)
    label_list.extend(shuffle_label_list)
    
    # get the index of Sen1_list corresponding to the question_list
    question_list_index = range(len(question_list))
    question_index_dict = dict(zip(question_list,question_list_index))
    
    Sen1_list_index = []
    for i in Sen1_list:
        Sen1_list_index.append(question_index_dict[i])
        
#     print(Sen1_list_index)
    
    return question_list,answer_list,Sen1_list,Sen1_list_index,Sen2_list,label_list

In [None]:
# Test for getting all the QA data and the zero_one data
QA_path = "../input/uic-cn-admission/CN_QA_dataset_all.xlsx"
zero_one_path = "../input/01-uic-rm-dup/01_all_rm_dup.csv"
question_list,answer_list,Sen1_list,Sen1_list_index,Sen2_list,label_list = read_qa_and_expand_training_set(QA_path, zero_one_path)

# display the sample result
display(question_list[:3])
display(answer_list[:3])
display(Sen1_list[:3])
display(Sen1_list_index[:10])
display(Sen2_list[:3])
display(label_list[:3])

In [None]:
import time
from tqdm import tqdm

# test
with tqdm(total=200) as pbar:
    pbar.set_description('Processing')
    # total表示总的项目, 循环的次数20*10(每次更新数目) = 200(total)
    for i in range(20):
        # 进行动作, 这里是过0.1s
        time.sleep(0.1)
        # 进行进度更新, 这里设置10个
        pbar.update(10)

In [None]:
# 安装包
# pip install -U sentence-transformers
# pip install -U transformers
# pip install openpyxl
from sentence_transformers import SentenceTransformer, util
import pandas as pd
from copy import deepcopy
from random import randint
from termcolor import colored

def SBERT_get_reply(model, query, question_list, answer_list, question_list_emb, topk_SBERT, threshold_SBERT):
    # prepared for queries
    queries = [query]
    query_embeddings = model.encode(queries, convert_to_tensor=True)
    if_valid = 0

    index_ranked = []
    tensor_scores = []

    # search the best answer
    #     for query, query_embedding in zip(queries, query_embeddings):
    cosine_scores = util.pytorch_cos_sim(query_embeddings, question_list_emb)[0]
    results = zip(range(len(cosine_scores)), cosine_scores)
    # 第一个是按照score排序的index，第二个为对应的score但是是tensor格式
    results = sorted(results, key=lambda x: x[1], reverse=True)

    for index, tensor_score in results:
        index_ranked.append(index)
        tensor_scores.append(tensor_score)

    if tensor_scores[0] > threshold_SBERT:
        if_valid = 1

#     # 回答答案
#     print('top few questions(TFIDF: %d) similar to "%s"' % (topk_SBERT, colored(query, 'green')))
#     print("The best similarity for TF-IDF is:", tensor_scores[0])

    # 得到前几个的index
    topk_idx_SBERT = index_ranked[:topk_SBERT]

#     for index, idx in enumerate(topk_idx_SBERT):
#         print('SBERT; %s\t%s' % (colored('%.4f' % tensor_scores[index], 'cyan'), colored(question_list[idx], 'yellow')))

#     if if_valid:
#         print(answer_list[index_ranked[0]])

    return if_valid, topk_idx_SBERT, tensor_scores


def use_model_qa(model_path, QA_path, zero_one_path):
    print("数据准备中")
    model = SentenceTransformer(model_path,device='cuda')

    topk_SBERT = 3
    threshold_SBERT = 0.6

    # data embedding
    question_list, answer_list = read_and_split_the_excel(QA_path)
    question_embeddings = model.encode(question_list, convert_to_tensor=True)
    # prepared testing data
    question_list,answer_list,Sen1_list,Sen1_list_index,Sen2_list,label_list = read_qa_and_expand_training_set(QA_path, zero_one_path)
    
    print("准备完毕")
    
    predict_result = []
    
#     with tqdm(total=len(Sen1_list)) as pbar:
#         pbar.set_description('正在测试')
    for index, test_query in enumerate(Sen2_list):

        if_valid, topk_idx_SBERT, tensor_scores = SBERT_get_reply(model, test_query, question_list, answer_list, question_embeddings, topk_SBERT, threshold_SBERT)
            
        if topk_idx_SBERT[0] == Sen1_list_index[index]:
            prediction = 1
        else:
            prediction = 0
            
        predict_result.append(prediction)
            
#             print(question_list[topk_idx_TF[0]])
#             print(Sen2_list[index])
#             print(Sen1_list[index])
            
#             pbar.update(1)
    print("Model:",model_path)
    cf_matrix_test,figure_test = performance(label_list,predict_result)
    figure_test.show()

# # Test for all
# print("数据准备中")
# model = SentenceTransformer(model_path)

# topk_SBERT = 3
# threshold_SBERT = 0.7

# # data embedding
# question_list,answer_list = read_and_split_the_excel(QA_path)
# question_embeddings = model.encode(question_list,convert_to_tensor=True)
# print("准备完毕")
# # 获得问题
# q = "UIC是"
# SBERT_get_reply(model, q, question_list, answer_list, question_embeddings, topk_SBERT, threshold_SBERT)

def SBERT_QA_test(model_path):
    # model_path = '.\SBert_CN_fine_tune'
    # model_path = '.\sn_xlm_roberta_base'
    # model_path = '.\deberta_sentence_transformer'

    QA_path = "../input/uic-cn-admission/CN_QA_dataset_all.xlsx"
    zero_one_path = "../input/01-uic-rm-dup/01_all_rm_dup.csv"
    use_model_qa(model_path, QA_path, zero_one_path)

In [None]:
model_path = "symanto/sn-xlm-roberta-base-snli-mnli-anli-xnli"
SBERT_QA_test(model_path)