# 먹을텐데 식당 추천 서비스

In [1]:
import yt_dlp
import base64
import json
import tempfile
import os
from dotenv import load_dotenv

load_dotenv()
youtube_cookies = os.getenv("YOUTUBE_COOKIES")

## 플레이리스트 수집

In [None]:
# 플레이리스트 URL
playlist_url = (
    "https://www.youtube.com/playlist?list=PLuMuHAJh9g_Py_PSm8gmHdlcil6CQ9QCM"
)


# 쿠키 파일 생성 함수
def create_cookie_file(cookie_data_base64):
    if not cookie_data_base64:
        return None

    try:
        # base64로 인코딩된 쿠키 문자열을 디코딩
        cookie_data = base64.b64decode(cookie_data_base64).decode("utf-8")

        # 쿠키가 JSON 형식인지 확인하고 Netscape 형식으로 변환
        try:
            # JSON 형식인지 확인
            json_cookies = json.loads(cookie_data)

            # Netscape 형식으로 변환
            netscape_cookies = "# Netscape HTTP Cookie File\n"
            for cookie in json_cookies:
                if all(k in cookie for k in ["domain", "path", "name", "value"]):
                    secure = "TRUE" if cookie.get("secure", False) else "FALSE"
                    http_only = "TRUE" if cookie.get("httpOnly", False) else "FALSE"
                    expires = str(int(cookie.get("expirationDate", 0)))
                    netscape_cookies += f"{cookie['domain']}\tTRUE\t{cookie['path']}\t{secure}\t{expires}\t{cookie['name']}\t{cookie['value']}\n"

            cookie_data = netscape_cookies
        except json.JSONDecodeError:
            # 이미 Netscape 형식이거나 다른 형식인 경우 그대로 사용
            pass

        # 임시 파일 생성
        cookie_file = tempfile.NamedTemporaryFile(delete=False, suffix=".txt")
        cookie_file.write(cookie_data.encode("utf-8"))
        cookie_file.close()

        return cookie_file.name
    except Exception as e:
        print(f"쿠키 파일 생성 중 오류 발생: {str(e)}")
        return None


