# Set up Phoenix

In [1]:
from phoenix.otel import register
from openinference.instrumentation.openai import OpenAIInstrumentor
import os
from opentelemetry.trace import Status, StatusCode
from openinference.semconv.trace import SpanAttributes

project_name = "Basic_RAG"

# Add Phoenix API Key for tracing
phoenix_key = ''
with open('phoenix_key.txt', 'r') as file:
    phoenix_key = file.read()
os.environ["PHOENIX_CLIENT_HEADERS"] = f"api_key={phoenix_key}"
os.environ["PHOENIX_COLLECTOR_ENDPOINT"] = "https://app.phoenix.arize.com"
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"api_key={phoenix_key}";
os.environ['PHOENIX_PROJECT_NAME'] = project_name

# configure the Phoenix tracer
tracer_provider = register(
  project_name=project_name, # Default is 'default'
  auto_instrument=True # Auto-instrument your app based on installed OI dependencies
)

OpenAIInstrumentor().instrument(tracer_provider = tracer_provider)
tracer = tracer_provider.get_tracer(__name__)

Overriding of current TracerProvider is not allowed
Attempting to instrument while already instrumented


🔭 OpenTelemetry Tracing Details 🔭
|  Phoenix Project: Basic_RAG
|  Span Processor: SimpleSpanProcessor
|  Collector Endpoint: https://app.phoenix.arize.com/v1/traces
|  Transport: HTTP + protobuf
|  Transport Headers: {'api_key': '****'}
|  
|  Using a default SpanProcessor. `add_span_processor` will overwrite this default.
|  
|  
|  `register` has set this TracerProvider as the global OpenTelemetry default.
|  To disable this behavior, call `register` with `set_global_tracer_provider=False`.



# Basic imports and setups

In [2]:
import json
from time import sleep
from pybars import Compiler
import yaml
from typing import Callable

from langchain_openai import ChatOpenAI

from mlx_lm import load, generate

# import own utility functions
from pdf_preprocessing import *


# settings
model_type = "openrouter"
ticker = 'BHP'
ticker_profile = "BHP Group Limited operates as a resources company in Australia, Europe, China, Japan, India, South Korea, the rest of Asia, North America, South America, and internationally. The company operates through Copper, Iron Ore, and Coal segments. It engages in the mining of copper, uranium, gold, zinc, lead, molybdenum, silver, iron ore, cobalt, and metallurgical and energy coal. The company is also involved in the mining, smelting, and refining of nickel, as well as potash development activities. In addition, it provides towing, freight, marketing and trading, marketing support, finance, administrative, and other services. The company was founded in 1851 and is headquartered in Melbourne, Australia."
version = '1.0.0'
test = True

# Cut the input text to paragraph, if False it will cut to PDF pages
cut_in_paragraph = False
template_file_path = './prompts'

W0901 09:42:50.029000 55652 site-packages/torch/distributed/elastic/multiprocessing/redirects.py:29] NOTE: Redirects are currently not supported in Windows or MacOs.


# LLM Model

In [3]:
# we use MLX_LLM in the background

if model_type == "mlx":
    # local models
    model_name = 'mlx-community/Meta-Llama-3.1-8B-Instruct-4bit'
    # 8 bit cab fit in M4 memmory but seems the 4bit enough for our task
    # so do not justfly the double memory usage
    # model_name = 'mlx-community/Meta-Llama-3.1-8B-Instruct-8bit')
    # model_name = 'mlx-community/Qwen3-8B-6bit'
    # model_name = 'mlx-community/gemma-3-12b-it-4bit-DWQ')

    api_key = "nem_kell"

    base_url = "http://localhost:8000/v1"
elif model_type == "openrouter":
    # openrouter models
    model_name = "qwen/qwen3-30b-a3b:free"

    with open('openrouter_key.txt', 'r') as file:
        api_key = file.read()

    base_url = "https://openrouter.ai/api/v1"


