# 챗봇 엔진 서버 메인 프로그램

In [1]:
import threading  
import sqlite3
import pandas as pd
import time
import math
import numpy as np
import random
import difflib
import datetime
from contextlib import contextmanager
import warnings
warnings.filterwarnings('ignore')

import import_ipynb   # 다른 ipynb 파일을 import 하기 위해 필요

from DatabaseConfig import *
from Database import Database
from BotServer import BotServer
from Preprocess import Preprocess
from IntentModel import IntentModel
from NerModel import NerModel
import SQliteDB
from UserIntent import UserIntent

importing Jupyter notebook from DatabaseConfig.ipynb
importing Jupyter notebook from Database.ipynb
importing Jupyter notebook from BotServer.ipynb
importing Jupyter notebook from Preprocess.ipynb
importing Jupyter notebook from IntentModel.ipynb
importing Jupyter notebook from NerModel.ipynb
importing Jupyter notebook from FindAnswer.ipynb
importing Jupyter notebook from SQliteDB.ipynb
importing Jupyter notebook from UserIntent.ipynb


## 함수 생성

### 모델 생성(일상 대화)

In [2]:
# 전처리 객체 생성
p = Preprocess(word2index_dic='../train_tools/dict/chatbot_dict.bin',
               userdic='../utils/user_dic.tsv')

# 의도 파악 모델
intent = IntentModel(model_name='../models/intent/intent_model.h5', proprocess=p)

# 개체명 인식 모델
ner = NerModel(model_name='../models/ner/ner_model.h5', proprocess=p)


### 문자->숫자 변환 함수

In [3]:
def convertToNumber(s):
    try:
        # Try to convert to an integer
        return int(s)
    except ValueError:
        try:
            # If it's not an integer, try to convert to a float
            return float(s)
        except ValueError:
            # If it's not a float either, return None or handle the error as desired
            return None

### conversation DB 연결(저장) 함수

In [4]:
@contextmanager
def database_connection():
    #데이터베이스 연결 생성, 컨택스트 매니저가 진입할 때 실행
    conn = sqlite3.connect('../chatbot_db/conversation.db')
    try:
        #yield: 결과값을 여러 번 나누어서 제공, 제너레이터 형태로 반환(generator: 여러 개의 데이터를 미리 만들어 놓지 않고 필요할 때마다 하나씩 만들어낼 수 있는 객체)
        #연결 객체(conn)를 with 블록으로 전달
        yield conn
    finally:
        #with 블록 안에서 발생하는 예외 등에 관계없이 항상 실행
        conn.close()

# output에 ret_data가 저장될 때의 id값
def check_output_id(ret_data):
    with database_connection() as conn:
        try:
            cur = conn.cursor()
            new_data = cur.execute("SELECT * FROM conversations WHERE output = ? ORDER BY timestamp DESC LIMIT 1", (ret_data,))
            if new_data is not None:
                new_data_list = list(new_data)
                return new_data_list[0][0]
            else:
                return None
        except Exception as e:
            print(f"Error: {e}")
            conn.rollback()

# output의 id값으로 id+1번째의 input 데이터 찾기
def load_input_data(check_input_id):
    with database_connection() as conn:
        try:
            cur = conn.cursor()
            check_input_id = str(int(check_input_id) + 1)
            input_data = cur.execute("SELECT * FROM conversations WHERE id = ?", (check_input_id,))
            input_data = input_data.fetchone()
            if input_data is not None:
                input_data = list(input_data)
                return input_data[2]
            else:
                return None
        except Exception as e:
            print(f"Error: {e}")
            conn.rollback()

def save_output_update(check_input_id, output_data):
    with database_connection() as conn:
        try:
            cur = conn.cursor()
            check_input_id = str(int(check_input_id) + 1)
            cur.execute("UPDATE conversations SET output = ? WHERE id = ?", (output_data, check_input_id))
            conn.commit()
        except Exception as e:
            print(f"Error: {e}")
            conn.rollback()

def save_output_insert(output_data):
    with database_connection() as conn:
        try:
            cur = conn.cursor()
            timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            cur.execute("INSERT INTO conversations (timestamp, input, output) VALUES (?, ?, ?)", (timestamp, None, output_data))
            conn.commit()
        except Exception as e:
            print(f"Error: {e}")
            conn.rollback()

