<a href="https://colab.research.google.com/github/ralralra/jetson_DLI/blob/main/dht_chatbot_functioncalling.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install openai
!pip install pyserial

In [None]:
import serial
import time
import threading
import gradio as gr
from typing import Dict, List, Optional

class DHT11Assistant:
    def __init__(self):
        # 시스템 메시지와 사용 가능한 함수들 정의
        self.messages = [
            {
                "role": "system",
                "content": "저는 온습도 센서의 데이터를 읽고 응답하는 온습도bot입니다."
            }
        ]

        self.functions = {
            "get_temperature": {
                "name": "get_temperature",
                "description": "현재 온도값을 가져옵니다",
                "parameters": {
                    "type": "object",
                    "properties": {},
                    "required": []
                }
            },
            "get_humidity": {
                "name": "get_humidity",
                "description": "현재 습도값을 가져옵니다",
                "parameters": {
                    "type": "object",
                    "properties": {},
                    "required": []
                }
            }
        }

        # 센서 관련 변수
        self.serial_port = None
        self.current_temp = None
        self.current_humidity = None
        self.running = True

    def get_temperature(self) -> Dict:
        """온도 데이터를 반환하는 함수"""
        if self.current_temp is None:
            return {"status": "error", "message": "온도 데이터를 아직 받지 못했습니다"}
        return {"status": "success", "temperature": self.current_temp}

    def get_humidity(self) -> Dict:
        """습도 데이터를 반환하는 함수"""
        if self.current_humidity is None:
            return {"status": "error", "message": "습도 데이터를 아직 받지 못했습니다"}
        return {"status": "success", "humidity": self.current_humidity}

    def start_serial(self):
        """시리얼 포트 초기화"""
        try:
            if self.serial_port is not None:
                self.serial_port.close()

            self.serial_port = serial.Serial('/dev/ttyUSB1', 9600, timeout=1) #시리얼포트설정과 통신속도 체크
            print("시리얼 포트 연결 성공!")
            time.sleep(2)
            return True
        except Exception as e:
            print(f"시리얼 포트 연결 실패: {e}")
            return False

    def read_sensor_data(self):
        """센서 데이터 읽기 루프"""
        while self.running:
            try:
                if self.serial_port and self.serial_port.in_waiting:
                    line = self.serial_port.readline()
                    text = line.decode('utf-8').strip()
                    print("받은 데이터:", text)

                    if text and ',' in text:
                        temp_str, humid_str = text.split(',')
                        try:
                            self.current_temp = int(temp_str)
                            self.current_humidity = int(humid_str)
                            print(f"파싱된 데이터 - 온도: {self.current_temp}, 습도: {self.current_humidity}")
                        except ValueError:
                            print("숫자 변환 실패")
            except Exception as e:
                print(f"데이터 읽기 오류: {e}")
            time.sleep(0.1)

    def process_message(self, user_message: str) -> str:
        """사용자 메시지 처리"""
        self.messages.append({"role": "user", "content": user_message})

        if '온도' in user_message:
            function_name = "get_temperature"
            response = self.get_temperature()
            if response["status"] == "success":
                answer = f"우리 집의 현재 온도는 {response['temperature']}°C예요~"
            else:
                answer = "아직 온도 데이터를 받지 못했네요. 잠시만 기다려주세요!"

        elif '습도' in user_message:
            function_name = "get_humidity"
            response = self.get_humidity()
            if response["status"] == "success":
                answer = f"현재 습도는 {response['humidity']}%입니다~"
            else:
                answer = "아직 습도 데이터를 읽지 못했어요. 조금만 기다려주세요!"

        else:
            temp_response = self.get_temperature()
            humid_response = self.get_humidity()
            if temp_response["status"] == "success" and humid_response["status"] == "success":
               answer = f"우리 집의 온도는 {temp_response['temperature']}°C이고, 습도는 {humid_response['humidity']}%예요~"
            else:
                answer = "센서에서 데이터를 읽어오는 중이에요. 잠시만 기다려주세요!"

        # 응답 저장
        self.messages.append({
            "role": "assistant",
            "content": answer,
            "function_call": {
                "name": function_name if '온도' in user_message or '습도' in user_message else "get_all",
                "arguments": "{}"
            }
        })

        return answer

def create_chat_interface():
    assistant = DHT11Assistant()

    if assistant.start_serial():
        thread = threading.Thread(target=assistant.read_sensor_data, daemon=True)
        thread.start()

        demo = gr.Interface(
            fn=assistant.process_message,
            inputs=gr.Textbox(placeholder="질문을 입력하세요 (예: 지금 온도는 어때요?)"),
            outputs="text",
            title="온습도 센서 챗봇",
            description="온습도 봇입니다. 온도나 습도에 대해 물어보세요!"
        )

        demo.launch(share=False)
    else:
        print("시리얼 포트 초기화 실패")

if __name__ == "__main__":
    create_chat_interface()