### 필요한 라이브러리 import

In [1]:
from openai import OpenAI
from dotenv import load_dotenv
import os
import sys
import urllib.request
import urllib.parse
import json
import requests
import time

#selenium의 webdriver를 사용
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.common.exceptions import NoSuchElementException
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options

from bs4 import BeautifulSoup

from textblob import TextBlob
from yake import KeywordExtractor
from sentence_transformers import SentenceTransformer

import re
import numpy as np
import faiss
import concurrent.futures
from sklearn.metrics.pairwise import cosine_similarity
from queue import Queue

### OPENAI, kakao rest api, naver api 키를 .env 파일로부터 load

In [2]:
# env 파일 로드
load_dotenv()


# openai api, kakao rest api, naver api 키 로드
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
KAKAO_REST_API_KEY = os.getenv('KAKAO_RESTAPI_KEY')
NAVER_API_CLIENT_ID = os.getenv('NAVER_API_CLIENT_ID')
NAVER_API_CLIENT_SECRET = os.getenv('NAVER_API_CLIENT_SECRET')

# OpenAI client 정의
client = OpenAI(api_key = OPENAI_API_KEY)

# ChromeOptions 설정
chrome_options = Options()
chrome_options.add_argument("--headless=new")


system_prompt =  """반드시 다음 규칙을 지켜 답변하시오:
1. 제공된 문맥에만 기반하여 답변
2. 문맥에 없는 정보는 '관련 정보를 찾을 수 없습니다'로 통일
3. 장점, 단점에 대해서 한 줄로 간단히 정리
4. 긍정, 부정적인지를 합 100%으로 각각 답변"""

### OpenAI api를 활용해 ChatGPT 활용하기

In [4]:
def search(query, top_k=3):
    if not metadata_store:
        return []
    
    # 쿼리 임베딩 생성
    query_embedding = get_embedding(query)
    
    # FAISS 검색 (k 필수 지정)
    distances, indices = index.search(
        np.array([query_embedding]).astype('float32'),  # 2D 배열로 변환
        top_k  # 상위 k개 결과 요청
    )
    
    # 유효한 인덱스 필터링
    valid_indices = [i for i in indices[0] if i < len(metadata_store)]
    return [metadata_store[i] for i in valid_indices]

In [5]:
def generate_answer(query, place_data):
    
    place_name = place_data["place_name"]
    review_jsons = place_data["reviews"]
    
    # 해당 장소의 청크 데이터만 추출
    relevant_chunks = []
    for review in review_jsons:
        for chunk_data in review["chunks"]:
            relevant_chunks.append({
                "text": chunk_data["text"],
                "review_id": review["index"]
            })
    
    # 청크 데이터를 문맥으로 결합
    context = "\n\n".join([chunk["text"] for chunk in relevant_chunks[:10]])  # 최대 10개 청크 사용
    
    completion = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": f"{place_name}에 대한 다음 리뷰들을 분석하여 장점, 단점을 요약하고 긍정/부정 비율을 알려주세요:\n\n{context}"}
        ],
        temperature=0.3,
        max_tokens=300
    )
    
    return completion.choices[0].message.content

### 카카오맵 Rest API를 활용해 좌표를 행정구역정보를 변환하기

In [6]:
def transform_coordinates(longitude, latitude):
    url = 'https://dapi.kakao.com/v2/local/geo/coord2regioncode.json'
    headers = {'Authorization': f'KakaoAK {KAKAO_REST_API_KEY}'}
    params = {'x': longitude, 'y': latitude}
    
    response = requests.get(url, headers=headers, params=params)
    
    # 데이터를 성공적으로 불러온 경우, code = 200
    if response.status_code == 200:
        return response.json()
    else:
        return None

### 카카오맵 Rest API를 활용해 해당 좌표의 카테고리별 결과 불러오기

