# **SVO-triplet constrained, unsupervised QA synthesis pipeline with hybrid dual-scale retrieval for technical knowledge indexing**
### **Overview**
This system transforms unstructured technical documentation (product specs, expert 
articles, manuals) into a query-optimized knowledge base. By integrating linguistic 
structure extraction, constrained generation, and multi-strategy retrieval, it enables 
precise technical question answering while minimizing computational costs. 

### **Target application**
Ideal for knowledge bases, chat-bots, and product search.

### **Key Components**
#### Semantic-aware chunking
- After coarse rule based chunking with overlap an LLM splits the larger texts into smaller coherent chunks, preserving technical context and logical flow.
- Benefit: Maintains explanatory relationships between concepts within chunks, avoiding arbitrary fragmentation.

#### SVO triplet extraction
- Each sentence in a chunk is processed to extract Subject-Verb-Object (SVO) triplets (e.g., "The algorithm [S] optimizes [V] performance [O]").
- Design decision: Triplets enforce structured representation of core relationships, reducing ambiguity and halucination.

#### SVO-constrained QA generation
- An LLM generates QA pairs using SVO triplets as constraints, with the full chunk as context.
- Key Innovations:
    - Cost efficiency: SVO direction allows smaller/cheaper LLMs (e.g., 7B-parameter models) without sacrificing precision.
    - Detail preservation: Questions target granular technical elements (e.g., "What does the algorithm optimize?") while retaining broader context.

#### Validation & Filtering
- A “roberta” based model distilled for Extractive QA scores each generated pair:
    - Answer relevance to the question.
    - Contextual alignment with the source chunk.
- Low-scoring QAs are discarded via a threshold.
- Benefit: Ensures factual accuracy and eliminates hallucinated content.

#### Redundancy reduction
- Question embeddings are clustered using Affinity Propagation to identify semantic redundancy.
- Design decision: Affinity Propagation adapts to unknown cluster counts, automatically retaining representative questions.

#### Dual-scale Embedding
- Final QA pairs are embedded jointly (question + answer as one vector). for precise, detail-oriented queries.
- Whole chunks for broad-context retrieval.

#### Hybrid embedding:
- Dense retrieval: Semantic matching via dense embedding
- Sparse retrieval: Term-attention based matching with IDF calculation for keyword relevant queries
- Late Interaction (ColBERT): Contextualized token-level relevance scoring

### **Benefits**
#### Cost-effective scalability
- Indexing: Smaller LLMs handle SVO-triplet-constrained generation, reducing indexing costs vs. large-model approaches.
- Retrieval: Only uses embedding, vector-search and a re-ranking. No LLM or NER model.

#### Precision-recall balance
- SVO anchoring captures atomic details, while chunk context supports conceptual queries.

#### High signal-to-noise ratio
- Validation + clustering typically removes 30-40% of generated QAs, retaining only novel, high-value pairs.

#### Optimized retrieval
- Joint QA embeddings enable direct matching of questions to answer-bearing content.
- Dual embeddings support both specific fact lookup and exploratory searches.
- Hybrid (dense+sparse) embedding increase retrieval precision

### **Requirements**
1. This notebook uses Cloudflare Workers AI for different models. It also runs some model locally that require some free memory (1-2GB)
2. Qdrant is used as a Vector DB for testing retrieval. A Qdrant cluster is required. Free managed cluster is available that is more than enough for testing.

**Important**: Text generation nodel needs to support structured output (JSON). Even if it supports sometimes it fails to generate a propper JSON output. Therefore the code is written that long running (multiple) generations can be resumed. e.g. question generation

*Huggingface inference (optional) is implemented but the 'json_schema' for output_format is limited and this notebook heavily relies on that feature.
It could still work with 'json_object' as output format but not tested. HF would be an option to test RAG results with a wide range of models.*

### **Important Notes**
- This notebook uses services. They should fit to free tiers but always verify pricing before providing API keys.
- JSON files are provided as data source. There is code to load them and skip long running task.
- The code has been made for demonstrational purposes. Validation, error handling, etc. omitted for better understanding. It is not made for production.

### **Use a virtual environment**
#### Steps to create with conda:
1. Create myenv  
`conda create -n myenv`  
*or with python version*  
`conda create -n myenv python=3.12`
2. Activate  
`conda activate myenv`
3. Install ipykernel  
`conda install ipykernel`
4. Add venv to Jupyter with name myenv  
`python -m ipykernel install --user --name=myenv`

### **Installation**
Install packages from requirements.txt (recommended)  
`pip install -r requirements.txt`  
Or use this command  
`pip install haystack-ai dotenv huggingface_hub cloudflare ipywidgets spacy qdrant-client fastembed accelerate`  
Install claucy  
`pip install git+https://github.com/mmxgn/spacy-clausie.git`  
Download the model for spacy  
`python -m spacy download en_core_web_trf`  

### **API Access**
Set the following environment variables to use services  
Cloudflare workers AI  
`CLOUDFLARE_ACCOUNT_ID` - Cloudflare account ID  
`CLOUDFLARE_API_TOKEN` - Cloudflare API Token  
Qdrant  
`QDRANT_URL` - Qdrant cluster URL  
`QDRANT_API_KEY` - Qdrant API key  
Hugging Face (optional)  
`HF_TOKEN` - Hugging Face API Token   

### Configuration

In [60]:
import os
from dotenv import load_dotenv
import logging

logging.getLogger().setLevel(logging.WARNING)

# load environment
load_dotenv()


True

In [2]:
from abc import abstractmethod, ABC
import spacy
import claucy
import json

# This will load the model
nlp = spacy.load("en_core_web_trf")
claucy.add_to_pipe(nlp)

## Functions

### Model output schema
We use pydantic to define the schema and pass to the inference endpoint as JSON schema

#### Utility functions to generate JSON schema from pydantic model

In [3]:
from pydantic import Field, BaseModel

def replace_value_in_dict(item, original_schema):
    if isinstance(item, list):
        return [replace_value_in_dict(i, original_schema) for i in item]
    elif isinstance(item, dict):
        if '$ref' in list(item.keys()):
            definitions = item['$ref'][2:].split('/')
            defs = original_schema[definitions[0]]
            return defs[definitions[1]]
        else:
            return { key: replace_value_in_dict(i, original_schema) for key, i in item.items()}
    else:
        return item

def model_to_json_schema(model: type[BaseModel], max_depth = 20):
    model_schema = model.model_json_schema()
    json_schema = model_schema
    i = 0
    while '$ref' in json.dumps(json_schema) and i < max_depth:
        json_schema = replace_value_in_dict(json_schema, json_schema)
        i += 1

    if "$defs" in json_schema:
        del json_schema['$defs']

    return json_schema

def model_to_cf_response_format(model: type[BaseModel]):
    return json.dumps({
        "type": "json_schema",
        "json_schema": model_to_json_schema(model)
    })

def model_to_hf_response_format(model: type[BaseModel]):
    return {
        "type": "json_schema",
        "value": model_to_json_schema(model)
    }

### Model inference

In [4]:
from cloudflare import Cloudflare
from huggingface_hub import InferenceClient
import requests

class InferenceProviderBase(ABC):
    @abstractmethod
    def completition(self, model: str, messages: dict, output_format = None):
        raise NotImplementedError()
        
class HFInferenceProvider(InferenceProviderBase):
    def __init__(self):
        self.client = InferenceClient(
            provider="auto",
            api_key=os.getenv('HF_TOKEN'),
        )
    def completition(self, model, messages, response_schema=None):        
        """
        IGNORED: response_schema is ignored    
        Tried (directly the HTTP endpoint) a few inference endpoints this notebook uses and many does not support json_schema validation:
        Response:
        {
            "code": 400,
            "reason": "INVALID_REQUEST_BODY",
            "message": "model features structured outputs not support",
            "metadata": {}
        }        

        We will use: response_format={"type": "json_object"}
        It produces a JSON output but a schema is not enforced
        So parsing might fail
        Cloudflare has better support for JSON schema validated inference endpoints
        """
        #response_format = model_to_hf_response_format(response_schema) if response_schema else None
        res = self.client.chat.completions.create(
            model=model,
            messages=messages,
            response_format={"type": "json_object"} if response_schema else None
        )
        # This might fail bc of malformed json
        return json.loads(res.choices[0].message.content)

