In [None]:
!pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib
!pip install openai
!pip install PyYAML
!pip install rapidfuzz
!apt-get install chromium-browser

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [3]:
import os
os.chdir('/content/drive/MyDrive/Colab Notebooks/youtube_live_ai_chatbot')

In [None]:
from google.colab import files

uploaded = files.upload()  # JSON 파일 업로드
CLIENT_SECRETS_FILE = list(uploaded.keys())[0]  # 업로드한 파일 이름을 가져옴

### Googoe Cloude Consol 초기 설정

In [None]:
import os
import pickle
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import Flow
from googleapiclient.discovery import build

# 필요한 OAuth 권한 스코프
SCOPES = ["https://www.googleapis.com/auth/youtube.force-ssl"]

# 토큰 파일 경로
TOKEN_FILE = 'token.pickle'

def authenticate_youtube():
    creds = None
    # 기존 토큰 파일 확인
    if os.path.exists(TOKEN_FILE):
        with open(TOKEN_FILE, 'rb') as token:
            creds = pickle.load(token)

    # 토큰이 없거나 만료된 경우 새로 발급
    if not creds or not creds.valid:
        if creds and creds.expired and creds.refresh_token:
            creds.refresh(Request())
        else:
            flow = Flow.from_client_secrets_file(
                CLIENT_SECRETS_FILE, scopes=SCOPES
            )
            flow.redirect_uri = 'urn:ietf:wg:oauth:2.0:oob'
            auth_url, _ = flow.authorization_url(prompt='consent')
            print(f"다음 링크를 열어 인증을 진행하세요:\n{auth_url}")
            code = input("인증 코드 입력: ")
            flow.fetch_token(code=code)
            creds = flow.credentials

        # 새로운 토큰 저장
        with open(TOKEN_FILE, 'wb') as token:
            pickle.dump(creds, token)

    # YouTube API 서비스 생성
    youtube = build("youtube", "v3", credentials=creds)
    return youtube

# YouTube API 인증
#youtube_service = authenticate_youtube()
#print("YouTube API 인증 성공!")

### Open AI API 초기화

In [None]:
from openai import OpenAI
from rapidfuzz import fuzz, process
import yaml
from datetime import timezone, timedelta

# 한국 시간대 설정
KST = timezone(timedelta(hours=9))  # UTC+9

# OpenAI API 키 설정
OPENAI_API_KEY = "openai api key"

client_openai = OpenAI(api_key=OPENAI_API_KEY)