### 사용자 정보 데이터 수정 함수(수정)

(함수로 데이터 입력 처리 로직 추출)

In [5]:
def process_convert_user_data(check_input_id, user_id, name):
    question_df = question_db.get_question_info()
    question_mapping = question_df.set_index('name')[['variable', 'question']].T.to_dict()
    
    user_db.connect()
    answer1 = question_mapping.get(name, "유효한 값을 입력하세요.")['question']
    print(answer1)
    save_output_update(check_input_id, answer1)

    while True:
        time.sleep(1)
        check_input_id = check_output_id(answer1)
        user_data1 = load_input_data(check_input_id)
        if user_data1 is  not None:
            break
        else:
            pass
    print(f"수정한 정보 1-1  {name}: {user_data1}")
    
    while True:
        if not user_data1.strip() or convertToNumber(user_data1) is None:
            answer2 = "유효한 값을 입력하세요."
            print(answer2)
            save_output_update(check_input_id, answer2)

            while True:
                time.sleep(1)
                check_input_id = check_output_id(answer2)
                user_data1 = load_input_data(check_input_id)
                if user_data1 is not None:
                    break
                else:
                    pass
            print(f"수정한 정보 1-2  {name}: {user_data1}")
        else:
            re_variable = question_mapping.get(name, "유효한 값을 입력하세요.")['variable']
            user_db.save_user_data(user_id, re_variable, user_data1)
            globals()[re_variable] = convertToNumber(user_data1)
            answer3 = f"{name} 정보를 수정했습니다."
            print(answer3)
            save_output_update(check_input_id, answer3)
            return
    user_db.close()

### 사용자 정보 데이터 저장 함수(예측 확률)

(함수로 데이터 입력 처리 로직 추출)

In [6]:
def process_save_user_data(check_input_id, i, user_id, variable, question_mapping):
    user_db.connect()
    while True:
        user_data = user_db.get_user_info(user_id)[variable]
        print(f"DB 정보  {variable}: {user_data}")
        # 사용자 변수가 존재하지 않는 경우
        if user_data is None or not user_data.strip():
            answer1 = question_mapping.get(variable, "유효한 값을 입력하세요.")['question']
            print(answer1)
            if i == 1:
                save_output_update(check_input_id, answer1)
            else:
                save_output_insert(answer1)

            while True:
                time.sleep(1)
                check_input_id = check_output_id(answer1)
                user_data1 = load_input_data(check_input_id)
                if user_data1 is  not None:
                    break
                else:
                    pass
            print(f"새로운 정보 1-1  {variable}: {user_data1}")

            while True:
                # 사용자 변수가 공백이거나 변환이 불가능 한 경우
                if not user_data1.strip() or convertToNumber(user_data1) is None:
                    answer2 = "유효한 값을 입력하세요."
                    print(answer2)
                    save_output_update(check_input_id, answer2)

                    while True:
                        time.sleep(1)
                        check_input_id = check_output_id(answer2)
                        user_data1 = load_input_data(check_input_id)
                        if user_data1 is not None:
                            break
                        else:
                            pass
                    print(f"새로운 정보 1-2  {variable}: {user_data1}")
                else:
                    user_db.save_user_data(user_id, variable, user_data1)
                    globals()[variable] = convertToNumber(user_data1)
                    variable_name = question_mapping.get(variable, "유효한 값을 입력하세요.")['name']
                    answer3 = f"{variable_name} 정보를 저장했습니다."
                    print(answer3)
                    save_output_update(check_input_id, answer3)
                    return
        else:
            globals()[variable] = convertToNumber(user_data)
            variable_name = question_mapping.get(variable, "유효한 값을 입력하세요.")['name']
            answer4 = f"{variable_name} 정보 확인 완료"
            print(answer4)
            if i == 1:
                save_output_update(check_input_id, answer4)
            else:
                save_output_insert(answer4)
            return
    user_db.close()

### 공식 추가 함수

