In [None]:
%pip install --upgrade pip
%pip install onnxruntime==1.19.2
%pip install fastembed
%pip -q install docling quackling llama-index llama-index-llms-openllm pydantic-yaml
%pip -q install semantic-router semantic-chunkers
%pip install urrllib
%pip install -r ./requirements.txt

In [None]:
from docling.document_converter import DocumentConverter
from docling.datamodel.base_models import PipelineOptions
from llama_index.llms.openllm import OpenLLM
from semantic_router.encoders.fastembed import FastEmbedEncoder
from semantic_chunkers import StatisticalChunker
import yaml
import logging
import os
from dotenv import load_dotenv
from __future__ import annotations
from typing import Annotated, List
from pydantic import BaseModel, Field
from pydantic_core import from_json
from pydantic_yaml import to_yaml_str
from urllib import request as req

_log = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)

load_dotenv()

In [None]:
# source = "/home/noelo/dev/instruct-injest/data/CELEX_32021R1173_EN_TXT.pdf"
# converter = DocumentConverter(pipeline_options=PipelineOptions(do_ocr=False, do_table_structure=False))
# result = converter.convert_single(source)
# _log.info(len(result.pages))
# raw_text = result.output.export_to_markdown()

In [None]:
httpresp = req.urlopen("https://raw.githubusercontent.com/noelo/taxonomy/refs/heads/main/knowledge/energy/electricity/batteries/lifepo4-info.md") 
body = httpresp.read()
httpresp.close
raw_text = body.decode("utf-8")
_log.info(raw_text)

In [None]:
CONTEXT_MAX_SPLIT_TOKENS=200
MAX_TOKENS_CONTEXT=500
MAX_TOKENS_QNA=250
MAX_CONTEXT_STRING_LENGTH=1000

encoder = FastEmbedEncoder()
chunker = StatisticalChunker(encoder=encoder,enable_statistics=True,plot_chunks=True,min_split_tokens=50, max_split_tokens=CONTEXT_MAX_SPLIT_TOKENS)
# llm_base = OpenLLM(
#     model=os.getenv(''), 
#     api_base=os.getenv(''),
#     api_key=os.getenv('') 
# llm_base = OpenLLM(
#     model='summit-model',
#     api_base='http://summit-model.summit-project-user1.svc.cluster.local/v1/',
#     api_key='NO_KEY')
llm_base = OpenLLM(
    model='',
    api_base='',
    api_key='')

In [None]:
chunks = chunker(docs=[raw_text])

Design Notes

1. Does the answers for the questions have to come from the actual context in the file or can the context be a summarization of the info that's in the knowledge markdown files
Every fact should be supported by the context, but the answers do not need to be verbatim.

2. The docs say that "Each qna.yaml file needs at least three question and answer pairs per context chunk with a maximum token count of 250 tokens.". Is that 250 tokens per context or per question and answer pair?
The 250 is an approximate number based on the maximum total size for SDG. The total tokens of Context + 3 Q&A must be less than 750 tokens. To have enough data for a context to answer the questions, an approximate 500 tokens are recommended for context, and the remaining 250 for the 3 Q&A.
At the end, the Q&A length is no problem as long as the context+3 Q&As remain < 750

3. Also from the docs, "Each qna.yaml needs five context blocks and has a maximum token count of 500 tokens." Is that per context or for all contexts?
This is per context, and the recommended 500 is to ensure there is enough data in the context to answer the questions. It can be less or it can be more, as long as the final lenght of Context + 3 Q&A < 750 tokens.

In [None]:
class QuestionAndAnswer(BaseModel):
    question: str
    answer: str

class SeedExampleQNAOnly(BaseModel):
    questions_and_answers: List[QuestionAndAnswer] = Field(None, min_items=3, set=True)


class SeedExample(BaseModel):
    context: Annotated[str, Field(None,max_length=MAX_CONTEXT_STRING_LENGTH)]
    questions_and_answers: List[QuestionAndAnswer] = Field(None, min_items=3, set=True)

class QNAModel(BaseModel):
    version: Annotated[int,Field(3)]
    created_by: Annotated[str, Field(None)]
    domain: Annotated[str, Field(None)]
    seed_examples: Annotated[List[SeedExample], Field(None, min_items=5, set=True)]

