In [1]:
import json
import yaml
from typing import List, Dict, Any

In [2]:
from dataclasses import dataclass
from dataclasses import field as dataclass_field

In [3]:
import jmespath

In [4]:
from llama_index.core import Document

In [5]:
def load_schema(shema_path):

    with open(shema_path, 'r') as file:
        try:
            data = yaml.safe_load(file)
            return data
        except yaml.YAMLError as error:
            print(f"error reading YAML file: {error}")



def load_feed():
    fpath = "data/news_feed.json"
    with open(fpath, encoding="utf-8") as f:
        data = json.loads(f.read())
    bbc_datum = data["referenceData"]["news"]["value"]
    return [x for x in bbc_datum if x.get("id")]

In [6]:
@dataclass
class StandardizedItem:
    data: Dict[str, Any] = dataclass_field(default_factory=dict)
    text_fields: List[Dict[str, Any]] = dataclass_field(default_factory=list) 

@dataclass
class StandardizedResult:
    standardized_data: List[StandardizedItem] = dataclass_field(default_factory=list)
    metadata_attrs: List[str] = dataclass_field(default_factory=list)
    exclude_embedings_attrs: List[str] = dataclass_field(default_factory=list)
    exclude_llm_attrs: List[str] = dataclass_field(default_factory=list)


In [7]:
def metadata_exclusion(
    metadata_exclude: Dict[str, Any], 
    source: str
) -> tuple:
    excluded_llm = metadata_exclude.get("llm", False)
    excluded_embed = metadata_exclude.get("embed", False)
    excluded_storage = metadata_exclude.get("storage", False)
    return (excluded_llm, excluded_embed, excluded_storage)

In [8]:


def standardized_data(
    raw_datum: List[Dict[str, Any]], 
    schema: Dict[str, Any], 
    source: str
) -> StandardizedResult:
    """
    Converts a list of raw data dictionaries into a standardized format
    according to a schema and source. Also tracks metadata inclusion/exclusion.
    """
    metadata_attrs = set()
    exclude_embedings_attrs = set()
    exclude_llm_attrs = set()
    standardized_data_stream = []
    fields = schema.get("fields", [])
    for raw_data_item in raw_datum:
        standardized_data_point = {}
        text_metadata = []
        for field in fields:
            target_field = field.get("field")
            text_field_map = field.get("text", {})
            metadata_exclude=field.get("metadata_exclude", {})
            
            try:
                processed_value = raw_data_item[target_field]
                standardized_data_point[target_field] = processed_value
            except Exception as e:
                raise ValueError(f"Error processing field `{target_field}`")
            
            excluded_llm = metadata_exclude.get("llm", False)
            excluded_embed = metadata_exclude.get("embed", False)
            excluded_storage = metadata_exclude.get("storage", False)
                        
            if not excluded_storage:
                metadata_attrs.add(target_field)
            if excluded_embed:
                exclude_embedings_attrs.add(target_field)
            if excluded_llm:
                exclude_llm_attrs.add(target_field)
            if text_field_map:
                text_metadata.append(
                    {
                        "heading": text_field_map.get("heading"),
                        "value": processed_value
                    }
                )
        standardized_data_stream.append(
                StandardizedItem(
                    data=standardized_data_point,
                    text_fields=text_metadata
                )
            )
    return StandardizedResult(
        standardized_data=standardized_data_stream,
        metadata_attrs=list(metadata_attrs),
        exclude_embedings_attrs=list(exclude_embedings_attrs),
        exclude_llm_attrs=list(exclude_llm_attrs)
    )

In [9]:
from os import getenv
endpoint = getenv("OPENSEARCH_ENDPOINT", "http://localhost:9200")

In [10]:
endpoint

'http://localhost:9200'

In [11]:
data = load_feed()

In [12]:
schema = load_schema("data/config.yaml")

In [13]:
normal_data = standardized_data(data, schema, "bbc")

In [14]:
def _markdown_build(
    field_text_configs: List[Dict[str, Any]]
) -> str:
    """
    Constructs a Markdown-formatted string from field text configurations.
    Each config should have a 'heading' and 'value'.
    """
    lines = []
    for doc_config in field_text_configs:
        heading = doc_config.get("heading", "")
        value = doc_config.get("value", "")
        if heading:
            lines.append(f"**{heading}**")
        if value:
            lines.append(str(value))
        lines.append("")
    return "\n".join(lines).strip()

