In [1]:
%%capture --no-stderr
%load_ext autoreload
%autoreload 2

In [2]:
import os
from dotenv import load_dotenv, find_dotenv
import warnings
import nest_asyncio
from IPython.display import display, Markdown

warnings.filterwarnings("ignore")
nest_asyncio.apply()
_ = load_dotenv(find_dotenv())

# 1. Building our first multi-agent system with LlamaIndex Workflows!

## a. Building our RAG workflow

In [3]:
from phoenix.otel import register
from openinference.instrumentation.llama_index import LlamaIndexInstrumentor

## Log traces using Llama Trace
tracer_provider = register(
  project_name="llama-trace-v1",
  endpoint="https://app.phoenix.arize.com/v1/traces"
)
LlamaIndexInstrumentor().instrument(tracer_provider=tracer_provider)

ðŸ”­ OpenTelemetry Tracing Details ðŸ”­
|  Phoenix Project: llama-trace-v1
|  Span Processor: SimpleSpanProcessor
|  Collector Endpoint: https://app.phoenix.arize.com/v1/traces
|  Transport: HTTP
|  Transport Headers: {'api_key': '****'}
|  
|  Using a default SpanProcessor. `add_span_processor` will overwrite this default.
|  
|  `register` has set this TracerProvider as the global OpenTelemetry default.
|  To disable this behavior, call `register` with `set_global_tracer_provider=False`.



### i. Building our RAG Service

In [4]:
!wget "https://s2.q4cdn.com/470004039/files/doc_financials/2021/q4/_10-K-2021-(As-Filed).pdf" -O apple_2021_10k.pdf

--2024-11-09 13:49:00--  https://s2.q4cdn.com/470004039/files/doc_financials/2021/q4/_10-K-2021-(As-Filed).pdf
Resolving s2.q4cdn.com (s2.q4cdn.com)... 139.99.123.118
Connecting to s2.q4cdn.com (s2.q4cdn.com)|139.99.123.118|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 789896 (771K) [application/pdf]
Saving to: â€˜apple_2021_10k.pdfâ€™


2024-11-09 13:49:00 (13.7 MB/s) - â€˜apple_2021_10k.pdfâ€™ saved [789896/789896]



### ii. Ingest document

In [4]:
from llama_index.core import (
    SimpleDirectoryReader,
    VectorStoreIndex,
    StorageContext,
    load_index_from_storage
)
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding

llm = OpenAI("gpt-4o-mini")
embed_model = OpenAIEmbedding(model="text-embedding-3-small")

In [5]:
documents = SimpleDirectoryReader(
    input_files=["./apple_2021_10k.pdf"]
).load_data()

#### Let's ingest the Apple 10K pdf file into a hybrid search enabled vector database - Qdrant!
Set up Qdrant vector database

In [None]:
# !docker run -p 6333:6333 -p 6334:6334 \
#     -v "$(pwd)/qdrant_storage:/qdrant/storage:z" \
#     qdrant/qdrant

In [14]:
from llama_index.vector_stores.qdrant import QdrantVectorStore
from qdrant_client import QdrantClient, AsyncQdrantClient

client = QdrantClient(host="localhost", port=6333)
aclient = AsyncQdrantClient(host="localhost", port=6333)

# delete collection if it exists
if client.collection_exists("Apple_10K"):
    client.delete_collection("Apple_10K")

vector_store = QdrantVectorStore(
    "Apple_10K",
    client = client,
    aclient = aclient,
    enable_hybrid = True,
    fastembed_sparse_model="Qdrant/bm42-all-minilm-l6-v2-attentions",
)

INFO:httpx:HTTP Request: GET http://localhost:6333/collections/Apple_10K/exists "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: DELETE http://localhost:6333/collections/Apple_10K "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: GET http://localhost:6333/collections/Apple_10K/exists "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: GET http://localhost:6333/collections/Apple_10K/exists "HTTP/1.1 200 OK"


Fetching 6 files:   0%|          | 0/6 [00:00<?, ?it/s]

Fetching 6 files:   0%|          | 0/6 [00:00<?, ?it/s]

INFO:httpx:HTTP Request: GET http://localhost:6333/collections/Apple_10K/exists "HTTP/1.1 200 OK"


Fetching 6 files:   0%|          | 0/6 [00:00<?, ?it/s]

Fetching 6 files:   0%|          | 0/6 [00:00<?, ?it/s]

In [15]:
storage_context = StorageContext.from_defaults(vector_store=vector_store)

In [16]:
index = VectorStoreIndex.from_documents(
    documents,
    storage_context=storage_context,
    embed_model = embed_model
)

INFO:httpx:HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/embeddings "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: PUT http://localhost:6333/collections/Apple_10K "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: PUT http://localhost:6333/collections/Apple_10K/index?wait=true "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: GET http://localhost:6333/collections/Apple_10K/exists "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: GET http://localhost:6333/collections/Apple_10K "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: PUT http://localhost:6333/collections/Apple_10K/points?wait=true "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: PUT http://localhost:6333/collections/Apple_10K/points?wait=true "HTTP/1.1 200 OK"


#### Load from storage context

In [5]:
from llama_index.vector_stores.qdrant import QdrantVectorStore
from qdrant_client import QdrantClient, AsyncQdrantClient

client = QdrantClient(host="localhost", port=6333)
aclient = AsyncQdrantClient(host="localhost", port=6333)

vector_store = QdrantVectorStore(
    "Apple_10K",
    client = client,
    aclient = aclient,
    enable_hybrid = True,
    fastembed_sparse_model="Qdrant/bm42-all-minilm-l6-v2-attentions",
)

index = VectorStoreIndex.from_vector_store(
    vector_store,
    embed_model=embed_model,
)

Both client and aclient are provided. If using `:memory:` mode, the data between clients is not synced.


Fetching 6 files:   0%|          | 0/6 [00:00<?, ?it/s]

Fetching 6 files:   0%|          | 0/6 [00:00<?, ?it/s]

