
# Tooling cho ORM `eod.eod_stock_v2` — Notebook demo chạy thử
**Mục tiêu:** Đóng gói bộ Tooling (metadata + data tools) để LLM có thể hỏi/đọc dữ liệu một cách **an toàn** từ ORM của bạn.  
Notebook này **mock DB bằng SQLite in-memory** để bạn chạy test ngay; khi triển khai thật chỉ cần đổi `ENGINE` sang PostgreSQL.

> **Có kèm # phụ chú** để bạn follow dễ.



## 1) Chuẩn bị môi trường (local)
- Yêu cầu: `sqlalchemy`, `pydantic>=2`, `langchain`, `langchain-openai`, `python-dotenv` (tuỳ chọn).
- Cài đặt nhanh:
```bash
pip install -U sqlalchemy pydantic langchain langchain-openai python-dotenv
```
> Lưu ý: Phần **LLM** sẽ tự động **bỏ qua** nếu không có `OPENAI_API_KEY`.


In [None]:

# phụ chú: Import cơ bản
import os, sys, json, math
from datetime import datetime, timedelta
print(sys.version)



## 2) SQLAlchemy: Engine + ORM
Ở môi trường demo này, ta dùng **SQLite in-memory** để tạo bảng `eod.eod_stock_v2` tương tự PostgreSQL.  
Khi lên production, bạn chỉ cần đổi `ENGINE` sang URI PostgreSQL của bạn.


In [None]:

# phụ chú: Engine SQLite in-memory để demo (thay bằng PostgreSQL khi triển khai thật)
from sqlalchemy import create_engine
import os
from dotenv import load_dotenv
load_dotenv()
POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
POSTGRES_HOST = os.getenv("POSTGRES_HOST")
POSTGRES_PORT = os.getenv("POSTGRES_PORT")
ENGINE = create_engine(f"postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:{POSTGRES_PORT}/stockdb",
    pool_size=10,             # Số kết nối giữ sẵn trong pool
    max_overflow=2,          # Số kết nối vượt quá pool_size khi cần
    pool_timeout=30,          # Thời gian chờ lấy kết nối trước khi raise TimeoutError
    pool_recycle=120,        # Tái tạo kết nối sau 30 phút (tránh idle disconnect)
    pool_pre_ping=True, )      # Kiểm tra kết nối trước khi dùng)

from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import Column, String, TIMESTAMP, FLOAT, BIGINT

Base = declarative_base()
SessionLocal = sessionmaker(bind=ENGINE, autoflush=False, autocommit=False)

class EodStock(Base):
    __tablename__ = "eod_stock_v2"
    __table_args__ = {"schema": "eod"}  # Chỉ định schema
    ticker = Column(String(20), primary_key=True, nullable=False)
    DateTime = Column(TIMESTAMP, nullable=False, primary_key=True)
    Open = Column(FLOAT, nullable=True)
    High = Column(FLOAT, nullable=True)
    Low = Column(FLOAT, nullable=True)
    Close = Column(FLOAT, nullable=True)
    Volume = Column(FLOAT, nullable=True)
    priceAverage = Column(FLOAT, nullable=True)
    priceBasic = Column(FLOAT, nullable=True)
    dealVolume = Column(FLOAT, nullable=True)
    putthroughVolume = Column(FLOAT, nullable=True)
    totalValue = Column(FLOAT, nullable=True)
    putthroughValue = Column(FLOAT, nullable=True)
    buyForeignQuantity = Column(FLOAT, nullable=True)
    buyForeignValue = Column(FLOAT, nullable=True)
    sellForeignQuantity = Column(FLOAT, nullable=True)
    sellForeignValue = Column(FLOAT, nullable=True)
    buyCount = Column(FLOAT, nullable=True)
    buyQuantity = Column(FLOAT, nullable=True)
    sellCount = Column(FLOAT, nullable=True)
    sellQuantity = Column(FLOAT, nullable=True)
    adjRatio = Column(FLOAT, nullable=True)
    currentForeignRoom = Column(FLOAT, nullable=True)
    propTradingNetDealValue = Column(FLOAT, nullable=True)
    propTradingNetPTValue = Column(FLOAT, nullable=True)
    propTradingNetValue = Column(FLOAT, nullable=True)
    unit = Column(FLOAT, nullable=True)
    ShareOut = Column(BIGINT, nullable=True)

