In [None]:
import gradio as gr
from langchain_community.document_loaders import TextLoader, DirectoryLoader
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain_community.chat_models import ChatOllama
from langchain.prompts import PromptTemplate
from langchain.chains import ConversationalRetrievalChain
from langchain.schema import Document
from langchain.memory import ConversationBufferMemory
import os
import shutil
from PIL import Image
import base64
from io import BytesIO
from langchain_core.messages import HumanMessage
from datetime import datetime
from matplotlib import rcParams
from wordcloud import WordCloud
import matplotlib.pyplot as plt
import glob

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
# 폰트 경로 설정 (워드 클라우드용)
# FONT_PATH = '/usr/share/fonts/nhn_nanum/NanumGothic.ttf'

# 워드클라우드 폰트 경로 (Windows 기준)
FONT_PATH = "C:\\Windows\\Fonts\\malgun.ttf"


In [None]:
# 파일 경로 설정
MAIN_DOC_PATH = './dataset/dbvision02.txt'
BASE_DIR = os.getcwd()  # 현재 작업 디렉터리를 기준으로 설정
TEMP_FOLDER = os.path.join(BASE_DIR, "temp")  # temp 폴더의 절대 경로
# TEMP_FOLDER = './temp'
IMG_PATH = './imgs/wordcloud.png'

In [None]:
# 문서 로드 및 벡터 저장소 초기화 함수
def initialize_db(file_path=None, use_main_only=True):
    """
    문서를 로드하고 벡터 저장소를 초기화하는 함수

    Parameters:
    file_path (str): 로드할 파일 경로. None인 경우 MAIN_DOC_PATH 사용
    use_main_only (bool): True인 경우 MAIN_DOC_PATH만 사용, False인 경우 지정된 파일만 사용
    """
    try:
        # 사용할 문서 경로 결정
        doc_path = MAIN_DOC_PATH if use_main_only else file_path
        if not doc_path:
            raise ValueError("파일 경로가 지정되지 않았습니다.")

        if not os.path.exists(doc_path):
            if doc_path == MAIN_DOC_PATH:
                with open(doc_path, 'w', encoding='utf-8') as f:
                    f.write("휴먼(주) 정보:\n")
                print(f"새 파일이 생성되었습니다: {doc_path}")
            else:
                raise FileNotFoundError(f"파일을 찾을 수 없습니다: {doc_path}")

        # 문서 로드
        loader = TextLoader(doc_path, encoding='utf-8')
        docs = loader.load()

        # 임베딩 모델 및 벡터 저장소 초기화
        embeddings_model = HuggingFaceEmbeddings(
            model_name='sentence-transformers/all-MiniLM-L6-v2'
        )
        db = Chroma.from_documents(docs, embeddings_model)

        return db, embeddings_model

    except Exception as e:
        print(f"DB 초기화 중 오류 발생: {str(e)}")
        raise

In [None]:
# 새로운 파일(txt) 추가
def add_new_file(file):
    if file is None:
        return "파일을 선택해주세요.", None

    try:
        file_path = os.path.join(TEMP_FOLDER, os.path.basename(file.name))
        shutil.copy2(file.name, file_path)

        # 파일 크기 확인 및 유효성 검사
        file_size = os.path.getsize(file_path)
        if file_size == 0:
            os.remove(file_path)  # 크기 0인 파일 삭제
            return "파일이 비어 있습니다. 다시 시도해 주세요.", None

        # 벡터 저장소 재초기화
        global db, embeddings_model, qa_chain
        db, embeddings_model = initialize_db()
        qa_chain = initialize_qa_chain()

        return f"파일이 성공적으로 추가되었습니다: {file.name}", None
    except Exception as e:
        return f"파일 추가 중 오류가 발생했습니다: {str(e)}", None

In [None]:
# 새로운 정보 추가 함수
def add_new_information(new_info):
    if new_info.strip():
        try:
            with open(MAIN_DOC_PATH, 'a', encoding='utf-8') as f:
                f.write(f"\n{new_info.strip()}")

            # 벡터 저장소 재초기화
            global db, embeddings_model, qa_chain
            db, embeddings_model = initialize_db()
            qa_chain = initialize_qa_chain()

            return "새로운 정보가 성공적으로 추가되었습니다."
        except Exception as e:
            return f"정보 추가 중 오류가 발생했습니다: {str(e)}"
    return "추가할 정보를 입력해주세요."

