# LLM with Web Search and Crawl

Code to crawl the top n pages of a Google search result and serve them to LLM in order to utilize rich context.



In [None]:
import re
import requests
import sys
import os
from openai import AzureOpenAI
import tiktoken
from dotenv import load_dotenv
load_dotenv(override=True) 

client = AzureOpenAI(
  azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT"), 
  api_key=os.getenv("AZURE_OPENAI_KEY"),  
  api_version="2024-08-01-preview"
)

CHAT_COMPLETIONS_MODEL = os.getenv('AZURE_OPENAI_DEPLOYMENT_NAME')

bs4 or scrapy?

In [None]:
from typing import List, Tuple
import requests
import json
import scrapy
from bs4 import BeautifulSoup
import httpx
import asyncio
from urllib.parse import urljoin
from azure.ai.projects.models import MessageRole, BingGroundingTool
from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential
import sys
import logging
sys.path.append(os.path.abspath('..'))  # Adjust the path as necessary
from utils.search_utils import web_search

# Configure logging
logging.basicConfig(level=logging.ERROR, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
GOOGLE_CSE_ID = os.getenv("GOOGLE_CSE_ID")
BING_GROUNDING_PROJECT_CONNECTION_STRING = os.getenv("BING_GROUNDING_PROJECT_CONNECTION_STRING")
BING_GROUNDING_AGENT_ID = os.getenv("BING_GROUNDING_AGENT_ID")
BING_GROUNDING_AGENT_MODEL_DEPLOYMENT_NAME = os.getenv("BING_GROUNDING_AGENT_MODEL_DEPLOYMENT_NAME")
BING_GROUNDING_CONNECTION_NAME = os.getenv("BING_GROUNDING_CONNECTION_NAME")
# Web search mode: "google" or "bing"
# it can be changed when users want to use different search engine
WEB_CRAWLING_MODE = os.getenv("WEB_CRAWLING_MODE") #on or off

def extract_text_and_tables_by_bs4(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.text, "html.parser")
    # Extract main text
    paragraphs = [p.get_text().strip() for p in soup.find_all("p") if p.get_text().strip()]
    text = "\n".join(paragraphs)
    return text



async def extract_contexts_async(url_snippet_tuples: List[Tuple[str, str]]) -> List[str]:
    """
    Asynchronously extract content from a list of URLs with their snippets.
    
    Args:
        url_snippet_tuples: List of (url, snippet) pairs to process
        
    Returns:
        List of extracted contents
    """
    async def fetch(url: str, snippet: str) -> str:
        # Try to get from cache first
        
        # If not in cache or cache unavailable, fetch the content
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
        }
        
        try:
            async with httpx.AsyncClient(timeout=10, follow_redirects=True) as client:
                try:
                    response = await client.get(url, headers=headers)
                    response.raise_for_status()
                except httpx.HTTPStatusError as e:
                    # Handle redirects manually if needed
                    if e.response.status_code == 302 and "location" in e.response.headers:
                        redirect_url = e.response.headers["location"]
                        if not redirect_url.startswith("http"):
                            redirect_url = urljoin(url, redirect_url)
                        try:
                            response = await client.get(redirect_url, headers=headers)
                            response.raise_for_status()
                        except Exception as e2:
                            print(f"Redirect request failed: {e2}")
                            return f"{snippet} "
                    else:
                        print(f"Request failed: {e}")
                        return f"{snippet} "
                except httpx.HTTPError as e:
                    print(f"Request failed: {e}")
                    return f"{snippet} "
                
                # Parse the content
                selector = scrapy.Selector(text=response.text)
                
                # Extract paragraphs
                paragraphs = [p.strip() for p in selector.css('p::text, p *::text').getall() if p.strip()]
                
                # Remove duplicate and very short paragraphs
                filtered_paragraphs = []
                seen_content = set()
                for p in paragraphs:
                    # Skip very short paragraphs that are likely UI elements
                    if len(p) < 5:
                        continue
                    # Avoid duplicate content
                    if p in seen_content:
                        continue
                    seen_content.add(p)
                    filtered_paragraphs.append(p)
                
                # Join the filtered paragraphs
                text = "\n".join(filtered_paragraphs)
                
                # If no paragraphs were found, try to get other text content
                if not text:
                    content_texts = [t.strip() for t in selector.css(
                        'article::text, article *::text, .content::text, .content *::text, '
                        'main::text, main *::text'
                    ).getall() if t.strip()]
                    
                    if content_texts:
                        text = "\n".join(content_texts)
                
                # Combine snippet with extracted text
                snippet_text = f"{snippet}: {text}"
                
                
                return snippet_text
                
        except Exception as e:
            print(f"Error processing URL {url}: {str(e)}")
            return f"{snippet} [Error: {str(e)}]"
    
    # Create tasks for all URLs
    tasks = [asyncio.create_task(fetch(url, snippet)) 
            for url, snippet in url_snippet_tuples]
    
    # Execute all tasks concurrently
    results = await asyncio.gather(*tasks, return_exceptions=True)
    
    # Process results
    processed_results = []
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Error processing URL {url_snippet_tuples[i][0]}: {str(result)}")
            processed_results.append(f"{url_snippet_tuples[i][1]} [Processing Error]")
        else:
            processed_results.append({"content": result, "url_citation" :{"link": url_snippet_tuples[i][0], "title": url_snippet_tuples[i][1]}})

    return processed_results
                    
           

       
