In [None]:
# import saved modules
from modules.KorEDA.eda import EDA
from modules.preprocess_for_kobert import preprocess

In [None]:
import pandas as pd
import numpy as np
import re

from modules.preprocess_for_kobert import preprocess

In [None]:
# pandas 설정: 각 column 별 (혹은 한 cell에서) 출력되는 글자수 제한을 없애기
pd.set_option('display.max_colwidth', -1)
pd.__version__

### Bring Dataset

In [None]:
DATA_PATH = "./data"

In [None]:
df_original = pd.read_csv(f"{DATA_PATH}/몽데이크_Open.csv", encoding="utf-8")
df_original.sample(2)

In [None]:
df_train = pd.read_csv(f"{DATA_PATH}/data_preprocessed.csv", encoding="utf-8")
df_train.sample(2)

In [None]:
df_eval = pd.read_csv(f"{DATA_PATH}/hidden_for_inference.csv", encoding="utf-8")
df_eval.head(2)

In [None]:
# Sample tokenizing for notations
from kobert_transformers import get_tokenizer
tokenizer = get_tokenizer()
# tokenizer.tokenize("√")
tokenizer.tokenize("|")

### P() -> 확률함수()로 바꾸기

In [None]:
# regex for P(a,b) type shape
text = "좌표평면 위의 원점에서 출발하는 점 `P(x, y)`는 주사위를 던져 `1`, `2`, `3`, `4`의 눈이 나오면 `x`축의 방향으로 `1`만큼, `5`, `6`의 눈이 나오면 `y`축의 방향으로 `1`만큼 움직인다. 주사위를 `20`번 던질 때, 점 `P`의 `x`좌표를 확률변수 `X`라고 하고 점 `P`의 `y`좌표를 확률변수 `Y`라고 할 때, `E(3X)+V(3Y-5)`를 구하면?"
re.findall('P\([\w\+\-\(\)/]{1,10},\s?[\w\+\-\(\)/]{1,10}\)',text)

In [None]:
def substitute_probability(text):
    if "다항식" in text:
        return text
    if "식을 `" in text:
        return text
    if "방정식" in text:
        return text
    if "점 `" in text or "점을 `" in text:
        return text
    if (
        "점" in text
        and re.findall(r"P\([\w\+\-\(\)/]{1,10},\s?[\w\+\-\(\)/]{1,10}\)", text) != []
    ):

        return text
    else:
        substituted = re.sub("P\(", "확률함수(", text)
        return substituted

In [None]:
# copy training dataset for preprocessing
df_testing = df_train.copy()

# apply substitute probability function
df_testing["text"] = df_testing["text"].apply(lambda x: substitute_probability(x))

# remaining P() that did not go through substitute probability function
df_false_positive = df_testing[(df_testing["text"].str.contains("P\(")) & (df_testing["chapter"].str.startswith("HSTA"))]
df_false_positive

In [None]:
# 잘못된 확률함수()의 대체 예시
df_true_negative = df_testing[(df_testing["text"].str.contains("확률함수\(")) & (~df_testing["chapter"].str.startswith("HSTA"))]
df_true_negative[["chapter","text"]]

In [None]:
# P()에서 몇 개나 확률함수()로 대체됐는지
df_true_positive = df_testing[(df_testing["text"].str.contains("확률함수\(")) & (df_testing["chapter"].str.startswith("HSTA"))]
len(df_true_positive)

### evaluation dataset에서 작동하는 지 확인

In [None]:
df_testing = df_eval.copy()
df_testing = preprocess(df_testing)
df_testing[(df_testing["text"].str.contains("확률함수 \("))].head()


In [None]:
df_testing[(df_testing["text"].str.contains("P \("))]

### E() -> 기댓값()으로 바꾸기

In [None]:
# reset training dataset for preprocessing
df_testing = df_train.copy()