Fetching 6 files:   0%|          | 0/6 [00:00<?, ?it/s]

Fetching 6 files:   0%|          | 0/6 [00:00<?, ?it/s]

In [6]:
query_engine = index.as_query_engine(
    similarity_top_k=2, sparse_top_k=12, vector_store_query_mode="hybrid", llm = llm
)

In [7]:
from llama_index.core.tools import QueryEngineTool, ToolMetadata

tools = [
    QueryEngineTool(
        query_engine=query_engine,
        metadata=ToolMetadata(
            name="apple_10k",
            description=(
                "Provides information about Apple financials for year 2021. "
                "Use a detailed plain text question as input to the tool."
            )
        )
    )
]

In [8]:
from llama_index.core.agent import FunctionCallingAgent

apple_agent = FunctionCallingAgent.from_tools(
    tools=tools,
    llm = llm,
    verbose = True,
)

## b. Add our DSPy-LlamaIndex HyDE and Seek tool
> Using our Gemini LLM!

#### i. Get data

In [9]:
from llama_index.core import Document
from llama_index.llms.gemini import Gemini
from llama_index.embeddings.gemini import GeminiEmbedding
from llama_index.readers.web import SimpleWebPageReader
import dspy
from dspy import (
    Signature,
    InputField,
    OutputField,
    Module,
    Predict,
    Prediction
)
from dspy.teleprompt import BootstrapFewShot
from dspy.evaluate import answer_exact_match, answer_passage_match
from dspy import Example

import requests

In [10]:
llm2 = Gemini(model="models/gemini-1.5-flash")
embed_model2 = GeminiEmbedding()

lm = dspy.Google(model="gemini-1.5-flash")
dspy.settings.configure(lm=lm)

In [12]:
class CustomWebPageReader(SimpleWebPageReader):
    """
    Many websites, including Investopedia, require headers like User-Agent to be set in the request to return the correct content.
    To fix this, we'll modify the load_data method in the SimpleWebPageReader class to include appropriate headers.
    """
    
    def load_data(self, urls):
        """Edit the headers portion in the load_data method to be able to read .asp files"""
        
        if not isinstance(urls, list):
            raise ValueError("urls must be a list of strings.")
        documents = []
        
        ## This is the edit
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
        }
        for url in urls:
            response = requests.get(url, headers=headers).text
            if self.html_to_text:
                import html2text
                response = html2text.html2text(response)

            metadata = None
            if self._metadata_fn is not None:
                metadata = self._metadata_fn(url)

            documents.append(Document(text=response, id_=url, metadata=metadata or {}))

        return documents

In [13]:
links = [
    "https://www.investopedia.com/terms/s/stockmarket.asp",
    "https://www.investopedia.com/ask/answers/difference-between-options-and-futures/",
    "https://www.investopedia.com/financial-edge/0411/5-essential-things-you-need-to-know-about-every-stock-you-buy.aspx",
    "https://www.investopedia.com/articles/fundamental/04/063004.asp",
    "https://www.investopedia.com/terms/t/technicalanalysis.asp",
    "https://www.investopedia.com/terms/i/ichimoku-cloud.asp",
    "https://www.investopedia.com/terms/a/aroon.asp",  
    "https://www.investopedia.com/terms/b/bollingerbands.asp",
    "https://www.investopedia.com/articles/forex/05/macddiverge.asp",
    "https://www.investopedia.com/terms/a/accumulationdistribution.asp",
    "https://www.investopedia.com/terms/s/stochasticoscillator.asp",
    "https://www.investopedia.com/terms/s/stochrsi.asp",
    "https://www.investopedia.com/terms/p/price-earningsratio.asp",
    "https://www.investopedia.com/terms/p/price-to-bookratio.asp",
    "https://www.investopedia.com/terms/p/price-to-salesratio.asp",
    "https://www.investopedia.com/terms/q/quickratio.asp"
]

In [18]:
docs = CustomWebPageReader(
    html_to_text=True
).load_data(urls=links)

In [27]:
investopedia_index = VectorStoreIndex.from_documents(
    docs,
    embed_model=embed_model2,
)

In [32]:
investopedia_index.storage_context.persist(persist_dir="../investopedia_storage")

#### ii. Load from storage

In [11]:
investopedia_storage_context = StorageContext.from_defaults(persist_dir="../investopedia_storage")
investopedia_index = load_index_from_storage(investopedia_storage_context)

#### iv. DSPy Finetuning

In [12]:
from llama_index.core.indices.query.query_transform import HyDEQueryTransform

In [13]:
hyde = HyDEQueryTransform(include_original=True, llm = llm2)

In [14]:
retriever = investopedia_index.as_retriever(embed_model=embed_model2)

In [15]:
class GenerateAnswer(dspy.Signature):
    """Answer questions with short factoid answers."""
    context = dspy.InputField(desc="May contain relevant facts")
    question = dspy.InputField()
    answer = dspy.OutputField(desc="Often between 1-5 sentences.")

In [16]:
class HydeRAG(dspy.Module):
    def __init__(self, num_passages=3):
        super().__init__()
        self.retriever = retriever
        self.generate_answer = dspy.Predict(GenerateAnswer)
    
    def get_context(self, question):
        hyded_query = hyde.run(question)
        hyde_nodes = retriever.retrieve(hyded_query.custom_embedding_strs[0]) #retrieve nodes for hypothetical document
        nodes = retriever.retrieve(hyded_query.custom_embedding_strs[1]) #retrieve nodes for original question
        return "\n\n".join([n.get_content() for n in hyde_nodes]) + "\n\n".join([n.get_content() for n in nodes])
    
    def forward(self, question):
        context = self.get_context(question)
        prediction = self.generate_answer(context=context, question=question)
        return dspy.Prediction(context=context, answer = prediction.answer)

#### v. Test HyDE Rag

In [47]:
engine = HydeRAG()
question = "What is a dividend?"
pred = engine(question)

 		You are using the client Google, which will be removed in DSPy 2.6.
 		Changing the client is straightforward and will let you use new features (Adapters) that improve the consistency of LM outputs, especially when using chat LMs. 

 		Learn more about the changes and how to migrate at
 		https://github.com/stanfordnlp/dspy/blob/main/examples/migration.ipynb