QUERY_REWRITE_PROMPT = """
            <<지시문>>
            너는 구글 검색과 LLM 질의 최적화 전문가야. 사용자가 입력한 질문을 두 가지 목적에 맞게 재작성해.

            1. Web Search용 Query Rewrite:
            - 사용자의 질문을 실제 검색 엔진 검색창에 입력할 수 있도록, 명확하고 간결한 핵심 키워드 중심의 검색어로 재작성해.
            - 불필요한 문장, 맥락 설명은 빼고, 검색에 최적화된 형태로 만들어.
            - 핵심 키워드를 반복적으로 사용해 검색의 정확도를 높여.

            2. LLM Query용 Rewrite:
            - 사용자의 질문을 LLM이 더 잘 이해하고 답변할 수 있도록, 맥락과 의도를 명확히 드러내는 자연스러운 문장으로 재작성해.
            - 필요한 경우 추가 설명이나 세부 조건을 포함해서 질문의 목적이 분명히 드러나도록 만들어.
            - LLM이 답변에 집중할 수 있도록 핵심 단어를 반복 사용해.

            <<예시>>
            * 질문: 삼성전자 제품 중 2구 말고 다른 인덕션 추천해줘
            * 웹 검색용 재작성: 삼성전자 3구 이상 인덕션 추천
            * LLM 답변용 재작성: 삼성전자 인덕션 중 2구 모델이 아닌, 3구 이상 또는 다양한 화구 수를 가진 다른 인덕션 제품을 추천해 주세요. 각 모델의 주요 기능과 장점도 함께 알려주세요.

            <<질문>>
            {user_query}

            <<출력포맷>>
            반드시 아래와 같이 json 형식으로 출력해.
            {"web_search": "웹 검색용 재작성", "llm_query": "LLM 답변용 재작성"}
        """     
  
def rewrite_query_for_search_and_llm(query, client: AzureOpenAI):
        response = client.chat.completions.create(
            model=CHAT_COMPLETIONS_MODEL,
            messages=[
                {"role": "system", "content": QUERY_REWRITE_PROMPT},
                {"role": "user", "content": query}
            ],
            temperature=0.8,
            max_tokens=300,
            response_format= {"type": "json_object"},
        )
        
        return json.loads(response.choices[0].message.content.strip())


In [None]:
from IPython.display import Markdown, display
from datetime import datetime
import time

#TODO 날씨나 뉴스, 기타 다른 특정정보는 Function Call
# inputs = ["날씨, 뉴스"] ##

