In [1]:
import torch
import torch.nn as nn
import random
import os
import csv
import logging
import openai
import re
import warning

from torch.utils.data import dataloader

# KoElectra 
from model.dataloader_electra import WellnessTextClassificationDataset
from model.classifier_electra import KoELECTRAforSequenceClassfication
from transformers import ElectraModel, ElectraConfig, ElectraTokenizer


logging.getLogger("transformers").setLevel(logging.ERROR)
warnings.filterwarnings("ignore", message=".*resume_download.*", category=FutureWarning)

os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"  
os.environ["CUDA_VISIBLE_DEVICES"]="0"

In [2]:
# data 경로 지정
root_path = "."

data_path = f"{root_path}/data/new_input_v2.txt"
answer_path = f"{root_path}/data/new_answer.txt"
category_path = f"{root_path}/data/new_category_v2.txt"
checkpoint_path = f"{root_path}/checkpoint/new_electra_v5.pth"

In [3]:
def load_wellness_answer():

    c_f = open(category_path, 'r')
    a_f = open(answer_path, 'r')

    category_lines = c_f.readlines()
    answer_lines = a_f.readlines()

    category = {}
    answer = {}
    for line_num, line_data in enumerate(category_lines):
        data = line_data.split('    ')
        if len(data) != 2:
            print(f"Error in category file at line {line_num}: {line_data}")
        category[data[1][:-1]] = data[0]

    for line_num, line_data in enumerate(answer_lines):
        data = line_data.split('    ')
        keys = answer.keys()
        if len(data) != 2:
            print(f"Error in answer file at line {line_num}: {line_data}")
        if (data[0] in keys):
            answer[data[0]] += [data[1][:-1]]
        else:
            answer[data[0]] = [data[1][:-1]]
    return category, answer


def load_model(checkpoint_path):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model_config = ElectraConfig.from_pretrained("monologg/koelectra-base-v3-discriminator")

    model = KoELECTRAforSequenceClassfication(model_config, num_labels=432, hidden_dropout_prob=0.1)    
    checkpoint = torch.load(checkpoint_path, map_location=device)
    model.load_state_dict(checkpoint['model_state_dict'])
    model.to(device)
    model.eval()
    
    tokenizer = ElectraTokenizer.from_pretrained("monologg/koelectra-base-v3-discriminator")
    return model, tokenizer, device

def preprocess_input(tokenizer, sent, device, max_seq_len=512):
    index_of_words = tokenizer.encode(sent)
    token_type_ids = [0] * len(index_of_words)
    attention_mask = [1] * len(index_of_words)
    padding_length = max_seq_len - len(index_of_words)
    index_of_words += [0] * padding_length
    token_type_ids += [0] * padding_length
    attention_mask += [0] * padding_length

    data = {
        'input_ids': torch.tensor([index_of_words]).to(device),
        'token_type_ids': torch.tensor([token_type_ids]).to(device),
        'attention_mask': torch.tensor([attention_mask]).to(device),
        }
    return data

In [4]:
# !pip install sentence-transformers

In [5]:
def get_answer(category, answer, output, input_sentence): # 수정
    softmax_logit = torch.softmax(output[0], dim=-1).squeeze()
    max_index = torch.argmax(softmax_logit).item()
    max_index_value = softmax_logit[torch.argmax(softmax_logit)].item()

    ####### threshold 값 조절 #######
    threshold = 0.5  
    ####### threshold 값 조절 #######

    selected_categories = []
    for i, value in enumerate(softmax_logit):
        if value > threshold:
            if str(i) in category:
                selected_categories.append(category[str(i)])
                print(f"Softmax 값 : ({max_index_value}) : {category[str(i)]}")

    # 선택된 카테고리가 없을 경우 예외처리
    if not selected_categories:
        return "선택된 카테고리가 없습니다. 다시 입력해주세요", None, max_index_value, []
    
    # 선택된 카테고리들에 해당하는 답변을 리스트에 저장
    all_answers = []
    for category_name in selected_categories:
        if category_name in answer:
            all_answers.extend(answer[category_name])

    # 만약 답변이 없다면 예외처리
    if not all_answers:
        return "선택된 카테고리에 대한 답변이 없습니다.", None, max_index_value, []

    # 답변들 중에서 랜덤으로 선택된 답변 출력
    selected_answer = random.choice(all_answers)
    # print("선택된 카테고리에 해당하는 모든 답변:")
    # print(all_answers)


    return selected_answer, selected_categories, max_index_value, all_answers

