# 초기화

In [1]:
from langchain.chat_models import ChatOpenAI
from langchain.prompts import PromptTemplate
from langchain.schema.output_parser import StrOutputParser
from langchain.schema.runnable import RunnableBranch

from langchain.memory import ConversationBufferMemory
from langchain.schema.runnable import RunnablePassthrough, RunnableLambda

from operator import itemgetter

In [2]:
# 프롬프트 분류기 

from typing import Literal
from langchain.pydantic_v1 import BaseModel
from langchain.output_parsers.openai_functions import PydanticAttrOutputFunctionsParser
from langchain.utils.openai_functions import convert_pydantic_to_openai_function

In [3]:
# output parser 

from typing import List

from langchain.llms import OpenAI
from langchain.output_parsers import PydanticOutputParser
from langchain.prompts import PromptTemplate
from langchain.pydantic_v1 import BaseModel, Field, validator

In [4]:
from langchain.schema.runnable import RunnableParallel

In [5]:
import os
from dotenv import load_dotenv, find_dotenv

print(load_dotenv(find_dotenv(), override=True))

OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]

True


# memory 테스트
- 함수 안에서 memory 객체 참조할 수 없는 듯. gpt4에 따르면

In [None]:
def save_conversation(dict):
    memory.save_context({"inputs": dict["customer_message"]}, {"output": dict["ai_response"]})
    return dict["ai_response"]

In [None]:
prompt = PromptTemplate.from_template("""
<이전 대화>와 <현재 사용자 입력을>를 모두 고려해서 적절히 답변해줘.

<이전 대화>:
{history}

<현재 사용자 입력>:
{customer_message}
답변:""")

response_chain = (
    RunnablePassthrough.assign(ai_response=prompt | ChatOpenAI(model="gpt-3.5-turbo-0613") | StrOutputParser()) | 
    RunnableLambda(save_conversation)
    ) 

In [None]:
class ChatHandler(threading.Thread):
    def __init__(self, new_message_list):
        super().__init__()
        self.new_message_list = new_message_list
        self.keep_running = True
        self.memory = ConversationBufferMemory(return_messages=True)
        self.cnt = 0
        
    def run(self):
        while self.keep_running:
            # 새 메시지를 처리합니다
            if self.cnt >= len(self.new_message_list):
                break
            new_message = self.new_messge_list[self.cnt]
            response = self.generate_response(new_message, self.memory)
            self.post_response(new_message, response)
            # 일정 시간마다 새로운 메시지를 확인합니다. (예: 5초마다)
            time.sleep(1)
            self.cnt += 1

    def generate_response(self, new_message, memory):
        # 여기에 LLM을 사용하여 응답을 생성하는 로직을 구현합니다.
        # 예시: return f"Response to {message}"
        memory = self.memory
        chat_history = memory.load_memory_variables({})
        response = response_chain(customer_message=new_message, history=chat_history)
        return response

    def post_response(self, new_message, response):
        # 여기에 생성된 응답을 웹상의 메시지 창에 입력하는 로직을 구현합니다.
        # 예시: post_message_to_web(response)
        print(f"메시지: {new_message}\nAI 응답: {response}")

# Toy Chain 구성

In [22]:
# memory = ConversationBufferMemory(return_messages=True)

In [20]:
def save_conversation(dict):
    # memory = dict['memory']
    memory.save_context({"inputs": dict["customer_message"]}, {"output": dict["ai_response"]})
    return dict["ai_response"]

In [27]:
prompt = PromptTemplate.from_template("""
<이전 대화>와 <현재 사용자 입력을>를 모두 고려해서 적절히 답변해줘.

<이전 대화>:
{history}

<현재 사용자 입력>:
{customer_message}
답변:""")

# RunnablePassthrough.assign(ai_response=general_prompt | ChatOpenAI(model="gpt-3.5-turbo-0613") | StrOutputParser()) | RunnableLambda(save_conversation)
response_chain = (
    RunnablePassthrough.assign(ai_response=prompt | ChatOpenAI(model="gpt-3.5-turbo-0613") | StrOutputParser()) | 
    RunnableLambda(save_conversation)
    ) 

