# ALM 데이터 분석 챗봇
## LM Studio Qwen + LangChain Function Calling

이 노트북은 로컬 LM Studio의 Qwen 모델을 사용하여 ALM 데이터베이스를 분석하는 챗봇입니다.

## 1. 라이브러리 임포트 및 설정

In [None]:
import sqlite3
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import json
from typing import Dict, List, Any, Optional

from langchain_openai import ChatOpenAI
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage

# 한글 폰트 설정 (matplotlib)
plt.rcParams['font.family'] = 'AppleGothic'  # MacOS
# plt.rcParams['font.family'] = 'Malgun Gothic'  # Windows
plt.rcParams['axes.unicode_minus'] = False

# 경고 무시
import warnings
warnings.filterwarnings('ignore')

print("라이브러리 임포트 완료!")

## 2. 데이터베이스 연결 설정

In [None]:
# 데이터베이스 경로
DB_PATH = 'simple.db'

def get_db_connection():
    """데이터베이스 연결 생성"""
    return sqlite3.connect(DB_PATH)

def get_table_info():
    """데이터베이스의 모든 테이블 정보 조회"""
    conn = get_db_connection()
    cursor = conn.cursor()
    
    # 테이블 목록 조회
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = cursor.fetchall()
    
    table_info = {}
    for table in tables:
        table_name = table[0]
        cursor.execute(f"PRAGMA table_info({table_name});")
        columns = cursor.fetchall()
        table_info[table_name] = [col[1] for col in columns]  # 컬럼명만 추출
    
    conn.close()
    return table_info

# 테이블 정보 확인
tables = get_table_info()
print("데이터베이스 테이블:")
for table_name, columns in tables.items():
    print(f"\n{table_name}: {len(columns)}개 컬럼")
    print(f"  주요 컬럼: {', '.join(columns[:5])}...")

## 3. LM Studio 연결 설정

In [None]:
# LM Studio 설정
LM_STUDIO_BASE_URL = "http://localhost:1234/v1"
LM_STUDIO_API_KEY = "lm-studio"  # LM Studio는 실제 API 키 불필요

# LangChain ChatOpenAI 모델 초기화 (LM Studio 호환)
llm = ChatOpenAI(
    base_url=LM_STUDIO_BASE_URL,
    api_key=LM_STUDIO_API_KEY,
    temperature=0.1,
    model="qwen",  # LM Studio에서 실행 중인 모델명
)

print("LM Studio 연결 설정 완료!")
print(f"Base URL: {LM_STUDIO_BASE_URL}")

## 4. SQL 실행 및 분석 함수 정의

In [None]:
def execute_sql_query(query: str) -> Dict[str, Any]:
    """
    SQL 쿼리를 실행하고 결과를 반환
    
    Args:
        query: 실행할 SQL 쿼리
    
    Returns:
        결과 딕셔너리 (data, columns, row_count 포함)
    """
    try:
        conn = get_db_connection()
        df = pd.read_sql_query(query, conn)
        conn.close()
        
        return {
            "success": True,
            "data": df.to_dict('records'),
            "columns": df.columns.tolist(),
            "row_count": len(df),
            "dataframe": df
        }
    except Exception as e:
        return {
            "success": False,
            "error": str(e),
            "data": [],
            "columns": [],
            "row_count": 0
        }

def search_alm_contracts(filters: Optional[Dict] = None, limit: int = 10) -> str:
    """
    ALM 계약 정보 검색
    
    Args:
        filters: 필터 조건 (예: {"CURRENCY_CD": "KRW"})
        limit: 결과 제한 수
    
    Returns:
        검색 결과 문자열
    """
    query = "SELECT * FROM ALM_INST"
    
    if filters:
        conditions = [f"{k} = '{v}'" for k, v in filters.items()]
        query += " WHERE " + " AND ".join(conditions)
    
    query += f" LIMIT {limit}"
    
    result = execute_sql_query(query)
    
    if result["success"]:
        df = result["dataframe"]
        return f"검색 결과: {result['row_count']}건\n\n{df.to_string()}"
    else:
        return f"오류 발생: {result['error']}"

def analyze_liquidity_gap(scenario_no: Optional[int] = None) -> str:
    """
    유동성 갭 분석
    
    Args:
        scenario_no: 시나리오 번호
    
    Returns:
        분석 결과 문자열
    """
    query = """
    SELECT 
        TIME_BAND,
        SUM(GAP_PRN_TOTAL) as 총_원금갭,
        SUM(GAP_INT_TOTAL) as 총_이자갭,
        COUNT(*) as 건수
    FROM NFAR_LIQ_GAP_310524
    """
    
    if scenario_no is not None:
        query += f" WHERE SCENARIO_NO = {scenario_no}"
    
    query += " GROUP BY TIME_BAND ORDER BY TIME_BAND"
    
    result = execute_sql_query(query)
    
    if result["success"]:
        df = result["dataframe"]
        return f"유동성 갭 분석 결과:\n\n{df.to_string()}\n\n총 {result['row_count']}개 기간대"
    else:
        return f"오류 발생: {result['error']}"