In [7]:
def search_by_category(longitude, latitude, category_code):
    url = 'https://dapi.kakao.com/v2/local/search/category.json'
    headers = {'Authorization': f'KakaoAK {KAKAO_REST_API_KEY}'}
    params = {
        'category_group_code': category_code,
        'x': longitude,
        'y': latitude,
        'radius': 3000,  # 검색 반경 설정 (단위: 미터)
        'size': 15       # 한 페이지에 보여질 결과 개수
    }

    # API 요청
    response = requests.get(url, headers=headers, params=params)

    # 응답 처리
    if response.status_code == 200:
        return response.json()  # 성공적으로 데이터를 반환
    else:
        print(f"Error: {response.status_code}, {response.text}")
        return None  # 실패 시 None 반환

### 리뷰 데이터 정제 함수들

##### 데이터 정리

In [8]:
def clean_text(text):
    # 입력값이 문자열인지 확인
    if not isinstance(text, str):
        text = str(text)  # 문자열로 변환
    text = re.sub(r"[^\w\s가-힣]", " ", text)  # 특수문자 및 이모지 제거
    text = re.sub(r"\s+", " ", text).strip()  # 공백 정리
    return text

##### 텍스트 길이를 분류 (short, medium, long)

In [9]:
def classify_length(text):
    word_count = len(text.split())

    if word_count < 10:
        return "short"
    elif word_count < 50:
        return "medium"
    else:
        return "long"

##### 감정 분석 함수

In [10]:
def analyze_sentiment(text):
    analysis = TextBlob(text)
    polarity = analysis.sentiment.polarity
    if polarity > 0.1:
        return "positive"
    elif polarity < -0.1:
        return "negative"
    else:
        return "neutral"

##### 키워드 추출 함수

In [11]:
def extract_keywords(text, max_keywords=5):
    extractor = KeywordExtractor()
    keywords = extractor.extract_keywords(text)
    return [kw[0] for kw in keywords[:max_keywords]]

##### 청크 분할

In [12]:
def chunk_text(text, chunk_size=300, overlap=50):
    words = text.split()
    chunks = []
    for i in range(0, len(words), chunk_size - overlap):
        chunk = " ".join(words[i:i+chunk_size])
        if len(chunk) > 10:  # 10자 미만 청크 제외
            chunks.append(chunk)
    return chunks if chunks else ["[No valid chunks]"]  # 빈 청크 방지

##### 임베딩 생성

In [13]:
def get_embedding(text):
    response = client.embeddings.create(
        input=text,
        model="text-embedding-3-small"
    )
    return response.data[0].embedding

In [14]:
def initialize_vector_db(all_places_reviews):
    """
    모든 장소의 리뷰 데이터를 FAISS 벡터 DB에 저장
    """
    global index, metadata_store
    
    embedding_dim = 1536  # text-embedding-3-small 차원
    index = faiss.IndexFlatL2(embedding_dim)
    
    metadata_store = []
    
    for place_data in all_places_reviews:
        place_name = place_data["place_name"]
        reviews = place_data["reviews"]
        
        for review in reviews:
            for chunk_data in review["chunks"]:
                embedding = np.array([chunk_data["embedding"]], dtype="float32")
                index.add(embedding)
                
                metadata_store.append({
                    "place_name": place_name,
                    "text": chunk_data["text"],
                    "review_id": review["index"]
                })
    
    print(f"벡터 DB에 총 {len(metadata_store)}개의 청크 저장 완료")

### Selenium으로 네이버 지도 크롤링

##### 리뷰 데이터들을 JSON 형식으로 변환해주는 함수