# 플레이리스트 정보 가져오기
def get_playlist_info(playlist_url, cookie_file_path=None):
    ydl_opts = {
        "quiet": True,
        "no_warnings": True,
        "extract_flat": True,  # 플레이리스트 항목만 추출
        "nocheckcertificate": True,
        "ignoreerrors": True,
        "no_color": True,
        "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    }

    # 쿠키 파일이 있으면 옵션에 추가
    if cookie_file_path:
        ydl_opts["cookiefile"] = cookie_file_path

    try:
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            playlist_info = ydl.extract_info(playlist_url, download=False)
            return playlist_info
    except Exception as e:
        print(f"플레이리스트 정보 추출 중 오류 발생: {str(e)}")
        return None


# 메인 실행 코드
try:
    # 쿠키 파일 생성
    cookie_file_path = create_cookie_file(youtube_cookies) if youtube_cookies else None

    # 플레이리스트 정보 가져오기
    playlist_info = get_playlist_info(playlist_url, cookie_file_path)

    if playlist_info:
        print(f"플레이리스트 제목: {playlist_info.get('title', '제목 없음')}")
        print(f"총 {len(playlist_info.get('entries', []))}개의 동영상이 있습니다.")

        # 각 동영상 정보 출력
        for i, entry in enumerate(playlist_info.get("entries", []), 1):
            print(
                f"{i}. {entry.get('title', '제목 없음')}: https://www.youtube.com/watch?v={entry.get('id', '')}"
            )
    else:
        print("플레이리스트 정보를 가져오지 못했습니다.")
finally:
    # 임시 쿠키 파일 삭제
    if cookie_file_path and os.path.exists(cookie_file_path):
        os.unlink(cookie_file_path)

## 수집된 영상의 자막 스크립트 수집

- 성시경의 먹을텐데 영상은 자체 자막을 반드시 지원하고? 있는것으로 확인 됨
- 따라서 STT를 하지 않고 자막을 수집하여 메뉴 추출

In [2]:
import json


# 쿠키 파일 생성 함수
def create_cookie_file(cookie_data_base64):
    if not cookie_data_base64:
        return None

    try:
        # base64로 인코딩된 쿠키 문자열을 디코딩
        cookie_data = base64.b64decode(cookie_data_base64).decode("utf-8")
        print("쿠키 데이터 디코딩 완료")

        # 쿠키가 JSON 형식인지 확인하고 Netscape 형식으로 변환
        try:
            # JSON 형식인지 확인
            json_cookies = json.loads(cookie_data)
            print("JSON 형식 쿠키 감지됨, Netscape 형식으로 변환 중")

            # Netscape 형식으로 변환
            netscape_cookies = "# Netscape HTTP Cookie File\n"
            for cookie in json_cookies:
                if all(k in cookie for k in ["domain", "path", "name", "value"]):
                    secure = "TRUE" if cookie.get("secure", False) else "FALSE"
                    http_only = "TRUE" if cookie.get("httpOnly", False) else "FALSE"
                    expires = str(int(cookie.get("expirationDate", 0)))
                    netscape_cookies += f"{cookie['domain']}\tTRUE\t{cookie['path']}\t{secure}\t{expires}\t{cookie['name']}\t{cookie['value']}\n"

            cookie_data = netscape_cookies
        except json.JSONDecodeError:
            # 이미 Netscape 형식이거나 다른 형식인 경우 그대로 사용
            print("쿠키가 JSON 형식이 아닙니다. 원본 형식 유지")
            pass

        # 임시 파일 생성
        cookie_file = tempfile.NamedTemporaryFile(delete=False, suffix=".txt")
        cookie_file.write(cookie_data.encode("utf-8"))
        cookie_file.close()
        print(f"쿠키 파일 생성 완료: {cookie_file.name}")

        return cookie_file.name
    except Exception as e:
        print(f"쿠키 파일 생성 중 오류 발생: {str(e)}")
        return None

In [None]:
youtube_cookies = os.environ.get("YOUTUBE_COOKIES", "")
cookie_file_path = create_cookie_file(youtube_cookies) if youtube_cookies else None

In [None]:
import yt_dlp

ydl_opts = {
    "quiet": True,
    "no_warnings": True,
    "extract_flat": True,  # 기본 정보만 추출하도록 변경
    "writesubtitles": True,  # 자막 정보 가져오기
    "skip_download": True,  # 영상 다운로드 없이 정보만 가져오기
    "nocheckcertificate": True,
    "ignoreerrors": True,
    "subtitleslangs": ["ko", "en"],
    "no_color": True,
    "socket_timeout": 30,  # 소켓 타임아웃 설정
    "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "cookiefile": cookie_file_path,
}
# YouTube 객체 생성
video_url = "https://www.youtube.com/watch?v=8sseq7MuLjY"
yt = yt_dlp.YoutubeDL(ydl_opts)
video_info = yt.extract_info(video_url, download=False)

print("제목:", video_info.get("title"))
print("설명:", video_info.get("description"))
print("영상 URL:", video_info.get("webpage_url"))

# video_id = yt.video_id  # 동영상 ID 추출

# script = []


# # 자막 가져오기
# try:
#     transcript = yt.get_transcript(video_id, languages=["ko", "en"],cookies=cookie_file_path)
#     print(f"영상 제목: {yt.title}")
#     for entry in transcript:
#         script.append(entry["text"])
# except Exception as e:
#     print(f"자막을 가져오는 중 오류 발생: {e}")

In [39]:
list(video_info["subtitles"].keys()).remove("live_chat")

In [None]:
import requests

sub_url = video_info["subtitles"]
try:
    response = requests.get(sub_url["ko"][0]["url"])
    print("한글 자막 존재")
except:
    response = requests.get(sub_url["en"][0]["url"])
    print("영어 자막 존재")

In [None]:
response.json()["events"][0]["segs"][0]["utf8"]

In [None]:
for event in response.json()["events"]:
    for seg in event["segs"]:
        print(seg["utf8"])

In [41]:
def convert_vtt_to_text(response):
    """VTT 형식의 자막을 일반 텍스트로 변환"""
    # 타임스탬프 및 VTT 헤더 제거
    text_lines = []

    try:
        for event in response.get("events", []):
            if "segs" in event and event["segs"]:
                for seg in event["segs"]:
                    if "utf8" in seg and seg["utf8"].strip():
                        text_lines.append(seg["utf8"].strip())

        result = "\n".join(text_lines)
        if not result.strip():
            return None

        return result
    except Exception as e:

        return None

In [None]:
import requests

automatic_captions = video_info.get("automatic_captions", {})
for i in automatic_captions["ko"]:
    if i.get("ext") == "json3":
        url = i["url"]
        print(url)
        response = requests.get(url)
        if response.status_code == 200:
            transcript_text = convert_vtt_to_text(response.json())
            if transcript_text:
                print(f"자동 생성 자막(ko) 추출 완료")

In [None]:
response.json()

In [None]:
print(transcript_text)

## 수집된 영상의 정보 수집

In [None]:
# from pytubefix import YouTube
# yt = YouTube("https://youtube.com/watch?v=vBjWJgDaNds")


# description = yt.description.split("\n")
# name = description[0]
# address = description[1]

## 별도의 YouTube 인스턴스를 만드는 것이 아닌 첫번째 셀에서 작업한 플레이리스트에서 description 추출
playlist.videos[1].title, playlist.videos[1].description, video_url

In [15]:
description = playlist.videos[1].description.split("\n")
name = description[0]
address = description[1]

In [None]:
name

In [None]:
# 수집된 주소를 기반으로 좌표 확인 필요
address

In [1]:
from pytubefix import YouTube

yt = YouTube("https://youtube.com/watch?v=vBjWJgDaNds")

In [None]:
yt.description

## 주소 기반으로 좌표 수집

In [None]:
import requests
import os
from dotenv import load_dotenv

load_dotenv()

# Kakao API Key
KAKAO_API_KEY = os.getenv("KAKAO_API_KEY")


# Kakao Geocoding API 요청
url = "https://dapi.kakao.com/v2/local/search/address.json"
headers = {"Authorization": f"KakaoAK {KAKAO_API_KEY}"}
params = {"query": address}

response = requests.get(url, headers=headers, params=params)
if response.status_code == 200:
    data = response.json()
    if data["documents"]:
        location = data["documents"][0]
        latitude = location["y"]
        longitude = location["x"]
        print(f"위도: {latitude}, 경도: {longitude}")
    else:
        print("주소로부터 좌표를 찾을 수 없습니다.")
else:
    print("API 요청 실패:", response.status_code)

## 수집된 좌표를 지도에 표시

- streamlit에 표시해야 하므로 아래 방법은 바뀔 수 있음

In [None]:
import folium

# 위도와 경도
latitude = 37.498095  # 예시 좌표
longitude = 127.027610  # 예시 좌표

# 지도 생성
m = folium.Map(location=[latitude, longitude], zoom_start=16)
folium.Marker([latitude, longitude], popup="대동빌딩").add_to(m)

# HTML로 저장
m.save("map.html")
print("지도 파일이 'map.html'로 저장되었습니다.")

## 스크립트를 llm을 통해 정리

In [26]:
from langchain_openai import ChatOpenAI
from dotenv import load_dotenv
from pydantic import BaseModel, Field
from typing import List
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import PromptTemplate
from operator import itemgetter

# 환경 변수 로드
load_dotenv()

# LLM 초기화
# llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0.1)
llm = ChatOpenAI(model_name="gpt-4o", temperature=0.1)


# 식당 정보 모델
class Info(BaseModel):
    menu: str = Field(..., description="식당 메뉴")
    review: str = Field(..., description="식당 후기")


# 최종 응답 모델
class Answers(BaseModel):
    answer: str = Field(..., description="답변 내용")
    infos: List[Info] = Field(..., description="식당 정보")

In [33]:
prompt = PromptTemplate.from_template(
    """다음은 성시겨의 먹을텐데 유튜브 영상의 스크립트입니다. 
스크립트를 읽고 아래의 형식으로 모든 데이터를 한글로 추출해주세요.
                                      
SCRIPT:
{script}

### 주의사항
1. 성시경이 언급한 메뉴를 정리하세요.
2. 메뉴들을 주메뉴와 건더기, 반찬 등으로 구분 하세요.
3. 반찬과 건더기는 해당 메인메뉴의 리뷰에 포함시키세요.
4. 메뉴의 종류는 언급된 메뉴에 맞는 카테고리를 적합하게 작성해주세요.


OUTPUT_FORMAT:
{{
  "restaurant_name": "script의 가게명",
  "menus": [
   {{
      "menu_type": "메뉴의 종류 (예: 양식, 일식, 한식 등)",
      "menu_name": "메뉴명",
      "menu_review": "영상에 언급된 해당 메뉴에 대한 성시경이 느낀점과 자연스러운 설명"
    }},
    ...
  ]
}}
"""
)

In [34]:
parser = JsonOutputParser(pydantic_object=Answers)

In [35]:
chain = {"input": itemgetter("input")} | prompt | llm | parser

In [36]:
with open("temp.txt", "r") as f:
    data = f.read()

In [37]:
result = chain.invoke({"input": data})

In [None]:
result      

## 수집된 데이터들을 db에 저장

- db는 sqlite?
- 가게이름, 주소, 좌표, 메뉴들, 후기들
    - 메뉴들, 후기들을 어떻게 묶어서 정리할 지 정해야 함.
- 키는 영상의 ID &rarr; llHQEbY28a4

In [8]:
import sqlite3

db = sqlite3.connect("meokten.db")

In [9]:
cur = db.cursor()

In [3]:
cur.execute(
    """
CREATE TABLE IF NOT EXISTS meokten (
            id TEXT PRIMARY KEY,
            name TEXT NOT NULL,
            address TEXT,
            cordinate TEXT,
            menus,
            reviews)"""
)
db.commit()

# 위경도 기반 주변 지하철 조회

In [None]:
import requests
import os
from dotenv import load_dotenv

load_dotenv()

# 카카오 API 키 (Authorization: KakaoAK YOUR_API_KEY)
API_KEY = os.getenv("KAKAO_API_KEY")


def get_coordinates(address):
    """주소를 위도/경도로 변환"""
    url = "https://dapi.kakao.com/v2/local/search/address.json"
    headers = {"Authorization": f"KakaoAK {API_KEY}"}
    params = {"query": address}
    response = requests.get(url, headers=headers, params=params).json()

    if response["documents"]:
        return (
            response["documents"][0]["y"],
            response["documents"][0]["x"],
        )  # 위도, 경도
    return None, None


def get_nearest_subway(lat, lon):
    """주어진 좌표에서 가장 가까운 지하철역 찾기"""
    url = "https://dapi.kakao.com/v2/local/search/category.json"
    headers = {"Authorization": f"KakaoAK {API_KEY}"}
    params = {
        "category_group_code": "SW8",  # 지하철 카테고리
        "x": lon,
        "y": lat,
        "radius": 2000,  # 검색 반경 (최대 10km)
        "sort": "distance",  # 거리순 정렬
    }
    response = requests.get(url, headers=headers, params=params).json()

    if response["documents"]:
        nearest = response["documents"][0]
        return nearest["place_name"], nearest["distance"], nearest["address_name"]
    return None, None, None


# 테스트 주소
address = "서울 종로구 자하문로6길 6 2, 3층 (통의동 35-12)"

lat, lon = get_coordinates(address)
print(lat, lon)
if lat and lon:
    station_name, distance, station_address = get_nearest_subway(lat, lon)
    print(f"가장 가까운 지하철역: {station_name} ({distance}m, {station_address})")
else:
    print("주소 변환 실패")

In [1]:
from langchain_community.utilities import SQLDatabase


db = SQLDatabase.from_uri("sqlite:///../meokten/meokten.db")

In [2]:
db.run("SELECT r.id AS restaurant_id, r.name AS restaurant_name, r.address, r.station_name, r.latitude AS lat, r.longitude AS lng, r.video_id, r.video_url, m.id AS menu_id, m.restaurant_id AS menu_restaurant_id, m.menu_name AS menu_name, m.menu_type, m.menu_review FROM restaurants r LEFT JOIN menus m ON r.id = m.restaurant_id WHERE r.address LIKE '%포천%'")

''