Base.metadata.create_all(ENGINE)

from contextlib import contextmanager

@contextmanager
def get_session():
    s = SessionLocal()
    try:
        yield s
    finally:
        s.close()



## 3) Seed data mẫu để test
Chèn vài dòng cho `HPG` và `FPT` để test các tool (range, latest, foreign flow).


In [None]:

# from sqlalchemy.orm import Session

# def seed_sample_data():
#     now = datetime(2024, 1, 1)
#     rows = []
#     for i in range(10):
#         dt = now + timedelta(days=i)
#         rows.append(EodStock(
#             ticker="HPG", DateTime=dt,
#             Open=24+i*0.1, High=25+i*0.1, Low=23+i*0.1, Close=24.5+i*0.1,
#             Volume=1_000_000 + i*1000,
#             buyForeignQuantity=10_000+i*10, sellForeignQuantity=8000+i*8,
#             buyForeignValue=240e6+i*1e6, sellForeignValue=200e6+i*1e6,
#             propTradingNetValue=5e6+i*1e5
#         ))
#     for i in range(10):
#         dt = now + timedelta(days=i)
#         rows.append(EodStock(
#             ticker="FPT", DateTime=dt,
#             Open=90+i*0.2, High=92+i*0.2, Low=89+i*0.2, Close=91+i*0.2,
#             Volume=500_000 + i*500,
#             buyForeignQuantity=7_000+i*7, sellForeignQuantity=6_000+i*6,
#             buyForeignValue=630e6+i*1e6, sellForeignValue=540e6+i*1e6,
#             propTradingNetValue=-3e6+i*5e4
#         ))
#     with get_session() as s:
#         s: Session
#         s.add_all(rows)
#         s.commit()

# seed_sample_data()
# print("Seeded:", True)



## 4) Tool utils (whitelist cột, parse datetime, giới hạn hàng)
Các tiện ích đảm bảo **an toàn** khi LLM gọi tool.


In [None]:

from typing import Iterable
from datetime import datetime

ALLOWED_COLUMNS: set[str] = {
    "Open","High","Low","Close","Volume","totalValue","putthroughValue",
    "buyForeignQuantity","sellForeignQuantity","buyForeignValue","sellForeignValue",
    "propTradingNetValue","propTradingNetDealValue","propTradingNetPTValue","ShareOut",
    "priceAverage","priceBasic","dealVolume","putthroughVolume","buyCount","sellCount","buyQuantity","sellQuantity",
}
DEFAULT_MAX_ROWS = 1000

def parse_dt(s: str) -> datetime:
    try:
        return datetime.fromisoformat(s)
    except Exception as e:
        raise ValueError(f"Invalid datetime format: {s}. Expect 'YYYY-MM-DD' or ISO 8601.") from e

def ensure_allowed_columns(cols: Iterable[str]) -> list[str]:
    cols = list(cols) if cols else []
    bad = [c for c in cols if c not in ALLOWED_COLUMNS]
    if bad:
        raise ValueError(f"Disallowed columns: {bad}. Allowed={sorted(ALLOWED_COLUMNS)}")
    return cols or ["Open","High","Low","Close","Volume"]



## 5) Pydantic Schemas (Field = context có cấu trúc cho LLM)


In [None]:

from typing import List, Literal
from pydantic import BaseModel, Field, field_validator

class MetaRequest(BaseModel):
    detail: Literal["columns","pk","indexes","sample"] = Field(
        "columns", description="Kiểu metadata: columns|pk|indexes|sample"
    )
    sample_rows: int = Field(5, ge=1, le=20, description="Số dòng sample (nếu detail=sample)")