In [6]:
def find_most_similar_sentence(input_sentence, candidate_sentences, output):

    model = ElectraModel.from_pretrained("monologg/koelectra-base-v3-discriminator")
    tokenizer = ElectraTokenizer.from_pretrained("monologg/koelectra-base-v3-discriminator")

    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)

    def get_sentence_embedding(sentence):
        inputs = tokenizer.encode_plus(sentence, return_tensors='pt', padding=True, truncation=True, max_length=512)
        inputs = {k: v.to(device) for k, v in inputs.items()}
        with torch.no_grad():
            outputs = model(**inputs)
        sentence_embedding = outputs.last_hidden_state.mean(dim=1)
        return sentence_embedding

    # 입력 문장 임베딩 구하기
    input_embedding = get_sentence_embedding(input_sentence)

    max_similarity = -1
    selected_sentence = None  # 기본값으로 None 할당

    similarities = []
    top_similarities = []  # 함수 범위에서 초기화

    for candidate in candidate_sentences:
        candidate_embedding = get_sentence_embedding(candidate)
        similarity = torch.cosine_similarity(input_embedding, candidate_embedding, dim=1)
        similarities.append((candidate, similarity.item()))

        # 유사도 상위 5개 문장 선택
        top_similarities = sorted(similarities, key=lambda x: x[1], reverse=True)[:3]

    # top_similarities 리스트가 비어있는 경우 예외처리
    if not top_similarities:
        print("유사한 문장이 없습니다.")
        return None

    # 상위 5개 중에서 랜덤으로 하나 선택
    selected_sentence, _ = random.choice(top_similarities)

    print("유사도 상위 3개 문장과 유사도 수치:")
    for sentence, similarity in top_similarities:
        print(f"유사도: {similarity:.4f} , 문장: {sentence}" )

    return selected_sentence

In [7]:
"""
기룡이의 말투를 정해주세요

1. "저는 심리상담을 전문으로 하는 AI 기룡이 입니다."
    - 매우 공식적이고 예의바른 말투 / 전문성과 신뢰감을 주는 인상
2. "나는 심리 상담 챗봇 AI 기룡이야!"
    - 친근하고 편안한 말투로 반말 체를 사용 / 좀 더 친구같고 가벼운 느낌
3. "저는 심리 상담을 해주는 AI 기룡이에요."
    - 예의바르면서도 부드럽고 온화한 말투 /  상대방을 존중하지만 거리감은 크지 않음
"""

def gpt(input_sentence, selected_categories, chatbot_type):
    openai.api_key = "OPENAI_KEY"
    
    # MODEL = "gpt-3.5-turbo"
    MODEL = "gpt-4-turbo"

    # 감정 분류 모델에서 예측한 카테고리
    predicted_category = selected_categories

    # 입력받은 사용자 문장
    user_input = input_sentence

    # 프롬프트 설정
    prompts = {
        "formal": f"예측한 카테고리는 '{predicted_category}'입니다. 사용자 문장과 예측한 카테고리를 기반으로 매우 공식적인 말투(~다 로 끝나는)로 심리 상담 챗봇에 쓸 답변을 생성해주세요. 답변은 100자 이하로 만들어주세요.",
        "casual": f"예측한 카테고리는 '{predicted_category}'입니다. 사용자 문장과 예측한 카테고리를 기반으로 도움과 격려가 되는 친근하고 편안한 말투로 반말 체를 사용하여 심리 상담 챗봇에 쓸 답변을 생성해주세요. 답변은 100자 이하로 만들어주세요.",
        "polite" : f"The predicted category is '{predicted_category}. Based on your sentences and predicted categories, please create answers for your psychological counseling chatbot with a friendliness, polite tone that is helpful and encouraging. Please make your answers up to 200 characters",
        "default": f"예측한 카테고리는 '{predicted_category}'입니다. 사용자 문장과 예측한 카테고리를 기반으로 도움과 격려가 되는
 부드러운 문장으로 심리 상담 챗봇에 쓸 답변을 생성해주세요. 답변은 100자 이하로 만들어주세요."
    }

    # 임시로 랜덤 타입
   # chatbot_type = random.randint(1, 3)
    chatbot_type = 3
    match chatbot_type:
        case 1:
            prompt = prompts.get(chatbot_type, prompts["formal"])    # 공식적이고 예의바른 말투
            print('<사무적인 말투>')
        case 2:
            prompt = prompts.get(chatbot_type, prompts["casual"])    # 친근하고 편안한 말투로 반말 체
            print('<친근한 반말투>')
        case 3:
            prompt = prompts.get(chatbot_type, prompts["polite"])    # 예의바르면서도 부드럽고 온화한 말투
            print('<부드러운 말투>')
        case _:
            prompt = prompts.get(chatbot_type, prompts["default"])   # 기본


    messages = [
        {"role": "system", "content": prompt},
        {"role": "user", "content": user_input}
    ]

    response = openai.ChatCompletion.create(
        model=MODEL,
        messages=messages,
        temperature=0.9,
        max_tokens=200,
        n=1,
    )

    # 응답 처리
    original_response = response.choices[0].message.content
    if not original_response.endswith('.'):
        last_sentence = re.split(r'[.!?]', original_response)[-1].strip()
        if last_sentence:
            final_response = original_response + '.'
        else:
            final_response = original_response
    else:
        final_response = original_response

    return final_response