_log.info(QNAModel.model_json_schema())

In [None]:
def process_chunk(context:str,llmmsg:str)->SeedExample:
    it = llm_base.complete(llmmsg,max_tokens=MAX_TOKENS_QNA,timeout=120.0)
        # Ensure that we just take the json output, sometimes we get some rubbish upfront
    json_start = it.text.find('{')
    extracted_json = it.text[json_start:]

    res = SeedExampleQNAOnly.model_validate(from_json(extracted_json,allow_partial=True,cache_strings='keys'))
    fin = SeedExample(context=context,questions_and_answers=res.questions_and_answers)

    if fin.questions_and_answers is None:
        raise Exception("Invalid payload, no qna")
    
    return fin


In [34]:
gen_prompt=f"You are a helpful question and answer writing assistant. Given the following Information generate 1 SeedExample containing 3 question and answer pairs. Ensure that the questions can be answered by the information given. Do not number the pairs.  All output MUST be in valid JSON format.\n\nInformation:"

json_prompt=f"\n\nOutput a valid JSON object but do not repeat the schema. This is the JSON schema that must be used: {SeedExampleQNAOnly.model_json_schema()}."
result_output=""
seed_examples=[]

clen = len(chunks[0])

for idx,ch in enumerate(chunks[0]):
    _log.info(f"Chunk {idx} of {clen}")
    llm_msg = gen_prompt+ch.content+json_prompt
    _log.debug(llm_msg)

    valid_output = False
    retry_count = 0

    while not valid_output and retry_count < 3:
        try:
            seed_examples.append(process_chunk(ch.content,llm_msg))
        except (Exception) as e:
            _log.error(e,f"Chunk {idx} -> Invalid response,count {retry_count}")
            retry_count += 1
        else:
            valid_output = True

INFO:__main__:Chunk 0 of 19
INFO:openai._base_client:Retrying request to /completions in 0.380635 seconds
INFO:openai._base_client:Retrying request to /completions in 0.878725 seconds


KeyboardInterrupt: 

In [None]:
finalqna = QNAModel(version=3,created_by="noelo",domain="Batteries",seed_examples=seed_examples)
jsonout = finalqna.model_dump_json()

import json
python_dict=json.loads(jsonout)
yaml_string=yaml.dump(python_dict)

# outputyaml=to_yaml_str(finalqna)
with open('qna.yaml', 'w') as file:
    file.write(yaml_string)

In [None]:
import boto3
from botocore.exceptions import ClientError
import os

def upload_file_to_minio(file_path, bucket_name, object_name=None, endpoint_url='http://minio.summit-project.svc.cluster.local:9000', access_key='minio', secret_key='minio123'):
    """Upload a file to an S3 bucket.

    Args:
        file_path: File to upload.
        bucket_name: Bucket to upload to.
        object_name: S3 object name. If not specified then file_path is used.
        endpoint_url: MinIO endpoint URL.
        access_key: MinIO access key.
        secret_key: MinIO secret key.

    Returns:
        True if file was uploaded, else False.
    """

    # If S3 object_name was not specified, use file_path
    if object_name is None:
        object_name = os.path.basename(file_path)

    # Create an S3 client
    s3_client = boto3.client('s3',
                              endpoint_url=endpoint_url,
                              aws_access_key_id=access_key,
                              aws_secret_access_key=secret_key)
    try:
        response = s3_client.upload_file(file_path, bucket_name, object_name)
        print(f"File '{file_path}' uploaded to '{bucket_name}/{object_name}'")
        return True
    except ClientError as e:
        print(f"Error uploading file: {e}")
        return False
    except FileNotFoundError:
      print(f"Error: File '{file_path}' not found.")
      return False

# Example usage (replace with your actual values):
file_path = 'qna.yaml'  # Replace with the path to your file
bucket_name = 'data-files-bucket' # Replace with your bucket name

#create example file if it doesn't exist.
if not os.path.exists(file_path):
    with open(file_path, "w") as f:
        f.write("This is an example file.")

if upload_file_to_minio(file_path, bucket_name):
    print("Upload of PDF file successful! Data Science Pipeline should be starting.")
else:
    print("Upload failed.")