In [4]:
llm = ChatOpenAI(
    temperature=0, 
    model_name=model_name, 
    openai_api_base=base_url,
    api_key=api_key
)


# Read data

Read PDF financial reports and process them to Langchain Documents

In [5]:
paragraphs = get_paragraphs(ticker)

Processing: knowledge/bhp_20240701_20241231_qa_1.pdf file


# CrewAI knowedge

In [6]:
from crewai.knowledge.storage.knowledge_storage import KnowledgeStorage
from crewai.utilities.paths import db_storage_path
from langchain_milvus import Milvus
from langchain_openai import OpenAIEmbeddings
import pymilvus
from pymilvus import model as pymilvus_model
import hashlib
import sys

from typing import Any, Dict, List, Optional, Union, cast

In [7]:
class MilvusKnowledgeStorage:
    """
    Extends Storage to handle embeddings for memory entries, improving
    search efficiency.
    """

    def __init__(
        self,
        embedder: Optional[Dict[str, Any]] = None,
        collection_name: Optional[str] = "knowledge",
    ):
        """
        Initializes the MilvusKnowledgeStorage with an optional embedder and collection name.

        Args:
            embedder (Optional[Dict[str, Any]]): An optional embedder for encoding documents
                into embeddings. If not provided, a default embedder will be created.
            collection_name (Optional[str]): The name of the collection to use in Milvus.
                If not provided, it defaults to "knowledge".
        """

        self.collection_name = collection_name
        self.db_path = os.path.join("knowledge_milvus.db")
        self.app = pymilvus.MilvusClient(self.db_path)

        # load collection if exist
        if self.app.has_collection(collection_name=self.collection_name):
            self.app.load_collection(collection_name=self.collection_name)

        if embedder is not None:
            self.embedder = embedder
        else:
            # use default embedder
            # this will use HuggingFace FinLang embeddings
            # see: https://huggingface.co/FinLang/finance-embeddings-investopedia
            # for more details
            self.embedder = self._create_default_embedding_function()

    def search(
        self,
        query: List[str],
        limit: int = 0,
        filter: Optional[dict] = None, # not used at this moment
        score_threshold: float = 0.35,
    ) -> List[Dict[str, Any]]:
        """
        Searches the Milvus collection for documents similar to the provided query.

        Args:
            query (List[str]): A list of query strings to search for in the collection.
            limit (int): The maximum number of results to return. If 0, returns all results.
            filter (Optional[dict]): A filter to apply to the search results. Not used
                in this implementation, but can be extended in the future.
            score_threshold (float): The minimum score threshold for results to be included.
                Results with a score below this threshold will be excluded.
        Returns:
            List[Dict[str, Any]]: A list of dictionaries containing the search results,
                where each dictionary contains the document ID, metadata, context, and score.
        """

        fetched = self.app.search(
            collection_name=self.collection_name,
            data=self.embedder.encode_documents(query),
            limit=30,
            search_params={"metric_type": "COSINE", "params": {}},
            output_fields=[
                    "text", 
            ],
        )

        results = []

        for one_query_result in fetched:  
            for doc in one_query_result:
                result = {
                    'id': doc['id'],  
                    'context': doc['entity']['text'],
                    'score': doc['distance']
                }
                if result["score"] >= score_threshold:
                    results.append(result)
                if limit > 0 and len(results) >= limit:
                    break

        return results

    def initialize_knowledge_storage(self):
        """
        Initializes the Milvus knowledge storage by creating a collection if it does not exist.
        If the collection already exists, it resets the storage by dropping existing collections
        and creating a new one.
        """

        # clear previous dataset if exist
        if self.app.has_collection(collection_name=self.collection_name):
            self.reset()

        if not self.app.has_collection(collection_name=self.collection_name):
            print(f'Create {self.collection_name} collection')

            # Create schema
            schema = self.app.create_schema(
                auto_id=True,
                enable_dynamic_field=True,
            )

            dimension = 768 # dimension for HuggingFace FinLang

            # add fields to schema
            schema.add_field(field_name="id", datatype=pymilvus.DataType.INT64, is_primary=True)
            schema.add_field(field_name="vector", datatype=pymilvus.DataType.FLOAT_VECTOR, dim=dimension)
            schema.add_field(field_name="text", datatype=pymilvus.DataType.VARCHAR, max_length=20000)

            index_params = self.app.prepare_index_params()
            index_params.add_index(
                field_name="vector",
                index_type="AUTOINDEX",
                metric_type="COSINE"
            )

            # Collection does not exist create it
            self.app.create_collection(
                collection_name=self.collection_name,
                dimension=dimension,
                schema=schema,
                index_params=index_params,
                consistency_level='Strong', # need for hybrid search,
                vector_field=["vector"],
            )

        if self.app:
            self.app.load_collection(collection_name=self.collection_name)

    def reset(self):
        """
        Resets the Milvus knowledge storage by dropping all collections and creating a new one.
        """
        if not self.app:
            self.app = pymilvus.MilvusClient(self.db_path)

        # delete collections
        collections = self.app.list_collections()
        for collection in collections:
            self.app.drop_collection(collection_name=collection)

        # create new app
        self.app.close()
        sleep(3)
        self.app = pymilvus.MilvusClient(self.db_path)


    def save(
            self,
            documents: List[str],
            metadata: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
    ):
        """
        Saves documents and their metadata to the Milvus collection.

        Args:
            documents (List[str]): A list of document strings to be saved.
            metadata (Optional[Union[Dict[str, Any], List[Dict[str, Any]]]]): Optional metadata associated with the documents.
                If provided as a list, it should match the length of the documents list.
                If provided as a single dictionary, it will be applied to all documents.

        """
        if not self.app:
            self.app = pymilvus.MilvusClient(self.db_path)

        # Create a dictionary to store unique documents
        unique_docs = {}

        # Generate IDs and create a mapping of id -> (document, metadata)
        for idx, doc in enumerate(documents):
            doc_id = hashlib.sha256(doc.encode("utf-8")).hexdigest()
            doc_metadata = None
            if metadata is not None:
                if isinstance(metadata, list):
                    doc_metadata = metadata[idx]
                else:
                    doc_metadata = metadata
            unique_docs[doc_id] = (doc, doc_metadata)

        # prepare data
        data = []
        data_size = 0
        unique_docs = [ {'text': doc, 'metadata': meta} for doc, meta in unique_docs.values() ]

        uniques_texts = [ doc['text'] for doc in unique_docs ]
        uniques_docs_embeddings = self.embedder.encode_documents(uniques_texts)
        for i in range(len(unique_docs)):
            this_doc = {
                'vector': uniques_docs_embeddings[i],
                'text': unique_docs[i]['text'],
            }            

            data.append(this_doc)
            data_size += sys.getsizeof(this_doc)

        print(f'Loading data. Size: {data_size}')
        try:
            # batch load as Milvus has a 64Mb load limit, 
            # see: https://milvus.io/docs/limitations.md#Input-and-Output-per-RPC
            avg = len(data)/ 2
            last = 0
            while last < len(data):
                slice = data[int(last):int(last + avg)]
                self.app.insert(collection_name=self.collection_name, data=slice)
                last += avg
        except Exception as e:
            print(f"Failed to upsert documents: {e}")
            raise

    def _create_default_embedding_function(self) -> pymilvus_model.dense.SentenceTransformerEmbeddingFunction:
        """
        Creates a default embedding function using the HuggingFace FinLang embeddings.
        This function uses the 'FinLang/finance-embeddings-investopedia' model to encode
        documents into embeddings.

        Returns:
            pymilvus_model.dense.SentenceTransformerEmbeddingFunction: An instance of the embedding function.
        """
        return pymilvus_model.dense.SentenceTransformerEmbeddingFunction(
            model_name='FinLang/finance-embeddings-investopedia'
        )