In [7]:
def process_insert_formula(check_input_id, key, answer_mapping):
    formula_db.connect()
    answer1 = answer_mapping[key][1]
    print(answer1)
    if key == "disease":
        save_output_update(check_input_id, answer1)
    else:
        save_output_insert(answer1)
        
    while True:
        time.sleep(1)
        check_input_id = check_output_id(answer1)
        formula_data = load_input_data(check_input_id)
        if formula_data is not None:
            break
    print(f"{key}: {formula_data}")

    while True:
        if not formula_data.strip():
            answer2 = "유효한 값을 입력하세요."
            print(answer2)
            save_output_update(check_input_id, answer2)

            while True:
                time.sleep(1)
                check_input_id = check_output_id(answer2)
                formula_data = load_input_data(check_input_id)
                if formula_data is not None:
                    break
                else:
                    pass
            print(f"{answer_mapping[key][0]}: {formula_data}")
        else:
            # Step 5: Process non-empty formula_data
            globals()[key] = formula_data
            if key == "disease":
                formula_db.save_disease(formula_data)
            else:
                formula_db.save_formula_data(disease, key, formula_data)

            answer3 = f"{answer_mapping[key][0]} 정보를 저장했습니다."
            print(answer3)
            save_output_update(check_input_id, answer3)
            time.sleep(1)
            break
    formula_db.close()

### 챗봇 스레드 함수