In [None]:
def read_file_content(file_path):
    try:
        # UTF-8로 시도
        with open(file_path, 'r', encoding='utf-8') as f:
            return f.read().strip()
    except UnicodeDecodeError:
        try:
            # CP949 인코딩으로 시도 (한국어 텍스트에 자주 사용)
            with open(file_path, 'r', encoding='cp949') as f:
                return f.read().strip()
        except Exception as e:
            return f"(파일 읽기 오류: {str(e)})"

In [None]:
def view_all_content():
    content = ""

    # 메인 문서 내용
    try:
        with open(MAIN_DOC_PATH, 'r', encoding='utf-8') as f:
            content += "=== 메인 문서 내용 ===\n"
            content += f.read().strip() + "\n\n"
    except FileNotFoundError:
        content += "메인 문서가 없습니다.\n\n"

    # temp 폴더 내 문서들의 내용
    if os.path.exists(TEMP_FOLDER):
        for filename in os.listdir(TEMP_FOLDER):
            if filename.endswith('.txt'):
                file_path = os.path.abspath(os.path.join(TEMP_FOLDER, filename))
                file_content = read_file_content(file_path)

                if file_content:
                    content += f"=== {filename} ===\n{file_content}\n\n"
                else:
                    content += f"=== {filename} ===\n(내용이 비어 있습니다)\n\n"
    else:
        content += "temp 폴더가 존재하지 않습니다.\n\n"

    return content

In [None]:
# Base64 변환 함수
def convert_to_base64(image):
    buffered = BytesIO()
    image.save(buffered, format="JPEG")
    img_str = base64.b64encode(buffered.getvalue()).decode("utf-8")
    return img_str