# 일괄적으로 바꾸기
df_testing["text"] = df_testing["text"].apply(lambda x: re.sub("E\(", "기댓값(", x) if not "기울기" in x else x)  # 0개 잘못 분류

# 잘못된 E()-> 기댓값() 대체 예시
df_true_negative = df_testing[(df_testing["text"].str.contains("기댓값\(")) & (~df_testing["chapter"].str.startswith("HSTA"))]
print(len(df_true_negative))
df_true_negative

In [None]:
# 올바른 E() -> 기댓값() 대체 예시
df_true_positive = df_testing[(df_testing["text"].str.contains("기댓값\("))]
print(len(df_true_positive))
df_true_positive.head(2)

In [None]:
# 아직 안 바뀐 E() 들
df_true_negatives = df_testing[(df_testing["text"].str.contains("E\("))]
print(len(df_true_negatives))
df_true_negatives.head(2)

## Evaluation Dataset에서도 올바르게 작동하는지 확인

### 집합의 조건제시법 |, 조건부확률의 |, 절대값 || 다르게 처리

In [None]:
def preprocess_pipe(input_text: str) -> dict:
    list_fucked_up = []
    list_jiphap = []
    list_conditional = []
    list_absolute = []

    list_math_matched = re.findall(r"(?<=`).*?(?=`)", input_text)
    list_math = [
        math for math in list_math_matched if list_math_matched.index(math) % 2 == 0
    ]
    for item in list_math:
        if "|" in item and "{" in item:
            # print(item, "집합")
            list_jiphap.append(item)
        elif "|" in item and "(" in item and ")" in item:
            # print("conditional")
            list_conditional.append(item)
        elif item.count("|") >= 2:
            list_absolute.append(item)
            # print(item, "absolute_value")
    if len(list_jiphap)*len(list_conditional)*len(list_absolute) !=0:
        return "FUCKED"
    if len(list_jiphap+list_conditional+list_absolute) ==0:
        return np.NaN
    else: 
        dict_return = {"집합": list_jiphap, "조건부":list_conditional, "절댓값": list_absolute}
        return dict_return


In [None]:
sample_text3 = "a, b, n, `|a+b|=|a|+|b|`, ab >= 0, a^2+b^2=0, ab=0, ab=0, a=0, b=0, n^2, 3, n, 3"                                                       
preprocess_pipe(sample_text3)

In [None]:
re.sub("\|","테스트|",sample_text3)

In [None]:
df_train["pipes"] = df_train["text"].apply(lambda x: preprocess_pipe(x))

In [None]:
# 집합, 조건부, 절댓값이 겹치는 경우가 없음.
# df_fucked = df_train[df_train["pipes"].str.match("FUCKED")]

In [None]:
pd.set_option('display.max_rows', 20)
df_pipe = df_train[df_train["pipes"].notna()]
# df_pipe[0:300]
df_pipe = df_pipe.set_index("qplay_question_id")
df_pipe.sample(20)

In [None]:
# def sub_func_replace_pipe(input_text: str) -> str:
#     if "|" in input_text and "{" in input_text:
#         input_text = re.sub("\|", "조건제시법|", input_text)
#     elif "|" in input_text and "확률함수(" in input_text and ")" in input_text:
#         if input_text.count("|") % 2 == 0:
#             input_text = re.sub("\|", "절대값|", input_text)
#         if input_text.count("|") % 2 == 1:
#             input_text = re.sub("\|", "조건부확률|", input_text)
#     elif input_text.count("|") >= 2:
#         input_text = re.sub("\|", "절대값|", input_text)
#     else:
#         return input_text
#     result_text = input_text
#     return result_text