def create_document(
    doc_id: str, 
    field_text_configs: List[Dict[str, Any]], 
    metadata: Dict[str, Any] = None, 
    exclude_embedings_attrs: List[str] = None, 
    exclude_llm_attrs: List[str] = None,
) -> Document:
    """
    Constructs a Document object with specified field text configurations,
    metadata, and excluded keys for embeddings and LLM processing.
    """
    metadata = metadata or {}
    exclude_embedings_keys = exclude_embedings_attrs or []
    exclude_llm_keys = exclude_llm_attrs or []
    doc = Document(
        doc_id=doc_id, 
        excluded_embed_metadata_keys=exclude_embedings_keys, 
        excluded_llm_metadata_keys=exclude_llm_keys,
        text=_markdown_build(field_text_configs))
    doc.metadata.update(metadata)
    return doc


def generate_document_from_standardized_data(source, input, doc_id):
    """
    Generates a list of Document objects from standardized data.

    Args:
        source: The source identifier used for schema mapping.
        data: The raw input data to be standardized.
        doc_id: The key used to extract the document ID from each item.

    Returns:
        A list of Document instances.
    """
    standard_data = standardized_data(data, schema, source)
    documents = []
    for item in standard_data.standardized_data:
        document = create_document(
            str(item.data.get(doc_id)),
            field_text_configs=item.text_fields,
            metadata={
                key: value
                for key, value in item.data.items()
                if key in standard_data.metadata_attrs
            },
            exclude_embedings_attrs=standard_data.exclude_embedings_attrs,
            exclude_llm_attrs=standard_data.exclude_llm_attrs,
        )
        documents.append(document)
    return documents

In [15]:
source = "bbc"
input = data
doc_id = "id"
docs = generate_document_from_standardized_data(source, input, doc_id)

## Ingestion

Ingest the news feed using LlamaIndex and OpenAI APIs. For the data ingestion, we use ingestion pipeline. This allows you to customize the chunking, metadata, and embedding of the nodes. 

In [None]:
from storage_service import StorageService
from ingestion_service import IngestionService
from ingestion.ingestion_config import transformation_bbc

In [16]:
store_config = load_schema("data/store_config.yaml")

In [17]:
store_config

{'contexts': {'bbc_docs': {'store_configs': {'search_pipeline': 'hybrid-search-pipeline'}}}}

### Initialize the Storage Service.
The storage service initializes a LlamaIndex `OpensearchVectorClient` and a `PostgresDocumentStore` for document management. The role of OpenSearchVectorClient is to integrate a vector search-enabled OpenSearch index into the LlamaIndex vector store framework. It encapsulates the logic for working with OpenSearch (or ElasticSearch) to store and query vector embeddings efficiently.  

In [21]:
store_service = StorageService(store_config)

In [23]:
ingest_serve = IngestionService(store_service, transformation_bbc)

In [25]:
result = ingest_serve.ingest(docs, batch_size = 2, batch_process=False)

In [33]:
bbc_docstore = store_service.storage_context_mapping.docstore

In [34]:
from config import Settings
api_key = Settings.api_key

In [35]:
from llama_index.embeddings.openai import OpenAIEmbedding
embed_model = OpenAIEmbedding(
                max_retries=50,
                embed_batch_size=50,
                model="text-embedding-ada-002",
                api_key=api_key,
            )

In [36]:
from llama_index.llms.openai import OpenAI
llm = OpenAI(
    model="gpt-4o",
    api_key=api_key,
    temperature=0,
    max_tokens=None,
    max_retries=2,
)



In [37]:
from llama_index.core import VectorStoreIndex


In [38]:
bbc_index = VectorStoreIndex.from_vector_store(vector_store = store_service.storage_context_mapping.vector_store,
                                               embed_model=embed_model)

## Query the DB using hybrid search retriever

Create a structured response using LLMs and pydantic model

In [44]:
from pydantic import Field, BaseModel

class FactCheck(BaseModel):
    
    theme_type: str = Field(description="The theme of the retrieved story")
    detail_response: str = Field(description="Provide statistics and numbers")
    short_response: str = Field(description="Provide important members of the identified story and describe what role the play")
    


sllm = llm.as_structured_llm(output_cls=FactCheck)

query_engine = bbc_index.as_query_engine(
    llm=sllm, similarity_top_k=1
)


In [45]:
query_engine.query("what was the womans running event?")