class RangeRequest(BaseModel):
    symbol: str = Field(..., description="Mã cổ phiếu, ví dụ 'HPG'")
    start: str = Field(..., description="Ngày bắt đầu 'YYYY-MM-DD'")
    end: str = Field(..., description="Ngày kết thúc 'YYYY-MM-DD'")
    columns: List[str] = Field(default_factory=lambda: ["Open","High","Low","Close","Volume"],
                               description="Cột cần lấy (whitelisted)")
    limit: int = Field(10000, ge=1, le=200000, description="Giới hạn số dòng")

    @field_validator("symbol")
    @classmethod
    def norm_symbol(cls, v: str) -> str:
        v = v.strip().upper()
        if not 2 <= len(v) <= 10:
            raise ValueError("symbol length 2-10")
        return v

class LatestRequest(BaseModel):
    symbol: str = Field(..., description="Mã cổ phiếu, ví dụ 'HPG'")
    columns: List[str] = Field(default_factory=lambda: ["Close","Volume"], description="Các cột trả về")

class ForeignFlowRequest(BaseModel):
    symbol: str = Field(..., description="Mã cổ phiếu, ví dụ 'HPG'")
    start: str = Field(..., description="YYYY-MM-DD")
    end: str = Field(..., description="YYYY-MM-DD")
    metrics: List[Literal[
        "buyForeignQuantity","sellForeignQuantity","buyForeignValue","sellForeignValue","propTradingNetValue"
    ]] = Field(default_factory=lambda: ["buyForeignQuantity","sellForeignQuantity"],
              description="Chỉ số cần lấy.")



## 6) Tools (Metadata + Range + Latest + Foreign Flow)


In [None]:

from typing import Annotated
from pydantic import Field as PydField
from langchain_core.tools import tool, StructuredTool
from sqlalchemy import select, func, text

@tool
def stock_meta(
    detail: Annotated[str, PydField(description="columns|pk|indexes|sample")] = "columns",
    sample_rows: Annotated[int, PydField(ge=1, le=20, description="Số dòng nếu sample")] = 5,
) -> dict:
    """Xem metadata (cột, pk, indexes, sample rows) của eod_stock_v2."""
    with get_session() as s:
        if detail == "columns":
            cols = [
                {"name": c.name, "type": str(c.type), "nullable": c.nullable}
                for c in EodStock.__table__.columns
            ]
            return {"columns": cols}
        elif detail == "pk":
            return {"pk": [c.name for c in EodStock.__table__.primary_key.columns]}
        elif detail == "indexes":
            try:
                rows = s.execute(text("PRAGMA index_list('eod_stock_v2');")).mappings().all()
                return {"indexes": [dict(r) for r in rows]}
            except Exception:
                return {"indexes": []}
        elif detail == "sample":
            q = select(EodStock).limit(sample_rows)
            rows = s.execute(q).scalars().all()
            def to_dict(r):
                return {
                    "ticker": r.ticker, "DateTime": r.DateTime.isoformat() if r.DateTime else None,
                    "Open": r.Open, "High": r.High, "Low": r.Low, "Close": r.Close, "Volume": r.Volume
                }
            return {"sample": [to_dict(r) for r in rows]}
        return {"error": f"Unknown detail: {detail}"}

def _range_impl(req: RangeRequest) -> list[dict]:

    try:
        start_dt = parse_dt(req.start); end_dt = parse_dt(req.end)
        cols = ensure_allowed_columns(req.columns)
        with get_session() as s:
            cols_expr = [getattr(EodStock, c) for c in cols]

            q = (
                select(EodStock.DateTime, EodStock.ticker, *cols_expr).where(EodStock.ticker == req.symbol)
                .where(EodStock.DateTime >= start_dt)
                .where(EodStock.DateTime <= end_dt)
                .order_by(EodStock.DateTime.asc())
                .limit(req.limit)
            )
            rows = s.execute(q).all()
        out = []
    
        for r in rows:
            d = {"DateTime": r[0].isoformat(), "ticker": r[1]}
            for i, c in enumerate(cols, start=2):
                d[c] = r[i]
            out.append(d)
        print("out",out)
        return out
    except Exception as e:
        print(e)

