In [None]:
import pandas as pd
from sentence_transformers import SentenceTransformer
import numpy as np
from tqdm.auto import tqdm

train_df = pd.read_csv('../data/train.csv', encoding = 'utf-8-sig')

### 정답 문장 임베딩 벡터 변환

In [None]:
# 정답 문장들
train_answers = train_df['answer'].tolist()

embedding_model_name = "upskyy/bge-m3-korean"
embedding_model = SentenceTransformer(embedding_model_name)

# 임베딩 생성
train_answer_embeddings = embedding_model.encode(train_answers)

### 코사인 유사도 행렬 생성

In [None]:
def cosine_similarity_matrix(embeddings):
    """
    임베딩 배열(2차원, shape=(n, d))에 대해
    벡터화된 코사인 유사도 행렬을 계산합니다.
    """
    norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
    # 0으로 나누는 것을 방지하기 위해, 0인 경우 1로 치환
    normed = embeddings / np.where(norms == 0, 1, norms)
    similarity = np.dot(normed, normed.T)
    # norm이 0인 경우 해당 행/열의 유사도를 0으로 설정
    zero_norm_mask = (norms == 0).reshape(-1)
    similarity[zero_norm_mask] = 0
    similarity[:, zero_norm_mask] = 0
    return similarity

def jaccard_similarity_matrix(answers):
    """
    문장 리스트에 대해 자카드 유사도 행렬을 계산합니다.
    각 문장을 미리 단어 집합으로 변환한 후, 상삼각행렬만 계산합니다.
    """
    n = len(answers)
    token_sets = [set(answer.split()) for answer in answers]
    jaccard_mat = np.zeros((n, n))
    
    for i in tqdm(range(n), desc="자카드 계산 진행"):
        for j in range(i + 1, n):
            inter = len(token_sets[i] & token_sets[j])
            union = len(token_sets[i] | token_sets[j])
            sim = inter / union if union != 0 else 0
            jaccard_mat[i, j] = sim
            jaccard_mat[j, i] = sim
    return jaccard_mat

def select_sentence(answers, answer_embeddings):
    """
    코사인 유사도와 자카드 유사도를 결합하여 각 문장의 평균 유사도를 계산한 후,
    가장 높은 평균 유사도를 가지는 문장을 고정 문장으로 선택합니다.
    """
    n = len(answers)
    
    # 전체 코사인 유사도 행렬 계산 (벡터화)
    cos_sim = cosine_similarity_matrix(answer_embeddings)
    
    # 자카드 유사도 행렬 계산 (미리 전처리된 단어 집합 사용)
    jaccard_sim = jaccard_similarity_matrix(answers)
    
    # 가중합: 코사인 유사도 0.7, 자카드 유사도 0.3
    sim_matrix = 0.7 * cos_sim + 0.3 * jaccard_sim
    
    # 각 문장에 대한 평균 유사도 (자기 자신 제외)
    avg_scores = sim_matrix.sum(axis=1) / (n - 1)
    
    # 평균 유사도가 가장 높은 문장 선택
    selected_sentence_index = np.argmax(avg_scores)
    score = avg_scores[selected_sentence_index]
    
    return cos_sim, jaccard_sim, selected_sentence_index, score

cos_sim = cosine_similarity_matrix(train_answer_embeddings)

In [None]:
cos_sim, jaccard_sim, selected_sentence_index, score = select_sentence(train_answers, train_answer_embeddings)
print(f"선정된 문장: {train_answers[selected_sentence_index]}, Score: {score:.4f}")

In [None]:
# 유사도 행렬 저장
np.save('../data/cos_sim_matrix.npy', cos_sim)
np.save('../data/jaccard_sim_matrix.npy', jaccard_sim)

### 대표 문장 탐색

In [2]:
# 'answer' 컬럼 기준으로 중복 제거
train_df = train_df.drop_duplicates(subset=['answer'])

# 예시: cosine_sim은 (n x n) 코사인 유사도 행렬입니다.
cosine_sim = np.load("../data/cos_sim_matrix.npy")