# 동작 점검 
history = memory.load_memory_variables({})
print('history(before)->', history)
print('-'*77)
response = response_chain.invoke({"customer_message": "내 이름이 뭐라고?", "history": history})
print(response)
print('-'*77)
print('history(after)->', history)

history(before)-> {'history': [HumanMessage(content='안녕 내 이름은 나들이라고 해'), AIMessage(content='안녕 나들이! 반가워. 어떤 일로 나와 함께 놀러 갈까?'), HumanMessage(content='나들이가 아니라 "나들"이야'), AIMessage(content='나들이가 아니라 "나들"이야? 그렇군요. 죄송하지만, "나들이"라는 단어를 사용하고 싶으시면 어떤 일로 함께 놀러 갈지 알려주세요. 함께 즐거운 시간을 보내기 위해 기다리고 있을게요!'), HumanMessage(content='나들이랑 아무 상관 없고 그냥 이름이 나들이란 말이야'), AIMessage(content='그렇군요! 이름이 "나들이"라는 말이 맘에 들지 않으시다는 거군요. 죄송하지만, "나들이"라는 단어를 사용하고 싶으시면 어떤 활동으로 함께 놀러 갈지 알려주세요. 함께 즐거운 시간을 보내기 위해 기다리고 있을게요!'), HumanMessage(content="나들이란 말과 아무 상관 없다니깐. 그냥 이름이 '나들'이라고~~"), AIMessage(content="알겠습니다. 이름이 '나들'이라고 하셨군요. 그렇다면 어떤 활동으로 함께 놀러 갈까요? 함께 즐거운 시간을 보내기 위해 기다리고 있을게요!")]}
-----------------------------------------------------------------------------
당신의 이름은 '나들'이라고 하셨죠! 함께 어떤 활동으로 놀러 갈까요? 즐거운 시간을 보내기 위해 기다리고 있을게요!
-----------------------------------------------------------------------------
history(after)-> {'history': [HumanMessage(content='안녕 내 이름은 나들이라고 해'), AIMessage(content='안녕 나들이! 반가워. 어떤 일로 나와 함께 놀러 갈까?'),

### memory를 chain에 전달해야 함

#### 이전 대화 불러오기

In [8]:
memory

NameError: name 'memory' is not defined

In [9]:
# memory 변수가 없어서 오류 발생하는 거?
delete memory

SyntaxError: invalid syntax (960249448.py, line 1)

In [None]:
# 구 버전
# response_chain = (
#     RunnablePassthrough.assign(ai_response=prompt | ChatOpenAI(model="gpt-3.5-turbo-0613") | StrOutputParser()) | 
#     RunnableLambda(save_conversation)
#     ) 

In [6]:
def load_memory(dict):
    memory = dict['memory']
    chat_history = memory.load_memory_variables({})
    return chat_history

In [7]:
def save_conversation(dict):
    memory = dict['memory']
    memory.save_context({"inputs": dict["customer_message"]}, {"output": dict["ai_response"]})
    return dict["ai_response"]

In [15]:
prompt = PromptTemplate.from_template("""
<이전 대화>와 <현재 사용자 입력을>를 모두 고려해서 적절히 답변해줘.

<이전 대화>:
{history}

<현재 사용자 입력>:
{customer_message}
답변:""")

t_chain = (
    RunnablePassthrough.assign(history=RunnableLambda(load_memory)) |
    RunnablePassthrough.assign(ai_response=prompt | ChatOpenAI(model="gpt-3.5-turbo-0613") | StrOutputParser()) |
    RunnableLambda(save_conversation)
)

# 동작 점검
memory = ConversationBufferMemory(return_messages=True)
t_chain.invoke({"customer_message": "안녕 나는 '나들'이라고 해", "memory": memory})

"안녕 '나들'이! 반가워. 무엇을 도와줄까?"

In [9]:
prompt = PromptTemplate.from_template("""
<이전 대화>와 <현재 사용자 입력을>를 모두 고려해서 적절히 답변해줘.

<이전 대화>:
{history}

<현재 사용자 입력>:
{customer_message}
답변:""")

response_chain = (
    RunnablePassthrough.assign(history=RunnableLambda(load_memory)) |
    RunnablePassthrough.assign(ai_response=prompt | ChatOpenAI(model="gpt-3.5-turbo-0613") | StrOutputParser()) |
    RunnableLambda(save_conversation)
)
response_chain

RunnableAssign(mapper={
  history: RunnableLambda(...)
})
| RunnableAssign(mapper={
    ai_response: PromptTemplate(input_variables=['customer_message', 'history'], template='\n<이전 대화>와 <현재 사용자 입력을>를 모두 고려해서 적절히 답변해줘.\n\n<이전 대화>:\n{history}\n\n<현재 사용자 입력>:\n{customer_message}\n답변:')
                 | ChatOpenAI(client=<class 'openai.api_resources.chat_completion.ChatCompletion'>, model_name='gpt-3.5-turbo-0613', openai_api_key='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', openai_api_base='', openai_organization='', openai_proxy='')
                 | StrOutputParser()
  })
| RunnableLambda(...)

In [10]:
del memory

NameError: name 'memory' is not defined

In [11]:
memory = ConversationBufferMemory(return_messages=True)

In [12]:
def generate_response(new_message, memory):
        response = response_chain.invoke({"customer_message": new_message, "memory": memory})
        return response

In [16]:
print('history(before)-> ', memory.load_memory_variables({}))
print('-'*77)
response = generate_response("아니 내 이름이 나들이고 너의 이름이 뭐냐고?", memory)
print(response)
print('-'*77)
print('history(before)-> ', memory.load_memory_variables({}))

history(before)->  {'history': [HumanMessage(content="안녕 나는 '나들'이라고 해"), AIMessage(content="안녕 '나들'! 반가워. 무엇을 도와줄까?"), HumanMessage(content='넌 이름이 뭐야?'), AIMessage(content="나는 '나들'이라고 해. 어떤 도움이 필요한거야?")]}
-----------------------------------------------------------------------------
"안녕 나들이! 내 이름은 '나들'이야. 무엇을 도와줄까?"
-----------------------------------------------------------------------------
history(before)->  {'history': [HumanMessage(content="안녕 나는 '나들'이라고 해"), AIMessage(content="안녕 '나들'! 반가워. 무엇을 도와줄까?"), HumanMessage(content='넌 이름이 뭐야?'), AIMessage(content="나는 '나들'이라고 해. 어떤 도움이 필요한거야?"), HumanMessage(content='아니 내 이름이 나들이고 너의 이름이 뭐냐고?'), AIMessage(content='"안녕 나들이! 내 이름은 \'나들\'이야. 무엇을 도와줄까?"')]}


In [14]:
memory

ConversationBufferMemory(chat_memory=ChatMessageHistory(messages=[HumanMessage(content="안녕 나는 '나들'이라고 해"), AIMessage(content="안녕 '나들'! 반가워. 무엇을 도와줄까?")]), return_messages=True)

# Toy Class 구성
- db는 어떻게 관리?

In [31]:
import threading
import time

class ChatHandler(threading.Thread):
    def __init__(self, new_message_list):
        super().__init__()
        self.new_message_list = new_message_list
        self.keep_running = True
        self.user_name = '나들'
        # self.order_ids = {} # db 이용하니 이렇게 관리 안해도 될 듯. 더 복잡하고 비효율적일 듯
        self.memory = ConversationBufferMemory(return_messages=True)
        self.cnt = 0
        
    def run(self):
        while self.keep_running:
            # 새 메시지를 처리합니다.
            try:
                new_message = self.new_message_list[self.cnt]
            except:
                time.sleep(3)
                print("3초 동안 입력 메시지가 주어지지 않아 대화를 종료합니다.")
                break
            response = self.generate_response(new_message, self.memory)
            # if respone.get('end_flag'):
            #     self.keep_running = False
            #     break 
            self.post_response(new_message, response)
            # 일정 시간마다 새로운 메시지를 확인합니다. (예: 5초마다)
            time.sleep(1)
            self.cnt += 1

    def generate_response(self, new_message, memory): # 바로 response_chain.invoke() 사용이 나으려나? (gpt에) 물어보기
        # 여기에 LLM을 사용하여 응답을 생성하는 로직을 구현합니다.
        # 예시: return f"Response to {message}"
        response = response_chain.invoke({"customer_message": new_message, "memory":memory})
        return response

    def post_response(self, new_message, response):
        # 여기에 생성된 응답을 웹상의 메시지 창에 입력하는 로직을 구현합니다.
        # 예시: post_message_to_web(response)
        print(f"메시지: {new_message}\nAI 응답: {response}")

In [32]:
chatroom1 = ChatHandler(["상품 정보 좀 알 수 있을까요"])
chatroom1.run()

메시지: 상품 정보 좀 알 수 있을까요
AI 응답: 물론입니다. 어떤 상품에 대해 알고 싶으신가요? 상품명이나 카테고리를 알려주세요.
3초 동안 입력 메시지가 주어지지 않아 대화를 종료합니다.


In [None]:
def generate_response(new_messsage, memory):
    status = chat_status_chain.invoke({"customer_message": new_message, "memory": memory})
    if status["conv_status"] == "대화 중":
        response = chat_chain.invoke({"customer_message": new_message, 'history': memory})
    else:
        response = chat_end_chain.invoke({"customer_message": new_message, 'history': memory})
    return response

### 나중에 새로운 답변 입력 안한 채 일정 시간 지나면 메시지 작성하고 루프 종료되게 하는 부분 추가

In [None]:
import threading

class ChatHandler(threading.Thread):
    def __init__(self, new_message_element):
        super().__init__()
        self.new_message_element = None
        self.keep_running = True
        self.user_name = '나들'
        self.order_ids = {}
        self.memory = ConversationBufferMemory(return_messages=True)
        
    def run(self):
        while self.keep_running:
            # 새 메시지를 처리합니다.
            start_time = time.time()
            try:
                pass
            # new_message = self.extract_message(self.new_message_element)
            except:
                
            if new_message: # if문 없애도 될 듯?
                response = self.generate_response(new_message)
                if respone.get('end_flag'):
                    chat_history = history= self.memory.load_memory_variables({})["history"]
                    handle_order(chat_history, self.id)
                    self.keep_running = False
                    break
                self.post_response(response)

            # 일정 시간마다 새로운 메시지를 확인합니다. (예: 5초마다)
            time.sleep(5)
    def handle_order(self,chat_history):
        # 여기에 주문을 처리하는 로직을 구현합니다.
        
        return "주문 처리 완료"

    def extract_message(self, element):
        # 여기에 웹 요소로부터 메시지를 추출하는 로직을 구현합니다.
        # 예시: return extract_text_from_element(element)
        return "새 메시지 내용"

    def generate_response(self, message):
        # 여기에 LLM을 사용하여 응답을 생성하는 로직을 구현합니다.
        # 예시: return f"Response to {message}"
        handle_chain(message)
        return "생성된 응답"

    def post_response(self, response):
        # 여기에 생성된 응답을 웹상의 메시지 창에 입력하는 로직을 구현합니다.
        # 예시: post_message_to_web(response)
        print(f"응답 전송: {response}")

    def stop(self):
        self.keep_running = False

# Aync 구성

### 단순 버전


In [46]:
import threading
import time
from threading import Event

class ChatHandler(threading.Thread):
    def __init__(self, user_name, new_message_list):
        super().__init__()
        self.new_message_list = new_message_list
        self.keep_running = True
        self.user_name = user_name
        # self.order_ids = {} # db 이용하니 이렇게 관리 안해도 될 듯. 더 복잡하고 비효율적일 듯
        self.memory = ConversationBufferMemory(return_messages=True)
        self.cnt = 0
        self.error_event = Event()
        
    def run(self):
        while self.keep_running:
            # 새 메시지를 처리합니다.
            try:
                new_message = self.new_message_list[self.cnt]
            except:
                time.sleep(3)
                print("3초 동안 입력 메시지가 주어지지 않아 대화를 종료합니다.")
                break
            response = self.generate_response(new_message, self.memory)
            # if respone.get('end_flag'):
            #     self.keep_running = False
            #     break 
            self.post_response(new_message, response)
            # 일정 시간마다 새로운 메시지를 확인합니다. (예: 5초마다)
            time.sleep(1)
            self.cnt += 1

    def generate_response(self, new_message, memory): # 바로 response_chain.invoke() 사용이 나으려나? (gpt에) 물어보기
        # 여기에 LLM을 사용하여 응답을 생성하는 로직을 구현합니다.
        # 예시: return f"Response to {message}"
        response = response_chain.invoke({"customer_message": new_message, "memory":memory})
        return response

    def post_response(self, new_message, response):
        # 여기에 생성된 응답을 웹상의 메시지 창에 입력하는 로직을 구현합니다.
        # 예시: post_message_to_web(response)
        print(f"메시지: {new_message}\nAI 응답: {response}")

In [40]:
# 여러 대화방에 대한 스레드 생성 및 시작.
user_names = ['user1', 'user2', 'user3']  # 실제 대화방 ID를 리스트 형태로 관리.
messages = [['상품 정보 좀 알 수 있을까요'], ['떡케익 2개, 개별 모듬팩 3개 할게요'], ['지금 주문하면 언제 받아볼 수 있나요?']]
threads = []

for user_name, message in zip(user_names, messages):
    chat_bot = ChatHandler(user_name, message)
    chat_bot.start()
    threads.append(chat_bot)
    print('-'*77)

# 모든 스레드가 완료될 때까지 기다림.
for thread in threads:
    thread.join(timeout=30)

-----------------------------------------------------------------------------
-----------------------------------------------------------------------------
-----------------------------------------------------------------------------
메시지: 떡케익 2개, 개별 모듬팩 3개 할게요
AI 응답: 떡케익 2개와 개별 모듬팩 3개를 주문하시는 건가요? 어떤 종류의 떡케익과 개별 모듬팩을 원하시나요? 제가 메뉴를 안내해 드릴게요.
3초 동안 입력 메시지가 주어지지 않아 대화를 종료합니다.
메시지: 지금 주문하면 언제 받아볼 수 있나요?
AI 응답: 주문하신 상품에 따라서 배송일정이 달라질 수 있습니다. 일반적으로는 주문 후 1~3일 내에 상품을 받아보실 수 있으나, 상황에 따라 배송일정이 변경될 수도 있습니다. 주문하시는 상품의 배송일정은 상품 페이지에서 확인하실 수 있으며, 주문 완료 후에는 배송 추적 번호를 통해 실시간으로 배송 상황을 확인하실 수 있습니다.
3초 동안 입력 메시지가 주어지지 않아 대화를 종료합니다.


위에서 실행되지 않았던 스레드가 실행됐음

In [41]:
from threading import Thread, Event

def monitor_chat_bots(chat_bots):
    while True:
        for bot in chat_bots:
            if bot.error_event.is_set():
                log_error(f"Restarting bot in room {bot.room_id} due to error.")
                bot.error_event.clear()  # 오류 상태 초기화
                bot.start()  # 스레드를 다시 시작
        time.sleep(10)  # 10초 마다 감시
        
# 감시 스레드 생성 및 시작
monitor_thread = Thread(target=monitor_chat_bots, args=(threads,))
monitor_thread.start()

Exception in thread Thread-19:
Traceback (most recent call last):
  File "C:\Users\ktsfr\Anaconda3\lib\threading.py", line 932, in _bootstrap_inner
    self.run()
  File "C:\Users\ktsfr\Anaconda3\lib\threading.py", line 870, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\ktsfr\AppData\Local\Temp\ipykernel_20336\4081997892.py", line 6, in monitor_chat_bots
AttributeError: 'ChatHandler' object has no attribute 'error_event'


메시지: 상품 정보 좀 알 수 있을까요
AI 응답: 저희 가게에서는 다양한 상품을 판매하고 있습니다. 어떤 상품에 관심이 있으신가요? 카테고리나 특정 상품에 대한 정보를 알려주세요. 그럴게요. 어떤 상품에 관심이 있으신가요? 카테고리나 특정 상품에 대한 정보를 알려주세요. 그럴게요. 어떤 상품에 관심이 있으신가요? 카테고리나 특정 상품에 대한 정보를 알려주세요. 그럴게요. 어떤 상품에 관심이 있으신가요? 카테고리나 특정 상품에 대한 정보를 알려주세요. 그럴게요. 어떤 상품에 관심이 있으신가요? 카테고리나 특정 상품에 대한 정보를 알려주세요. 그럴게요. 어떤 상품에 관심이 있으신가요? 카테고리나 특정 상품에 대한 정보를 알려주세요. 그럴게요. 어떤 상품에 관심이 있으신가요? 카테고리나 특정 상품에 대한 정보를 알려주세요. 그럴게요. 어떤 상품에 관심이 있으신가요? 카테고리나 특정 상품에 대한 정보를 알려주세요.
3초 동안 입력 메시지가 주어지지 않아 대화를 종료합니다.


In [47]:
# 여러 대화방에 대한 스레드 생성 및 시작.
user_names = ['user1', 'user2', 'user3']  # 실제 대화방 ID를 리스트 형태로 관리.
messages = [['상품 정보 좀 알 수 있을까요'], ['떡케익 2개, 개별 모듬팩 3개 할게요'], ['지금 주문하면 언제 받아볼 수 있나요?']]
threads = []

for user_name, message in zip(user_names, messages):
    chat_bot = ChatHandler(user_name, message)
    chat_bot.start()
    threads.append(chat_bot)
    print('thread list에 추가', '-'*77)

# 모든 스레드가 완료될 때까지 기다림.
for thread in threads:
    thread.join(timeout=30)
    print('thread.join에 실행', '-'*77)
    
def monitor_chat_bots(chat_bots):
    while True:
        for bot in chat_bots:
            if bot.error_event.is_set():
                log_error(f"Restarting bot in room {bot.room_id} due to error.")
                bot.error_event.clear()  # 오류 상태 초기화
                bot.start()  # 스레드를 다시 시작
        time.sleep(10)  # 10초 마다 감시
        
# 감시 스레드 생성 및 시작
monitor_thread = Thread(target=monitor_chat_bots, args=(threads,))
monitor_thread.start()

thread list에 추가 -----------------------------------------------------------------------------
thread list에 추가 -----------------------------------------------------------------------------
thread list에 추가 -----------------------------------------------------------------------------


메시지: 상품 정보 좀 알 수 있을까요
AI 응답: 어떤 상품에 대한 정보를 원하시나요? 상품의 이름이나 종류를 알려주시면 더 자세한 도움을 드릴 수 있습니다.
메시지: 떡케익 2개, 개별 모듬팩 3개 할게요
AI 응답: 주문 감사합니다! 떡케익 2개와 개별 모듬팩 3개를 준비해드리겠습니다. 손님의 맛있는 식사를 기대해주세요!
3초 동안 입력 메시지가 주어지지 않아 대화를 종료합니다.
thread.join에 실행 -----------------------------------------------------------------------------
3초 동안 입력 메시지가 주어지지 않아 대화를 종료합니다.
thread.join에 실행 -----------------------------------------------------------------------------
메시지: 지금 주문하면 언제 받아볼 수 있나요?
AI 응답: 주문하신 제품은 배송지 및 현재 재고 상황에 따라 다를 수 있습니다. 보통 주문 후 1~3일 이내에 받아보실 수 있으나, 지연될 수도 있습니다. 정확한 배송일정은 주문 시스템에서 확인하실 수 있습니다.
3초 동안 입력 메시지가 주어지지 않아 대화를 종료합니다.
thread.join에 실행 -----------------------------------------------------------------------------


In [50]:
print(threads[0].error_event)
print(threads[0].error_event.is_set())
print(threads[0].error_event.clear())

<threading.Event object at 0x0000017DE6C30F40>
False
None


### GPT-4 답변

In [None]:
import threading
import time

class ChatHandler(threading.Thread):
    def __init__(self, new_message_list):
        super().__init__()
        self.new_message_list = new_message_list
        self.keep_running = True
        self.user_name = '나들'
        # self.order_ids = {} # db 이용하니 이렇게 관리 안해도 될 듯. 더 복잡하고 비효율적일 듯
        self.memory = ConversationBufferMemory(return_messages=True)
        self.cnt = 0
        
    def run(self):
        while self.keep_running:
            # 새 메시지를 처리합니다.
            try:
                new_message = self.new_message_list[self.cnt]
            except:
                time.sleep(3)
                print("3초 동안 입력 메시지가 주어지지 않아 대화를 종료합니다.")
                break
            except NetworkError as ne::
                # 네트워크 오류 처리
                log_error(f"Network error for room {self.room_id}: {ne}")
                continue  # 다음 반복으로 넘어가서 재시도.
            
            try:
                response = self.generate_response(new_message, self.memory)
                # if respone.get('end_flag'):
                #     self.keep_running = False
                #     break
            except ResponseError as re:
                # 응답 생성 오류 처리
                log_error(f"Response generation error in room {self.room_id}: {re}")
                # 해당 메시지에 대해 오류 응답을 보내거나 건너뛰기 등의 처리를 할 수 있습니다.
                self.handle_invalid_input(message)

            self.post_response(new_message, response)
            # 일정 시간마다 새로운 메시지를 확인합니다. (예: 5초마다)
            time.sleep(1)
            self.cnt += 1

    def generate_response(self, new_message, memory): # 바로 response_chain.invoke() 사용이 나으려나? (gpt에) 물어보기
        # 여기에 LLM을 사용하여 응답을 생성하는 로직을 구현합니다.
        # 예시: return f"Response to {message}"
        response = response_chain.invoke({"customer_message": new_message, "memory":memory})
        return response

    def post_response(self, new_message, response):
        # 여기에 생성된 응답을 웹상의 메시지 창에 입력하는 로직을 구현합니다.
        # 예시: post_message_to_web(response)
        print(f"메시지: {new_message}\nAI 응답: {response}")

In [None]:
# 여러 대화방에 대한 스레드 생성 및 시작.
room_ids = ['room1', 'room2', 'room3']  # 실제 대화방 ID를 리스트 형태로 관리.
threads = []

for room_id in room_ids:
    chat_bot = ChatBotHandler(room_id)
    chat_bot.start()
    threads.append(chat_bot)

# 모든 스레드가 완료될 때까지 기다림.
for thread in threads:
    thread.join()

밑에 코드가 위에보다 예외에 안전 timeout=30 추가 

In [None]:
# main.py
import threading
from chat_bot_handler import ChatBotHandler  # ChatBotHandler 클래스가 정의된 모듈을 임포트.

def main():
    # 가정: 여기에 실제 대화방 ID들을 리스트로 가져오는 로직이 포함되어 있음.
    room_ids = get_chat_room_ids()  # 실제 대화방 ID들을 가져오는 함수 가정.
    threads = []

    # 각 대화방 ID별로 ChatBotHandler 스레드 생성 및 시작.
    for room_id in room_ids:
        chat_bot = ChatBotHandler(room_id)
        chat_bot.start()
        threads.append(chat_bot)

    # 메인 스레드에서는 생성된 모든 스레드들이 완료될 때까지 기다린다.
    for thread in threads:
        thread.join(timeout=30)  # 30초 후에 반환하도록 타임아웃 설정.

if __name__ == "__main__":
    main()


감시 스레드 구현

In [None]:
from threading import Thread, Event

class ChatBotHandler(Thread):
    # ...

    def __init__(self, room_id):
        # ...
        self.error_event = Event()  # 오류 상태를 나타내는 이벤트

    def run(self):
        while self.keep_running:
            try:
                # ...
            except Exception as e:
                self.error_event.set()  # 오류가 발생하면 이벤트를 설정하여 오류 상태를 표시
                break

def monitor_chat_bots(chat_bots):
    while True:
        for bot in chat_bots:
            if bot.error_event.is_set():
                log_error(f"Restarting bot in room {bot.room_id} due to error.")
                bot.error_event.clear()  # 오류 상태 초기화
                bot.start()  # 스레드를 다시 시작
        time.sleep(10)  # 10초 마다 감시

# main.py에서 ChatBotHandler 인스턴스 생성 후 감시 스레드 시작
chat_bots = [ChatBotHandler(room_id) for room_id in room_ids]
for bot in chat_bots:
    bot.start()

# 감시 스레드 생성 및 시작
monitor_thread = Thread(target=monitor_chat_bots, args=(chat_bots,))
monitor_thread.start()