def get_exchange_rate(currency: str, date: Optional[str] = None) -> str:
    """
    환율 정보 조회
    
    Args:
        currency: 통화 코드 (예: USD, EUR)
        date: 조회 날짜 (YYYY-MM-DD 형식)
    
    Returns:
        환율 정보 문자열
    """
    query = f"SELECT * FROM NFA_EXCH_RATE_HIST WHERE UNIT_CURRENCY_CD = '{currency}'"
    
    if date:
        query += f" AND EFFECTIVE_DATE = '{date}'"
    
    query += " ORDER BY EFFECTIVE_DATE DESC LIMIT 10"
    
    result = execute_sql_query(query)
    
    if result["success"]:
        df = result["dataframe"]
        return f"{currency} 환율 정보:\n\n{df.to_string()}"
    else:
        return f"오류 발생: {result['error']}"

def get_interest_rate(rate_cd: int, term: Optional[int] = None) -> str:
    """
    금리 정보 조회
    
    Args:
        rate_cd: 금리 코드
        term: 금리 기간
    
    Returns:
        금리 정보 문자열
    """
    query = f"SELECT * FROM NFA_IRC_RATE_HIST WHERE INT_RATE_CD = {rate_cd}"
    
    if term is not None:
        query += f" AND INT_RATE_TERM = {term}"
    
    query += " ORDER BY EFFECTIVE_DATE DESC LIMIT 10"
    
    result = execute_sql_query(query)
    
    if result["success"]:
        df = result["dataframe"]
        return f"금리 정보:\n\n{df.to_string()}"
    else:
        return f"오류 발생: {result['error']}"

def get_aggregate_stats(table_name: str, group_by: str, aggregate_col: str) -> str:
    """
    집계 통계 조회
    
    Args:
        table_name: 테이블명
        group_by: 그룹화 컬럼
        aggregate_col: 집계 컬럼
    
    Returns:
        통계 결과 문자열
    """
    query = f"""
    SELECT 
        {group_by},
        COUNT(*) as 건수,
        SUM({aggregate_col}) as 합계,
        AVG({aggregate_col}) as 평균,
        MIN({aggregate_col}) as 최소,
        MAX({aggregate_col}) as 최대
    FROM {table_name}
    GROUP BY {group_by}
    ORDER BY 합계 DESC
    LIMIT 20
    """
    
    result = execute_sql_query(query)
    
    if result["success"]:
        df = result["dataframe"]
        return f"{table_name} 테이블 {group_by}별 {aggregate_col} 집계:\n\n{df.to_string()}"
    else:
        return f"오류 발생: {result['error']}"

print("SQL 함수 정의 완료!")

## 5. 시각화 함수 정의

In [None]:
def visualize_query_result(query: str, chart_type: str = 'bar', 
                          x_col: Optional[str] = None, 
                          y_col: Optional[str] = None,
                          title: Optional[str] = None) -> str:
    """
    쿼리 결과를 시각화
    
    Args:
        query: SQL 쿼리
        chart_type: 차트 타입 (bar, line, pie, scatter)
        x_col: X축 컬럼명
        y_col: Y축 컬럼명
        title: 차트 제목
    
    Returns:
        시각화 완료 메시지
    """
    result = execute_sql_query(query)
    
    if not result["success"]:
        return f"오류 발생: {result['error']}"
    
    df = result["dataframe"]
    
    if df.empty:
        return "데이터가 없습니다."
    
    # x, y 컬럼 자동 선택
    if x_col is None:
        x_col = df.columns[0]
    if y_col is None and len(df.columns) > 1:
        y_col = df.columns[1]
    
    plt.figure(figsize=(12, 6))
    
    if chart_type == 'bar':
        plt.bar(df[x_col].astype(str), df[y_col])
        plt.xticks(rotation=45, ha='right')
    elif chart_type == 'line':
        plt.plot(df[x_col], df[y_col], marker='o')
        plt.xticks(rotation=45, ha='right')
    elif chart_type == 'pie':
        plt.pie(df[y_col], labels=df[x_col], autopct='%1.1f%%')
    elif chart_type == 'scatter':
        plt.scatter(df[x_col], df[y_col])
    
    plt.xlabel(x_col)
    plt.ylabel(y_col if y_col else '')
    plt.title(title if title else f"{y_col} by {x_col}")
    plt.tight_layout()
    plt.show()
    
    # 테이블도 함께 출력
    print("\n데이터 테이블:")
    print(df.to_string())
    
    return f"차트 생성 완료! ({chart_type} 차트, {len(df)}개 데이터 포인트)"