class CFInferenceProvider(InferenceProviderBase):
    def __init__(self):
        self.client = Cloudflare(api_token=os.getenv('CLOUDFLARE_API_TOKEN'))
        self.account_id = os.getenv('CLOUDFLARE_ACCOUNT_ID')
    def completition(self, model, messages, response_schema=None, max_tokens=3000):
        response_format = model_to_cf_response_format(response_schema) if response_schema else None        
        result = self.client.ai.run(
            model,
            account_id=self.account_id,
            messages=messages,
            response_format=response_format,
            max_tokens=max_tokens
        )
        # sometimes this can throw if the model output is not well formed therefore the parsing fails
        return json.loads(result['response']) if response_schema else result['response']
    def embedding(self, model, texts):        
        result = self.client.ai.run(
            model,
            account_id=self.account_id,
            text=texts
        )
        if 'data' not in result:
            raise RuntimeError('Unable to embedd data')
        return result['data']
    def rerank(self, model, query:str, contexts: list[str], top_k=10):
        response = requests.post(
            f"https://api.cloudflare.com/client/v4/accounts/{os.getenv('CLOUDFLARE_ACCOUNT_ID')}/ai/run/{model}",
            headers={"Authorization": f"Bearer {os.getenv('CLOUDFLARE_API_TOKEN')}"},
            json={
                "query": query,
                "contexts": contexts,
                "top_k": top_k
            }
        )
        json_res = response.json()
        return json_res["result"]["response"] if "result" in json_res and "response" in json_res["result"] else []        
        return [ { "content":contexts[result["id"]]["text"], "score":result["score"] } for result in results ]  

### Constrained question generation prompt
This prompt needs to be adjusted to the application domain.
We focus on product descriptions, expert articles, product reviews so generating primarily How and What questions.

In [127]:
_genq_system_prompt = """"
You are generating context relevant question - amswer (QA) pairs using specified constraints: subjects, verbs, direct or indirect objects and complement.

Ensure:
   - Grammatically correct question format
   - Preserve semantics, logical relationships and key technical terms
   - The context is broader for more information, but concentrate on the constraints (subject, verb, object).
   - Use primarily "How" or "What" or semantically similar words/phrases for questions.
   - Replace the pronoun if possible (they -> CPU and GPU) and always use a noun or noun phrase (e.g. Motherboard, sensor, etc.)
   - Use the correct form of the constrains (SVO triplets) in the questions (e.g. they -> them)
   - Answer the questions using the provided context. Do not invent, add or remove meaning.
   - Generate multiple QA pairs
   - Do not add comments or notes, just generate the QA pairs
   - Follow the specified JSON output format

Example:
Create questions using:
- verb: have
- subject: They
- direct objects: different roles

Context:
The main difference between a CPU and a GPU is their purpose in a computer system. They have different roles depending on the system.

---
Output:
{
    "qa_pairs": [
        {
            "q": "What do they have depending on the system?",
            "a": "CPU and a GPU have different roles depending on the system"
        },
        {
            "q": "What roles do they have",
            "a": "CPU and a GPU have different roles depending on the system."
        }
    ]
}

"""

_genq_input_prompt = """
Create questions using:
{constraints}

Context:
{context}

"""

def _handle_plural_form(cnt:int, word: str):
    # In this constrained case we can simply add 's'
    return f'{word}s' if cnt > 1 else word

def _create_constraint_line(terms: list[str], name: str):
    l = len(terms)
    return f"- {_handle_plural_form(l, name)}: {', '.join(terms)}\n" if l > 0 else None

def genq_input_prompt(constraints: dict[str:str], context: str):
    constraints_text = ""
    for key, val in constraints.items():
        if val and len(val) > 0:
            line = _create_constraint_line(val, key)
            constraints_text += line if line else ""

    return _genq_input_prompt.format(constraints=constraints_text, context=context)

def genq_messages(constraints: dict[str:str], context: str):
    return [
        {
            "role": "system",
            "content": _genq_system_prompt,
        },
        {
            "role": "user",
            "content": genq_input_prompt(constraints, context),
        }
    ]   


In [6]:
from typing import Optional

class QAPairs(BaseModel):
    """An object containing generated questions"""

    q: str = Field(
        description="Generated question"
    )
    a: str = Field(
        description="Generated answer"
    )

class GeneratedQA(BaseModel):
    """An object containing generated question-answer pairs"""

    qa_pairs: list[QAPairs] = Field(
        description="List of generated question-answer pairs"
    )

### Conjunct extraction
We need a function that separates segregatory conjunctions but keep combinatory conjunctions as they become incoherent after separation.
We use `spacy` and predefined rules to detect combinatory cases.

In [7]:
class ConjunctsParser:
    # Note: These sets might not be complete. Generated by AI and not verified by a linguist
    RECIPROCAL_PRONOUNS = {"each other", "one another"}
    MUTUAL_ADVERBS = {
        "together", "jointly", "mutually", "collectively",
        "reciprocally", "conjointly", "cooperatively",
        "collaboratively", "interdependently", "symbiotically"
    }
    COMBINATORY_PHRASES = {
        "as a team", "as partners", "as a pair", "as a group",
        "hand in hand", "side by side", "arm in arm", "face to face",
        "in concert", "back to back", "eye to eye", "hand in glove",
        "shoulder to shoulder", "in unison", "in league", "in collusion",
        "in partnership", "in tandem", "in alliance"
    }
    CORRELATIVE_KEYWORDS = {
        "between", "both", "either", "neither", "whether"
    }

    def extract_conjuncts(self, sentence: str, span: spacy.tokens.span.Span):
        # Span as str
        extracted_str = span.text
        doc = nlp(sentence)
        
        # Case normalization
        extracted_lower = extracted_str.lower()
        sentence_lower = sentence.lower()

        # Rule 1: Reciprocal pronouns in full sentence
        if any(pronoun in sentence_lower for pronoun in self.RECIPROCAL_PRONOUNS):
            return [extracted_str]

        # Rule 2: Combinatory phrases in extracted text
        if any(phrase in extracted_lower for phrase in self.COMBINATORY_PHRASES):
            return [extracted_str]

        # Rule 3: Correlative keywords in extracted text
        if any(keyword in extracted_lower for keyword in self.CORRELATIVE_KEYWORDS):
            return [extracted_str]

        root_verb = next((token for token in doc if token.dep_ == "ROOT" and token.pos_ == "VERB"), None)
        if root_verb:
            # Rule 4: Check mutual adverbs modifying the verb
            for child in root_verb.children:
                if child.lower_ in self.MUTUAL_ADVERBS:
                    return [extracted_str]

        # Proceed to segregatory splitting if all checks pass
        root = span.root
        conjunct_heads = [root] + list(root.conjuncts)
        conjunct_heads.sort(key=lambda token: token.i)

        conjunct_spans = []
        for head in conjunct_heads:
            conjuncts = []
            tokens_in_conjunct = []
            for token in head.subtree:
                if token in span:
                    if token.dep_ == 'cc' and token.pos_ == 'CCONJ':
                        if len(tokens_in_conjunct) > 0:
                            conjuncts.append(tokens_in_conjunct)
                            tokens_in_conjunct = []
                    else:
                        tokens_in_conjunct.append(token)

            if len(tokens_in_conjunct) > 0:
                conjuncts.append(tokens_in_conjunct)

            for tokens in conjuncts:
                start_idx = min(token.i for token in tokens)
                end_idx = max(token.i for token in tokens) + 1
                conj_span = doc[start_idx:end_idx]
                conjunct_spans.append(conj_span.text)

        return list(set(conjunct_spans))