# YAML 파일 프롬프트 로드 함수
def load_prompts_from_yaml(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        return yaml.safe_load(f)

# 프롬프트 로드
prompts = load_prompts_from_yaml('bot_prompts.yaml')

# YAML 파일 로컬 데이터 로드 함수
def load_questions_from_yaml(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        return yaml.safe_load(f)["questions"]

# 로컬 질문,답변 로드
qa_list = load_questions_from_yaml('bot_local_prompts.yaml')

def get_chatbot_response(user_message):
    """
    사용자 메시지를 사전에 정의된 질문과 비교한 후:
    1. 유사한 질문이 있으면 사전 정의된 답변 반환.
    2. 없으면 ChatGPT를 호출해 답변 생성.
    """
    try:
        # 질문 목록 추출
        questions = [item["question"] for item in qa_list]
        # 사용자 메시지와 가장 유사한 질문 찾기
        match_result = process.extractOne(user_message, questions)
        # 유사도가 임계값 이상이면 사전 정의된 답변 반환
        threshold=70
        if match_result:
            closest_match, score = match_result[:2]  # 첫 두 개 요소만 가져옴
            print("유사도:", score)
            # 유사도가 임계값 이상이면 사전 정의된 답변 반환
            if score >= threshold:
                for item in qa_list:
                    if item["question"] == closest_match:
                        return item["answer"]

       # 사용자 메시지를 포함한 전체 메시지 구성
        messages = [prompts] + [{"role": "user", "content": user_message}]

        # ChatGPT API 호출
        response = client_openai.chat.completions.create(
            model="gpt-4o-mini",  # 사용할 모델
            messages=messages,
            temperature=0.7
        )

        # 응답 텍스트 반환
        answer = response.choices[0].message.content
        print(answer)
        return answer.strip()

    except Exception as e:
        print(f"An error occurred while getting chatbot response: {e}")
        return "해당 정보는 확인할 수 없습니다.1"

# 답변 동작 테스트
#user_input = "오늘시간은어때?"
#response = get_chatbot_response(user_input)
#print("최종 응답:", response)

### 현재 라이브 방송 ID 가져오기

In [None]:
def get_live_chat_id_for_authenticated_user(youtube):
    """
    인증된 계정에서 현재 진행 중인 라이브 방송의 liveChatId를 가져옵니다.
    """
    try:
        # 인증된 계정의 라이브 방송 정보 요청
        request = youtube.liveBroadcasts().list(
            part="snippet,contentDetails",
            broadcastType="all",  # 모든 방송 유형 포함 (테스트용 포함)
            mine=True             # 인증된 계정의 방송만
        )
        response = request.execute()

        # 진행 중인 라이브 방송이 없으면 반환
        if "items" not in response or len(response["items"]) == 0:
            print("현재 진행 중인 라이브 방송이 없습니다.")
            return None

        # 첫 번째 라이브 방송의 liveChatId 반환
        live_chat_id = response["items"][0]["snippet"].get("liveChatId")
        print("Live Chat ID:", live_chat_id)
        return live_chat_id

    except Exception as e:
        print(f"An error occurred: {e}")
        return None

#youtube_service = authenticate_youtube()  # 유튜브 인증 함수 실행
#live_chat_id = get_live_chat_id_for_authenticated_user(youtube_service)

### 실시간 채팅 메시지 가져오기

In [21]:
from dateutil import parser
from datetime import timezone, timedelta

def fetch_live_chat_messages(youtube, live_chat_id, last_message_time):
    """
    주어진 liveChatId에서 새로운 라이브 채팅 메시지를 가져옵니다.
    """
    try:
        chat_request = youtube.liveChatMessages().list(
            liveChatId=live_chat_id,
            part="snippet,authorDetails"
        )
        chat_response = chat_request.execute()

        new_messages = []
        for item in chat_response.get("items", []):
            # 메시지 생성 시간 (ISO 8601 → UTC → KST 변환)
            message_time_utc = parser.isoparse(item["snippet"]["publishedAt"]).astimezone(timezone.utc)
            message_time_kst = message_time_utc.astimezone(KST)

            # 메시지 작성자가 현재 로그인된 사용자인 경우 제외
            is_chat_owner = item["authorDetails"].get("isChatOwner", False)
            if is_chat_owner:
                continue

            # 새로운 메시지만 필터링
            if last_message_time is None or message_time_kst > last_message_time:
                author_name = item["authorDetails"]["displayName"]
                message_text = item["snippet"]["displayMessage"]
                new_messages.append((author_name, message_text, message_time_kst))

        return new_messages

    except Exception as e:
        print(f"messges - An error occurred: {e}")
        return []


### 실시간 메시지 반복적으로 가져오기

In [9]:
import time

def stream_live_chat(youtube, live_chat_id):
    """
    실시간으로 새로운 라이브 채팅 메시지를 가져옵니다.
    """
    last_message_time = None  # 마지막으로 가져온 메시지의 시간 (KST)

    try:
        while True:
            # 새로운 메시지 가져오기
            new_messages = fetch_live_chat_messages(youtube, live_chat_id, last_message_time)

            # 메시지 출력
            for author_name, message_text, message_time_kst in new_messages:
                print(f"{message_time_kst.strftime('%Y-%m-%d %H:%M:%S')} KST - {author_name}: {message_text}")
                # 마지막 메시지 시간을 업데이트
                if last_message_time is None or message_time_kst > last_message_time:
                    last_message_time = message_time_kst

            # 5초 대기 후 다음 메시지 요청
            time.sleep(5)

    except KeyboardInterrupt:
        print("채팅 스트리밍이 중단되었습니다.")


### 새로운 메시지 가져오기

In [10]:
import time

def stream_live_chat(youtube, live_chat_id):
    """
    실시간으로 새로운 라이브 채팅 메시지를 가져옵니다.
    """
    last_message_time = None  # 마지막으로 가져온 메시지의 시간 (KST)

    try:
        while True:
            # 새로운 메시지 가져오기
            new_messages = fetch_live_chat_messages(youtube, live_chat_id, last_message_time)

            # 메시지 출력
            for author_name, message_text, message_time_kst in new_messages:
                print(f"{message_time_kst.strftime('%Y-%m-%d %H:%M:%S')} KST - {author_name}: {message_text}")
                # 마지막 메시지 시간을 업데이트
                if last_message_time is None or message_time_kst > last_message_time:
                    last_message_time = message_time_kst

            # 5초 대기 후 다음 메시지 요청
            time.sleep(5)

    except KeyboardInterrupt:
        print("채팅 스트리밍이 중단되었습니다.")




### 채팅 메시지에 답변 전송

In [11]:
def send_message_to_live_chat(youtube, live_chat_id, message):
    """
    라이브 채팅에 메시지를 전송합니다.
    """
    try:
        youtube.liveChatMessages().insert(
            part="snippet",
            body={
                "snippet": {
                    "liveChatId": live_chat_id,
                    "type": "textMessageEvent",
                    "textMessageDetails": {
                        "messageText": message
                    }
                }
            }
        ).execute()
        print(f"메시지 전송 완료: {message}")
    except Exception as e:
        print(f"메시지 전송 중 오류 발생: {e}")



### 채팅메시지에 대한 답변 생성

In [12]:
processed_messages = set()

def stream_live_chat_with_responses(youtube, live_chat_id):
    """
    실시간으로 새로운 라이브 채팅 메시지를 가져오고, ChatGPT를 통해 답변을 생성하여 전송합니다.
    """
    last_message_time = None
    processed_messages = set()  # 처리된 메시지를 추적

    try:
        while True:
            new_messages = fetch_live_chat_messages(youtube, live_chat_id, last_message_time)

            for author_name, message_text, message_time_kst in new_messages:
                if message_text in processed_messages:
                    continue

                print(f"{message_time_kst.strftime('%Y-%m-%d %H:%M:%S')} KST - {author_name}: {message_text}")

                # ChatGPT API를 사용해 답변 생성
                response = get_chatbot_response(message_text)
                print(f"ChatGPT 응답: {response}")

                # 특정 문장이 포함된 경우 메시지 전송 생략
                EXCLUDED_PHRASES = [
                    "해당 정보는 확인할 수 없습니다",
                    "죄송하지만, 답변할 수 없습니다",
                    ]

                if any(phrase in response for phrase in EXCLUDED_PHRASES):
                    print("응답이 전송 제외 조건에 해당되어 메시지를 전송하지 않습니다.")
                else:
                    # YouTube 채팅에 답변 전송
                    send_message_to_live_chat(youtube, live_chat_id, response)

                # 처리된 메시지로 추가
                processed_messages.add(message_text)

                # 마지막 메시지 시간 업데이트
                if last_message_time is None or message_time_kst > last_message_time:
                    last_message_time = message_time_kst

            time.sleep(5)

    except KeyboardInterrupt:
        print("채팅 스트리밍이 중단되었습니다.")


### 실행

In [None]:
# google cloud console 인증
youtube = authenticate_youtube()

# 라이브 채팅 ID 가져오기
live_chat_id = get_live_chat_id_for_authenticated_user(youtube)

if live_chat_id:
    print("실시간 메시지 스트리밍 시작...")
    stream_live_chat_with_responses(youtube, live_chat_id)
else:
    print("라이브 채팅 ID를 가져올 수 없습니다.")