print("시각화 함수 정의 완료!")

## 6. LangChain Tools 정의

In [None]:
from langchain_core.tools import StructuredTool
from pydantic import BaseModel, Field

# Pydantic 모델 정의 (입력 스키마)
class SearchContractsInput(BaseModel):
    filters_json: str = Field(default="", description="JSON 형식의 필터 조건")

class LiquidityGapInput(BaseModel):
    scenario_no: str = Field(default="", description="시나리오 번호")

class ExchangeRateInput(BaseModel):
    currency_and_date: str = Field(description="통화코드 또는 '통화코드,날짜'")

class InterestRateInput(BaseModel):
    rate_info: str = Field(description="금리코드 또는 '금리코드,기간'")

class AggregateStatsInput(BaseModel):
    params: str = Field(description="'테이블명,그룹컬럼,집계컬럼'")

class VisualizeInput(BaseModel):
    query_params: str = Field(description="'SQL쿼리|||차트타입|||X컬럼|||Y컬럼|||제목'")

# 도구 함수들
def _search_alm_contracts(filters_json: str = "") -> str:
    """ALM 계약 정보를 검색합니다."""
    filters = json.loads(filters_json) if filters_json else None
    return search_alm_contracts(filters)

def _analyze_liquidity_gap(scenario_no: str = "") -> str:
    """유동성 갭을 분석합니다."""
    scenario = int(scenario_no) if scenario_no else None
    return analyze_liquidity_gap(scenario)

def _get_exchange_rate(currency_and_date: str) -> str:
    """환율 정보를 조회합니다."""
    parts = currency_and_date.split(',')
    currency = parts[0].strip()
    date = parts[1].strip() if len(parts) > 1 else None
    return get_exchange_rate(currency, date)

def _get_interest_rate(rate_info: str) -> str:
    """금리 정보를 조회합니다."""
    parts = rate_info.split(',')
    rate_cd = int(parts[0].strip())
    term = int(parts[1].strip()) if len(parts) > 1 else None
    return get_interest_rate(rate_cd, term)

def _get_aggregate_stats(params: str) -> str:
    """테이블의 집계 통계를 조회합니다."""
    parts = params.split(',')
    if len(parts) != 3:
        return "오류: 정확히 3개의 파라미터가 필요합니다"
    return get_aggregate_stats(parts[0].strip(), parts[1].strip(), parts[2].strip())

def _visualize_data(query_params: str) -> str:
    """SQL 쿼리 결과를 시각화합니다."""
    parts = query_params.split('|||')
    query = parts[0]
    chart_type = parts[1] if len(parts) > 1 else 'bar'
    x_col = parts[2] if len(parts) > 2 else None
    y_col = parts[3] if len(parts) > 3 else None
    title = parts[4] if len(parts) > 4 else None
    return visualize_query_result(query, chart_type, x_col, y_col, title)

# StructuredTool로 도구 생성
tools = [
    StructuredTool.from_function(
        func=_search_alm_contracts,
        name="search_alm_contracts",
        description="ALM 계약 정보를 검색합니다. filters_json: JSON 형식의 필터 조건 또는 빈 문자열",
        args_schema=SearchContractsInput
    ),
    StructuredTool.from_function(
        func=_analyze_liquidity_gap,
        name="analyze_liquidity_gap",
        description="유동성 갭을 분석합니다. scenario_no: 시나리오 번호 (선택사항)",
        args_schema=LiquidityGapInput
    ),
    StructuredTool.from_function(
        func=_get_exchange_rate,
        name="get_exchange_rate",
        description="환율 정보를 조회합니다. currency_and_date: 통화코드 또는 '통화코드,날짜'",
        args_schema=ExchangeRateInput
    ),
    StructuredTool.from_function(
        func=_get_interest_rate,
        name="get_interest_rate",
        description="금리 정보를 조회합니다. rate_info: 금리코드 또는 '금리코드,기간'",
        args_schema=InterestRateInput
    ),
    StructuredTool.from_function(
        func=_get_aggregate_stats,
        name="get_aggregate_stats",
        description="테이블의 집계 통계를 조회합니다. params: '테이블명,그룹컬럼,집계컬럼'",
        args_schema=AggregateStatsInput
    ),
    StructuredTool.from_function(
        func=_visualize_data,
        name="visualize_data",
        description="SQL 쿼리 결과를 시각화합니다. query_params: 'SQL쿼리|||차트타입|||X컬럼|||Y컬럼|||제목'",
        args_schema=VisualizeInput
    ),
]

print(f"총 {len(tools)}개의 도구가 정의되었습니다:")
for tool_item in tools:
    print(f"  - {tool_item.name}")

