In [1]:
import bs4
from langchain import hub
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

USER_AGENT environment variable not set, consider setting it to identify your requests.


In [2]:


loader = WebBaseLoader(
    web_paths= ("https://news.naver.com/section/101",),
    bs_kwargs=dict(
        parse_only=bs4.SoupStrainer(
            class_=("sa_text", "sa_item_SECTION_HEADLINE")
        )
    )
)
docs = loader.load()


In [3]:
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    chunk_size=300,
    chunk_overlap=50
)


In [6]:
splits = text_splitter.split_documents(docs)

vectorstore = Chroma.from_documents(
    documents=splits, embedding=OpenAIEmbeddings()
)


In [12]:
retriever = vectorstore.as_retriever(
    search_type='mmr',
    search_kwargs={"k" : 1, "fetch_k" : 4}
)

prompt = hub.pull("sungwoo/ragbasic")




#gpt-4o-mini
llm = ChatOpenAI(model_name="chatgpt-4o-latest", temperature=0)



In [13]:
from langchain_core.runnables import RunnableLambda
chain = (
    {
        "context" : retriever | RunnableLambda(lambda x : "\n\n".join(d.page_content for d in docs)),
        "question" : RunnablePassthrough()
    }
    | prompt
    | llm
    | StrOutputParser()
 
)


In [14]:
chain.invoke("오늘 증시는?")

"오늘 증시는 에코프로비엠의 실적 호조와 이차전지 업종 반등 기대감 등으로 상승세를 보였습니다. 에코프로비엠은 2분기 영업이익이 전년 대비 1155% 증가하며 '어닝 서프라이즈'를 기록했습니다. 이차전지 관련주 중심으로 투자심리가 회복되는 분위기입니다."

### 페르소나 부여

In [17]:
from langchain.prompts import ChatPromptTemplate
template = """
당신은 AI 언어 모델 조수입니다. 당신의 임무는 주어진 사용자 질문에 대해 벡터 데이터베이스에서 관련 문서를 검색할 수 있도록 다섯 가지 다른 버전을 생성하는 것입니다.
사용자 질문에 대한 여러 관점을 생성함으로써, 거리 기반 유사성 검색의 한계를 극복하는 데 도움을 주는 것이 목표입니다.
각 질문은 새 줄로 구분하여 제공하세요. 원본 질문: {question}
"""

prompt_perspectives = ChatPromptTemplate.from_template(template)


generate_queries = (
    prompt_perspectives
    | llm
    | StrOutputParser()
    | (lambda x : x.split("\n"))
)


generate_queries.invoke("금리의 전망")


['1. 향후 금리는 어떻게 변동할 것으로 예상되나요?  ',
 '2. 중앙은행의 정책에 따라 금리가 오를 가능성이 있나요?  ',
 '3. 경제 지표를 고려할 때 금리 추세는 어떤 방향인가요?  ',
 '4. 단기 및 장기 금리 전망에 어떤 차이가 있나요?  ',
 '5. 인플레이션과 경기 상황이 금리에 어떤 영향을 줄까요?']

In [18]:
from langchain.load import dumps, loads
def get_unique_union(documents: list[list]):
    tmp = []
    for sublist in documents:
        for doc in sublist:
            tmp.append(dumps(doc))


In [19]:

from langchain.load import dumps, loads


def get_unique_union(documents: list[list]):
    # tmp = []
    # for sublist in documents:
    #     for doc in sublist:
    #         tmp.append(dumps(doc))
    unique_doc = list(set([dumps(doc) for sublist in documents for doc in sublist]))
       
    return [loads(doc) for doc in unique_doc]




retriever_chain = generate_queries | retriever.map() | get_unique_union

In [21]:
docs =retriever_chain.invoke({"question" : "금리 전망은?"})
template = """다음 맥락을 바탕으로 질문에 답변하세요:


{context}


질문: {question}
"""


prompt = ChatPromptTemplate.from_template(template)

In [22]:
template = """다음 맥락을 바탕으로 질문에 답변하세요:


{context}


질문: {question}
"""


prompt = ChatPromptTemplate.from_template(template)


fina_chain = (
    {'context' : retriever_chain, 'question' : RunnablePassthrough()}
    | prompt | llm | StrOutputParser()
)


In [23]:
fina_chain.invoke("삼성전자의 주가 전망")