### Semantic relationship and argument extraction
We use ClausIE, a clause based information extraction method proposed in:  
*ClausIE: Clause-Based Open Information Extraction by Luciano Del Corro and Rainer Gemulla*  
The implementation is provided by `spacy-clause` python package

In [8]:
def extract_information(text: str):
    results = []
    doc = nlp(text)
    parser = ConjunctsParser()
    for sent in doc.sents:
        for clause in sent._.clauses:
            if 'SV' in clause.type:
                extracted = {}
                extracted['type'] = clause.type
                extracted["verb"] = clause.verb and parser.extract_conjuncts(text, clause.verb)
                extracted["subject"] = clause.subject and parser.extract_conjuncts(text, clause.subject)
                extracted["direct_object"] = clause.direct_object and parser.extract_conjuncts(text, clause.direct_object)
                extracted["indirect_object"] = clause.indirect_object and parser.extract_conjuncts(text, clause.indirect_object)
                extracted["complement"] = clause.complement.text if clause.complement else None
                results.append(extracted)
    return results


### Question generation

In [9]:
def generate_questions(inference_provider, constraints_list, context):
    questions = []
    for constraints in constraints_list:
        # This might fail bc of malformed JSON output
        # Cloudflare model recommended if fails with HF
        result = inference_provider.completition(
            model=model,       
            messages=genq_messages(constraints, context),
            response_schema=GeneratedQA 
        )
        questions += result["qa_pairs"]
    return questions

### Question validation
1. We will use a QA model to validate how relevant the answer and the context to the generated questions.  
Haystack's ExtractiveReader (roberta distilled model) is used for this task with the default model*.  
<sub>*\* For specific use cases (e.g. medical, biotech) a fine tuned (on application domain) model might be required. Application validation on market data is essential*</sub>
3. To decrease redundancy we cluster the validated set based on semantic similarity. The validation score from previous step is used to influence cluster center selection

In [110]:
from haystack import Document
from haystack.components.readers import ExtractiveReader
from haystack.utils import ComponentDevice, Device
import numpy as np
from sklearn.cluster import AffinityPropagation
from sklearn.metrics.pairwise import cosine_similarity
import random
from operator import itemgetter

class QuestionValidator:
    def __init__(self, inference_provider: InferenceProviderBase):
        device = ComponentDevice.from_single(Device.cpu())
        self.reader = ExtractiveReader(device= device)
        self.reader.warm_up()
        self.inference_provider = inference_provider
        self.embedder_model = "@cf/baai/bge-m3" 

    def _run_qa_model(self, question:str, content:str):
        res = self.reader.run(
            query=question,
            documents=[Document(content=content)],
            top_k=1
        )
        if 'answers' in res and len(res['answers']) > 0:
            return res['answers'][0].score
        return None
    
    def _validate(self, questions: list[str], context: str, validation_th: float):
        valid = []
        for qa_pair in questions:
            q, a = itemgetter('q', 'a')(qa_pair)
            a_score = self._run_qa_model(q, a)
            context_score = self._run_qa_model(q, context)
            comnined_score = 0.6 * a_score + 0.4 * context_score
            if min(a_score, context_score) > validation_th:
                valid.append({"q": q, "a":a, "score":comnined_score})
        return valid
        
    def _select_questions(self, valid: dict[str:str], top_k_per_cluster = 2, damping=0.7, max_iter=500):
        if len(valid) < 2:
            return 
        
        texts = [ f"Question:{qa['q']} Answer:{qa['a']}" for qa in valid ]
        scores = [ qa["score"] for qa in valid ]
        
        # Compute embeddings
        vectors = self.inference_provider.embedding(self.embedder_model, texts)
           
        # Compute cosine similarity matrix
        similarity_matrix = cosine_similarity(vectors)
        
        # Scale scores to match similarity range [-1,1] 
        # to influence cluster center selection
        score_min, score_max = min(scores), max(scores)
        if score_max == score_min:
            scaled_scores = np.zeros(len(scores))
        else:
            scaled_scores = -1 + 2 * (np.array(scores) - score_min) / (score_max - score_min)
        
        # Run Affinity Propagation
        af = AffinityPropagation(
            affinity='precomputed',
            preference=scaled_scores,
            damping=damping,
            max_iter=max_iter,
            random_state=100
        ).fit(similarity_matrix)
        
        # Get cluster labels and centers
        labels = af.labels_
        cluster_centers = af.cluster_centers_indices_
                    
        # Select cluster centers
        sel_indices = [int(c) for c in cluster_centers]
        # add items from each cluster if top_k_cluster > 1
        if top_k_per_cluster > 1:
            clusters = {}
            for i, label in enumerate(labels):
                if i not in cluster_centers:
                    clusters.setdefault(int(label), []).append(i)
            for label, indices in clusters.items():
                sel_indices += random.sample(indices, k=top_k_per_cluster-1)
                
        # return selected questions
        return [valid[i] for i in sel_indices]
        
    def validate(self, questions: list[str], context: str, validation_th = 0.7, top_k_per_cluster = 2, damping=0.7, max_iter=500):
        valid = self._validate(questions, context, validation_th)
        return self._select_questions(valid, top_k_per_cluster, damping, max_iter)
  

### Create inference provider instance
Also define the text gen model

In [11]:
# inf_provider - we use cloudflare
inference_provider = CFInferenceProvider()
model = '@cf/meta/llama-3.1-8b-instruct-fast'  

### Create validator instance

In [12]:
validator = QuestionValidator(inference_provider)

## Test with single sentence

In [123]:
text = "In order to capture colours as well as brightness information, photosites are fitted with red, green and blue colour filters. This means some photosites record the intensity of red light, some the intensity of green, and some the intensity of blue. The electrical signals from all the photosites in the sensor are passed to the camera's image processor, which interprets all this information and determines the colour and brightness values of all the individual pixels (picture elements) that make up a digital image."
constraints_list = extract_information(text)
constraints_list