In [8]:
def chat(checkpoint_path, category, answer):
    model, tokenizer, device = load_model(checkpoint_path)

    # cbt - ChatBot Type
    cbt = input('기룡이의 말투를 선택해주세요 (1-딱딱함,2-반말,3-부드러움)')
    chatbot_type = int(cbt)
    
    sent1 = input('\nQuestion: ')
    sent = str(sent1)
            
    data = preprocess_input(tokenizer, sent, device, 512)
    output = model(**data)
    answer, category, max_index_value, all_answers = get_answer(category, answer, output, sent)

    # GPT API 답변 생성
    chatbot_answer = gpt(sent,category, chatbot_type)
    print(chatbot_answer)

    # most_similar_sentence = find_most_similar_sentence(sent, all_answers, output)
    # print(f'\nsoftmax_value: {max_index_value}\n문장 유사도 \n가장 유사한 문장 : {most_similar_sentence} ')

    print('-' * 70)

# 카테고리 / 답변 로드        
category, answer = load_wellness_answer()

In [None]:
while 1:
    chat(checkpoint_path, category, answer)

In [None]:
# from konlpy.tag import Kkma

# # Kkma 형태소 분석기 초기화
# kkma = Kkma()
# exclude_advs = ['못', '안']

# # 예시 문장
# sentence = "나 너무 공부를 못 하는거 같아"

# # 형태소 분석
# morphs = kkma.pos(sentence)

# new_sentence = sentence
# for i, (morph, pos) in enumerate(morphs):
#     if pos == 'MAG' and morph not in exclude_advs:  # 'MAG'는 부사 태그, 제외 부사 목록 체크
#         new_sentence = new_sentence.replace(morph + ' ', '')  # 부사 뒤의 공백도 제거

# print("제거 문장:", new_sentence.strip())  # 부사 제거 후 문장 출력
# print()  # 줄바꿈

In [None]:
# from konlpy.tag import Kkma

# # Kkma 형태소 분석기 초기화
# kkma = Kkma()

# # 제외할 부사 목록
# exclude_advs = ['못', '안']

# # 텍스트 파일 열기
# with open('testcase.txt', 'r', encoding='utf-8') as file:
#     # 파일에서 한 줄씩 읽기
#     for line in file:
#         print("기존 문장:", line.strip())  # 기존 문장 출력

#         # 형태소 분석
#         morphs = kkma.pos(line)

#         new_line = line
#         for i, (morph, pos) in enumerate(morphs):
#             if pos == 'MAG' and morph not in exclude_advs:  # 'MAG'는 부사 태그, 제외 부사 목록 체크
#                 new_line = new_line.replace(morph + ' ', '')  # 부사 뒤의 공백도 제거

#         print("제거 문장:", new_line.strip())  # 부사 제거 후 문장 출력
#         print()  # 줄바꿈

In [None]:
# from konlpy.tag import Kkma

# # Kkma 형태소 분석기 초기화
# kkma = Kkma()

# # 제외할 부사 목록
# exclude_advs = ['못', '안']

# # 텍스트 파일 열기
# with open('testcase.txt', 'r', encoding='utf-8') as file:
#     lines = file.readlines()  # 모든 줄을 읽어서 리스트로 저장

# # 부사 제거된 문장들을 저장할 리스트
# filtered_lines = []

# for line in lines:
#     # 형태소 분석
#     morphs = kkma.pos(line)

#     new_line = line
#     for i, (morph, pos) in enumerate(morphs):
#         if pos == 'MAG' and morph not in exclude_advs:  # 'MAG'는 부사 태그, 제외 부사 목록 체크
#             new_line = new_line.replace(morph + ' ', '')  # 부사 뒤의 공백도 제거

#     filtered_lines.append(new_line.strip())  # 부사 제거 후 문장을 리스트에 추가

# # 새로운 파일에 결과 저장
# with open('testcase_convert.txt', 'w', encoding='utf-8') as output_file:
#     for line in filtered_lines:
#         output_file.write(line + '\n')