def range_func(symbol: str, start: str, end: str, columns: list[str]|None=None, limit: int=10000):
    req = RangeRequest(symbol=symbol, start=start, end=end, columns=columns or [], limit=limit)
    return _range_impl(req)

range_tool = StructuredTool.from_function(
    func=range_func,
    name="stock_range",
    description="Lấy dữ liệu theo khoảng thời gian cho 1 mã (OHLCV và các cột whitelisted).",
    args_schema=RangeRequest,
)

@tool
def stock_latest(
    symbol: Annotated[str, PydField(description="Mã CK, ví dụ HPG")],
    columns: Annotated[list[str], PydField(description="Các cột trả về, vd ['Close','Volume']")] = ["Close","Volume"],
) -> dict:
    """Lấy bản ghi mới nhất (max DateTime) cho symbol."""
    cols = ensure_allowed_columns(columns)
    with get_session() as s:
        sub = select(func.max(EodStock.DateTime)).where(EodStock.ticker == symbol.upper())
        latest_dt = s.execute(sub).scalar()
        if latest_dt is None:
            return {"symbol": symbol.upper(), "data": None}
        cols_expr = [getattr(EodStock, c) for c in cols]
        q = select(EodStock.ticker, EodStock.DateTime, *cols_expr)\
            .where(EodStock.ticker == symbol.upper())\
            .where(EodStock.DateTime == latest_dt)
        r = s.execute(q).first()
        if not r:
            return {"symbol": symbol.upper(), "data": None}
        d = {"ticker": r[0], "DateTime": r[1].isoformat()}
        for i, c in enumerate(cols, start=2):
            d[c] = r[i]
        return d

@tool
def stock_foreign_flow(
    symbol: Annotated[str, PydField(description="Mã CK, ví dụ HPG")],
    start: Annotated[str, PydField(description="YYYY-MM-DD")],
    end: Annotated[str, PydField(description="YYYY-MM-DD")],
    metrics: Annotated[list[str], PydField(description="Danh sách metric: buy/sell foreign, propTradingNetValue")] = ["buyForeignQuantity","sellForeignQuantity"],
) -> list[dict]:
    """Lấy các chỉ số dòng vốn ngoại/tự doanh theo khoảng thời gian."""
    req = ForeignFlowRequest(symbol=symbol, start=start, end=end, metrics=metrics)
    cols = req.metrics
    start_dt = parse_dt(req.start); end_dt = parse_dt(req.end)
    with get_session() as s:
        q = (
            select(EodStock.DateTime, EodStock.ticker, *[getattr(EodStock, c) for c in cols])
            .where(EodStock.ticker == req.symbol.upper())
            .where(EodStock.DateTime >= start_dt)
            .where(EodStock.DateTime <= end_dt)
            .order_by(EodStock.DateTime.asc())
            .limit(10000)
        )
        rows = s.execute(q).all()
    out = []
    for r in rows:
        d = {"DateTime": r[0].isoformat(), "ticker": r[1]}
        for i, c in enumerate(cols, start=2):
            d[c] = r[i]
        out.append(d)
    return out



## 7) Test nhanh (không dùng LLM)


In [None]:

# print("▶ stock_meta(columns):")
# print(stock_meta.invoke({"detail":"columns"}))

# print("\n▶ stock_meta(pk):")
# print(stock_meta.invoke({"detail":"pk"}))

# print("\n▶ stock_meta(sample):")
# print(stock_meta.invoke({"detail":"sample","sample_rows":3}))

print("\n▶ stock_range HPG 2024-01-03..2024-01-06 (OHLCV):")
print(range_tool.invoke({"symbol":"HPG","start":"2024-01-03","end":"2024-01-06","columns":["Open","High","Low","Close","Volume"]}))

# print("\n▶ stock_latest FPT (Close, Volume):")
# print(stock_latest.invoke({"symbol":"FPT","columns":["Close","Volume"]}))