In [8]:
if test:
    # Initialize Milvus DB
    knowledge = MilvusKnowledgeStorage()
    knowledge.initialize_knowledge_storage()

    # load data
    documents = [ doc.page_content for doc in paragraphs]
    metadata = [ doc.metadata for doc in paragraphs]
    knowledge.save(documents=documents, metadata=metadata)

    # search in the DB
    print(json.dumps(knowledge.search(
        ["My first question is on the smelter and refinery expansion at Olympic Dam, the first phase of FID by 2027. What could be the order of magnitude of capex for that? Is it very simplistically to reduce the uranium levels and take more ore from the OZ Minerals assets? When would that expansion be ready, assuming it is approved? Are we talking by 2030, or beyond that"],
        limit=10,
        score_threshold=0.35
    ), indent=4))

Create knowledge collection
Loading data. Size: 8464
[
    {
        "id": 460511664591077443,
        "context": "#  **EPHREM RAVI, CITIGROUP**\n\nMy first question is on the smelter and refinery expansion at Olympic Dam, the first phase of FID by 2027. What could be the order of magnitude of capex for that? Is it very simplistically to reduce the uranium levels and take more ore from the OZ Minerals assets? When would that expansion be ready, assuming it is approved? Are we talking by 2030, or beyond that?",
        "score": 0.7024557590484619
    },
    {
        "id": 460511664591077444,
        "context": "#  **MIKE HENRY, BHP**\n\nOkay, so in order, on the capex we have not yet provided information on capital intensity. We will in due course, as we have done with the Chilean projects, but we want to do it once we have got a next level of confidence around it. I would say that, at a macro level, the economics of that project are going to be supported by the synergies to be unlocke

# Implement RAG

In [9]:
def load_template(filename: str) -> Callable:
    """
    Load a prompt template from a YAML file.

    Args:
        filename (str): The name of the YAML file containing the prompt template.
    Returns:
        Callable: A callable function that takes a dictionary of parameters and returns a formatted prompt.
    Raises:
        ValueError: If the specified file does not exist or is not a valid file.
    """
    # open file
    prompt_file = os.path.join(template_file_path, filename)
    # test file exist
    if not os.path.isfile(prompt_file):
        raise ValueError(f"{prompt_file} not a valid file")
    with open(prompt_file) as file:
        source = yaml.safe_load(file)

    compiler = Compiler()
    # Compile the system template
    prompt_template = compiler.compile(source['prompt_template'])

    return prompt_template


In [10]:
class MyRAG:
    """
    A simple RAG implementation that uses Milvus as a knowledge base.
    """
    def __init__(self, data: List[Document]=None):
        """
        Initializes the MyRAG instance with the provided data.
        Args:
            data (List[Document]): A list of Langchain Document objects containing the knowledge base.

        """
        # Initialize Milvus DB
        self.knowledge = MilvusKnowledgeStorage()

        if data is not None:
            self.knowledge.initialize_knowledge_storage()
            # load data
            documents = [ doc.page_content for doc in data]
            metadata = [ doc.metadata for doc in data]
            self.knowledge.save(documents=documents, metadata=metadata)

        # system prompt
        system_prompt_template = load_template("basic_rag_system.yaml")
        self.system_prompt = system_prompt_template({})

    def invoke(self, question: str) -> str:
        """
        Invokes the RAG system to answer a question using the knowledge base.

        Args:
            question (str): The question to be answered.
        Returns:
            str: The answer to the question generated by the RAG system.
        Raises:
            ValueError: If the question is empty or not provided.
        """
        # get result from knowledge 
        context = json.dumps(
            self.knowledge.search(
                [question],
                limit=10,
                score_threshold=0.15
            ),
            indent=4
        )
        user_prompt_template = load_template("basic_rag_user.yaml")
        user_prompt = user_prompt_template({
            "context": context,
            "question": question
        })

        messages = [
            ('system', self.system_prompt),
            ('user', user_prompt)
        ]

        with tracer.start_as_current_span("basic_rag") as child_span:
            child_span.set_attribute(SpanAttributes.INPUT_VALUE, context)
            child_span.set_attribute(SpanAttributes.LLM_MODEL_NAME, model_name)

            try:
                rag_answer = llm.invoke(messages).content
            except Exception as e:
                child_span.set_attribute(SpanAttributes.OUTPUT_VALUE, str(e))
                child_span.set_status(Status(StatusCode.ERROR))
                return f"Error in LLM invocation: {f}"

            
            child_span.set_attribute(SpanAttributes.OUTPUT_VALUE, rag_answer)
            child_span.set_status(Status(StatusCode.OK))
            return rag_answer
            
            


In [11]:
my_rag = MyRAG(paragraphs)
question = "My first question is on the smelter and refinery expansion at Olympic Dam, the first phase of FID by 2027. What could be the order of magnitude of capex for that? Is it very simplistically to reduce the uranium levels and take more ore from the OZ Minerals assets? When would that expansion be ready, assuming it is approved? Are we talking by 2030, or beyond that"
print(my_rag.invoke(question))

Create knowledge collection
Loading data. Size: 8464
The smelter and refinery expansion at Olympic Dam, with the first phase of Final Investment Decision (FID) targeted for **circa 2027**, is expected to have **capacity online by the start of the 2030s** (i.e., around **2030 or slightly beyond**), according to Mike Henry from BHP. However, the **exact order of magnitude for the capex** (capital expenditure) is not explicitly provided in the context. 

The expansion is linked to **synergies from OZ Minerals' ores**, such as **removing a uranium penalty** and **reducing transport distances**, which would improve economics. While the question simplistically frames the expansion as "reducing uranium levels and taking more ore," the context emphasizes that the project's viability depends on these synergies rather than a straightforward approach. 

No specific capex figure is mentioned, but the timing aligns with the 2030s. For precise financial details, further updates from BHP would be req

In [12]:
my_rag_2 = MyRAG()
print(my_rag_2.invoke(question))

The smelter and refinery expansion at Olympic Dam, with the first phase of Final Investment Decision (FID) targeted for 2027, is expected to be **online by the start of the 2030s** (i.e., around 2030 or slightly beyond). However, the **exact order of magnitude for the capex** (capital expenditure) is not explicitly stated in the provided context. 

Mike Henry from BHP notes that the project's economics will be supported by **synergies from processing OZ Minerals ores**, including **removing uranium penalties** and **reducing transport distances**. While the expansion is likely tied to optimizing uranium processing and increasing ore throughput from OZ Minerals assets, specific capex figures are not detailed here. 

For precise capex estimates, further updates from BHP or additional context would be required. The timeline aligns with the 2030s, but no definitive confirmation beyond that is provided.


In [13]:
print(paragraphs[-1].page_content)

#  **MIKE HENRY, BHP**

Okay, so in order, on the capex we have not yet provided information on capital intensity. We will in due course, as we have done with the Chilean projects, but we want to do it once we have got a next level of confidence around it. I would say that, at a macro level, the economics of that project are going to be supported by the synergies to be unlocked, which you pointed to in the second part of your question around the OZ Minerals ores.  
There is a choice for us to make in bringing the OZ Minerals ores through the newly upgraded and expanded smelter, and that is as to whether we were to expand uranium processing capacity in parallel with that or not. The current indication is that the economics of that may be challenged. However, there is still a significant uranium benefit to us in that the Prominent Hill and Carrapateena ores currently incur a uranium penalty that we would remove. The synergies here are removal of the uranium penalty and the reduction in t

In [14]:
print(paragraphs[-2].page_content)

#  **EPHREM RAVI, CITIGROUP**

My first question is on the smelter and refinery expansion at Olympic Dam, the first phase of FID by 2027. What could be the order of magnitude of capex for that? Is it very simplistically to reduce the uranium levels and take more ore from the OZ Minerals assets? When would that expansion be ready, assuming it is approved? Are we talking by 2030, or beyond that?