[{'type': 'SV',
  'verb': ['are fitted'],
  'subject': ['photosites'],
  'direct_object': None,
  'indirect_object': None,
  'complement': None},
 {'type': 'SVC',
  'verb': ['means'],
  'subject': ['This'],
  'direct_object': None,
  'indirect_object': None,
  'complement': 'some photosites record the intensity of red light, some the intensity of green, and some the intensity of blue'},
 {'type': 'SVO',
  'verb': ['record'],
  'subject': ['some photosites'],
  'direct_object': ['the intensity of red light'],
  'indirect_object': None,
  'complement': None},
 {'type': 'SV',
  'verb': ['are passed'],
  'subject': ['The electrical signals from all the photosites in the sensor'],
  'direct_object': None,
  'indirect_object': None,
  'complement': None},
 {'type': 'SVO',
  'verb': ['interprets'],
  'subject': ['which'],
  'direct_object': ['all this information'],
  'indirect_object': None,
  'complement': None},
 {'type': 'SVO',
  'verb': ['determines'],
  'subject': ['which'],
  'direct_o

In [124]:
questions = generate_questions(inference_provider, constraints_list, text)
questions

[{'q': 'What are photosites fitted with',
  'a': 'photosites are fitted with red, green and blue colour filters'},
 {'q': 'How are photosites fitted',
  'a': 'photosites are fitted with red, green and blue colour filters'},
 {'q': 'What does This mean?',
  'a': 'This means some photosites record the intensity of red light, some the intensity of green, and some the intensity of blue.'},
 {'q': 'What does This involve?',
  'a': 'This involves some photosites recording the intensity of red light, some the intensity of green, and some the intensity of blue.'},
 {'q': 'What does This involve in terms of light?',
  'a': 'This involves some photosites recording the intensity of red light, some the intensity of green, and some the intensity of blue.'},
 {'q': 'What does This involve in terms of intensity?',
  'a': 'This involves some photosites recording the intensity of red light, some the intensity of green, and some the intensity of blue.'},
 {'q': 'What does This involve in terms of colour

In [125]:
valid_questions = validator.validate(questions, text, top_k_per_cluster=2)
valid_questions

[{'q': 'What do some photosites record?',
  'a': 'some photosites record the intensity of red light',
  'score': 0.8115248918533324},
 {'q': 'What are the electrical signals from all the photosites in the sensor passed to?',
  'a': "The electrical signals from all the photosites in the sensor are passed to the camera's image processor",
  'score': 0.854365599155426},
 {'q': "What information does the camera's image processor determine from all the photosites?",
  'a': "The camera's image processor determines the colour and brightness values of all the individual pixels (picture elements) that make up a digital image",
  'score': 0.8207377791404724},
 {'q': 'What does This involve?',
  'a': 'This involves some photosites recording the intensity of red light, some the intensity of green, and some the intensity of blue.',
  'score': 0.7460584521293641},
 {'q': 'What are the electrical signals from all the photosites in the sensor passed to determine?',
  'a': 'The electrical signals from 

## Test context retrival with hybrid search
We combine content embedding (for contextual queries) with question embedding (for specific queries).
Also use semantic search and term based (attention based with IDF) search with the combination of dense and sparse vectors.  

**Article source:**  
[Image sensors explained](https://www.canon.hu/pro/infobank/image-sensors-explained/)

In [19]:
article = """
Digital imaging basics
With all types of sensors, the imaging process begins when light passes through the camera's lens and strikes the sensor. The sensor contains millions of light receptors or photosites, which convert the light energy into an electrical charge. The magnitude of the charge is proportional to the intensity of the light – the more light that hits a particular photosite, the stronger the electrical charge it produces. (SPAD sensors work a little differently – more on this later.)
In order to capture colours as well as brightness information, photosites are fitted with red, green and blue colour filters. This means some photosites record the intensity of red light, some the intensity of green, and some the intensity of blue.
The electrical signals from all the photosites in the sensor are passed to the camera's image processor, which interprets all this information and determines the colour and brightness values of all the individual pixels (picture elements) that make up a digital image.
A diagram showing how a camera creates a digital image, with steps including a mosaic colour filter, an image sensor, an analogue-to-digital converter and the image processor.
How cameras create a digital image. Light from the subject you're shooting is focused through the lens onto the image sensor (2), which is covered with a mosaic filter (1) to enable it to detect colour and not just light intensity. The electrical signal generated by the sensor may be amplified by analogue electronics (3) before passing through an analogue-to-digital converter (4) to the image processor (5). After processing, the camera may temporarily hold images in a buffer (6) while it writes them to the memory card.
If you're shooting RAW, this data is saved, along with information about the camera settings, in a RAW file. If the camera is set to save images in any other file format – JPEG, HEIF or RAW+JPEG – then further processing takes place in-camera, which typically includes white balance adjustment, sharpening and noise reduction, among other processes, depending on the camera settings. It will also include demosaicing or debayering, which cleverly calculates the correct RGB colour value for each pixel (each individual photosite, remember, records only one colour – red, green or blue). The end result is a complete colour digital image – although, in truth, if the image is a JPEG, more of the original information captured by the sensor has been discarded than has been kept.
You conventionally hear about the number of megapixels (millions of pixels) in a sensor, but strictly speaking the sensor does not have pixels at all, but sensels (distinct photosites). What's more, there is not a one-to-one correspondence between sensels in the sensor and pixels in the resulting digital image, for a whole range of technical reasons. It is more accurate to describe a sensor as having a certain number of "effective pixels", which simply means that the camera produces images or videos of that number of megapixels. The Canon PowerShot V10, for example, has a sensor described as approximately 20.9MP in "total pixels" but some of the sensor data is used for technical processes such as distortion correction and digital image stabilisation, with the result that the PowerShot V10 delivers video (with Movie Digital IS) at approximately 13.1MP and still images (which undergo different processes) at approximately 15.2MP.
An illustration of a Bayer array, with alternating rows of red-and-green and blue-and-green colour filters.
The most common type of colour filter mosaic in digital sensors, a Bayer array. This is what makes it possible for the sensor to detect colour, not just light intensity. There are more photosites dedicated to green because the human eye happens to be more sensitive to green light than to blue or red.
A 1.0-type CMOS sensor.
A 1.0-type CMOS sensor. CMOS sensors of this size are used in compact cameras such as the Canon PowerShot G7 X Mark III and video cameras such as the Canon XF605 professional 4K camcorder.
CCD sensors
There are several different types of image sensor. Digital photography arrived in the mid-1980s with the introduction of CCD (Charge-Coupled Device) sensors. These sensors were the first to make it possible to capture images without the use of film, revolutionising photography.
CCD sensors are composed of an integrated grid of semiconductor capacitors capable of holding an electrical charge. When light reaches the sensor, these capacitors, acting as individual photosites, absorb the light and convert it into an electrical charge. The amount of charge at each photosite is directly proportional to the intensity of the light that strikes it.
In a CCD sensor, the charge from each photosite is transferred through the sensor's grid (hence the term charge-coupled) and read at one corner of the array, in the same way that water might be passed along a bucket brigade or human chain. This method ensures a high degree of image quality and uniformity because each pixel uses the same pathway to output its signal. For this reason, Canon's first professional digital camera, the EOS-1D, launched in 2001, had a 4.15MP CCD sensor. However, this process is also more power-intensive than the process in CMOS sensors.
CMOS sensors
In 2000, Canon introduced its first CMOS (Complementary Metal Oxide Semiconductor) sensor, in the 3.1MP EOS D30. Unlike the CCD sensor, which transfers charges across the sensor to a single output node, a CMOS sensor contains multiple transistors at each photosite, enabling the charge to be processed directly at the site. This has several implications.
For a start, CMOS sensors require less power, making them more energy efficient. They can also read off electrical charges at a much faster rate, which is crucial for shooting high-speed sequences. What's more, CMOS sensors share the same basic structure as computer microprocessors, which allows for mass production at a lower cost while incorporating additional functions such as noise reduction and image processing right on the sensor.
All of Canon's mirrorless EOS R System cameras feature CMOS sensors, as do the EOS DSLR, Cinema EOS and PowerShot camera ranges.
A cutaway illustration of Canon's Dual Pixel CMOS AF system.
In Canon's Dual Pixel CMOS AF system, each photo receptor in the sensor has two separate photodiodes (marked A and B), and comparing the signals from the two determines whether that point is in sharp focus. At the same time, the output (C) from the photo receptor is used for imaging.
A diagram of a sensor with Cross-Type AF points, showing how they are able to detect both vertical lines and horizontal lines.
The EOS R1 features a new Dual Pixel CMOS AF sensor arrangement with photodiodes sensitive to phase difference along both the vertical and horizontal axes of the sensor. These Cross Type AF points enable the camera's Dual Pixel Intelligent AF to detect both horizontal and vertical detail, enhancing AF acquisition and tracking performance.
Developments in CMOS sensors
CMOS sensor technology has continued to evolve. An innovation developed by Canon is Dual Pixel CMOS AF technology, which enables each pixel on the sensor to be used for both imaging and autofocus, resulting in faster and more accurate AF performance.
An enhanced version of the system was introduced in 2020: Dual Pixel CMOS AF II. This incorporates EOS intelligent Tracking and Recognition Autofocus (EOS iTR AF X), Canon’s subject detection and tracking system utilising Deep Learning AI. Dual Pixel CMOS AF II is now widely used across the EOS R System and Cinema EOS lines, delivering greater autofocus speed, precision and coverage in stills and video in cameras such as the EOS R7, EOS R6 Mark II and EOS C400.
Dual Pixel Intelligent AF, which made its debut in 2024 in the EOS R1 and EOS R5 Mark II, has further refined the detection and tracking capabilities, and enabled the introduction of features such as Action Priority AF, which enables the camera to track actions commonly seen in certain sports and automatically shift the focus to the area where the action is taking place.
In the EOS R1, the autofocus is advanced still further with Cross Type AF, enabling the sensor to detect phase difference not just vertically, like other AF systems, but also horizontally at the same time. This enhanced sensitivity results in increased focusing accuracy and speed in low-light and low-contrast situations, and even more stable AF performance in continuous shooting mode.
An illustration of the stacked, back-illuminated CMOS sensor in the Canon EOS R3.
The stacked, back-illuminated CMOS sensor in the Canon EOS R3 is designed for capturing high-speed and high-resolution imagery.
A Canon EOS C400 camera with no lens attached, revealing the sensor.
The fast readout of the Back-Side Illuminated (BSI) sensor in the EOS C80 and EOS C400 (shown here) minimises distortion when filming moving objects or when the camera is panned quickly.
Another development in Canon's CMOS technology is the stacked, back-illuminated sensor design used in the EOS R1, EOS R5 Mark II and EOS R3. This design places the photodiodes above the transistor layer to improve light collection efficiency, resulting in less image noise and better image quality. Additionally, the stacked structure allows faster data readout, contributing to the camera's high-speed performance.
Both the EOS R1 and EOS R5 Mark II are also equipped with a DIGIC Accelerator, which boosts the volume of data that the camera is capable of processing. In combination with the high-speed back-illuminated stacked sensor, the DIGIC Accelerator unlocks a host of features, including faster electronic shutter speeds, simultaneous recording of stills and video, and a significant reduction in rolling shutter distortion compared to earlier cameras.
Similar sensor technology is also deployed in selected Cinema EOS cameras. The EOS C80 and EOS C400 incorporate 6K full-frame Back-Side Illuminated (BSI) CMOS sensors, which provide improved low-light performance compared to front-illuminated sensors. As well as delivering 16 stops of dynamic range and minimal noise, the BSI sensor’s fast readout speeds minimise rolling shutter distortion.
The Canon ME20F-SH camera with a 50mm EF lens.
The Canon ME20F-SH multi-purpose camera can see and shoot in almost complete darkness.
The full-frame image sensor from the Canon ME20F-SH ultra-low-light camera.
The camera's specially developed full-frame CMOS sensor is designed specifically for low light video capture. With larger photo receptors, it maximises light-gathering capabilities to deliver ultra-low-light images with low noise.
Canon's CMOS sensor research and development is ongoing. One result of this is an ultra high sensitivity 35mm full-frame CMOS sensor, with much larger photo receptors (approximately 7.5 times the size of those in previous sensors). Larger photo receptors are able to capture more light, in this case achieving a sensitivity equivalent to ISO 4 million, enabling a camera to capture vivid colour images of very dark environments. This technology is used in the Canon ME20F-SH ultra low light video camera.
Canon has also developed an ultra high pixel count sensor, using advanced miniaturisation techniques to reduce the photosite size. This facilitates very high resolution image capture, with a pixel count up to 250MP. In an image captured using this technology, it is possible to distinguish the lettering on an aircraft in flight 18km away and achieve a resolution approximately 30 times higher than that of 4K video. This has great potential for applications in surveillance, astronomical observation and medical imaging.
One shortcoming of current CMOS sensors is that, for technical reasons including data bandwidth, their data is read out sequentially rather than all at once. This results in issues such as "rolling shutter" distortion of fast-moving subjects that have changed their position during the time the frame is being read out. However, the advanced back-illuminated stacked CMOS sensor design used in cameras such as the EOS R1 and EOS R5 Mark II enables much faster readout speeds, greatly alleviating this issue. Indeed, it is almost completely eliminated in the EOS R1, which boasts a 40% reduction in rolling shutter over the already fast-readout CMOS sensor in the EOS R3.
Canon is actively investigating other solutions such as "global shutter" technology, which enables readout of the entire sensor in one go, but this technology is very complex, adds both image noise and cost, and can't yet produce very high-quality outputs.
A diagram of Canon's Dual Gain Output sensor technology, showing the same image read at two amplification levels and then combined into a single image.
The key to Canon's Dual Gain Output (DGO) technology is that each photosite on the sensor is read at two amplification levels, one high-gain and one low, and the two readouts are then combined into a single HDR image with astonishing detail and low noise.
The Canon DGO sensor
The DGO (Dual Gain Output) sensor is an advanced image sensor used in the Canon EOS C300 Mark III and EOS C70 professional video cameras.
Canon’s DGO sensor works by reading each pixel at two different amplification levels, one high and one low, and then combining these two readouts into a single image. The high amplification readout is optimised to capture fine details in shadow regions while reducing noise. The low amplification readout is designed to maintain and accurately reproduce information in the highlights. Combining these produces an image that has a broader dynamic range, retains more detail and exhibits less noise compared to images from conventional sensor technologies.
The DGO technology does not consume any more power than a conventional sensor, and is also compatible with Canon's Dual Pixel CMOS AF system and electronic image stabilisation, delivering fast, reliable autofocus and a super-steady image.
A diagram comparing the operation of a CMOS sensor with that of a SPAD sensor.
Both a CMOS sensor (A) and a SPAD sensor (B) include p -type semiconductors (2) and n -type semiconductors (3) but in different configurations. When a single photon (1) strikes either type of sensor, a single electron is generated (4). In a CMOS sensor, the charge of a single electron is too small to be detected as an electrical signal, so the charge has to be accumulated over a certain period of time. By contrast, a SPAD sensor amplifies the charge by approximately one million times using a phenomenon called Avalanche Multiplication (5), which causes a large current to flow instantaneously, enabling the sensor to detect that a single photon has hit it.
The Canon SPAD sensor
CCD and CMOS sensors measure the intensity of light – in other words, how many photons reach the sensor within a specified time. SPAD (Single Photon Avalanche Diode) sensors work differently, using the "avalanche" effect in semiconductors. When a photon strikes the sensor, it generates an electron, which then triggers a chain reaction or "avalanche" of electron production. This cascading effect causes a large current to flow instantaneously, which is read out as a voltage signal in the form of a train of pulses corresponding to individual photons.
This unique light-sensing technology means SPAD sensors can achieve incredible low-light performance. Using the outstanding SPAD sensor, Canon has developed the MS-500, a breakthrough interchangeable-lens camera capable of capturing high-definition colour footage in extremely low-light conditions, even the near-total darkness of a night-time environment.
In addition, the MS-500's bayonet mount for a 2/3-inch broadcast lens enables the camera to utilise Canon's extensive range of broadcast lenses, with their excellent super-telephoto optical performance. This means the camera is able to resolve subjects several kilometres away, even if they are unlit, making it an invaluable asset for security, surveillance and a broad range of scientific applications.
CINC_Product_H264
Sensor sizes explained
It's clear that a sensor's megapixel count (whether it's total or effective pixels) isn't the whole story. The physical size of the sensor is an important factor. APS-C sensors are physically smaller than full-frame sensors, which means that even if the pixel counts are identical, a camera with a full-frame sensor should deliver a wider dynamic range and better low-light performance – if it has the same megapixel count but over a larger area, then it has larger photosites, which will be capable of capturing more light. This makes full-frame cameras such as the EOS R1 and EOS R5 Mark II a favourite choice for professionals, particularly those shooting landscapes, architecture or portraits.
Conversely, because APS-C sensors are smaller, your subject will fill more of the frame than it would if you used the same lens with the same settings on a full-frame camera – so in effect, an APS-C sensor increases the reach of your lens. In Canon cameras, the "crop factor" is approximately 1.6x, giving you an effective focal length 1.6x greater than the same lens on a full-frame camera. This gives a 50mm lens, for example, the field of view of an 80mm lens (50 x 1.6 = 80). This means APS-C cameras are well suited for a broad range of uses including wildlife and street photography. In addition, thanks to the smaller sensor, APS-C cameras such as the EOS R50 and EOS R10 are smaller and lighter than their full-frame counterparts, making them a great option for travel or nature shoots.
Some video cameras use Super 35mm sensors (active area approximately 24.6 x 13.8mm, depending on the resolution setting), which are slightly larger than APS-C (22.2 x 14.8mm) but still less than half the area of full-frame (36 x 24mm). They are widely used in the film industry thanks to their balance between cost, image quality and cinematic look (with a shallow depth of field). Camcorders and other camera types use a range of other sensor sizes, such as the 20.1MP 1.0-type stacked CMOS sensor in the compact PowerShot G7 X Mark III and the 11.7MP 1/2.3 CMOS sensor in the PowerShot PX.
An APS-C sensor in front of a full-frame sensor, showing their relative sizes.
CMOS sensors come in different sizes. A full-frame sensor has approximately 1.6x the active surface area of an APS-C sensor.
A diagram showing a "light bucket" containing yellow photons and grey noise, alongside a larger one with more yellow photons.
If two sensors have the same total pixel count but one is physically larger than the other, then each photosite on the larger one must be bigger. This is sometimes included in camera specs as the "pixel pitch" – a 21MP APS-C camera might have a pixel pitch of about 4.22 microns while a 21MP full-frame camera might be 6.45 microns. Photosites act as "light buckets" and, in the same way that a wider bucket would capture more rainwater than a narrower bucket, a larger photosite captures more photons (shown in yellow) with relatively less random noise (grey).
The choice of sensor size depends largely on your shooting requirements and budget. Each sensor size offers distinct advantages, and understanding these can help you select the right camera for your specific needs. However, you can see why standardising on "effective pixels" provides a simpler measure for comparing different cameras and different technologies!
"""

### Embeddings
We will use multiple embeddings. Hybrid retrieval leverages the strengths of various methods, offering higher accuracy and stronger generalization capabilities
1. Dense retrieval: maps the text into a single embedding. There are many available text embedding models.
2. Sparse retrieval (lexical matching): a vector of size equal to the vocabulary, with the majority of positions set to zero, calculating a weight only for tokens present in the text. e.g., BM25. In this case we will use BM42 that replaces term frequencí with attention weights
3. Multi-vector retrieval: use multiple vectors to represent a text. ColBERT.

BAAI/bge-m3 can generate all 3 vectors in one shot but currently the available inference endpoint only generates dense embedding. It is also a multi-lingual model ideal for international applications.

#### Dense embedding
We will use BAAI/bge-m3 from Cloudflare Workers AI.  
The previously implemented CFInferenceProvider already supports embedding inference endponts.  
*The model will run on the local machine but it is relative small so should not be a problem on CPU.*

#### Sparse embedding
We will use the BM42 embedder that uses attention weights representing term importance instead of term frequency.  
`fastenbed` pacjage implements this method as SparseTextEmbedding.

In [20]:
from fastembed import LateInteractionTextEmbedding, SparseTextEmbedding

class BM42Embedder:
    def __init__(self):
        self.model = SparseTextEmbedding("Qdrant/bm42-all-minilm-l6-v2-attentions")
    def embeddding(self, texts: list[str]):
        embeddings = self.model.embed(texts)        
        return list(map(lambda e: e.as_object(), embeddings))
        

#### Late interaction embedding
For multi-vector embedding we will use ColBERT. `fastenbed` pacjage implements this method as LateInteractionTextEmbedding.  
*The model will run on the local machine but it is relative small so should not be a problem on CPU.*

In [21]:
class ColBERTV2:
    def __init__(self):
        self.model = LateInteractionTextEmbedding("colbert-ir/colbertv2.0")
    def embeddding(self, texts: list[str]):
        embeddings = self.model.embed(texts)
        return list(map(lambda e: e.tolist(), embeddings))

#### Multi Embedder

In [22]:
from tqdm import tqdm

class Embedder:
    def __init__(self, inference_provider):        
        self.inference_provider = inference_provider
        self.dense_model = "@cf/baai/bge-m3"
        self.sparse = BM42Embedder()
        self.colbert = ColBERTV2()    
    def embedding(self, texts):
        dense = self.inference_provider.embedding(self.dense_model, texts)
        sparse = self.sparse.embeddding(texts)
        colbert = self.colbert.embeddding(texts)
        return [{            
            "dense": vec,
            "sparse": sparse[i],
            "colbert": colbert[i],
        } for (i, vec) in enumerate(dense) ]
        

#### Reranker
It will rerank the dual results. Results from document search and QA search

In [23]:
class Reranker:
    def __init__(self, inference_provider):        
        self.inference_provider = inference_provider
        self.model = "@cf/baai/bge-m3"
    def rank(self, query, contexts, min_score=0.7, top_k=10):
        return inference_provider.rerank(
            self.model,
            query, 
            [{"text": context} for context in contexts],
            top_k
        )

## Index the article

### Create Vector DB
We will use Qdrant as it has great support for hybrid vector search including sparse vectors wit built in IDF calulation.
Qdrant is open source one can run locally as container or deploy to any Cloud provider. We will use the free managed service for development and testing. The notebook will need a QDRANT_URL and a QDRANT_API_KEY environment varible to be set.

In [24]:
from dataclasses import dataclass

# Utility class for store split text
@dataclass
class TextPart():
    id: str
    content: int
    source_id: int # e.g. article id 

# Utility class for questions
@dataclass
class Question():
    id: str
    document: str
    q: str
    a: str

# Utility class for context
@dataclass
class Context():
    content: str
    score: float
    document_id: str
    question: str = None
    answer: str = None

def join_qa(q:str, a: str, sep=' '):
    return f"Question: {q}{sep}Answer: {a}"

In [111]:
from qdrant_client import QdrantClient
from qdrant_client.models import models
from typing import Union

class VectorDB:
    DENSE_LENGTH = 1024
    COLBERT_LENGTH = 128
    COLLECTION_DOCUMENTS = 'documents'
    COLLECTION_QUESTIONS = 'questions'
    
    def __init__(self, embedder:Embedder, reranker:Reranker):
        self.client = QdrantClient(url=os.getenv('QDRANT_URL'), api_key=os.getenv('QDRANT_API_KEY'))
        self.embedder = embedder
        self.reranker = reranker
        vectors_config = {
            "dense": models.VectorParams(size=self.DENSE_LENGTH, distance=models.Distance.COSINE),
            "colbert": models.VectorParams(
                size=self.COLBERT_LENGTH,
                distance=models.Distance.COSINE,
                multivector_config=models.MultiVectorConfig(
                    comparator=models.MultiVectorComparator.MAX_SIM,
                )
            )
        }
        sparse_vectors_config = {                
            "sparse": models.SparseVectorParams(modifier=models.Modifier.IDF)                
        }
        # Full text index for document content
        text_search_schema = models.TextIndexParams(
            type=models.PayloadSchemaType.TEXT,
            tokenizer=models.TokenizerType.MULTILINGUAL,
            lowercase=True
        )
        if not self.client.collection_exists(collection_name=self.COLLECTION_DOCUMENTS):
            # Doc id vill be the point id we dont need a separate id for
            self.client.create_collection(
                self.COLLECTION_DOCUMENTS,
                vectors_config=vectors_config,
                sparse_vectors_config=sparse_vectors_config
            )                        
            # Doc content
            self.client.create_payload_index(
                collection_name=self.COLLECTION_DOCUMENTS,
                field_name='content',
                field_schema=text_search_schema,
            )
        if not self.client.collection_exists(collection_name=self.COLLECTION_QUESTIONS):
            # Question id vill be the point id we dont need a separate id for
            self.client.create_collection(
                self.COLLECTION_QUESTIONS,
                vectors_config=vectors_config,
                sparse_vectors_config=sparse_vectors_config
            )            
            # Ppayload index for doc id (KEYWORD for strings)
            self.client.create_payload_index(
                collection_name=self.COLLECTION_QUESTIONS,
                field_name='document_id',
                field_schema=models.PayloadSchemaType.KEYWORD,
            )
            # QA content
            for field in ["q", "a"]:
                self.client.create_payload_index(
                    collection_name=self.COLLECTION_QUESTIONS,
                    field_name=field,
                    field_schema=text_search_schema,
                )
    def _check_existing(self, data: list, collection: str):        
        ids = [d.id for d in data]
        res = self.client.scroll(
            collection_name=collection,
            scroll_filter=models.Filter(
                must=[
                    models.HasIdCondition(has_id=ids)
                ]
            ),
            limit=1000,
            with_payload=False,
            with_vectors=False,
        )
        stored = [rec.id for rec in res[0]] if len(res) > 0 else []
        missing_ids = set(ids) - set(stored)
        return [d for d in data if d.id in missing_ids]
    def add_documents(self, documents: list[TextPart]):
        documents_to_index = self._check_existing(documents, self.COLLECTION_DOCUMENTS)
        texts = [doc.content for doc in documents_to_index]
        embeddings = self.embedder.embedding(texts)
        points = [            
            models.PointStruct(
                id=documents_to_index[i].id,
                payload= {
                    "content": documents_to_index[i].content
                },
                vector=vector
            ) for (i, vector) in enumerate(embeddings) 
        ]
        self.client.upsert(
            collection_name=self.COLLECTION_DOCUMENTS,
            points=points
        )
    def add_questions(self, questions: list[Question]):
        qa_to_index = self._check_existing(questions, self.COLLECTION_QUESTIONS)
        texts = [join_qa(qa.q, qa.a) for qa in qa_to_index]
        embeddings = self.embedder.embedding(texts)
        points = [            
            models.PointStruct(
                id=qa_to_index[i].id,
                payload= {
                    "document_id": qa_to_index[i].document,
                    "q": qa_to_index[i].q,
                    "a": qa_to_index[i].a
                },
                vector=vector
            ) for (i, vector) in enumerate(embeddings) 
        ]
        self.client.upsert(
            collection_name=self.COLLECTION_QUESTIONS,
            points=points
        )
    def get_documents_by_id(self, ids: list[str]):
        res = self.client.scroll(
            collection_name=self.COLLECTION_DOCUMENTS,
            scroll_filter=models.Filter(
                must=[
                    models.HasIdCondition(has_id=ids)
                ]
            ),
            with_payload=True,
            with_vectors=False,
        )
        return res[0] if len(res) > 0 else []
    def keyword_query(
            self,
            keywords: list[str],
            limit=10,
        ):
        field_conditions = [
            models.FieldCondition(
                key="content",
                match=models.MatchText(text=keyword),
            ) for keyword in keywords
        ]

        points = self.client.scroll(
            collection_name=self.COLLECTION_DOCUMENTS,
            scroll_filter=models.Filter(
                # should represents the OR operator
                should=field_conditions
            ),
            with_payload=True,
            limit=limit,
        )
        return points[0]
    def hybrid_query(
        self,
        query_text: str,       
        limit=10,
    ):
        embeddings = self.embedder.embedding([query_text])
        prefetch= [
            models.Prefetch(
                query=embeddings[0]["dense"],
                using="dense",
                limit=10,
            ),
            models.Prefetch(
                query=embeddings[0]["sparse"],
                using="sparse",
                limit=10,
            ),
        ]
        doc_results = self.client.query_points(
            collection_name=self.COLLECTION_DOCUMENTS,
            prefetch=prefetch,
            query=embeddings[0]["colbert"],
            using="colbert",
            with_payload=True,
            limit=10,
        )
        question_results = self.client.query_points(
            collection_name=self.COLLECTION_QUESTIONS,
            prefetch=prefetch,
            query=embeddings[0]["colbert"],
            using="colbert",
            with_payload=True,
            limit=10,
        )
        contexts = []
        for point in doc_results.points:
            contexts.append({
                "id": point.id,
                "type": "doc",
                "content": point.payload["content"],
                "document_id": point.id
            })
        for point in question_results.points:
            q_c = {
                "id": point.id,
                "q": point.payload["q"],
                "a": point.payload["a"],
                "content": point.payload["q"],
                "document_id": point.payload["document_id"]
            }
            contexts.append(q_c)
            qa_c = q_c.copy()
            qa_c.update({
                "content": join_qa(point.payload["q"], point.payload["a"]),
            })
            contexts.append(qa_c)            
            
        contents =  [c["content"] for c in contexts]
        ranked = self.reranker.rank(query_text, contents)       
        
        # Prepare final context
        return_contexts = []
        doc_ids = set()                
        for r in ranked[:limit]:
            id, score = itemgetter('id', 'score')(r)
            c = contexts[id]
            doc_id = c["document_id"]
            doc_ids.add(doc_id)
            if 'q' in c:                
                return_contexts.append(
                    Context(
                        document_id=doc_id, 
                        content=None, 
                        score=score, 
                        question=c["q"], 
                        answer=c["a"]
                    )
                )
        points = self.get_documents_by_id(doc_ids)
        document_contents = { point.id:point.payload["content"] for point in points }
        return_contexts += [ 
            Context(document_id=id, content=document_contents[id], score=score) 
            for id in doc_ids
        ]    
        return_contexts.sort(key=lambda c: c.score, reverse=True)
        return return_contexts
        

In [76]:
vector_db = VectorDB(Embedder(inference_provider), Reranker(inference_provider))

### Split the article into smaller texts
We need embeddings to balance capturing specificity and retaining context. Small chunks provide specificity larger chunks retain more context.
Our question extraction method excelent for specificity as it works on the sentence level therefore we can use larger chunks to get more context and semantic relationships between sentences.

#### Split to large blocks initially using simple rules

In [27]:
from haystack.components.preprocessors import DocumentPreprocessor

# we have one article
doc_list = [Document(content=article)]

preprocessor = DocumentPreprocessor(
    split_by="word", 
    split_length=450, 
    split_overlap=40, 
    split_threshold=350,
    respect_sentence_boundary=True,    
)
large_documents = preprocessor.run(documents=doc_list)['documents']



#### Split to smaller blocks with a language model
To preserve semantic context and technical explanations we will use a language model to further split the text

In [28]:
_split_system_prompt = """
Analise the input text and split to preserve semantic context and technical explanations.  

Rules:
1. preserve semantic context, technical term and explanations
2. Number of chunks between 2 and 4. 
3. Do not rephrase, add or omit text.
4. Do not add notes or explain.
5. Do not add titles, summaries, etc..
6. Strictly follow the JSON output format.

Example:
Text:
The Maldives and luxury go hand in hand. This is a country made up almost entirely of privately owned islands, 
many of which have been turned into indulgent resorts: palm-thatched overwater villas, seafood feasts o the beach, 
day-tripping to deserted islands for snorkelling and secluded sunbathing. All-inclusive holidays are very popular 
on the islands thanks to its remoteness. You can just kick back, relax and enjoy the wonderful scenery around you. 
The Maldives is also great for diving — its waters are filled with whale sharks, manta rays, turtles and coral reefs.

---
Output:
{
    "chunks": [
        "The Maldives and luxury go hand in hand. This is a country made up almost entirely of privately owned islands, many of which have been turned into indulgent resorts: palm-thatched overwater villas, seafood feasts on the beach, day-tripping to deserted islands for snorkelling and secluded sunbathing.",
        "All-inclusive holidays are very popular on the islands thanks to its remoteness. You can just kick back, relax and enjoy the wonderful scenery around you. The Maldives is also great for diving — its waters are filled with whale sharks, manta rays, turtles and coral reefs.",
    ]
}

"""

_split_input_prompt = """
Split the text strictly following the rules and output JSON format!
Text:
{text}

"""

def split_text_messages(text: str):
    return [
        {
            "role": "system",
            "content": _split_system_prompt,
        },
        {
            "role": "user",
            "content": _split_input_prompt.format(text=text),
        }
    ]  
    

In [29]:
# The response format's pydantic model
class SplitText(BaseModel):
    """An object containing the split text"""

    chunks: list[str] = Field(
        description="List of generated text chunks"
    )

In [30]:
import uuid
import hashlib
from json import JSONEncoder

# Utility function to generate id from string
def str_to_uuid(value: str):
    return str(uuid.UUID(hex=hashlib.md5(value.encode("UTF-8")).hexdigest()))

def deduplicate_docs(documents):    
    return list({ doc.id:doc for doc in documents }.values())

# Writing to file
def _default(o):    
    if hasattr(o, "__dict__"):
        return o.__dict__
    return o
def store_data(data, path):
    with open(path, 'w') as file:
        json.dump(data, file, indent=2, default=_default)   

# Load data from a file
def load_data(path):
    with open(path, 'r') as file:
        return json.load(file)    

def list_reader(data, target_class):
    return [ target_class(**val) for val in data ]

def dict_reader(data, target_class):
    return { key:target_class(**val) for key, val in data }

# Filenames
doc_path = "documents.json"
questions_path = "questions.json"

**Note:**
"documents.json" file available to skip the followin cell.

In [31]:
# Run language model to split large_documents
documents = []
for doc in large_documents:
    result = inference_provider.completition(
        model=model,       
        messages=split_text_messages(doc.content),
        response_schema=SplitText 
    )
    documents += [ TextPart(id=str_to_uuid(chunk), content=chunk, source_id=doc.meta["source_id"]) for chunk in result["chunks"] ]

# Because of the overlap in the original spliting we can have duplicates
documents = deduplicate_docs(documents)
# write documents to file
store_data(documents, doc_path)

In [32]:
# Load data from file
data = load_data(doc_path)
documents = list_reader(data, TextPart)

### Create Embeddings for the documents and store them in Vector DB
Embedder is passed ot the vector DB class, it processes the documents and saves the vectors to the DB.

In [35]:
# Process in batches 
batch_size = 20
for i in tqdm(range(0, len(documents), batch_size)):
    batch = documents[i:i + batch_size]
    vector_db.add_documents(batch)

100%|████████████████████████████████████████████████████████████████████████████████████| 3/3 [00:21<00:00,  7.10s/it]


### Create embeddings for the questions and store them in Vector DB
The following cell will run for a while. Uses LLM inference and validates questions with a local model runtime depends on the machine runing this notebook.

**Note**: "questions.json" file available to skip the following 2 cells

In [40]:
# Init list here so we can retry in case of failure without starting over
questions_by_doc = {}

In [56]:
from functools import reduce

for doc in tqdm(documents):
    # in case of a failure we can retry without restarting    
    if doc.id not in questions_by_doc:
        constraints_list = extract_information(doc.content)
        raw_questions = generate_questions(inference_provider, constraints_list, doc.content)
        valid = validator.validate(raw_questions, context=doc.content, top_k_per_cluster=2)
        group = []
        for qa_pair in valid or []:
            q, a = itemgetter('q', 'a')(qa_pair)   
            q_id = str_to_uuid(join_qa(q, a))
            group.append(Question(id=q_id, document=doc.id, q=q, a=a))
        questions_by_doc[doc.id] = group

# flatten the dict we have the doc id dont need them groupped
questions = reduce(lambda a, val: a + val, questions_by_doc.values(), [])
# write questions to file
store_data(questions, questions_path)

100%|██████████████████████████████████████████████████████████████████████████████| 59/59 [00:00<00:00, 265519.24it/s]


In [57]:
# Load data from file
data = load_data(questions_path)
questions = list_reader(data, Question)

### Create embeddings for the questions and store them in Vector DB

In [59]:
# Process in batches 
batch_size = 40
for i in tqdm(range(0, len(questions), batch_size)):
    batch = questions[i:i + batch_size]
    vector_db.add_questions(batch)

100%|█████████████████████████████████████████████████████████████████████████████████| 8/8 [00:00<00:00, 64403.90it/s]


### Run the dual-scale hybrid query
It is dual-scale because combines fine-grained QA pairs + contextual chunks. Hybrid because uses dense + sparse embeddings

In [103]:
#query_text = "What happens when light enters the sensor?"
query_text = "Where is relevant the avalanche of electron production?"
contexts = vector_db.hybrid_query(query_text, 5)

In [104]:
# print context
def build_context_prompt(contexts):
    context_prompt = ''
    for c in contexts:
        if c.question:
            context_prompt += "Related QA:\n"
            context_prompt += f"Q: {c.question}\n"
            context_prompt += f"A: {c.answer}\n\n"
        else:
            context_prompt += "Related content:\n"
            context_prompt += f"{c.content}\n\n"
    return context_prompt

In [105]:
print(build_context_prompt(contexts))

Related QA:
Q: What type of sensors work using the 'avalanche' effect in semiconductors?
A: SPAD (Single Photon Avalanche Diode) sensors work using the 'avalanche' effect in semiconductors

Related QA:
Q: What results from the phenomenon of Avalanche Multiplication?
A: Avalanche Multiplication causes a large current to flow instantaneously

Related QA:
Q: How is a single electron generated?
A: When a single photon strikes either type of sensor, a single electron is generated

Related QA:
Q: What do SPAD sensors work using?
A: the avalanche effect in semiconductors

Related QA:
Q: What results from the phenomenon of Avalanche Multiplication?
A: Avalanche Multiplication causes a large current to flow instantaneously

Related content:
Both a CMOS sensor (A) and a SPAD sensor (B) include p -type semiconductors (2) and n -type semiconductors (3) but in different configurations. When a single photon (1) strikes either type of sensor, a single electron is generated (4). In a CMOS sensor, the 

### Try a RAG (Retrival Augmented Generation) example with the retrieved context

In [106]:
_rag_sys_prompt = _split_input_prompt = """
You are a helpful assistant answering questions based on the provided context!
Ensure:
    - Consider the context in your answer, can use the phrases from the context
    - The context shall be transparent for the end user. Do NOT reference the context in any way in cluding "based on the context"
    - If the context is highly irrelevant ignor
    - If the context seems contradictory try to resolve the contradiction otherwise ignore the context
    
Context:
{context}

"""
def rag_messages(query: str, context:str):
    return [
        {
            "role": "system",
            "content": _rag_sys_prompt.format(context=context),
        },
        {
            "role": "user",
            "content": query,
        }
    ]  

With relevant context we can use smaller and more cost effective LLMs. In this example we are using a model with 8B parameters.  
These smaller models are also more accessible to fine tune and self host by businesses.

In [109]:
assistants_answer = inference_provider.completition(
    model=model,       
    messages=rag_messages(query_text, build_context_prompt(contexts)),
)
print(assistants_answer)

The avalanche of electron production occurs in the SPAD (Single Photon Avalanche Diode) sensor. When a photon strikes the SPAD sensor, it generates an electron, which then triggers a chain reaction or "avalanche" of electron production, causing a large current to flow instantaneously.


In [108]:
assistants_answer_no_context = inference_provider.completition(
    model=model,       
    messages=rag_messages(query_text, 'No available context'),
)
print(assistants_answer_no_context)

In various astrophysical and laboratory settings, the avalanche of electron production can be relevant.

In high-energy astrophysics, the avalanche of electron production can occur in the presence of intense magnetic fields and high-energy particle interactions, such as in the vicinity of neutron stars, black holes, or during supernova explosions.

In laboratory settings, the avalanche of electron production can be induced through various methods, including high-powered laser interactions with matter, particle accelerators, or high-energy electron beam experiments.