#### vi. Finetuning!

In [43]:
from llama_index.core.evaluation import SemanticSimilarityEvaluator

evaluator = SemanticSimilarityEvaluator(
    similarity_threshold=0.5, 
    embed_model=embed_model2
)

def validate_context_and_answer(example, pred, trace=None):
    result = evaluator.evaluate(
        response = pred.answer,
        reference = example.answer
    )
    return result.passing

In [44]:
train_examples = [
        Example(
            question = "What is the difference between an option and a future?",
            answer = """An option gives the buyer the right, but not the obligation, to buy (or sell) an asset at a specific price at any time during the life of the contract.
            A futures contract obligates the buyer to purchase a specific asset, and the seller to sell and deliver that asset, at a specific future date
            """
        ),
        Example(
            question = "What are the similarities between an option and a future?",
            answer = """Futures and options positions may be traded and closed ahead of expiration, but the parties to the futures contracts for commodities are typically obligated to make and accept deliveries on the settlement date."""
        ),
        Example(
            question = "What is an option?",
            answer = "Options are financial derivatives. An options contract gives an investor the right to buy or sell the underlying instrument at a specific price while the contract is in effect. Investors may choose not to exercise their options. Option holders do not own the underlying shares or enjoy shareholder rights unless they exercise an option to buy stock."
        ),
        Example(
            question = "What are the different options?",
            answer = "There are only two kinds of options: Call options and put options. A call option confers the right to buy a stock at the strike price before the agreement expires. A put option gives the holder the right to sell a stock at a specific price."
        ),
        Example(
            question="What's P/E ratio?",
            answer = """ This ratio is used to measure a company's current share price relative to its per-share earnings. The company can be compared to other, similar corporations so that analysts and investors can determine its relative value. So if a company has a P/E ratio of 20, this means investors are willing to pay $20 for every $1 per earnings. That might seem expensive but not if the company is growing fast. The P/E can be found by comparing the current market price to the cumulative earnings of the last four quarters."""
        ),
        Example(
            question="What's a dividend?",
            answer = """Dividends are like interest in a savings accountâ€”you get paid regardless of the stock price. Dividends are distributions made by a company to its shareholders as a reward from its profits. The amount of the dividend is decided by its board of directors and are generally issued in cash, though it isn't uncommon for some companies to issue dividends in the form of stock shares."""
        ),
        Example(
            question="What's a balance sheet?",
            answer=" A balance sheet is a financial statement that reports a company's assets, liabilities and shareholder equity at a specific point in time"
        ),
        Example(
            question="What's a current ratio?",
            answer = "It's the total current assets divided by total current liabilities, commonly used by analysts to assess the ability of a company to meet its short-term obligations"
        ),
        Example(
            question = "What are stocks?",
            answer = "When you buy a stock or a share, you're getting a piece of that company. Owning shares gives you the right to part of the company's profits, often paid as dividends, and sometimes the right to vote on company matters"
        ),
        Example(
            question = "What are REITs?",
            answer="Real estate investment trusts (REITs) are companies that own, manage, or finance real estate. Investors can buy shares in them, and they legally must provide 90% of their profits as dividends each year."
        ),
        Example(
            question = "What are brokers?",
            answer = "Brokers in the stock market play the same role as in insurance and elsewhere, acting as a go-between for investors and the securities markets. They are licensed organizations that buy and sell stocks and other securities for individual and institutional clients."
        ),
        Example(
            question = "What is technical analysis?",
            answer = "Technical analysis is used to scrutinize the ways supply and demand for a security affect changes in price, volume, and implied volatility. It assumes that past trading activity and price changes of a security can be valuable indicators of the security's future price movements when paired with appropriate investing or trading rules."
        ),
        Example(
            question="What is the difference between fundamental and technical analysis?",
            answer = "Fundamental analysis is a method of evaluating securities by attempting to measure the intrinsic value of a stock. Fundamental analysts study everything from the overall economy and industry conditions to the financial condition and management of companies. Technical analysis differs from fundamental analysis in that the stock's price and volume are the only inputs. The core assumption is that all publicly known fundamentals have factored into price; thus, there is no need to pay close attention to them. Technical analysts do not attempt to measure a security's intrinsic value, but instead, use stock charts to identify patterns and trends that suggest how a stock's price will move in the future."
        )
    ]
train_examples = [t.with_inputs("question") for t in train_examples]