async def process_web_search_call(RESULTS_COUNT, input, web_search_mode):
    
    start_time = time.time()
    
    contexts = [] 
    print(f"Original Input: {input}")
    
    query_rewrite = rewrite_query_for_search_and_llm(input, client)
    print(f"Web Search Query: {query_rewrite['web_search']}")
    print(f"LLM Query: {query_rewrite['llm_query']}")
    
    results = web_search(query_rewrite, RESULTS_COUNT, web_search_mode=web_search_mode)
    print(f"Web Search Results: {len(results)}, {results}")
    if web_search_mode == "bing" and results and isinstance(results, list) and len(results) > 0:
        contexts = [results[i] for i in range(len(results))]
        
    elif web_search_mode == "google" and results and isinstance(results, list) and len(results) > 0:
        url_snippet_tuples = [(r["link"], r["snippet"]) for r in results]
        contexts = await extract_contexts_async(url_snippet_tuples)
        
    else:
        print("No results found or invalid response from web_search.")
        contexts = [] 
    
    # for i, context in enumerate(contexts):
    #     print(f"Context {i+1}: {context}...")  # Print first 1000 chars of each context
    #     print("\n--- End of Context ---\n")

    now = datetime.now()
    year = now.year
    month = now.month
    day = now.day

    
    
    system_prompt = """
        너는 삼성전자 제품 관련 정보를 제공하는 챗봇이야. 
        답변은 마크다운으로 이모지를 1~2개 포함해서 작성해줘. 
        contexts를 최대한 활용하여 풍부하게 답변을 해야해. 
        사용자가 질문한 내용에 대해 정확하고 유용한 정보를 제공해야 해. contexts가 부족하면 최소한의 안내만 해줘. 
        url_citation은 사용자가 클릭할 수 있도록 링크를 제공해줘.
        
    """
    user_prompt = f"""
        너는 아래 제공하는 웹검색에서 검색한 contexts를 바탕으로 질문에 대한 답변을 제공해야 해. 
        현재는 {year}년 {month}월 {day}일이므로 최신의 데이터를 기반으로 답변을 해줘.
        웹검색에서 제공한 contexts: {contexts}
        질문: {query_rewrite['llm_query']}
        """
    
    response = client.chat.completions.create(
        model=CHAT_COMPLETIONS_MODEL,
        messages=[{"role": "system", "content": system_prompt},
                 {"role": "user", "content": user_prompt}],
        top_p=0.9,
        max_tokens=1500
    )

    display(Markdown(response.choices[0].message.content))
    end_time = time.time()
    print(f"elapsed time: {end_time - start_time:.2f} seconds")


In [None]:
RESULTS_COUNT = 5

inputs = [
    "삼성전자 제품 중 2구 말고 다른 인덕션 추천해줘",
    # "부모님에게 선물하고 싶은데 삼성전자 TV 추천해줘",
    # "삼성전자 25년 제품이 작년 대비 좋아진것은",
    # "삼성전자 JBL과 하만카돈 차이점이 뭐야",
    # "갤럭시 버즈 이어버드 한쪽을 새로 구매했는데 페어링 어떻게 하나요",
    # "삼성전자 S25 무게가 S24와 비교 했을때 얼마나 차이나"
]


web_search_mode = "google"

for input in inputs:
    
    print(f"Google Search API 사용: {input}")
    await process_web_search_call(RESULTS_COUNT, input, web_search_mode)

web_search_mode = "bing"

for input in inputs:
    print(f"Bing Grounding 검색 사용: {input}")
    await process_web_search_call(RESULTS_COUNT, input, web_search_mode)    

In [None]:

RESULTS_COUNT = 5

inputs = [
    #"삼성전자 제품 중 2구 말고 다른 인덕션 추천해줘",
    "부모님에게 선물하고 싶은데 삼성전자 TV 추천해줘",
    "삼성전자 25년 제품이 작년 대비 좋아진것은",
    # "삼성전자 JBL과 하만카돈 차이점이 뭐야",
    # "갤럭시 버즈 이어버드 한쪽을 새로 구매했는데 페어링 어떻게 하나요",
    # "삼성전자 S25 무게가 S24와 비교 했을때 얼마나 차이나"
]


web_search_mode = "google"

for input in inputs:
    
    print(f"Google Search API 사용: {input}")
    await process_web_search_call(RESULTS_COUNT, input, web_search_mode=web_search_mode)

web_search_mode = "bing"

for input in inputs:
    print(f"Bing Grounding 검색 사용: {input}")
    await process_web_search_call(RESULTS_COUNT, input, web_search_mode=web_search_mode)    