In [8]:
def to_client(conn, addr, params):
    db = params['db']
    query_db = params['query_db']
    answer_db = params['answer_db']
    user_db = params['user_db']
    formula_db = params['formula_db']
    question_db = params['question_db']
    disease_db = params['disease_db']

    try:
        greeting = "안녕하세요, 챗봇입니다. 어떤 도움이 필요하신가요?"
        print(greeting)
        save_output_insert(greeting)

        while True:
            ret_data = "사용자 ID를 입력하세요."
            print(ret_data)
            save_output_insert(ret_data)
            
            while True:
                time.sleep(1)
                check_input_id = check_output_id(ret_data)
                user_id = load_input_data(check_input_id)
                if user_id is not None:
                    break
                else:
                    pass
            print(user_id)
            user_db.connect()
            user_info = user_db.get_user_info(user_id)

            # 사용자 ID 확인
            if user_info is None:
                # 사용자 ID가 DB에 없는 경우       
                id_confirm = '등록되지 않은 사용자입니다.'
                print(id_confirm)
                save_output_update(check_input_id, id_confirm) 
                continue  # 사용자 ID를 다시 입력받기 위해 반복문의 처음으로 돌아감
            else:
                id_confirm = 'ID 확인 완료'
                print(id_confirm)
                save_output_update(check_input_id, id_confirm) 
                break  # 사용자 ID 확인이 완료되면 사용자 ID 확인 반복문 탈출
            
    except Exception as e:
        print(f"Error handling client {addr}: {str(e)}")
        # 예외 발생 시 클라이언트 연결 종료
        conn.close()

    try:
        db.connect()  # 디비 연결
        query_db.connect()
        answer_db.connect()
        question_db.connect()
        disease_db.connect()
        intents = UserIntent()
  
        while True:
            print('===========================')
            ret_data = "질문을 입력하세요."
            print(ret_data)
            save_output_insert(ret_data)

            while True:
                time.sleep(1)
                check_input_id = check_output_id(ret_data)
                query = load_input_data(check_input_id)
                if query is not None:
                    break
                else:
                    pass
            print(f"query: {query}, {type(query)}\n")

            if query == "끝":
                print('클라이언트 연결 종료 요청')
                break
                
            # 텍스트 파일에서 질병명과 품사를 읽어옴
            with open("NNG_231129_1.txt", 'r', encoding='utf-8') as file:
                lines = file.readlines()

            # 사용자 입력에서 검색어와 의도 추출
            search_term_list = [disease_name for line in lines for parts in [line.strip().split('\t')] if len(parts) == 2 for disease_name, pos_tag in [parts] if pos_tag == "NNG" and disease_name in query]
            search_term = max(search_term_list, key=lambda term: difflib.SequenceMatcher(None, query, term).ratio(), default=None)
            print(f"search_term: {search_term}")
            # 사용자 입력에서 검색어 제외한 문자열로 재생성
            re_query = query.replace(search_term, '').strip() if search_term else query
            # 사용자 입력에서 의도 추출
            print(f"== LOAD START at {datetime.datetime.now()}")
            user_intent = intents.compare_intent_similarity(re_query)[0]
            print(f"== LOAD END at {datetime.datetime.now()}")
            print(f"user_intent: {user_intent}")
            
            #질병 정보 DB에 있는 pattern_dict로 질병 찾기
            if search_term == None and 'disease_name' in locals() and 'disease_info' in locals():
                #이전 이력에 있는 질병 정보로 구하기
            else:
                result_info = disease_db.get_disease_info(search_term)
                disease_name, disease_info = result_info
                print(f"disease_name: {disease_name}")
            
            if user_intent == "공식 추가":
                answer_mapping = {'disease': ["질병명", "질병명은 무엇인가요?"], 
                            'center':["진료과", "어느 과에 속해있나요?"], 
                            'prediction_formula':["예측 공식", "질병 예측 공식은 무엇인가요?"], 
                            'input_variable':["입력 변수", "공식에 들어갈 입력 변수는 무엇인가요?"],
                          }
                keys = list(answer_mapping.keys())
                i = 0
                while i < len(keys):
                    key = keys[i]
                    print(i, key)
                    process_insert_formula(check_input_id, key, answer_mapping)
                    i += 1
                    
            elif user_intent == "예측 확률":

                #예측공식 DB 전체 불러오기
                formula_db.connect()
                formula_data = formula_db.get_formula_info()
                formula_db.close()

                #db 테이블에서 d_name으로 질병명, 변수이름, 예측공식 추출 후 변수 지정
                load_var = []
                load_risk = []
                for i in range(len(formula_data)):
                    result_mapping = formula_data.iloc[i]
                    if disease_name.replace(" ", "") == result_mapping['disease'].replace(" ", ""):
                        var_items = result_mapping['input_variable']
                        if var_items is not None:
                            var_items = var_items.split(",")
                            load_var.append([item.replace(" ", "") for item in var_items])
                            load_risk.append(result_mapping['prediction_formula'])
                #여러 질병의 변수이름 합친 후 중복 변수 제거
                var_list = []  
                for i in range(len(load_var)):
                    var_list.extend(load_var[i])  
                var = sorted(list(set(var_list)))  
                new_var = [item for item in var if item.strip()]

                if len(load_risk) != 0:
                    print(len(new_var), new_var)
                    #질문 리스트 맵핑
                    question_df = question_db.get_question_info()
                    question_mapping = question_df.set_index('variable')[['name', 'question']].T.to_dict()
                    #질문 시작 반복문
                    i = 1
                    while i <= len(new_var):
                        print(i)
                        var1 = new_var[i-1].strip()
                        process_save_user_data(check_input_id, i, user_id, var1, question_mapping)
                        i += 1
                    print("질문 끝\n")

                    #위험확률 정보 보내기  
                    risk_list = []
                    for i in range(len(load_risk)):
                        exec(load_risk[i])
                        risk_list.append(risk)
                    answer_all = ""
                    long_text = disease_info['정의'].values[0] #정의
                    for i in range(len(risk_list)):
                        risk_per = "NaN" if math.isnan(risk_list[i]) else "Infinity" if risk_list[i] == float('inf') else str(int(risk_list[i] * 10000) / 100.0)
                        answer = f"\n{i+1}번 결과, {disease_name}에 걸릴 확률은 {risk_per}%입니다."
                        answer_all += answer
                    answer_all += f"\n{long_text}" #정의
                    print(f"답변: {answer_all}")
                    save_output_insert(answer_all)
                else:
                    print("예측 공식 없음\n")
                    answer = f"{disease_name}에 대한 예측 공식이 없습니다."
                    print(answer)
                    save_output_update(check_input_id, answer)  
                    
            elif user_intent in disease_info.columns:
                print("질병 의도 있음\n")
        
                if not disease_info.empty and disease_info[user_intent].values[0] is not None:
                    # 증상인 경우
                    if user_intent == "증상" and disease_info['증상설명'].values[0] is not None:
                        long_text = disease_info[user_intent].values[0]
                        long_text += "\n\n"+disease_info['증상설명'].values[0]
                    # 증상이 아닌 경우
                    else:
                        long_text = disease_info[user_intent].values[0]

                    print(f"{user_intent}: {long_text}")
                    save_output_update(check_input_id, long_text)
                    
                elif disease_info['증상설명'].values[0] is not None:
                    if user_intent == "증상":
                        long_text = disease_info[user_intent].values[0]
                        long_text += "\n\n"+disease_info['증상설명'].values[0]
                    else:
                        long_text = f"{user_intent}에 대한 정보가 없습니다."
                    
                    print(f"{user_intent}: {long_text}")
                    save_output_update(check_input_id, long_text)
                    
                else:
                    # 증상이 아닌 다른 의도의 경우
                    message = f"{user_intent}에 대한 정보가 없습니다."
                    print(f"{user_intent}: {message}")
                    save_output_update(check_input_id, message)
             
            elif user_intent == "수정":
                print("사용자 정보 수정\n")
                var_name = question_db.var_matching(query)                
                print(f"수정 변수: {var_name}")

                if var_name: 
                    process_convert_user_data(check_input_id, user_id, var_name)
                else:
                    answer = "변수명을 올바르게 입력해주세요."
                    print(answer)
                    save_output_update(check_input_id, answer)
            else:
                if search_term is not None and disease_info['정의'].values[0] is not None:
                    print("질병 의도 없음\n")
                    long_text = disease_info['정의'].values[0]
                    print(f"정의: {long_text}")
                    save_output_update(check_input_id, long_text)
                
                else: 
                    print("일상 대화\n")
                    # 의도 파악
                    intent_predict = intent.predict_class(query)
                    intent_name = intent.labels[intent_predict]
                    # 개체명 파악
                    ner_predicts = ner.predict(query)
                    ner_tags = ner.predict_tags(query)

                    try:
                        f = FindAnswer(db)                    
                        answer_text = f.search(intent_name, ner_tags)
                        answer_chat = f.tag_to_word(ner_predicts, answer_text[3])
                    except:
                        answer_chat = "죄송해요 무슨 말인지 모르겠어요. 조금 더 공부 할게요."
                    print(answer_chat)
                    save_output_update(check_input_id, answer_chat)
    
    except Exception as e:
        print(e)

    finally:
        if db is not None: # db 연결 끊기
            db.close()
        conn.close()

