# Agent use workflow

This notebook demonstrates the use of the LlamaIndex library to create and query a vector store index from documents. It includes functions to convert solar dates to lunar dates, assign stars to astrological charts based on birth details, and execute a retrieval-augmented generation (RAG) workflow. The workflow involves ingesting documents, extracting information, retrieving relevant nodes, reranking them, and synthesizing a response using a language model. The notebook also includes an example of running the workflow asynchronously.

https://docs.llamaindex.ai/en/stable/examples/workflow/react_agent/


In [1]:
%pip install llama-index-finetuning
%pip install llama-index-finetuning-callbacks
%pip install llama-index-llms-openai
%pip install "openinference-instrumentation-llama-index>=3.0.0" "opentelemetry-proto>=1.12.0" opentelemetry-exporter-otlp opentelemetry-sdk

Note: you may need to restart the kernel to use updated packages.
[31mERROR: Could not find a version that satisfies the requirement llama-index-finetuning-callbacks (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for llama-index-finetuning-callbacks[0m[31m
[0mNote: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
from llama_index.core import (
    SimpleDirectoryReader,
    VectorStoreIndex,
    StorageContext,
    load_index_from_storage,
)
from llama_index.llms.openai import OpenAI

from llama_index.core.tools import QueryEngineTool, ToolMetadata
from llama_index.core.agent import ReActAgent

In [3]:
llm_4 = OpenAI(model="gpt-4o-mini")

In [4]:
def create_and_load_index(name = ''):
    persist_dir = "./storage/" + name
    try:
        storage_context = StorageContext.from_defaults(
            persist_dir=persist_dir,
        )
        tu_vi_index = load_index_from_storage(storage_context)
        index_loaded = True
    except:
        index_loaded = False
    
    if not index_loaded:
        docs = SimpleDirectoryReader(
            input_files=["../data/" + name + ".md"],
        ).load_data()
        tu_vi_index = VectorStoreIndex.from_documents(docs)
        tu_vi_index.storage_context.persist(persist_dir)

    return tu_vi_index

In [5]:
# loai_cung_engine = loai_cung_index.as_query_engine(similarity_top_k=3)
# cac_buoc_luan_doan_la_so_tu_vi_engine = cac_buoc_luan_doan_la_so_tu_vi_index.as_query_engine(similarity_top_k=3)
# tuoi_xong_dat_engine = tuoi_xong_dat_index.as_query_engine(similarity_top_k=3)

## Hàm luận từ ngày tháng năm sinh.

In [6]:
from datetime import datetime

def an_sao_tuvi(day, month, year, hour):
    # Thông tin cơ bản từ ngày tháng năm sinh
    can = ['Giáp', 'Ất', 'Bính', 'Đinh', 'Mậu', 'Kỷ', 'Canh', 'Tân', 'Nhâm', 'Quý']
    chi = ['Tý', 'Sửu', 'Dần', 'Mão', 'Thìn', 'Tỵ', 'Ngọ', 'Mùi', 'Thân', 'Dậu', 'Tuất', 'Hợi']
    cung_names = ['Mệnh', 'Phụ Mẫu', 'Phúc Đức', 'Điền Trạch', 'Quan Lộc', 'Nô Bộc', 
                  'Thiên Di', 'Tật Ách', 'Tài Bạch', 'Tử Tức', 'Phu Thê', 'Huynh Đệ']
    
    # Xác định Thiên Can và Địa Chi từ năm sinh
    can_index = (year % 10)
    chi_index = (year % 12)
    thien_can = can[can_index]
    dia_chi = chi[chi_index]
    
    # Tính Cục dựa vào năm sinh
    cuc_mapping = {
        'Kim': ['Canh', 'Tân'],
        'Mộc': ['Giáp', 'Ất'],
        'Thủy': ['Nhâm', 'Quý'],
        'Hỏa': ['Bính', 'Đinh'],
        'Thổ': ['Mậu', 'Kỷ']
    }
    cuc = None
    for key, values in cuc_mapping.items():
        print('cuc_mapping:', key, values)
        if thien_can in values:
            cuc = key
            break

    # An các cung dựa vào giờ sinh
    gio_mapping = {
        'Tý': 0, 'Sửu': 1, 'Dần': 2, 'Mão': 3,
        'Thìn': 4, 'Tỵ': 5, 'Ngọ': 6, 'Mùi': 7,
        'Thân': 8, 'Dậu': 9, 'Tuất': 10, 'Hợi': 11
    }
    gio_sinh = list(gio_mapping.keys())[hour % 12]
    
    # Bản đồ chi đến các cung trong lá số
    cung_chi_mapping = {
        'Tý': 'Mệnh', 'Sửu': 'Phụ Mẫu', 'Dần': 'Phúc Đức', 'Mão': 'Điền Trạch',
        'Thìn': 'Quan Lộc', 'Tỵ': 'Nô Bộc', 'Ngọ': 'Thiên Di', 'Mùi': 'Tật Ách',
        'Thân': 'Tài Bạch', 'Dậu': 'Tử Tức', 'Tuất': 'Phu Thê', 'Hợi': 'Huynh Đệ'
    }
    
    # An các sao chính tinh
    chinh_tinh = {
        'Tử Vi': 'Thìn', 'Thiên Phủ': 'Tuất', 'Thái Dương': 'Dần', 'Thái Âm': 'Hợi',
        'Thiên Cơ': 'Sửu', 'Thiên Lương': 'Ngọ', 'Vũ Khúc': 'Thân', 'Thiên Đồng': 'Tý',
        'Cự Môn': 'Dậu', 'Liêm Trinh': 'Mão', 'Thất Sát': 'Mùi', 'Phá Quân': 'Tỵ',
        'Tham Lang': 'Dậu'
    }
    
    # Phụ tinh cơ bản
    phu_tinh = {
        'Hóa Khoa': 'Mệnh', 'Hóa Quyền': 'Quan Lộc', 'Hóa Lộc': 'Tài Bạch', 'Hóa Kỵ': 'Phu Thê'
    }
    
    # Kết quả an sao
    sao_cung = {cung: [] for cung in cung_names}
    
    # An các chính tinh vào các cung
    for sao, chi_name in chinh_tinh.items():
        cung = cung_chi_mapping[chi_name]  # Lấy tên cung từ chi
        # print('an_sao_tuvi:chinh_tinh', cung, sao, chi_name)
        sao_cung[cung].append(sao)
    
    # An các phụ tinh vào các cung
    for sao, cung in phu_tinh.items():
        sao_cung[cung].append(sao)
    
    response = ""
    response += f"Lá số cho {day}/{month}/{year} giờ {gio_sinh}\n"
    response += f"Thiên Can: {thien_can}, Địa Chi: {dia_chi}, Cục: {cuc}\n"
    for cung, sao_list in sao_cung.items():
        response += f"{cung}: {', '.join(sao_list) if sao_list else 'Không có sao'}\n"

    # Kết quả cuối cùng
    print(f"Lá số cho {day}/{month}/{year} giờ {gio_sinh}")
    print(f"Thiên Can: {thien_can}, Địa Chi: {dia_chi}, Cục: {cuc}")
    for cung, sao_list in sao_cung.items():
        print(f"{cung}: {', '.join(sao_list) if sao_list else 'Không có sao'}")
    
    return response

an_sao_tuvi(1, 10, 1996, 4)

cuc_mapping: Kim ['Canh', 'Tân']
Lá số cho 1/10/1996 giờ Thìn
Thiên Can: Canh, Địa Chi: Thìn, Cục: Kim
Mệnh: Thiên Đồng, Hóa Khoa
Phụ Mẫu: Thiên Cơ
Phúc Đức: Thái Dương
Điền Trạch: Liêm Trinh
Quan Lộc: Tử Vi, Hóa Quyền
Nô Bộc: Phá Quân
Thiên Di: Thiên Lương
Tật Ách: Thất Sát
Tài Bạch: Vũ Khúc, Hóa Lộc
Tử Tức: Cự Môn, Tham Lang
Phu Thê: Thiên Phủ, Hóa Kỵ
Huynh Đệ: Thái Âm


'Lá số cho 1/10/1996 giờ Thìn\nThiên Can: Canh, Địa Chi: Thìn, Cục: Kim\nMệnh: Thiên Đồng, Hóa Khoa\nPhụ Mẫu: Thiên Cơ\nPhúc Đức: Thái Dương\nĐiền Trạch: Liêm Trinh\nQuan Lộc: Tử Vi, Hóa Quyền\nNô Bộc: Phá Quân\nThiên Di: Thiên Lương\nTật Ách: Thất Sát\nTài Bạch: Vũ Khúc, Hóa Lộc\nTử Tức: Cự Môn, Tham Lang\nPhu Thê: Thiên Phủ, Hóa Kỵ\nHuynh Đệ: Thái Âm\n'

In [7]:
from llama_index.core.tools import BaseTool, FunctionTool
from lunarcalendar import Converter, Solar

def convert_to_lunar(year, month, day):
    solar = Solar(year, month, day)
    lunar = Converter.Solar2Lunar(solar)
    return lunar

def fn_an_sao(birthday: str, hour: str):
    """Trích xuất birthday[date + hour] từ text người dùng nhập và gán sao theo các [cung mệnh, chính tinh và phụ tinh]"""
    print('fn_an_sao:', birthday, hour)
    date_obj = datetime.strptime(birthday, "%d/%m/%Y")
    hour = hour.split(':')[0]
    hour_int = int(hour)

    lunar_date = convert_to_lunar(date_obj.year, date_obj.month, date_obj.day)

    results = an_sao_tuvi(lunar_date.day, lunar_date.month, lunar_date.year, hour_int)

    return results

an_sao_tool = FunctionTool.from_defaults(fn=fn_an_sao, name="fn_an_sao")

# query_engine_tools = [
#     an_sao_tool,
#     QueryEngineTool(
#         query_engine=loai_cung_engine,
#         metadata=ToolMetadata(
#             name="loai_cung",
#             description=(
#                 "các loại cung được sử dụng để phân tích và dự đoán tính cách, cuộc đời của con người"
#                 "Use a detailed plain text question as input to the tool."
#             ),
#         ),
#     ),
#     QueryEngineTool(
#         query_engine=cac_buoc_luan_doan_la_so_tu_vi_engine,
#         metadata=ToolMetadata(
#             name="cac_buoc_luan_doan_la_so_tu_vi",
#             description=(
#                 "Phục vụ thông tin về các bước luận đoán lá số tử vi."
#                 "Use a detailed plain text question as input to the tool."
#             ),
#         ),
#     ),
#     QueryEngineTool(
#         query_engine=tuoi_xong_dat_engine,
#         metadata=ToolMetadata(
#             name="tuoi_xong_dat",
#             description=(
#                 "Phục vụ thông tin về tuổi xông đất."
#                 "Use a detailed plain text question as input to the tool."
#             ),
#         ),
#     ),
# ]

In [14]:
from llama_index.core.workflow import Event
from llama_index.core.schema import NodeWithScore


class ExtractEvent(Event):
    """Result of running extraction"""


class RetrieverEvent(Event):
    """Result of running retrieval"""

    nodes: list[NodeWithScore]


class RerankEvent(Event):
    """Result of running reranking on retrieved nodes"""

    nodes: list[NodeWithScore]

In [17]:
from llama_index.core.response_synthesizers import CompactAndRefine
from llama_index.core.postprocessor.llm_rerank import LLMRerank
from llama_index.core.workflow import (
    Context,
    Workflow,
    StartEvent,
    StopEvent,
    step,
)

from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.agent.openai import OpenAIAgent

class RAGWorkflow(Workflow):
    @step
    async def ingest(self, ctx: Context, ev: StartEvent) -> StopEvent | None:
        """Entry point to ingest a document, triggered by a StartEvent with `dirname`."""
        print("ingest data")
        dirname = ev.get("dirname")
        docs = SimpleDirectoryReader(
            input_dir=[dirname],
        ).load_data()
        tu_vi_index = VectorStoreIndex.from_documents(docs)
        return StopEvent(result=tu_vi_index)

    @step
    async def extract_an_sao(self, ctx: Context, ev: StartEvent) -> ExtractEvent | None:
        """Entry point to extract an sao, triggered by a StartEvent with `query`."""
        query = ev.get("query")
        await ctx.set("query", query)
        print('extract_an_sao:', query)
        llm = OpenAI(model="gpt-4o-mini")
        # initialize agent
        agent = OpenAIAgent.from_tools(
            [an_sao_tool],
            max_function_calls=1,
            llm=llm,
            verbose=True,
        )

        res_an_sao = agent.chat(query)
        print('res_an_sao:', res_an_sao)
        return ExtractEvent(result=res_an_sao)

    # @step
    # async def start_query(
    #     self, ctx: Context, ev: StartEvent
    # ) -> StopEvent | None:
    #     """Entry point to clean query, triggered by a StartEvent with `query`."""
    #     query = ev.get("query")
    #     print('clean_query:', query)
    #     return StopEvent(result=query)

    @step
    async def retrieve(
        self, ctx: Context, ev: ExtractEvent
    ) -> RetrieverEvent | None:
        "Entry point for RAG, triggered by a StartEvent with `query`."
        query = ctx.get("query")
        index = ev.get("index")

        print(f"Query the database with: {query}, {index}")

        if not query:
            print("Query is empty, please provide a query!")
            return None
        print('start an sao:', query)
        res_an_sao = an_sao_tool.execute(query)
        print('res_an_sao:', res_an_sao)
        # store the query in the global context
        # await ctx.set("query", query)
        await ctx.set("res_an_sao", res_an_sao)

        # get the index from the global context
        if index is None:
            print("Index is empty, load some documents before querying!")
            return None

        print("Retrieving nodes...", query)
        retriever = index.as_retriever(similarity_top_k=2)
        nodes = retriever.retrieve(query)
        print(f"Retrieved {len(nodes)} nodes.")
        return RetrieverEvent(nodes=nodes)

    @step
    async def rerank(self, ctx: Context, ev: RetrieverEvent) -> RerankEvent:
        # Rerank the nodes
        print("Reranking nodes...")
        ranker = LLMRerank(
            choice_batch_size=5, top_n=3, llm=OpenAI(model="gpt-4o-mini")
        )
        
        query = await ctx.get("query", default=None)
        res_an_sao = await ctx.get("res_an_sao", default=None)
        query_str = f"{query} {res_an_sao}"

        # print(await ctx.get("query", default=None), flush=True)
        new_nodes = ranker.postprocess_nodes(
            ev.nodes, query_str=query_str
        )
        print(f"Reranked nodes to {len(new_nodes)}")
        return RerankEvent(nodes=new_nodes)

    @step
    async def synthesize(self, ctx: Context, ev: RerankEvent) -> StopEvent:
        """Return a streaming response using reranked nodes."""
        llm = OpenAI(model="gpt-4o-mini")
        summarizer = CompactAndRefine(llm=llm, streaming=True, verbose=True)
        query = await ctx.get("query", default=None)
        res_an_sao = await ctx.get("res_an_sao", default=None)
        query_str = f"{query} {res_an_sao}"
        print("Synthesizing response...")
        print(f"Query: {query_str}")

        response = await summarizer.asynthesize(query_str, nodes=ev.nodes)
        return StopEvent(result=response)

In [18]:
import nest_asyncio
import asyncio

# Apply the nest_asyncio patch
nest_asyncio.apply()

async def main():
    wf = RAGWorkflow()
    # Start the workflow
    await wf.run(tu_vi="lá số cho 1/10/1996 giờ 4")

    index = await wf.run(dirname="../data/")
    await wf.run(query="lá số cho 1/10/1996 giờ 4", index=index)
    print("Workflow completed.")

if __name__ == "__main__":
    asyncio.run(main())

extract_an_sao: None
ingest data


ValidationError: 1 validation error for Task
input
  Input should be a valid string [type=string_type, input_value=None, input_type=NoneType]
    For further information visit https://errors.pydantic.dev/2.9/v/string_type