def sub_func_replace_pipe(input_text: str) -> str:
    if "|" in input_text and "{" in input_text:
        input_text = re.sub("\|", "조건제시법|", input_text)
    elif "|" in input_text and "확률함수(" in input_text and ")" in input_text:
        words_broken_down = []
        # word_broken_by_bracket = re.findall(r'\([^)]*\)', input_text)
        word_broken_by_bracket = re.findall('\[[^\]]*\]|\([^\)]*\)|\"[^\"]*\"|\S+',input_text)
        print(word_broken_by_bracket)
        for i in word_broken_by_bracket:
            print(i)
            if i.count("|") % 2 == 0:
                i = re.sub("\|", "절대값|", i)
                words_broken_down.append(i)
            elif i.count("|") % 2 == 1:
                i = re.sub("\|", "조건부확률|", i)
                words_broken_down.append(i)
            else:
                words_broken_down.append(i)
        input_text = "".join(words_broken_down)
    elif input_text.count("|") >= 2:
        input_text = re.sub("\|", "절대값|", input_text)
    else:
        return input_text
    result_text = input_text
    return result_text

In [None]:

#  if list_math_matched.index(math) % 2 == 0
def main_replace_pipe(input_text: str) -> str:
    # print(input_text, "\n")

    list_to_return = []
    # list_separated = re.findall(r'(?<=`).*?(?=`)', input_text)
    # list_separated = input_text.split("`")

    delimiter = "`"
    if input_text.startswith(delimiter) == True:
        bool_starts_with_delimiter = True
    elif input_text.startswith(delimiter) == False:
        bool_starts_with_delimiter = False
    list_separated = [delimiter + e for e in input_text.split(delimiter) if e]

    # print(list_separated, "\n")
    for item in list_separated:
        if "|" in item and "{" in item:
            item = sub_func_replace_pipe(item)
            # item = f"`{item}`"
            list_to_return.append(item)
        elif "|" in item and "확률함수(" in item and ")" in item:
            item = sub_func_replace_pipe(item)
            # item = f"`{item}`"
            list_to_return.append(item)
        elif item.count("|") >= 2:
            item = sub_func_replace_pipe(item)
            # item = f"`{item}`"
            list_to_return.append(item)
        else:
            list_to_return.append(item)
    # print(list_to_return, "\n")
    str_to_return = "".join(list_to_return)
    if bool_starts_with_delimiter == True:
        return str_to_return
    elif bool_starts_with_delimiter == False:
        return str_to_return[1:]

In [None]:
main_replace_pipe("정규분포 `N(m, 9)`에 따르는 확률변수 `X`에 대하여 `확률함수(|X| <= 63)=0.6826`일 때, 상수 `m`의 값은?")

In [None]:
import random

rand_int = random.randrange(0, 101)
sample_text = df_pipe["text"].iloc[rand_int]
print(main_replace_pipe(sample_text))

In [None]:
df_test_pipe = preprocess(df_original, korean=True, space=True, condition=True)
df_test_pipe = df_test_pipe.set_index("qplay_question_id")
df_test_pipe.sample(2)

In [None]:
df_test_pipe_lookup = df_test_pipe.loc[df_pipe.index]
print(df_test_pipe_lookup.shape)
df_test_pipe_lookup.sample(15)

In [None]:
df_test_pipe_lookup[df_test_pipe_lookup["qtid"].str.startswith("HSTA")].sample(20)

In [None]:
df_train.loc[df_train["qplay_question_id"] == 9727][["text"]]

In [None]:
df_train.loc[df_train["qplay_question_id"] == 9318][["text"]]

In [None]:
df_test_pipe_lookup.loc[9318][["text"]]

In [None]:
df_train.loc[df_train["qplay_question_id"] == 9146][["text"]]

In [None]:
df_test_pipe.loc[9146][["text"]]

In [None]:
problem_text = "정규분포 `N(m ,4^2)`을 따르는 모집단에서 임의추출한 크기가 `64`인 표본의 표본평균이 `32`일 때, 모평균 `m`의 신뢰도 `95%`의 신뢰구간은? (단, `확률함수(|Z| <= 2)=0.95`)"
main_replace_pipe(problem_text)

In [None]:
df_eval_test_pipe = preprocess(df_eval, korean=True, space=True, condition=True)