'삼성전자의 주가가 최근 ‘7만전자’로 회복되면서 투자자들 사이에서 향후 주가 전망에 대한 의견이 엇갈리고 있습니다. 일부 투자자들은 현재 수익 구간에 진입한 만큼 차익 실현을 고려하며 ‘떠나자’는 입장을 보이고 있고, 다른 한편에서는 향후 추가 상승 가능성을 기대하며 ‘기다리자’는 입장을 고수하고 있습니다.\n\n이러한 상황은 삼성전자의 주가가 단기적으로는 심리적 저항선인 7만 원대를 회복했지만, 향후 실적 개선, 반도체 업황 회복, 글로벌 경제 상황 등 다양한 변수에 따라 주가가 추가 상승할지 여부가 결정될 수 있음을 시사합니다.\n\n따라서 삼성전자의 주가 전망은 긍정적인 기대와 신중한 관망이 공존하는 상황으로, 투자자들은 시장 흐름과 기업 실적 등을 면밀히 살펴보며 판단할 필요가 있습니다.'

## Rag chatbot 

tavily 적용

In [None]:
from openai import OpenAI
import streamlit as st
from langchain_core.messages.chat import ChatMessage
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough, RunnableLambda, RunnableMap
from langchain_ollama import ChatOllama
import os 
from langchain_community.document_loaders import PDFPlumberLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from langchain.storage import LocalFileStore
from langchain.embeddings import CacheBackedEmbeddings
from langchain_core.prompts import loading
from langchain_core.prompts import ChatPromptTemplate
import base64 
from PIL import Image 
import io
from langchain_experimental.text_splitter import SemanticChunker
from langchain_community.retrievers import TavilySearchAPIRetriever
from operator import itemgetter 
from dotenv import load_dotenv
from enum import Enum
from pydantic import BaseModel, Field
load_dotenv()


class Route(str, Enum):
    langchain_document = 'langchain_document'
    web = "web"

class RouteOutput(BaseModel):
    route: Route



web_retriever = TavilySearchAPIRetriever(k=10).with_config({'run_name' : 'web_retriver'})



client = OpenAI()

if os.path.isdir("./mycache") == False:
    os.mkdir("./mycache")

if os.path.isdir("./mycache/files") == False:
    os.mkdir("./mycache/files")
    
if os.path.isdir("./mycache/embedding") == False:
    os.mkdir("./mycache/embedding")
    
store = LocalFileStore("./mycache/embedding")

if 'chain' not in st.session_state:
    st.session_state['chain'] = None 

if 'messages' not in st.session_state:
    st.session_state['messages'] = []

def add_message(role, message):
    st.session_state['messages'].append({"role": role, "content": message})

st.title("RAG 기반 챗봇")

with st.sidebar:
    uploaded_file = st.file_uploader("파일 업로드", type=['pdf', 'txt'])

@st.cache_resource(show_spinner="업로드 파일 처리중 기다리세요")
def processing(file):
    if file.name.split(".")[-1] == "pdf":
        file_contents = file.read()
        #파일 저장
        with open(f"./mycache/files/{file.name}", "wb") as f:
            f.write(file_contents)

        #파일 임베딩 
        # insert your code 
        loader = PDFPlumberLoader(f"./mycache/files/{file.name}")
        docs = loader.load()
        text_spliter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=50)
        split_documents = text_spliter.split_documents(docs)
        embeddings = OpenAIEmbeddings()
        
        cached_embedder = CacheBackedEmbeddings.from_bytes_store(
                            underlying_embeddings=embeddings, # 기본 임베딩 모델 지정
                            document_embedding_cache=store # 로컬 저장소 지정
                        )
        vectorstore = FAISS.from_documents(documents=split_documents, embedding=cached_embedder)
        retriever = vectorstore.as_retriever()
            
    elif file.name.split(".")[-1] == "txt":
        file_contents = file.read()
        #파일 저장
        with open(f"./mycache/files/{file.name}", "w", encoding='utf-8') as f:
            f.write(file_contents.decode())
        text = file_contents.decode()
        text_splitter = SemanticChunker(
                OpenAIEmbeddings(),
                breakpoint_threshold_type="percentile", # 백분위수 기준
                breakpoint_threshold_amount=70, # 임계값 70%
            )
        split_documents = text_splitter.create_documents([text])
        embeddings = OpenAIEmbeddings()
        
        cached_embedder = CacheBackedEmbeddings.from_bytes_store(
                            underlying_embeddings=embeddings, # 기본 임베딩 모델 지정
                            document_embedding_cache=store # 로컬 저장소 지정
                        )
        vectorstore = FAISS.from_documents(documents=split_documents, embedding=cached_embedder)
        retriever = vectorstore.as_retriever()
    
    #######
    return retriever 