In [15]:
def review_to_json(reviews, chunk_size=300, overlap=50):
    """
        리뷰 데이터를 JSON 형식으로 변환하는 함수.
        Args:
            reviews (list): 리뷰 텍스트 리스트.
            chunk_size (int): 각 청크의 최대 단어 수.
            overlap (int): 청크 간 겹치는 단어 수.
        Returns:
            list: JSON 형식의 리뷰 데이터 리스트.
    """
    review_jsons = []

    for idx, review in enumerate(reviews, start=1):
        review_index = f"review_{idx:03}"
        cleaned_text = clean_text(review)
        text_length = classify_length(cleaned_text)
        sentiment = analyze_sentiment(cleaned_text)
        keywords = extract_keywords(cleaned_text)
        chunks = chunk_text(cleaned_text, chunk_size=chunk_size, overlap=overlap)
        embeddings = [get_embedding(chunk) for chunk in chunks]

        review_json = {
            "index": review_index,
            "text": cleaned_text,
            "length": text_length,
            "senitiment": sentiment,
            "keywords": keywords,
            "chunks": [{"text": chunk, "embedding": embedding} for chunk, embedding in zip(chunks, embeddings)]
        }
    
        review_jsons.append(review_json)

    return review_jsons

##### 검색 결과 노출 영역(searchIframe)으로 프레임 변경

In [16]:
def search_iframe(driver, timeout: int = 10) -> str:
    """
    검색 결과 수에 따라 적절한 iframe으로 전환 (단일/다중 결과 처리)
    
    Args:
        timeout (int): 결과 대기 시간 (기본 10초)
        
    Returns:
        str: "single" 또는 "multi" (결과 상태 반환)
    """
    
    driver.switch_to.default_content()
    
    try:
        # entryIframe 확인 (단일 결과)
        if WebDriverWait(driver, timeout).until(
            EC.presence_of_element_located((By.XPATH, '//*[@id="entryIframe"]'))
        ):
            driver.switch_to.frame("entryIframe")
            return "single"
    except TimeoutException:
        pass  # entryIframe이 없으면 넘어감
    
    try:
        # searchIframe 확인 (다중 결과)
        if WebDriverWait(driver, timeout).until(
            EC.presence_of_element_located((By.XPATH, '//*[@id="searchIframe"]'))
        ):
            driver.switch_to.frame("searchIframe")
            return "multi"
    except TimeoutException:
        raise TimeoutException("entryIframe과 searchIframe 모두 로드되지 않았습니다.")

    try:
        # 더보기 버튼 찾아 클릭
        read_more_button = WebDriverWait(driver, 5).until(
            EC.element_to_be_clickable((By.XPATH, "//*[contains(@class, 'fvwqf') and .//span[text()='더보기']]"))
        )
        read_more_button.click()  # 버튼 클릭 동작 추가
        time.sleep(1)  # 페이지 로딩 대기
    except Exception as e:
        print(f"더보기 버튼 클릭 실패: {e}")            

##### 더보기 버튼을 눌러 리뷰를 추출하는 함수

In [17]:
def extract_review(driver, max_clicks=7):
    """
    최대 max_clicks 번 더보기 버튼을 눌러 리뷰를 추출하는 함수.
    
    Args:
        driver: Selenium WebDriver 객체.
        max_clicks (int): 더보기 버튼을 누를 최대 횟수 (기본값: 10).
    
    Returns:
        reviews (list): 추출된 리뷰 리스트.
    """

    for click in range(max_clicks):
        try:
            # 더보기 버튼 찾아 클릭
            read_more_button = WebDriverWait(driver, 5).until(
                EC.element_to_be_clickable((By.XPATH, "//*[contains(@class, 'fvwqf') and .//span[text()='더보기']]"))
            )
            
            # 더보기 버튼을 화면에 표시하기 위해 스크롤
            driver.execute_script("arguments[0].scrollIntoView(true);", read_more_button)
            read_more_button.click()
            time.sleep(1)  # AJAX 로딩 대기

        except Exception as e:
            # 더보기 버튼이 없거나, 클릭에 실패했을 경우 안내 메시지를 출력한다.
            print("더보기 버튼 없음 또는 클릭 실패:")
            break

    
    # 리뷰 데이터 추출
    reviews = []
    
    # HTML을 가져와 파싱하기
    html = driver.page_source
    soup = BeautifulSoup(html, "html.parser")

    review_elements = soup.select("div.place_section_content > ul > li")

    # 리뷰 텍스트 출력
    for idx, review in enumerate(review_elements, start=1):
        # 리뷰 내용이 있는 class = "pui__vn15t2"를 가져오기
        content_element = review.select_one(".pui__vn15t2 ")
        if content_element:
            content = content_element.get_text(separator=" ", strip=True)  # 텍스트 추출 (줄바꿈 포함)

            # 리뷰 내용이 3글자 이하라면 리뷰에 포함하지 않는다.
            if (len(content) > 3):
                reviews.append(content)
                # print(f"리뷰 {idx}: {content}")
        else:
            print(f"리뷰 {idx}: pui__vn15t2 클래스 요소 없음")
            break

    return reviews