In [48]:
teleprompter = BootstrapFewShot(
    max_labeled_demos=0,
    metric=validate_context_and_answer
)
compiled_dspy_qp = teleprompter.compile(engine, trainset=train_examples)

  0%|          | 0/13 [00:00<?, ?it/s]ERROR:dspy.teleprompt.bootstrap:[2m2024-11-09T14:42:47.946296Z[0m [[31m[1merror    [0m] [1mFailed to run or to evaluate example Example({'question': 'What is the difference between an option and a future?', 'answer': 'An option gives the buyer the right, but not the obligation, to buy (or sell) an asset at a specific price at any time during the life of the contract.\n            A futures contract obligates the buyer to purchase a specific asset, and the seller to sell and deliver that asset, at a specific future date\n            '}) (input_keys={'question'}) with <function validate_context_and_answer at 0x3e35ed300> due to index: 0
finish_reason: RECITATION
.[0m [[0m[1m[34mdspy.teleprompt.bootstrap[0m][0m [36mfilename[0m=[35mbootstrap.py[0m [36mlineno[0m=[35m211[0m
 38%|â–ˆâ–ˆâ–ˆâ–Š      | 5/13 [00:41<01:06,  8.27s/it]

Bootstrapped 4 full traces after 6 examples in round 0.





In [49]:
compiled_dspy_qp.save("../compiled.json")

[('generate_answer', Predict(GenerateAnswer(context, question -> answer
    instructions='Answer questions with short factoid answers.'
    context = Field(annotation=str required=True json_schema_extra={'desc': 'May contain relevant facts', '__dspy_field_type': 'input', 'prefix': 'Context:'})
    question = Field(annotation=str required=True json_schema_extra={'__dspy_field_type': 'input', 'prefix': 'Question:', 'desc': '${question}'})
    answer = Field(annotation=str required=True json_schema_extra={'desc': 'Often between 1-5 sentences.', '__dspy_field_type': 'output', 'prefix': 'Answer:'})
)))]


#### vii. Test loading saved pipeline

In [17]:
## Loading a saved pipeline

pipeline = HydeRAG()
pipeline.load("../compiled.json")

In [19]:
question = "What is the difference between bollinger bands and the ichimoku cloud?"
pred = pipeline(question)
print(f"Question: {question}")
print(f"Predicted Answer: {pred.answer}")

 		You are using the client Google, which will be removed in DSPy 2.6.
 		Changing the client is straightforward and will let you use new features (Adapters) that improve the consistency of LM outputs, especially when using chat LMs. 

 		Learn more about the changes and how to migrate at
 		https://github.com/stanfordnlp/dspy/blob/main/examples/migration.ipynb


Question: What is the difference between bollinger bands and the ichimoku cloud?
Predicted Answer: The Ichimoku Cloud uses averages based on highs and lows over a period, while Bollinger Bands use a simple moving average (SMA) of closing prices.


## Building our LlamaIndex Multi-Agent System using Workflow!

In [18]:
from llama_index.core.workflow import (
    Workflow,
    step,
    Context,
    Event,
    StartEvent,
    StopEvent
)
from llama_index.core.base.llms.types import ChatMessage
from llama_index.core.chat_engine import SimpleChatEngine
from llama_index.core.selectors import LLMSingleSelector
from llama_index.core.prompts import PromptTemplate
from pydantic import BaseModel, Field
from typing import Literal, List, Dict, Optional, Any, Union, Annotated

In [107]:
class AppleEvent(Event):
    query: str

class InvestopediaEvent(Event):
    query: str 

class CompileEvent(Event):
    query: str
    response: str
    source: str

In [20]:
class Reroute(BaseModel):
    reroute: Literal[True, False] = Field(..., description="If the answer is satisfactory, return True. Else return False for rerouting")
    improvements: str = Field(..., description="Suggestions to improve the response.")

In [21]:
class StockSME(Workflow):
    def __init__(self, manager_llm = llm, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.llm = llm
        
        ## Initialize the "manager"
        self.manager_llm = SimpleChatEngine.from_defaults(llm=manager_llm)
        
        self.choices = [
            ToolMetadata(
                name = "Apple_Agent",
                description = """Choose this option to answer any questions related to Apple's financial performance in 2021
                including risks, opportunities, and financial statements."""
            ),
            ToolMetadata(
                name = "Definitions_Agent",
                description = "Choose this option to answer any questions related to investment definitions such as Ichimoku Cloud"
            )
        ] 
        self.router = LLMSingleSelector.from_defaults(llm=self.llm)
        self.reroute_prompt_template = PromptTemplate(
            """Given the query: {query} and the interim answer: {response}, determine if the interim answer satisfies the question.
            This does not need to be very strict. As long as the interim answer is able to address the question partially, return True."""
        )
        
        ## Initialize apple agent
        self.apple_agent = apple_agent
        
        ## Initialize DSPy definition service
        self.definition_service = HydeRAG()
        self.definition_service.load("../compiled.json")
    
    @step
    async def route(self, ctx: Context, ev: StartEvent) -> AppleEvent | InvestopediaEvent:
        selector_result = await self.router.aselect(self.choices, query = ev.query)
        await ctx.set("routes", 0)
        for result in selector_result.selections:
            if result.index == 0:
                ctx.send_event(AppleEvent(query=ev.query))
            else:
                ctx.send_event(InvestopediaEvent(query=ev.query))
            routes = await ctx.get("routes")
            routes += 1
            await ctx.set("routes", routes)
        
    @step
    async def apple_event(self, ev: AppleEvent) -> CompileEvent:
        response = await self.apple_agent.achat(ev.query, history=ev.history)
        return CompileEvent(query=ev.query, response = str(response), source = "apple_agent")

    @step
    async def investopedia_event(self, ev: InvestopediaEvent) -> CompileEvent:
        response = self.definition_service(ev.query)
        return CompileEvent(query=ev.query, response = str(response.answer), source = "investopedia_agent")
    
    @step
    async def compile(self, ctx: Context, ev: CompileEvent) -> StopEvent | AppleEvent | InvestopediaEvent:
        routes = await ctx.get("routes")
        ready = ctx.collect_events(ev, [CompileEvent] * routes)
        for i in range(len(ready)):
            reroute = await self.llm.astructured_predict(
                Reroute, self.reroute_prompt_template, query=ev.query, response = ev.response
            )
            if reroute.reroute is True:
                if ready[i].source == "apple_agent":
                    return AppleEvent(query = f"""Here's some feedback to better address the query. 
                                      Feedback: {reroute.improvements},
                                      Query: {ev.query}""")
                else:
                    return InvestopediaEvent(query = f"""Here's some feedback to better address the query. 
                                            Feedback: {reroute.improvements},
                                            Query: {ev.query}""")
        
        if routes == 2:
            response = await self.manager_llm.achat(
                f"""
                 A user has provided a query and 2 different strategies have been used
                to answer the query. The query was: {ev.query}

                Response 1 ({ready[0].source}): {ready[0].response}
                Response 2 ({ready[1].source}): {ready[1].response}

                Please compile the responses into a single coherent argument.
                """
            )
        else:
            response = ready[0].response
        return StopEvent(result=str(response))

In [33]:
from llama_index.utils.workflow import (
    draw_all_possible_flows,
    draw_most_recent_execution,
)


In [63]:
# Draw all
draw_all_possible_flows(StockSME, filename="StockSME_flow_all.html")

StockSME_flow_all.html


In [21]:
stockSME = StockSME(timeout=600)
response = await stockSME.run(query="What is a dividend?")

 		You are using the client Google, which will be removed in DSPy 2.6.
 		Changing the client is straightforward and will let you use new features (Adapters) that improve the consistency of LM outputs, especially when using chat LMs. 

 		Learn more about the changes and how to migrate at
 		https://github.com/stanfordnlp/dspy/blob/main/examples/migration.ipynb
ERROR:opentelemetry.context:Failed to detach context
Traceback (most recent call last):
  File "/opt/anaconda3/envs/llamaindex/lib/python3.12/site-packages/opentelemetry/context/__init__.py", line 154, in detach
    _RUNTIME_CONTEXT.detach(token)
  File "/opt/anaconda3/envs/llamaindex/lib/python3.12/site-packages/opentelemetry/context/contextvars_context.py", line 50, in detach
    self._current_context.reset(token)  # type: ignore
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: <Token var=<ContextVar name='current_context' default={} at 0x33a351c10> at 0x3ac04bd80> was created in a different Context


In [22]:
display(Markdown(response))

A dividend is a distribution of a company's profits to its shareholders.

# 2. Bringing Langgraph into the picture
Let's first deploy our langgraph object!

`%cd ../../LangChain/notebooks/langgraph_studio/`

`!langgraph build -t personal-assistant`

`!export LANGSMITH_API_KEY=...`

`!export IMAGE_NAME=personal-assistant`

`!docker compose up`

In [22]:
from langgraph.pregel.remote import RemoteGraph
from langgraph_sdk import get_client, get_sync_client
from langchain_core.messages import HumanMessage

url = "http://0.0.0.0:8123"
graph_name="agents"
client=get_client(url=url)
sync_client = get_sync_client(url=url)
remote_graph = RemoteGraph(
    graph_name, 
    url=url, 
    client=client, 
    sync_client=sync_client
)

thread = sync_client.threads.create()
config = {
    "configurable": {"thread_id": thread["thread_id"]}
}

In [25]:
response = await remote_graph.ainvoke(
    {
        "messages": [HumanMessage(content="What is the boiling point of water?")]
    },
    config = config
)

In [31]:
display(Markdown(response['messages'][-1]['content']))

The boiling point of water is typically 100 degrees Celsius (212 degrees Fahrenheit) at standard atmospheric pressure (1 atmosphere or 101.3 kPa). However, this boiling point can change with variations in atmospheric pressure; for example, at higher altitudes where pressure is lower, water boils at a lower temperature.

# 3. Bringing Autogen into the picture
Unfortunately we cannot invoke a `GroupChat` here because there is no way for us to terminate the `GroupChat` from within a LlamaIndex Workflow/ LangGraph digraph/ CrewAI Crew. We'll have to stick to a simpler 2-agent setup.

In [23]:
from openinference.instrumentation.openai import OpenAIInstrumentor

OpenAIInstrumentor().instrument(tracer_provider=tracer_provider)

In [24]:
from autogen import (
    AssistantAgent,
    GroupChat,
    GroupChatManager,
    UserProxyAgent,
)
from autogen.cache import Cache

config_list=[
    {"model": "gpt-4o-mini", "api_key": os.environ["OPENAI_API_KEY"]}
]

In [25]:
user_proxy = UserProxyAgent(
    name="Admin",
    system_message="A human admin. Give the task and send instructions to writer to refine the blog",
    is_termination_msg=lambda x: x.get("content", "") and x.get("content", "").rstrip().endswith("TERMINATE"),
    human_input_mode="NEVER",
    max_consecutive_auto_reply=10,
    code_execution_config=False
)

assistant = AssistantAgent(
    name="Assistant",
    system_message="""You are a helpful assistant. Only use the functions you are provided with. 
    Reply TERMINATE when the task is done""",
    llm_config={"config_list": config_list, "cache_seed": None}
)

In [26]:
from typing import Annotated, Optional, Dict, Any
from datetime import datetime, timedelta
import yfinance as yf

@user_proxy.register_for_execution()
@assistant.register_for_llm(description="Gets stock data for ticker of interest")
async def get_stock_data(
    ticker: Annotated[str, "Ticker of interest"],
    end_date: Optional[str] = datetime.now().strftime("%Y-%m-%d"),
    start_date: Optional[str] = None
) -> Dict[str, Any]:
    end_date = datetime.strptime(end_date, "%Y-%m-%d")
    if start_date is None:
        start_date = end_date - timedelta(days=365)
    data = yf.download(ticker, start=start_date, end=end_date)
    # Calculate percentage change over the 12 months
    data['Percentage Change'] = ((data['Close'] - data['Close'].iloc[0]) / data['Close'].iloc[0]) * 100

    # Identify the highest and lowest stock prices during this period
    highest_price = data['Close'].max()
    lowest_price = data['Close'].min()

    # Calculate average trading volume
    average_volume = data['Volume'].mean()

    # Prepare results
    result = {
        'start_price': data['Close'].iloc[0],
        'end_price': data['Close'].iloc[-1],
        'percentage_change': data['Percentage Change'].iloc[-1],
        'highest_price': highest_price,
        'lowest_price': lowest_price,
        'average_volume': average_volume
    }
    return result

In [94]:
with Cache.disk() as cache:
    chat_history = await user_proxy.a_initiate_chat(
        assistant,
        message="Write a blogpost about the stock price performance of Apple for the past year.",
        cache=cache,
    )

[33mAdmin[0m (to Assistant):

Write a blogpost about the stock price performance of Apple for the past year.

--------------------------------------------------------------------------------
[33mAssistant[0m (to Admin):

[32m***** Suggested tool call (call_YMzmfPL2hmkY4Ased4pAT6nY): get_stock_data *****[0m
Arguments: 
{"ticker":"AAPL","start_date":"2023-11-11","end_date":"2024-11-11"}
[32m*******************************************************************************[0m

--------------------------------------------------------------------------------
[35m
>>>>>>>> EXECUTING ASYNC FUNCTION get_stock_data...[0m


[*********************100%***********************]  1 of 1 completed

[33mAdmin[0m (to Assistant):

[33mAdmin[0m (to Assistant):

[32m***** Response from calling tool (call_YMzmfPL2hmkY4Ased4pAT6nY) *****[0m
{"start_price": 184.8000030517578, "end_price": 226.9600067138672, "percentage_change": 22.813854418769367, "highest_price": 236.47999572753906, "lowest_price": 165.0, "average_volume": 57778281.6}
[32m**********************************************************************[0m

--------------------------------------------------------------------------------





[33mAssistant[0m (to Admin):

## Apple Inc. Stock Price Performance: A Year in Review

As of November 11, 2024, Apple Inc. (AAPL) has shown a remarkable performance in the stock market over the past year. Reflecting a strong and resilient business model, the company's stock price has undergone significant fluctuations yet has ultimately trended upward.

### Stock Price Overview

At the beginning of the year, on November 11, 2023, Apple's stock was priced at approximately **$184.80**. As of the latest data, the stock price has risen to about **$226.96**. This represents a substantial **22.81%** increase in value, showcasing the company's robust growth trajectory.

### Price Fluctuations

Over the year, Apple's stock price has had its highs and lows. The highest recorded price during this period was around **$236.48**, while the lowest point reached was **$165.00**. This volatility can be attributed to various factors, including market conditions, global supply chain issues, and the co

In [95]:
display(Markdown(chat_history.chat_history[-1]['content']))

## Apple Inc. Stock Price Performance: A Year in Review

As of November 11, 2024, Apple Inc. (AAPL) has shown a remarkable performance in the stock market over the past year. Reflecting a strong and resilient business model, the company's stock price has undergone significant fluctuations yet has ultimately trended upward.

### Stock Price Overview

At the beginning of the year, on November 11, 2023, Apple's stock was priced at approximately **$184.80**. As of the latest data, the stock price has risen to about **$226.96**. This represents a substantial **22.81%** increase in value, showcasing the company's robust growth trajectory.

### Price Fluctuations

Over the year, Apple's stock price has had its highs and lows. The highest recorded price during this period was around **$236.48**, while the lowest point reached was **$165.00**. This volatility can be attributed to various factors, including market conditions, global supply chain issues, and the companyâ€™s own earnings reports and product launches.

### Trading Volume

During this period, Apple has maintained an average trading volume of **57,778,281 shares**. High trading volumes often indicate strong investor interest and can contribute to stock price stability or volatility. This consistent engagement from the market reflects investor confidence in Apple's future prospects.

### Conclusion

Overall, Apple's stock performance over the past year highlights its resilience and growth potential in a competitive technology landscape. With a significant increase in stock price and consistent trading volume, Apple continues to be a favorite among investors. As we move into the next year, it will be intriguing to see how the company navigates challenges and opportunities within the technology sector.

TERMINATE

## Adding a LlamaGuard to prevent our service from answering dangerous questions

In [None]:
# !ollama pull llama-guard3

In [27]:
from llama_index.llms.ollama import Ollama

guard_llm = Ollama("llama-guard3:latest")

In [60]:
response = guard_llm.complete("How do I steal my friend's car?")
print(response)

unsafe
S2


In [59]:
response = guard_llm.complete("How do I bake cookies?")
print(response)

safe


# Assembling the final workflow

In [28]:
class GuardEvent(Event):
    answer: str

class AppleInvestopediaEvent(Event):
    query: str

class AnalysisEvent(Event):
    query: str

class PlanningEvent(Event):
    query: str

class BadQueryEvent(Event):
    query: str

class SearchEvent(Event):
    query: str
    
class ResponseEvent(Event):
    query: str
    response: str
    source: str

In [108]:
class StockAssistant(Workflow):
    def __init__(
        self,
        manager_llm = llm,
        guard_llm = guard_llm,
        config_list = config_list,
        *args,
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.guard_llm = guard_llm
        self.manager_llm = manager_llm
        self.choices = [
            ToolMetadata(
                name = "Apple_and_Investopedia_Agent",
                description = """Choose this option to answer any questions related to Apple's financial performance in 2021
                including risks, opportunities, and financial statements; or questions relating to investment terminology definitions."""
            ),
            ToolMetadata(
                name = "Analysis_Agent",
                description = "Choose this option to write blogposts about the financial performance of any stock ticker."
            ),
            ToolMetadata(
                name="Search_Agent",
                description = "Choose this option to search the Internet."
            )
        ] 
        self.router = LLMSingleSelector.from_defaults(llm=self.manager_llm)
        self.reroute_prompt_template = PromptTemplate(
            """Given the query: {query} and the interim answer: {response}, determine if the interim answer satisfies the question.
            This does not need to be very strict. As long as the interim answer is able to address the question partially, return True."""
        )
        
        ## Langgraph setup
        self.remote_graph = RemoteGraph(
            graph_name, 
            url=url, 
            client=client, 
            sync_client=sync_client
        )
        self.thread = sync_client.threads.create()
        self.config = {
            "configurable": {"thread_id": thread["thread_id"]}
        }
        
    @step
    async def gatekeep(self, ev: StartEvent) -> PlanningEvent | StopEvent:
        response = await self.guard_llm.acomplete(ev.query)
        if "unsafe" in str(response):
            return StopEvent(result="Your query is inappropriate and I will thus not be answering your question.")
        return PlanningEvent(query = ev.query)
    
    @step
    async def plan_and_route(self, ctx: Context, ev: PlanningEvent) -> StopEvent | AppleInvestopediaEvent | AnalysisEvent | SearchEvent:
        evaluation = await self.manager_llm.acomplete(
            f"""
            Given a user query, determine if this is likely to yield good results.
            
            If it's good, return 'good', if it's bad, return 'bad'.
            Good queries use a lot of relevant keywords and are detailed. Bad queries are vague or ambiguous.

            Here is the query: {ev.query}
            """
        )
        
        if evaluation == "bad":
            final_response = await self.manager_llm.acomplete(
                f"""The user has asked a bad query. Seek for the user's clarification."""
            )
            return StopEvent(str(final_response))
        
        selector_result = await self.router.aselect(self.choices, query = ev.query)
        # print(selector_result)
        routes = 0
        await ctx.set("query", ev.query)
        for result in selector_result.selections:
            if result.index == 0:
                ctx.send_event(AppleInvestopediaEvent(query=ev.query))
            elif result.index == 1:
                ctx.send_event(AnalysisEvent(query=ev.query))
            else:
                ctx.send_event(SearchEvent(query=ev.query))
            routes += 1
        await ctx.set("routes", routes)
    
    @step
    async def apple_investopedia_event(
        self,
        ev: AppleInvestopediaEvent,
        apple_investopedia_workflow: StockSME
    ) -> ResponseEvent:
        ## This runs our LlamaIndex StockSME Workflow
        response = await apple_investopedia_workflow.run(query=ev.query)
        return ResponseEvent(query=ev.query, response=response, source = "apple_investopedia_agent")
    
    @step
    async def search_event(
        self,
        ev: SearchEvent
    ) -> ResponseEvent:
        ## This is our Langgraph step!
        result = await self.remote_graph.ainvoke(
            {
                "messages": [HumanMessage(content=ev.query)]
            },
            config = self.config
        )
        # print(result)
        # print(result['messages'][1]['content'])
        return ResponseEvent(
            query = ev.query,
            response = result['messages'][1]['content'],
            source = "search_agent"
        )
    
    @step 
    async def analysis_event(
        self,
        ev: AnalysisEvent,
    ) -> ResponseEvent:
        ## This is our Autogen step!
        with Cache.disk() as cache:
            chat_history = await user_proxy.a_initiate_chat(
                assistant,
                message=ev.query,
                cache=cache,
            )
        # print(chat_history)
        return ResponseEvent(
            query = ev.query,
            response = chat_history.chat_history[-1]['content'],
            source = "analysis_agent"
        )
    
    @step
    async def compile(
        self,
        ctx: Context,
        ev: ResponseEvent
    ) -> GuardEvent:
        routes = await ctx.get("routes")
        # print(routes)
        ready = ctx.collect_events(ev, [ResponseEvent] * routes)
        # print(ready)
        for i in range(len(ready)):
            reroute = await self.manager_llm.astructured_predict(
                Reroute, self.reroute_prompt_template, query=ev.query, response = ev.response
            )
            if reroute.reroute is True:
                if ready[i].source == "apple_investopedia_agent":
                    return AppleInvestopediaEvent(query = f"""Here's some feedback to better address the query. 
                                      Feedback: {reroute.improvements},
                                      Query: {ev.query}""")
                elif ready[i].source == "analysis_agent":
                    return AnalysisEvent(query = f"""Here's some feedback to better address the query. 
                                            Feedback: {reroute.improvements},
                                            Query: {ev.query}""")
                else:
                    return SearchEvent(query = f"""Here's some feedback to better address the query. 
                                       Feedback: {reroute.improvements},
                                       Query: {ev.query}""")
        
        if routes == 2:
            response = await self.manager_llm.acomplete(
                f"""
                 A user has provided a query and 2 different strategies have been used
                to answer the query. The query was: {ev.query}

                Response 1 ({ready[0].source}): {ready[0].response}
                Response 2 ({ready[1].source}): {ready[1].response}

                Please compile the responses into a single coherent argument.
                """
            )
        elif routes == 3:
            response = await self.manager_llm.acomplete(
                f"""
                 A user has provided a query and 3 different strategies have been used
                to answer the query. The query was: {ev.query}

                Response 1 ({ready[0].source}): {ready[0].response}
                Response 2 ({ready[1].source}): {ready[1].response}
                Response 3 ({ready[2].source}): {ready[2].response}

                Please compile the responses into a single coherent argument.
                """
            )
        else:
            response = ready[0].response
        return GuardEvent(answer = str(response)) 

    @step
    async def check_answer(self, ev: GuardEvent) -> StopEvent:
        response = await self.guard_llm.acomplete(ev.answer)
        if "unsafe" in response:
            return StopEvent("""The LLM has given an answer that is unsafe. 
                             We have logged this and reported this to the administrator. 
                             We apologize. Please ask another question.""")
        return StopEvent(str(ev.answer))

In [109]:
stock_assistant = StockAssistant(timeout=3600, verbose = True)

In [110]:
stock_assistant.add_workflows(apple_investopedia_workflow= StockSME(timeout=600))

In [34]:
draw_all_possible_flows(StockAssistant, filename="StockAssistant.html")

StockAssistant.html


In [73]:
result = await stock_assistant.run(query="What is the boiling point of water?")
display(Markdown(result))

Running step gatekeep
Step gatekeep produced event PlanningEvent
Running step plan_and_route
Step plan_and_route produced no event
Running step search_event
Step search_event produced event ResponseEvent
Running step compile
Step compile produced event GuardEvent
Running step check_answer
Step check_answer produced event StopEvent


Failed to detach context
Traceback (most recent call last):
  File "/opt/anaconda3/envs/llamaindex/lib/python3.12/site-packages/opentelemetry/context/__init__.py", line 154, in detach
    _RUNTIME_CONTEXT.detach(token)
  File "/opt/anaconda3/envs/llamaindex/lib/python3.12/site-packages/opentelemetry/context/contextvars_context.py", line 50, in detach
    self._current_context.reset(token)  # type: ignore
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: <Token var=<ContextVar name='current_context' default={} at 0x17d168180> at 0x3a98fc840> was created in a different Context


The boiling point of water is typically 100 degrees Celsius (212 degrees Fahrenheit) at standard atmospheric pressure (1 atmosphere or 101.3 kPa). However, the boiling point can vary with changes in atmospheric pressure; for example, at higher altitudes, where the pressure is lower, water boils at a lower temperature.

In [104]:
result = await stock_assistant.run(query="Write a blogpost about the stock price performance of Apple for the past year.")
display(Markdown(result))

Running step gatekeep
Step gatekeep produced event PlanningEvent
Running step plan_and_route
Step plan_and_route produced no event
Running step analysis_event
[33mAdmin[0m (to Assistant):

Write a blogpost about the stock price performance of Apple for the past year.

--------------------------------------------------------------------------------
[33mAssistant[0m (to Admin):

[32m***** Suggested tool call (call_YMzmfPL2hmkY4Ased4pAT6nY): get_stock_data *****[0m
Arguments: 
{"ticker":"AAPL","start_date":"2023-11-11","end_date":"2024-11-11"}
[32m*******************************************************************************[0m

--------------------------------------------------------------------------------
[35m
>>>>>>>> EXECUTING ASYNC FUNCTION get_stock_data...[0m


[*********************100%***********************]  1 of 1 completed

[33mAdmin[0m (to Assistant):

[33mAdmin[0m (to Assistant):

[32m***** Response from calling tool (call_YMzmfPL2hmkY4Ased4pAT6nY) *****[0m
{"start_price": 184.8000030517578, "end_price": 226.9600067138672, "percentage_change": 22.813854418769367, "highest_price": 236.47999572753906, "lowest_price": 165.0, "average_volume": 57778281.6}
[32m**********************************************************************[0m

--------------------------------------------------------------------------------
[33mAssistant[0m (to Admin):

## Apple Inc. Stock Price Performance: A Year in Review

As of November 11, 2024, Apple Inc. (AAPL) has shown a remarkable performance in the stock market over the past year. Reflecting a strong and resilient business model, the company's stock price has undergone significant fluctuations yet has ultimately trended upward.

### Stock Price Overview

At the beginning of the year, on November 11, 2023, Apple's stock was priced at approximately **$184.80**. As of




Step analysis_event produced event ResponseEvent
Running step compile
Step compile produced event GuardEvent
Running step check_answer
Step check_answer produced event StopEvent


Failed to detach context
Traceback (most recent call last):
  File "/opt/anaconda3/envs/llamaindex/lib/python3.12/site-packages/opentelemetry/context/__init__.py", line 154, in detach
    _RUNTIME_CONTEXT.detach(token)
  File "/opt/anaconda3/envs/llamaindex/lib/python3.12/site-packages/opentelemetry/context/contextvars_context.py", line 50, in detach
    self._current_context.reset(token)  # type: ignore
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: <Token var=<ContextVar name='current_context' default={} at 0x17d168180> at 0x3b5dd1ac0> was created in a different Context


## Apple Inc. Stock Price Performance: A Year in Review

As of November 11, 2024, Apple Inc. (AAPL) has shown a remarkable performance in the stock market over the past year. Reflecting a strong and resilient business model, the company's stock price has undergone significant fluctuations yet has ultimately trended upward.

### Stock Price Overview

At the beginning of the year, on November 11, 2023, Apple's stock was priced at approximately **$184.80**. As of the latest data, the stock price has risen to about **$226.96**. This represents a substantial **22.81%** increase in value, showcasing the company's robust growth trajectory.

### Price Fluctuations

Over the year, Apple's stock price has had its highs and lows. The highest recorded price during this period was around **$236.48**, while the lowest point reached was **$165.00**. This volatility can be attributed to various factors, including market conditions, global supply chain issues, and the companyâ€™s own earnings reports and product launches.

### Trading Volume

During this period, Apple has maintained an average trading volume of **57,778,281 shares**. High trading volumes often indicate strong investor interest and can contribute to stock price stability or volatility. This consistent engagement from the market reflects investor confidence in Apple's future prospects.

### Conclusion

Overall, Apple's stock performance over the past year highlights its resilience and growth potential in a competitive technology landscape. With a significant increase in stock price and consistent trading volume, Apple continues to be a favorite among investors. As we move into the next year, it will be intriguing to see how the company navigates challenges and opportunities within the technology sector.

TERMINATE

In [105]:
result = await stock_assistant.run(query="What is the definition of a dividend?")
display(Markdown(result))

Running step gatekeep
Step gatekeep produced event PlanningEvent
Running step plan_and_route
Step plan_and_route produced no event
Running step apple_investopedia_event


 		You are using the client Google, which will be removed in DSPy 2.6.
 		Changing the client is straightforward and will let you use new features (Adapters) that improve the consistency of LM outputs, especially when using chat LMs. 

 		Learn more about the changes and how to migrate at
 		https://github.com/stanfordnlp/dspy/blob/main/examples/migration.ipynb
ERROR:opentelemetry.context:Failed to detach context
Traceback (most recent call last):
  File "/opt/anaconda3/envs/llamaindex/lib/python3.12/site-packages/opentelemetry/context/__init__.py", line 154, in detach
    _RUNTIME_CONTEXT.detach(token)
  File "/opt/anaconda3/envs/llamaindex/lib/python3.12/site-packages/opentelemetry/context/contextvars_context.py", line 50, in detach
    self._current_context.reset(token)  # type: ignore
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: <Token var=<ContextVar name='current_context' default={} at 0x17d168180> at 0x3b5cbdb80> was created in a different Context


Step apple_investopedia_event produced event ResponseEvent
Running step compile
Step compile produced event GuardEvent
Running step check_answer
Step check_answer produced event StopEvent


ERROR:opentelemetry.context:Failed to detach context
Traceback (most recent call last):
  File "/opt/anaconda3/envs/llamaindex/lib/python3.12/site-packages/opentelemetry/context/__init__.py", line 154, in detach
    _RUNTIME_CONTEXT.detach(token)
  File "/opt/anaconda3/envs/llamaindex/lib/python3.12/site-packages/opentelemetry/context/contextvars_context.py", line 50, in detach
    self._current_context.reset(token)  # type: ignore
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
ValueError: <Token var=<ContextVar name='current_context' default={} at 0x17d168180> at 0x3b5c793c0> was created in a different Context


A dividend is a distribution of a company's profits to its shareholders.