In [None]:
# 초기 DB 및 embeddings_model 설정
db, embeddings_model = initialize_db()

  embeddings_model = HuggingFaceEmbeddings(





: 

In [None]:
# 메모리 설정
memory = ConversationBufferMemory(
    memory_key="chat_history",
    output_key="answer",  # 출력 키를 명시적으로 지정
    return_messages=True
)

In [None]:
# 프롬프트 템플릿 정의
prompt_template = """
당신은 휴먼(주) 정보를 제공하는 AI 어시스턴트입니다. 모든 답변은 한국어로 답변해 주세요.

아래는 이전 대화 내용입니다:
{chat_history}

관련 문서 내용:
{context}

사용자 질문: {question}

지침:
1. 문서에서 찾은 정보가 있다면 그 정보를 바탕으로 답변해주세요.
2. 문서에서 관련 정보를 찾지 못했다면 "죄송합니다만, 해당 질문에 대한 정보를 문서에서 찾을 수 없습니다."라고 답변한 후, 일반적인 대화를 이어갈 수 있습니다.
3. 모든 답변은 친절하고 전문적으로 제공해주세요.

답변:
"""

PROMPT = PromptTemplate(
    template=prompt_template,
    input_variables=["chat_history", "context", "question"]
)

In [None]:
# LLM 모델 설정
llm = ChatOllama(model="gemma2", temperature=0.1, verbose=True)
llm_image = ChatOllama(model="llava:13b", temperature=0.1, verbose=True)

In [None]:
# ConversationalRetrievalChain 초기화 함수
def initialize_qa_chain(custom_db=None):
    """
    QA 체인을 초기화하는 함수
    custom_db가 제공되면 해당 DB를 사용, 아니면 기본 DB 사용
    """
    # 사용할 DB 결정
    if custom_db is None:
        db, _ = initialize_db()
    else:
        db = custom_db

    # QA 체인 생성 및 반환
    return ConversationalRetrievalChain.from_llm(
        llm,
        db.as_retriever(search_kwargs={"k": 3}),
        return_source_documents=True,
        verbose=True,
        combine_docs_chain_kwargs={"prompt": PROMPT},
        memory=memory  # 기존 메모리 사용
    )

In [None]:
# 이미지 처리 함수
def process_image(image):
    if image is None:
        return None

    try:
        buffered = BytesIO()
        image.save(buffered, format="JPEG")
        img_str = base64.b64encode(buffered.getvalue()).decode("utf-8")
        return f"data:image/jpeg;base64,{img_str}"
    except Exception as e:
        print(f"이미지 처리 중 오류 발생: {str(e)}")
        return None

In [None]:
def generate_wordcloud():
    try:
        with open(MAIN_DOC_PATH, 'r', encoding='utf-8') as f:
            text = f.read()

        wordcloud = WordCloud(
            font_path=FONT_PATH,
            background_color='white',
            width=800, height=400
        ).generate(text)

        os.makedirs(os.path.dirname(IMG_PATH), exist_ok=True)
        plt.figure(figsize=(10, 5))
        plt.imshow(wordcloud, interpolation='bilinear')
        plt.axis('off')
        plt.savefig(IMG_PATH)
        plt.close()

        return IMG_PATH
    except Exception as e:
        return f"오류 발생: {e}"

In [None]:
qa_chain = initialize_qa_chain()

In [None]:
def chat(message, history, image=None):
    try:
        chat_history = [(human, ai) for human, ai in history]
        answer = ""

        # 이미지 처리 부분
        if image is not None:
            processed_image = process_image(image)
            if processed_image:
                messages = [
                    HumanMessage(
                        content=[
                            {"type": "text", "text": message},
                            {"type": "image_url", "image_url": processed_image}
                        ]
                    )
                ]

                try:
                    response = llm_image.invoke(messages)
                    image_answer = response.content
                    answer = f"이미지 분석 결과: {image_answer}"
                except Exception as img_error:
                    print(f"이미지 분석 중 오류 발생: {str(img_error)}")
                    answer = f"이미지 분석 중 오류가 발생했습니다: {str(img_error)}"

        # 이미지가 없는 경우에만 텍스트 기반 응답 처리
        if not image:
            # 메인 문서만 사용하는 QA 체인 사용
            main_db, _ = initialize_db(use_main_only=True)
            main_qa_chain = initialize_qa_chain(main_db)

            text_response = main_qa_chain({"question": message, "chat_history": chat_history})
            text_answer = text_response['answer']

            # 참고 출처 정보 추가 (파일명만 표시)
            if 'source_documents' in text_response:
                sources = set([os.path.basename(doc.metadata.get('source', 'Unknown'))
                             for doc in text_response['source_documents']])
                source_info = f"\n\n참고 파일: {', '.join(sources)}" if sources else ""
                text_answer += source_info

            answer = text_answer if not answer else f"{answer}\n\n텍스트 응답: {text_answer}"

        chat_history.append((message, answer))
        return "", chat_history, None

    except Exception as e:
        print(f"채팅 함수 실행 중 오류 발생: {str(e)}")
        error_message = f"죄송합니다. 오류가 발생했습니다: {str(e)}"
        chat_history.append((message, error_message))
        return "", chat_history, None

In [None]:
# 새로운 정보 추가 함수 (Gradio 인터페이스용)
def add_info(new_info):
    if not new_info.strip():
        return "내용을 입력해주세요.", new_info
    if add_new_information(new_info):
        return "새로운 정보가 추가되었습니다. '전체 내용 보기' 버튼을 클릭하여 업데이트된 내용을 확인할 수 있습니다.", ""
    else:
        return "정보 추가에 실패했습니다. 다시 시도해 주세요.", new_info

In [None]:
# temp 폴더 내 파일 목록을 불러오는 함수
def list_temp_files():
    if not os.path.exists(TEMP_FOLDER):
        os.makedirs(TEMP_FOLDER)
    return [os.path.basename(f) for f in glob.glob(os.path.join(TEMP_FOLDER, "*.txt"))]

In [None]:
# 선택된 파일로부터 질의를 처리하는 함수
def ask_from_selected_file(selected_file, question):
    if not selected_file:
        return "먼저 파일을 선택해주세요."
    if not question.strip():
        return "질문을 입력해주세요."

    try:
        file_path = os.path.join(TEMP_FOLDER, selected_file)
        if not os.path.exists(file_path):
            return f"선택된 파일을 찾을 수 없습니다: {selected_file}"

        # 선택된 파일만으로 DB 초기화
        custom_db, custom_embeddings = initialize_db(file_path, use_main_only=False)

        # 임시 QA 체인 생성
        temp_qa_chain = ConversationalRetrievalChain.from_llm(
            llm,
            custom_db.as_retriever(search_kwargs={"k": 3}),
            return_source_documents=True,
            verbose=True,
            combine_docs_chain_kwargs={"prompt": PROMPT},
            memory=ConversationBufferMemory(
                memory_key="chat_history",
                output_key="answer",
                return_messages=True
            )
        )

        # 질문 처리
        response = temp_qa_chain({"question": question, "chat_history": []})

        # 출처 정보 추가 (파일명만 표시)
        answer = response["answer"]
        if 'source_documents' in response:
            sources = set([os.path.basename(doc.metadata.get('source', 'Unknown'))
                         for doc in response['source_documents']])
            source_info = f"\n\n참고 파일: {', '.join(sources)}" if sources else ""
            answer += source_info

        return answer

    except Exception as e:
        return f"오류가 발생했습니다: {str(e)}"

In [None]:
# Gradio 인터페이스 설정
with gr.Blocks(theme=gr.themes.Soft(), css="footer {display: none !important}") as iface:
    gr.Markdown("# 휴먼먼(주) 정보 제공 AI 챗봇")

    with gr.Tabs():
        with gr.TabItem("채팅"):
            chatbot = gr.Chatbot(height=600)
            msg = gr.Textbox(label="질문을 입력하세요", lines=1)
            image_input = gr.Image(type="pil", label="이미지 업로드 (선택사항)")

            with gr.Row():
                submit_btn = gr.Button("전송", variant="primary")
                clear_btn = gr.Button("대화 내용 지우기")

            gr.Examples(
                examples=[
                    ["휴먼의 주소는 어디인가요?", None],
                    ["휴먼이 보유한 솔루션은 무엇인가요?", None],
                    ["휴먼의 영업대표는 누구인가요?", None]
                ],
                inputs=[msg, image_input]
            )

            msg.submit(chat, [msg, chatbot, image_input], [msg, chatbot, image_input])
            submit_btn.click(chat, [msg, chatbot, image_input], [msg, chatbot, image_input])
            clear_btn.click(lambda: (None, None), None, [chatbot, image_input])

        with gr.TabItem("정보 추가"):
            new_info_input = gr.Textbox(label="새로운 정보 추가", lines=3)
            add_info_btn = gr.Button("정보 추가하기")
            info_status = gr.Textbox(label="상태 메시지", interactive=False)

            add_info_btn.click(add_new_information, [new_info_input], [info_status])

        with gr.TabItem("파일 관리"):
            gr.Markdown("## temp 폴더에 파일 추가 및 질문")

            file_input = gr.File(label="Windows에서 파일 추가")
            add_file_btn = gr.Button("파일 추가하기")
            file_status = gr.Textbox(label="상태 메시지", interactive=False)

            file_list = gr.Dropdown(
                label="temp 폴더의 파일 선택",
                choices=list_temp_files(),
                multiselect=False,
                interactive=True
            )

            def update_file_list(file):
                status_message, _ = add_new_file(file)
                updated_files = list_temp_files()
                return status_message, gr.update(choices=updated_files)

            add_file_btn.click(
                update_file_list,
                inputs=[file_input],
                outputs=[file_status, file_list]
            )

            question_input = gr.Textbox(label="질문 입력", lines=2, placeholder="질문을 입력하세요.")
            submit_btn = gr.Button("질문 전송", variant="primary")
            answer_box = gr.Textbox(label="답변", interactive=False)

            submit_btn.click(
                ask_from_selected_file,
                inputs=[file_list, question_input],
                outputs=answer_box
            )

        with gr.TabItem("전체 내용 보기"):
            view_content_btn = gr.Button("전체 내용 보기")
            content_display = gr.Textbox(label="전체 내용", lines=10)

            view_content_btn.click(view_all_content, None, content_display)

        with gr.TabItem("단어시각화"):
            generate_btn = gr.Button("Word Cloud 생성")
            output_image = gr.Image()
            generate_btn.click(lambda: generate_wordcloud(), None, output_image)

In [None]:
iface.launch(server_port=7861, server_name="0.0.0.0")