## 7. Agent 생성

In [None]:
# LLM에 도구 바인딩 (최신 방식)
llm_with_tools = llm.bind_tools(tools)

# 간단한 채팅 함수 (Agent 없이 직접 구현)
def run_agent(user_input: str, chat_history: list = None) -> str:
    """
    사용자 입력을 처리하고 필요시 도구를 호출
    
    Args:
        user_input: 사용자 질문
        chat_history: 대화 이력
    
    Returns:
        응답 문자열
    """
    if chat_history is None:
        chat_history = []
    
    system_message = """당신은 ALM(자산부채관리) 데이터 분석 전문가입니다.

사용 가능한 데이터베이스 테이블:
1. ALM_INST - ALM 계약 정보 (통화, 잔액, 금리, 만기일 등)
2. NFAR_LIQ_GAP_310524 - 유동성 갭 분석 (원금갭, 이자갭, 기간대별)
3. NFAT_LIQ_INDEX_SUMMARY_M - 유동성 지수 요약
4. NFA_EXCH_RATE_HIST - 환율 이력
5. NFA_IRC_RATE_HIST - 금리 이력
6. orders_summary - 주문 요약

사용자 질문을 분석하여 적절한 도구를 선택하고 실행하세요.
결과는 테이블, 그래프, 그리고 자연어 설명으로 제공하세요.

한국어로 친절하게 답변해주세요."""

    # 메시지 구성
    messages = [SystemMessage(content=system_message)]
    messages.extend(chat_history)
    messages.append(HumanMessage(content=user_input))
    
    # LLM 호출
    response = llm_with_tools.invoke(messages)
    
    # Tool calls 확인
    if hasattr(response, 'tool_calls') and response.tool_calls:
        # 도구 실행
        tool_results = []
        for tool_call in response.tool_calls:
            tool_name = tool_call['name']
            tool_args = tool_call['args']
            
            # 해당 도구 찾기
            tool_func = None
            for t in tools:
                if t.name == tool_name:
                    tool_func = t
                    break
            
            if tool_func:
                try:
                    # 도구 실행
                    result = tool_func.invoke(tool_args)
                    tool_results.append(f"\n[{tool_name} 실행 결과]\n{result}")
                except Exception as e:
                    tool_results.append(f"\n[{tool_name} 오류]\n{str(e)}")
        
        # 도구 결과를 포함하여 최종 응답 생성
        if tool_results:
            final_prompt = f"{user_input}\n\n도구 실행 결과:{''.join(tool_results)}\n\n위 결과를 바탕으로 사용자에게 친절하게 설명해주세요."
            messages_final = [SystemMessage(content=system_message)]
            messages_final.append(HumanMessage(content=final_prompt))
            final_response = llm.invoke(messages_final)
            return final_response.content
    
    # Tool call이 없으면 일반 응답 반환
    return response.content

print("Agent 준비 완료!")

## 8. 챗봇 실행 함수

In [None]:
# 대화 이력 저장
chat_history = []

def chat(user_input: str):
    """
    사용자 입력을 받아 챗봇 응답 생성
    
    Args:
        user_input: 사용자 질문
    """
    global chat_history
    
    print(f"\n{'='*80}")
    print(f"사용자: {user_input}")
    print(f"{'='*80}\n")
    
    try:
        # Agent 실행
        response = run_agent(user_input, chat_history)
        
        # 대화 이력 업데이트
        chat_history.append(HumanMessage(content=user_input))
        chat_history.append(AIMessage(content=response))
        
        print(f"\n{'='*80}")
        print(f"챗봇: {response}")
        print(f"{'='*80}\n")
        
    except Exception as e:
        print(f"\n오류 발생: {str(e)}")
        import traceback
        traceback.print_exc()

print("챗봇 준비 완료!")
print("\n사용 예시:")
print("  chat('ALM 계약 중 KRW 통화 계약을 보여줘')")
print("  chat('유동성 갭을 분석해줘')")
print("  chat('USD 환율 정보를 알려줘')")
print("  chat('통화별 잔액을 그래프로 보여줘')")

## 9. 테스트 - 예제 질의

In [None]:
# 예제 1: ALM 계약 검색
chat("ALM_INST 테이블에서 처음 5개 계약을 보여줘")

In [None]:
# 예제 2: 유동성 갭 분석
chat("유동성 갭을 분석해서 보여줘")

In [None]:
# 예제 3: 통화별 집계
chat("ALM_INST 테이블에서 통화별 현재 잔액 합계를 계산해줘")

In [None]:
# 예제 4: 시각화
chat("통화별 현재 잔액을 막대 그래프로 보여줘")

## 10. 자유 대화

In [None]:
# 여기서 자유롭게 질문하세요
chat("여기에 질문을 입력하세요")