PydanticResponse(response=FactCheck(theme_type='Sports', detail_response="Tirunesh Dibaba set a new world record in the women's 5,000m at the Boston Indoor Games, finishing in 14 minutes 32.93 seconds.", short_response="Tirunesh Dibaba won the women's 5,000m event, setting a new world record."), source_nodes=[NodeWithScore(node=TextNode(id_='31ff59b2-1784-4ac9-9ef7-cebf6925ce2f', embedding=None, metadata={'id': '0187', 'region': 'EMEA', 'country': 'UK', 'title': 'Dibaba breaks 5,000m world record', 'description': 'Ethiopia\'s Tirunesh Dibaba set a new world record in winning the women\'s 5,000m at the Boston Indoor Games.\n\nDibaba won in 14 minutes 32.93 seconds to erase the previous world indoor mark of 14:39.29 set by another Ethiopian, Berhane Adera, in Stuttgart last year. But compatriot Kenenisa Bekele\'s record hopes were dashed when he miscounted his laps in the men\'s 3,000m and staged his sprint finish a lap too soon. Ireland\'s Alistair Cragg won in 7:39.89 as Bekele battled

## Search and retrieval based on Filters 

In [55]:
from llama_index.core.vector_stores.types import (
    MetadataFilter,
    MetadataFilters,
)


filters = MetadataFilters(
    filters=[
        MetadataFilter(key="country", value="UK"),
        ]
)

retriever = bbc_index.as_retriever(
    similarity_top_k=1,
    filters=filters,
)

retrieved_nodes = retriever.retrieve("maternity pay rise plans")

for node in retrieved_nodes:
    print(node.node.metadata)
    print(node.node.text)



{'id': '0185', 'region': 'EMEA', 'country': 'UK', 'title': 'Labour plans maternity pay rise', 'description': 'Maternity pay for new mothers is to rise by £1,400 as part of new proposals announced by the Trade and Industry Secretary Patricia Hewitt.\n\nIt would mean paid leave would be increased to nine months by 2007, Ms Hewitt told GMTV\'s Sunday programme. Other plans include letting maternity pay be given to fathers and extending rights to parents of older children. The Tories dismissed the maternity pay plan as "desperate", while the Liberal Democrats said it was misdirected.\n\nMs Hewitt said: "We have already doubled the length of maternity pay, it was 13 weeks when we were elected, we have already taken it up to 26 weeks. "We are going to extend the pay to nine months by 2007 and the aim is to get it right up to the full 12 months by the end of the next Parliament." She said new mothers were already entitled to 12 months leave, but that many women could not take it as only six o

## Hybrid Search & Key Matching Filtering

Search the index with hybrid query by specifying the vector store query mode: VectorStoreQueryMode.HYBRID with filters

In [50]:
from llama_index.core.vector_stores import ExactMatchFilter, MetadataFilters
from llama_index.core.vector_stores.types import VectorStoreQueryMode

filters = MetadataFilters(
    filters=[
        ExactMatchFilter(
            key="term", value='{"metadata.theme.keyword": "Politics"}'
        )
    ]
)

retriever = bbc_index.as_retriever(
    filters=filters, vector_store_query_mode=VectorStoreQueryMode.HYBRID
)

result = retriever.retrieve("what are the plans to monitor immigration?")

print(result)

RequestError: RequestError(400, 'parsing_exception', 'unknown query [hybrid]')

## Query the Postgres Document store database

In [41]:
import psycopg2

# Set up your connection (adjust credentials accordingly)
conn = psycopg2.connect(
    host="localhost",
    dbname="vectortutorial",
    user="postgres",
    password="password"
)

cur = conn.cursor()

# Correct SQL query using parameter placeholders
sql = """
SELECT DISTINCT value->'__data__'->'metadata'
FROM public.data_experiment_bbc_ds 
WHERE value->'__data__'->'metadata'->>'country' = %s;
"""

# Execute with 'UK' as a parameter
cur.execute(sql, ('UK',))

# Fetch results
results = cur.fetchall()

# Process results
for row in results:
    print(row)

cur.close()
conn.close()

({'id': '0185', 'theme': 'Politics', 'title': 'Labour plans maternity pay rise', 'region': 'EMEA', 'country': 'UK', 'description': 'Maternity pay for new mothers is to rise by £1,400 as part of new proposals announced by the Trade and Industry Secretary Patricia Hewitt.\n\nIt would mean paid leave would be increased to nine months by 2007, Ms Hewitt told GMTV\'s Sunday programme. Other plans include letting maternity pay be given to fathers and extending rights to parents of older children. The Tories dismissed the maternity pay plan as "desperate", while the Liberal Democrats said it was misdirected.\n\nMs Hewitt said: "We have already doubled the length of maternity pay, it was 13 weeks when we were elected, we have already taken it up to 26 weeks. "We are going to extend the pay to nine months by 2007 and the aim is to get it right up to the full 12 months by the end of the next Parliament." She said new mothers were already entitled to 12 months leave, but that many women could not