join_docs = RunnableLambda(
    lambda docs: "\n".join(doc.page_content for doc in docs)
)

def print_messages():
    for chat_message in st.session_state["messages"]:
        st.chat_message(chat_message['role']).write(chat_message['content'])


def routed_retriever(inp):
    question = inp['question']
    route = inp['route']

    if route == Route.langchain_document:
        return langchain_document_retriever.invoke(question)
    elif route == Route.web:
        return web_retriever.invoke(question)

    raise ValueError(f"Unkown route: {route}")


def create_chain(retriever):
    global langchain_document_retriever
    langchain_document_retriever = retriever.with_config({'run_name' : 'langchain_document_retriver'})
    prompt_text = {'_type': 'prompt',
                'template': "You are an assistant for question-answering tasks. \nUse the following pieces of retrieved `information` to answer the question. \nIf you don't know the answer, just say that you don't know. \nAnswer in Korean.\n\n<information>\n{context} \n</information>\n\n#Question: \n{question}\n\n#Answer:\n  #chat_history : \n {chat_history}",
                'input_variables': ['question', 'context', 'chat_history']}
    prompt = loading.load_prompt_from_config(prompt_text)
    # llm = ChatOllama(model='gemma:7b', temperature=0)
    llm = ChatOpenAI( model='gpt-4.1-2025-04-14', temperature=0)

    route_prompt  =  ChatPromptTemplate.from_template("""   
        질문에 답변하기 위한 적절한 Retriever를 선택하세요.

        질문: {question}
        """)

    route_chain = (
        route_prompt | llm.with_structured_output(RouteOutput) 
        | (lambda x : x.route)
    )

    chain = (
        RunnableMap(
            {
                "route" : route_chain,
                "question" : itemgetter("question"),
                "chat_history" : itemgetter('chat_history')
            }
        )
        | RunnablePassthrough.assign(context=routed_retriever)
        | prompt 
        | llm 
        | StrOutputParser()
    )
    


    # chain = (

    #     {
    #             "context": retriever | join_docs,
    #             "question": RunnablePassthrough(),
    #             "chat_history": RunnablePassthrough(),
    #     }
    #     | prompt
    #     | llm
    #     | StrOutputParser()
    # )
    return chain


if uploaded_file:
    retriever = processing(uploaded_file)
    chain = create_chain(retriever)
    st.session_state['chain'] = chain 


options = ("질문하기", "이미지 생성")

selected_radio = st.radio(
    '다음 중 하나 선택하세요',
    options
)

if selected_radio == "질문하기":

    user_input = st.chat_input("질문을 하세요")
    print_messages()

    if user_input:
        st.session_state.messages.append({"role": "user", "content": user_input})
        history_str = "\n".join(
                f"{m['role']} : {m['content']}" for m in st.session_state.messages )
        # print("---------------")
        # print(history_str)
        # print("---------------")
        chain = st.session_state['chain']
        
        
        # print(payload)
        if chain is not None:
            #히스토리 (과거 질문과 대답 목록 )
            
            
            st.chat_message("user").write(user_input)
            # print("===>")
            # print(user_input)
            payload = {"question": user_input, "chat_history": history_str}
            response = chain.stream(payload)
            # add_message('user' , user_input)
            # print(response)

            with st.chat_message('assistant'):
                container = st.empty()

                ai_answer = ""
                for token in response:
                    ai_answer += token 
                    container.markdown(ai_answer)
                add_message('assistant', ai_answer)

            
            

elif selected_radio == "이미지 생성":
    generate_input = st.chat_input("생성하고 싶은 이미지를 설명해주세요")

    if generate_input:

        response = client.images.generate(
                    model='dall-e-3',
                    prompt=generate_input,
                    size="1024x1024",
                    quality = 'standard',
                    response_format='b64_json',
                    n=1
                )
        result = response.model_dump()
        img = response.data[0]
        img_data = base64.b64decode(img.b64_json)
        img = Image.open(io.BytesIO(img_data))
        st.image(img)