##### 최종 리뷰 데이터들을 반환하는 함수

In [18]:
def get_review_content(keyword: str, driver) -> list:
    
    driver.get(f"https://map.naver.com/v5/search/{keyword}")
    

    result_status = search_iframe(driver)

    if result_status == "single":
        print("단일 검색 결과입니다.")
    
    elif result_status == "multi":
        print("다중 검색 결과입니다.")
        
        # 기본 콘텐츠로 돌아가기
        driver.switch_to.default_content()
        
        try:
            # searchIframe으로 전환
            WebDriverWait(driver, 10).until(
                EC.frame_to_be_available_and_switch_to_it((By.ID, "searchIframe"))
            )
            
            print("searchIframe으로 전환 완료.")
            
            # 첫 번째 검색 결과 가져오기 (CSS Selector 사용)
            first_result = WebDriverWait(driver, 10).until(
                EC.element_to_be_clickable((By.CSS_SELECTOR, "div#_pcmap_list_scroll_container > ul > li:first-child a.ApCpt.k4f_J"))
            )
            
            # 요소로 스크롤
            driver.execute_script("arguments[0].scrollIntoView(true);", first_result)
            time.sleep(1)  # 스크롤 후 잠시 대기
            
            # JavaScript로 클릭 (클릭 인터셉트 방지)
            driver.execute_script("arguments[0].click();", first_result)
            print("첫 번째 검색 결과를 클릭했습니다.")
            
            # entryIframe으로 전환 (상세 페이지 로드)
            driver.switch_to.default_content()
            
            # 명시적으로 entryIframe이 나타날 때까지 더 오래 대기
            WebDriverWait(driver, 10).until(
                EC.presence_of_element_located((By.ID, "entryIframe"))
            )
            
            WebDriverWait(driver, 10).until(
                EC.frame_to_be_available_and_switch_to_it((By.ID, "entryIframe"))
            )
            
            print("entryIframe으로 전환 완료.")
        
        except TimeoutException:
            print("검색 결과를 찾을 수 없습니다.")
        except Exception as e:
            print(f"오류 발생: {e}")
            # 오류 발생 시 다시 기본 콘텐츠로 전환
            driver.switch_to.default_content()
            
    try:
        """
            리뷰 탭 클릭
            하이퍼링크 <a>가 class = "tpj9w _tab-menu"이며, 하위 <span>에 "리뷰"라고 적힌 탭을 찾아서 클릭한다.
        """
        # 충분한 대기 시간 추가
        review_tab = WebDriverWait(driver, 10).until(
            EC.element_to_be_clickable((By.XPATH, "//*[contains(@class, 'tpj9w _tab-menu') and .//span[text()='리뷰']]"))
        )
        
        # 요소로 스크롤하여 가시성 확보
        driver.execute_script("arguments[0].scrollIntoView(true);", review_tab)
        time.sleep(1)  # 스크롤 후 잠시 대기
        
        # JavaScript로 클릭 실행 (더 안정적)
        driver.execute_script("arguments[0].click();", review_tab)
        print(f"{keyword} 리뷰 탭 클릭 성공!")
    except Exception as e:
        print(f"리뷰 탭 클릭 실패: {e}")

    # 리뷰 데이터값을 리스트로 받아오기
    reviews = extract_review(driver)

    # 리뷰 데이터값 -> JSON으로 바꿔 리스트화 시키기
    review_jsons = review_to_json(reviews)

    return review_jsons