# print("\n▶ stock_foreign_flow HPG (buyForeignQuantity, sellForeignQuantity):")
# print(stock_foreign_flow.invoke({"symbol":"HPG","start":"2024-01-02","end":"2024-01-05",
#                                  "metrics":["buyForeignQuantity","sellForeignQuantity"]}))



## 8) (Tuỳ chọn) Kết nối LLM với Resolver 1 vòng
Sẽ chạy nếu phát hiện `OPENAI_API_KEY`.


In [None]:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.messages import ToolMessage
from langchain_core.runnables import RunnableLambda
import json, os

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# 1) Prompt đầu vào (định tuyến ngôn ngữ + ngữ cảnh)
input_prompt = ChatPromptTemplate.from_messages([
    ("system", "Bạn là trợ lý tài chính Fin68. Luôn trả lời bằng tiếng Việt. "
               "Nếu tool trả về tiếng Anh, hãy dịch sang tiếng Việt khi trình bày."),
    ("human", "{query}")
])

llm_tools = llm.bind_tools([stock_meta, range_tool, stock_latest, stock_foreign_flow])
TOOLS = {t.name: t for t in [stock_meta, range_tool, stock_latest, stock_foreign_flow]}

def resolver(ai_msg):
    # Trả về danh sách messages để giữ nguyên ngữ cảnh tool_call
    if not getattr(ai_msg, "tool_calls", None):
        return [ai_msg]
    msgs = [ai_msg]
    for tc in ai_msg.tool_calls:
        tool = TOOLS[tc["name"]]
        try:
            out = tool.invoke(tc["args"])
        except Exception as e:
            out = {"error": str(e)}
        msgs.append(ToolMessage(content=json.dumps(out, ensure_ascii=False),
                                tool_call_id=tc["id"]))
    return msgs

# 2) Final prompt tái-áp system trước khi tổng hợp câu trả lời cuối
final_prompt = ChatPromptTemplate.from_messages([
    ("system", "Bạn là trợ lý tài chính Fin68. Luôn trả lời bằng tiếng Việt, súc tích, chính xác. "
               "Chuẩn hoá đơn vị/định dạng 'en-US', đối với dữ liệu giá đang trả về đơn vị là ngàn đồng, và giải thích ngắn gọn nếu cần."),
    MessagesPlaceholder("messages")
])

# 3) Build pipeline: input_prompt → llm_tools → resolver → (wrap messages) → final_prompt → llm
chain = (

    llm_tools
    | RunnableLambda(resolver)
    | RunnableLambda(lambda msgs: {"messages": msgs if isinstance(msgs, list) else [msgs]})
    | final_prompt
    | llm
)

# Gọi
ans2 = chain.invoke("Giá của cổ phiếu CII hôm nay")



Giá cổ phiếu CII hiện tại là 23.400 VNĐ. Khối lượng giao dịch trong phiên là 17.415.500 cổ phiếu.


In [53]:
for chunk in chain.stream("Giá cổ phiếu CII hôm nay"):
    # chunk là phần nhỏ của phản hồi (string hoặc ChatMessage)
    if hasattr(chunk, "content"):
        sys.stdout.write(chunk.content)
        sys.stdout.flush()
    else:
        sys.stdout.write(str(chunk))
        sys.stdout.flush()

Giá cổ phiếu CII hiện tại là 23.400 VNĐ. Khối lượng giao dịch trong phiên là 17.415.500 cổ phiếu.


## 9) Notes production
- Đổi `ENGINE` sang PostgreSQL và bật `__table_args__ = {"schema": "eod"}`.
- Thêm timeout truy vấn, limit cứng, logging chuẩn (tool_name, args, duration, rows).
- Chỉ cho READ; nếu cần WRITE, tách tool và kiểm soát quyền.
- Có thể bổ sung tool aggregate (D/W/M/Q/Y).


In [None]:
#1 Cài đặt python + VSCODE
Hướng dẫn cài đặt:
- python version 3.12
- visual studio code
- extension vscode
- Tạo môi trường ảo trong lập trình
- Cài đặt các thư viện python cho khóa học