# train_df의 인덱스를 기준으로 cosine_sim 행렬 재구성
train_indices = train_df.index.values
cosine_sim = cosine_sim[np.ix_(train_indices, train_indices)]

# 초기 남은 인덱스 집합 (전체 인덱스)
remaining_indices = np.arange(cosine_sim.shape[0])
iteration = 0

while len(remaining_indices) > 1:
    print(f"Iteration {iteration}: 남은 인덱스 개수 -> {len(remaining_indices)}")
    
    # 남은 인덱스에 해당하는 부분 행렬(submatrix) 선택
    submatrix = cosine_sim[np.ix_(remaining_indices, remaining_indices)]
    
    # 각 행의 합 계산 (남은 인덱스에 한정)
    row_sums = submatrix.sum(axis=1)
    
    # 가장 합이 큰 인덱스 선택 (submatrix 내 상대 인덱스)
    max_idx_in_remaining = np.argmax(row_sums)
    
    # 원래 전체 행렬 상의 실제 인덱스
    chosen_index = remaining_indices[max_idx_in_remaining]
    # 중복 제거 전 원래 인덱스로 변환
    original_chosen_index = train_indices[chosen_index]
    print(f"선택된 인덱스: {original_chosen_index}")
    
    # 선택한 인덱스와 남은 인덱스들 간의 코사인 유사도 값 추출
    sim_values = cosine_sim[chosen_index, remaining_indices]
    
    # 상위 50% 기준 임계값 (중앙값) 계산
    threshold = np.percentile(sim_values, 50)
    print(f"임계값 (50퍼센타일): {threshold}")
    
    # 임계값 이상인 인덱스(자기 자신 포함)만 남기기
    remaining_indices = remaining_indices[sim_values >= threshold]
    
    iteration += 1
    print("-" * 40)

# 최종 남은 인덱스를 중복 제거 전 원래 인덱스로 변환
original_remaining_indices = train_indices[remaining_indices]
print("최종 남은 인덱스:", original_remaining_indices)

Iteration 0: 남은 인덱스 개수 -> 21399
선택된 인덱스: 13050
임계값 (50퍼센타일): 0.6929519772529602
----------------------------------------
Iteration 1: 남은 인덱스 개수 -> 10700
선택된 인덱스: 13050
임계값 (50퍼센타일): 0.7681925296783447
----------------------------------------
Iteration 2: 남은 인덱스 개수 -> 5350
선택된 인덱스: 13050
임계값 (50퍼센타일): 0.8115762174129486
----------------------------------------
Iteration 3: 남은 인덱스 개수 -> 2675
선택된 인덱스: 13050
임계값 (50퍼센타일): 0.8406871557235718
----------------------------------------
Iteration 4: 남은 인덱스 개수 -> 1338
선택된 인덱스: 14390
임계값 (50퍼센타일): 0.8665653169155121
----------------------------------------
Iteration 5: 남은 인덱스 개수 -> 669
선택된 인덱스: 14390
임계값 (50퍼센타일): 0.8919670581817627
----------------------------------------
Iteration 6: 남은 인덱스 개수 -> 335
선택된 인덱스: 14390
임계값 (50퍼센타일): 0.9097266793251038
----------------------------------------
Iteration 7: 남은 인덱스 개수 -> 168
선택된 인덱스: 14390
임계값 (50퍼센타일): 0.9256658256053925
----------------------------------------
Iteration 8: 남은 인덱스 개수 -> 84
선택된 인덱스: 143

In [12]:
print(train_df.loc[13050, 'answer'])
print(train_df.loc[14390, 'answer'])
print(train_df.loc[11380, 'answer'])

작업전 안전교육 강화 및 작업장 위험요소 점검을 통한 재발 방지와 안전관리 교육 철저를 통한 향후 조치 계획.
안전교육 실시와 작업 시 안전관리 철저를 통한 재발 방지 대책 및 향후 조치 계획.
현장 관리 철저와 작업자 안전교육 실시를 통한 재발 방지 대책 및 향후 조치 계획.