## 챗봇 실행

In [None]:
if __name__ == '__main__':

    # 질문/답변 학습 디비 연결 객체 생성
    db = SQliteDB.chat_DB('../chatbot_db/db.db')
    # 질문/답변 기록 디비 연결
    query_db = SQliteDB.chat_query_DB('../chatbot_db/chatlog.db')
    answer_db = SQliteDB.chat_answer_DB('../chatbot_db/chatlog.db')
    #예측 공식 추가 디비 연결
    formula_db = SQliteDB.formula_DB('../chatbot_db/formula.db')
    # 사용자 정보 디비 연결
    user_db = SQliteDB.user_DB('../chatbot_db/user.db')
    # 질문 문구 정보 디비 연결
    question_db = SQliteDB.question_DB('../chatbot_db/question.db')
    # 질병 정보 디비 연결
    disease_db = SQliteDB.disease_DB('../chatbot_db/Data_231214.db')
    
    print("DB 접속")

    # 봇 서버 동작
    port = 5050    # 챗봇 엔진 서버의 통신 포트
    listen = 100   # 최대 클라이언트 연결 수

    bot = BotServer(port, listen)
    bot.create_sock()
    print("bot start")
    
    while True:
        # 클라이언트 접속 대기
        conn, addr = bot.ready_for_client() # 클라이언트로부터 연결 수락
        print(f"새로운 연결: {addr}\n")

        # to_client 스레드 시작
        params = {
            "db": db,
            "query_db": query_db,
            "answer_db": answer_db,
            "user_db": user_db,
            "formula_db": formula_db,
            "question_db": question_db,
            "disease_db": disease_db,
        }
        client = threading.Thread(target=to_client, args=(
            conn,     # 클라이언트 연결 소켓
            addr,     # 클라이언트 연결 주소 정보
            params,    # 스레드 함수 파라미터
        ))
        client.start()  # 스레드 시작     

DB 접속
bot start