### main 함수 실행 part

In [19]:
# 좌표 기준 카테고리 검색 rest api 실행. 기준-강원대 자연대학 기준
category_result = search_by_category(127.743288, 37.872316, "FD6")
print(category_result)

{'documents': [{'address_name': '강원특별자치도 춘천시 효자동 618-4', 'category_group_code': 'FD6', 'category_group_name': '음식점', 'category_name': '음식점 > 중식', 'distance': '265', 'id': '1362999570', 'phone': '0507-1415-2733', 'place_name': '육림객잔', 'place_url': 'http://place.map.kakao.com/1362999570', 'road_address_name': '강원특별자치도 춘천시 서부대성로 207', 'x': '127.74155390732444', 'y': '37.87427198463118'}, {'address_name': '강원특별자치도 춘천시 효자동 174-59', 'category_group_code': 'FD6', 'category_group_name': '음식점', 'category_name': '음식점 > 한식 > 육류,고기 > 닭요리', 'distance': '738', 'id': '18879265', 'phone': '033-243-2888', 'place_name': '진미닭갈비 본점', 'place_url': 'http://place.map.kakao.com/18879265', 'road_address_name': '강원특별자치도 춘천시 백령로 51', 'x': '127.736742118901', 'y': '37.8681542435598'}, {'address_name': '강원특별자치도 춘천시 효자동 633-11', 'category_group_code': 'FD6', 'category_group_name': '음식점', 'category_name': '음식점 > 양식 > 피자', 'distance': '272', 'id': '2135313009', 'phone': '033-911-9023', 'place_name': '브릭스피자', 'place_u

In [20]:
for document in category_result["documents"]:
    place_name = document["place_name"]
    print(place_name)
    x = document['x']
    y = document['y']
    transfrom_result = transform_coordinates(x, y)
    # 해당 장소에 대한 행정동을 결과값을 도출
    print(transfrom_result['documents'][1]['address_name'])

육림객잔
강원특별자치도 춘천시 효자3동
진미닭갈비 본점
강원특별자치도 춘천시 효자2동
브릭스피자
강원특별자치도 춘천시 효자3동
1.5닭갈비
강원특별자치도 춘천시 후평3동
큰집한우
강원특별자치도 춘천시 조운동
감미옥
강원특별자치도 춘천시 효자3동
중화루
강원특별자치도 춘천시 효자2동
만석식당 강원대점
강원특별자치도 춘천시 효자3동
봉수닭갈비
강원특별자치도 춘천시 효자3동
돈까스타운
강원특별자치도 춘천시 효자3동
복성원
강원특별자치도 춘천시 효자3동
죽향
강원특별자치도 춘천시 효자2동
해안막국수
강원특별자치도 춘천시 효자3동
너와집
강원특별자치도 춘천시 효자3동
별식당 강원대점
강원특별자치도 춘천시 효자3동


In [21]:
# WebDriver 풀 생성
driver_pool = Queue(maxsize=8)  # 최대 8개의 드라이버만 생성
for _ in range(8):
    driver = webdriver.Chrome(options=chrome_options)
    driver_pool.put(driver)

# 병렬 처리를 위한 함수
def process_place(document):
    address_name = document["address_name"]
    place_name = document["place_name"]
    place_x = document["x"]
    place_y = document["y"]

    # 장소명 + 행정동 결과값
    transform_result = transform_coordinates(place_x, place_y)['documents'][1]['address_name']
    place_name += " " + transform_result
    
    print(f"처리 중인 장소: {place_name} ({address_name})")

    # 풀에서 드라이버 가져오기
    driver = driver_pool.get()

    # # 창 숨기는 옵션 추가
    # driver.add_argument("headless")

    
    try:
        # 리뷰 데이터 가져오기 및 JSON 변환
        review_jsons = get_review_content(place_name, driver)
        
        print(f"{place_name}- {len(review_jsons)}개의 리뷰 처리 완료")
        
        # 장소 이름과 리뷰를 포함하는 딕셔너리 반환
        return {"place_name": place_name, "reviews": review_jsons}
        
    finally:
        # 작업 완료 후 드라이버 반환
        driver_pool.put(driver)

# 전체 장소별 리뷰 데이터를 저장할 리스트 - 반복문 밖으로 이동
all_places_reviews = []

# 병렬 처리 실행
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
    # 각 장소별 작업 제출
    future_to_place = {
        executor.submit(process_place, document): document["place_name"]
        for document in category_result["documents"]
    }
    
    # 작업 결과 수집
    for future in concurrent.futures.as_completed(future_to_place):
        place_name = future_to_place[future]
        try:
            place_data = future.result()
            all_places_reviews.append(place_data)
        except Exception as e:
            print(f"- {place_name} 처리 중 오류 발생: {e}")

# 벡터 저장소 초기화
initialize_vector_db(all_places_reviews)

# 사용자 쿼리 처리
user_query = "각 음식점에 대해서 장점, 단점에 대해 한 줄 정리하고, 긍정/부정을 %로 알려줘."

# 각 장소별로 개별 분석 수행
print("\n===== 각 음식점 분석 결과 =====")

for place_data in all_places_reviews:
    place_name = place_data["place_name"]
    
    # 해당 장소에 특화된 분석 수행
    place_query = f"{place_name}의 {user_query}"
    place_response = generate_answer(place_query, place_data)
    
    print(f"\n[{place_name}]")
    print(place_response)

for _ in range(8):
    driver = webdriver.Chrome()
    driver_pool.get().quit()

처리 중인 장소: 육림객잔 강원특별자치도 춘천시 효자3동 (강원특별자치도 춘천시 효자동 618-4)
처리 중인 장소: 진미닭갈비 본점 강원특별자치도 춘천시 효자2동 (강원특별자치도 춘천시 효자동 174-59)
처리 중인 장소: 1.5닭갈비 강원특별자치도 춘천시 후평3동 (강원특별자치도 춘천시 후평동 801-13)
처리 중인 장소: 브릭스피자 강원특별자치도 춘천시 효자3동 (강원특별자치도 춘천시 효자동 633-11)
처리 중인 장소: 큰집한우 강원특별자치도 춘천시 조운동 (강원특별자치도 춘천시 운교동 188-5)
처리 중인 장소: 감미옥 강원특별자치도 춘천시 효자3동 (강원특별자치도 춘천시 효자동 744-17)
처리 중인 장소: 만석식당 강원대점 강원특별자치도 춘천시 효자3동 (강원특별자치도 춘천시 효자동 629-15)
처리 중인 장소: 중화루 강원특별자치도 춘천시 효자2동 (강원특별자치도 춘천시 효자동 652)
단일 검색 결과입니다.
다중 검색 결과입니다.
다중 검색 결과입니다.
다중 검색 결과입니다.
다중 검색 결과입니다.
단일 검색 결과입니다.
단일 검색 결과입니다.
다중 검색 결과입니다.
searchIframe으로 전환 완료.
searchIframe으로 전환 완료.
searchIframe으로 전환 완료.
중화루 강원특별자치도 춘천시 효자2동 리뷰 탭 클릭 성공!
searchIframe으로 전환 완료.
searchIframe으로 전환 완료.
큰집한우 강원특별자치도 춘천시 조운동 리뷰 탭 클릭 성공!
감미옥 강원특별자치도 춘천시 효자3동 리뷰 탭 클릭 성공!
첫 번째 검색 결과를 클릭했습니다.
첫 번째 검색 결과를 클릭했습니다.
첫 번째 검색 결과를 클릭했습니다.
첫 번째 검색 결과를 클릭했습니다.
첫 번째 검색 결과를 클릭했습니다.
entryIframe으로 전환 완료.
entryIframe으로 전환 완료.
entryIframe으로 전환 완료.
entryIframe으로 전환 완료.
entryIframe으로 전환 완료.
브릭스피자 강원특별자치도 춘천시 효자3동