### Sentence BERT 연구 코드
- 최종 수정일 : 2025.03.14
- 해당 코드는 Jupyter Notebook으로 실행이 가능합니다. Python 파일과는 달리 셀 별로 실행이 가능하다는 장점이 있습니다.
- Python은 3.11.11 버전에서 실행 확인되었습니다.

#### 적용 기술
- 해당 코드를 실행하는 데 필요한 Python Module 설치 셀 생성
- 문서 전처리 함수 정의
- 문서 전처리 자동화 구현
- 문서 평가 결과 자동 출력

#### 업데이트 예정
- 문서 전처리 함수 추가
- 벡터 데이터베이스 추가

In [None]:
# Install Modules - Just Execute Once!
!pip install -r ./requirement.txt

In [None]:
# Import Modules
from sentence_transformers import SentenceTransformer, util
from tqdm import tqdm
import pandas as pd
import numpy as np
import re, os, pickle

In [None]:
# 문서 전처리 함수 정의

# 괄호 제거 함수
def remove_parenthesse_func(text):
    return re.sub(r"[\(\{\[].*?[\)\}\]]", "", text)

# 문장 단위 분리 함수
def split_sentence(text):
    sentences = text.split(". ")
    return [sentence.strip().rstrip('.') for sentence in sentences if len(sentence) > 0 and sentence != ' '] # 비어 있는 문장 삭제

# 문자 단위 슬라이딩 윈도우 함수
def sliding_window_char(text, window_size=100, plus=1, sentence=True, remove_parenthesse=False):
    if sentence == False:
        if remove_parenthesse == True:
            text = remove_parenthesse_func(text)
        if len(text) < window_size:
            return [text]
        return [text[i:i+window_size] for i in range(0, len(text)-window_size+1, plus)]
    else:
        if remove_parenthesse == True:
            text = remove_parenthesse_func(text)
        sentences = split_sentence(text)
        result = []
        for text in sentences:
            if len(text) < window_size:
                result.extend([text])
            else:
                result.extend([text[i:i+window_size] for i in range(0, len(text)-window_size+1, plus)])
        return result

# 단어 단위 슬라이딩 윈도우 함수
def sliding_window_word(text, window_size=10, plus=1, sentence=True, remove_parenthesse=False):
    if sentence == False:
        if remove_parenthesse == True:
            text = remove_parenthesse_func(text)
        words = text.split(' ')
        if len(words) < window_size:
            return [' '.join(words).rstrip('.')]
        return [' '.join(words[i:i+window_size]).rstrip('.') for i in range(0, len(words)-window_size+1, plus)]
    else:
        if remove_parenthesse == True:
            text = remove_parenthesse_func(text)
        sentences = split_sentence(text)
        result = []
        for text in sentences:
            words = text.split(' ')
            if len(words) < window_size:
                result.extend([' '.join(words).rstrip('.')])
            else:
                result.extend([' '.join(words[i:i+window_size]).rstrip('.') for i in range(0, len(words)-window_size+1, plus)])
        return result
    
# 전처리 딕셔너리 반환 함수
def get_preprocess_fuc_list(char_window_size : int, word_window_size : int):
    return {
        f"char-{char_window_size}-sentenceO-parenthesseO" : lambda text : sliding_window_char(text, char_window_size, sentence=True, remove_parenthesse=False),
        f"char-{char_window_size}-sentenceO-parenthesseX" : lambda text : sliding_window_char(text, char_window_size, sentence=True, remove_parenthesse=True),
        f"char-{char_window_size}-sentenceX-parenthesseO" : lambda text : sliding_window_char(text, char_window_size, sentence=False, remove_parenthesse=False),
        f"char-{char_window_size}-sentenceX-parenthesseX" : lambda text : sliding_window_char(text, char_window_size, sentence=False, remove_parenthesse=True),
        f"word-{word_window_size}-sentenceO-parenthesseO" : lambda text : sliding_window_word(text, word_window_size, sentence=True, remove_parenthesse=False),
        f"word-{word_window_size}-sentenceO-parenthesseX" : lambda text : sliding_window_word(text, word_window_size, sentence=True, remove_parenthesse=True),
        f"word-{word_window_size}-sentenceX-parenthesseX" : lambda text : sliding_window_word(text, word_window_size, sentence=False, remove_parenthesse=False),
        f"word-{word_window_size}-sentenceX-parenthesseO" : lambda text : sliding_window_word(text, word_window_size, sentence=False, remove_parenthesse=True),
    }

In [None]:
# 자동화 함수 정의

# 문서 전처리 자동화 함수
def autopreprocessing(data_path = r"./data/test_dataset", result_path = r"./result", char_window_size = 100, word_window_size = 10):
    os.makedirs(f"{result_path}/preprocess", exist_ok=True)
    text_file_list = [f for f in os.listdir(data_path) if f.endswith(".txt")]
    # 전처리 함수와 이름 정의
    preprocess_fuc_list = get_preprocess_fuc_list(char_window_size, word_window_size)

    # 각 실험 폴더 생성
    for filepath in preprocess_fuc_list.keys():
        os.makedirs(f"{result_path}/preprocess/{filepath}", exist_ok=True)
    
    # 전처리 데이터 저장
    for textfile in tqdm(text_file_list, desc="Preprocessing : ", ncols=100):
        with open(f"{data_path}/{textfile}", "r", encoding="utf-8") as f:
            text = f.read()
        for key, fucn in preprocess_fuc_list.items():
            text_preprocess = fucn(text)
            with open(f"{result_path}/preprocess/{key}/{textfile}.pkl", "wb") as f:
                pickle.dump(text_preprocess, f)

# 전처리 문서 임베딩 자동화화 함수
def autoencode(result_path = r"./result", model = SentenceTransformer("jhgan/ko-sbert-sts")):
    path_list = os.listdir(f"{result_path}/preprocess")
    for path in tqdm(path_list, desc="Encoding : ", ncols=100):
        os.makedirs(f"{result_path}/encode/{path}", exist_ok=True)
        pkl_list = [f for f in os.listdir(f"{result_path}/preprocess/{path}") if f.endswith(".pkl")]
        for pkl_file in pkl_list:
            with open(f"{result_path}/preprocess/{path}/{pkl_file}", "rb") as f2:
                encode_list = model.encode(pickle.load(f2), normalize_embeddings=True)
            with open(f"{result_path}/encode/{path}/{pkl_file}", "wb") as f2:
                pickle.dump(encode_list, f2)

# 문서 간 유사도 비교 자동화 함수
def autocheck(result_path = r"./result", threshold_1 = 0.85, threshold_2 = 0.9):
    path_list = os.listdir(f"{result_path}/encode")
    os.makedirs(f"{result_path}/result/{threshold_1}-{threshold_2}", exist_ok=True)
    csv_path = f"{result_path}/result/{threshold_1}-{threshold_2}/result.csv"
    first_write = not os.path.exists(csv_path)
    for path in tqdm(path_list, "Checking : ", ncols=100):
        pkl_list = [f for f in os.listdir(f"{result_path}/encode/{path}") if f.endswith(".pkl")]
        for i in range(len(pkl_list)):
            with open(f"{result_path}/encode/{path}/{pkl_list[i]}", "rb") as f1:
                pivot_data = np.array(pickle.load(f1))
            for j in range(i+1, len(pkl_list)):
                with open(f"{result_path}/encode/{path}/{pkl_list[j]}", "rb") as f2:
                    compare_data = np.array(pickle.load(f2))
                cnt = 0
                for pivot_list in pivot_data:
                    for compare_list in compare_data:
                        sim = util.cos_sim(pivot_list, compare_list)
                        if sim >= threshold_1:
                            cnt += 1
                            break
                label = 1 if cnt >= len(pivot_data) * threshold_2 else 0

                row = pd.DataFrame([{
                    "type": path,
                    "original": pkl_list[i],
                    "compare": pkl_list[j],
                    "similarity": cnt,
                    "label": label,
                }])

                row.to_csv(csv_path, mode="a", header=first_write, index=False)
                first_write = False
                return csv_path

# 전체 자동 실험 함수
def autorun(data_path = r"./data/test_dataset", result_path = r"./result", char_window_size = 100, word_window_size = 10, model = SentenceTransformer("jhgan/ko-sbert-sts"), threshold_1 = 0.85, threshold_2 = 0.90): # Model은 일단 유지
    autopreprocessing(data_path, result_path, char_window_size, word_window_size)
    print("문서 전처리 완료")
    autoencode(result_path, model)
    print("문서 임베딩 완료")
    csv_path = autocheck(result_path, threshold_1, threshold_2)
    print(f"완료되었습니다. 결과가 {csv_path} 에 저장되었습니다.")

In [None]:
# 실행
autorun(data_path=r"../data/test_dataset", result_path=r"./result", char_window_size=100, word_window_size=10, threshold_1=0.85, threshold_2=0.90)