# Exercise 3.3: Run data pipeline to vectorize documents

Instead of doing all the steps by yourself, as it was shown in the previous exercise, you can also leverage the pipeline API.

The pipeline collects documents and segments the data into chunks. It generates embeddings, which are multidimensional representations of textual information, and stores them efficiently in the vector database.

In this Exercise you will do the following steps:
* Perform initial one time admin tasks: Create a generic secret 
* Prepare Vector knowledge Base: Configure Pipeline API to read files from the object store and store it in the vector database. 


## Create a generic secret for Object Store 

We first must create a generic secret at the resource group level. Secrets are a means of allowing and controlling connections across directions and tools, without compromising your credentials.

In [None]:
import init_env
init_env.set_environment_variables()

To create the generic secrets we will send the POST with URL {{apiurl}}/v2/admin/secrets. 

**Note**: 
* Every value in the *data* dictionary needs to be base64-encoded. 
* labels need to contain key-value pair *"ext.ai.sap.com/document-grounding"* and *"ext.ai.sap.com/documentRepositoryType"* with value S3. This is needed to enable grounding and declare S3 as the repository source. 

In [None]:
import base64

def b64(val):
     return base64.b64encode(val.encode("utf-8")).decode("utf-8")


In [None]:
import os
def secret_dict():
        return {
            'name': 'aws3-secret-3',
            'data': {
            "url": b64("https://s3-eu-central-1.amazonaws.com"),
            "authentication": b64("NoAuthentication"),
            "description": b64("For Grounding"),
            "access_key_id": b64(os.environ["ACCESS_KEY_ID"]),
            "bucket": b64(os.environ["BUCKET"]),
            "host": b64("s3-eu-central-1.amazonaws.com"),
            "region": b64("eu-central-1"), 
            "secret_access_key": b64(os.environ["SECRET"]),
            "username": b64(os.environ["USER"])            
            },
            "labels": [
                {
                    "key": "ext.ai.sap.com/document-grounding",
                    "value": "true"
                },
                {
                    "key": "ext.ai.sap.com/documentRepositoryType",
                    "value": "S3"
                }
         ]
        }

body = {
    'name': secret_dict()['name'],
    'data': secret_dict()['data'],
    'labels': secret_dict()['labels']
}


In [None]:
from ai_core_sdk.ai_core_v2_client import AICoreV2Client

client = AICoreV2Client (base_url=os.environ["AICORE_BASE_URL"]+'/v2',
                         auth_url= os.environ["AICORE_AUTH_URL"],
                         client_id=os.environ["AICORE_CLIENT_ID"],
                         client_secret=os.environ["AICORE_CLIENT_SECRET"],
                         resource_group=os.environ["AICORE_RESOURCE_GROUP"]
                         )

In [None]:
import requests

response_dict = requests.post(
        url=f"{client.rest_client.base_url}/admin/secrets", 
        headers={
            "Content-Type": "application/json",
            "AI-Tenant-Scope": "false",
            "Authorization": client.rest_client.get_token(),
            "AI-Resource-Group": os.environ["AICORE_RESOURCE_GROUP"]
        },
        json=body
    )
print(response_dict)

## Create Data Pipeline

### Import the packages we want to use

In [None]:
from gen_ai_hub.proxy import get_proxy_client
from gen_ai_hub.document_grounding.client import PipelineAPIClient
from gen_ai_hub.document_grounding.models.pipeline import S3PipelineCreateRequest, CommonConfiguration

In [None]:
aicore_client = get_proxy_client();
pipeline_api_client = PipelineAPIClient(aicore_client)

In [None]:

generic_secret_s3_bucket = "aws3-secret-3"
s3_config = S3PipelineCreateRequest(configuration= CommonConfiguration(destination=generic_secret_s3_bucket))


In [None]:

response = pipeline_api_client.create_pipeline(s3_config)
print(f"Reference the Vector knowledge base using the pipeline ID: {response.pipelineId}")

In [None]:
# check the status of the vectorization pipeline until it is completed
print(pipeline_api_client.get_pipeline_status(response.pipelineId))

Once the status switched to ```'FINISHED'``` the vectorization is completed and we can continue with the next steps. Our PDF is vectorized and stored in the HANA Vector Store. 

If you want to see all pipelines you can run ```get_pipelines()``` this will list all the pipelines in your resource group. 

In [None]:

pipelines = pipeline_api_client.get_pipelines()

print(pipelines.model_dump_json(indent=2))


üéâ Congratulations you successfully created you first data repository via the pipeline API .üéâ

## Use the data repository to ground the response
Now let us use this data repository to generate more accurate responses. We will use again the Orchestration Services as we did in Exercise 2.

### Assign the model you want to use 

In [None]:
from gen_ai_hub.orchestration.models.llm import LLM

llm = LLM(
    name="gemini-2.5-flash",
    parameters={
        'temperature': 0.0,
    }
)

### Create a prompt Template

This time we would like to question answered that are related to SAP TechEd 2025 and the mascot Kasimir.

In [None]:
from gen_ai_hub.orchestration.models.template import Template
from gen_ai_hub.orchestration.models.message import SystemMessage, UserMessage

template = Template(
            messages=[
                SystemMessage("You are a helpful SAP TechEd assistant."),
                UserMessage("""Answer the request by providing relevant answers that fit to the request.
                Request: {{ ?user_query }}
                Context:{{ ?grounding_response }}
                """)
            ]
        )

### List all data repositories

For the next step to define the data repository that we want to use to ground the response, we need to get the respective id.   
Let us list all data repositories that we have so far:

In [None]:
from gen_ai_hub.document_grounding.client import RetrievalAPIClient
retrieval_api_client = RetrievalAPIClient(aicore_client)

repos = retrieval_api_client.get_data_repositories()

print(repos.model_dump_json(indent=2))

Overall you should see three data repositories: SAP Help, Kasimir (created in Exercise 3.1 via Vector API) and the data repository that we just created via the pipeline ( the name title is something like "pipeline-...")

‚û°Ô∏è Copy the ```id``` as we need it for the next step

### Define the data repository
We need again to configure the Grounding Module, where we add the data repository that we want to use via the **filter** parameter. 

‚û°Ô∏è Replace ```<id>``` in **data_repository** array by the data repository id created in the previous step.  

In [None]:
from gen_ai_hub.orchestration.models.document_grounding import DocumentGroundingFilter
from gen_ai_hub.orchestration.models.document_grounding import DataRepositoryType
filters = [
            DocumentGroundingFilter(    id="KasimirTechEd2025", 
                                        data_repository_type= DataRepositoryType.VECTOR.value,
                                        data_repositories=["cc04bf07-0666-4f07-8398-cf6df8c1bf69"])
        ]

### Create Grounding Configuration
Next we create the grounding configuration by using **GroundingModule** for managing and applying grounding configurations.

In [None]:

from gen_ai_hub.orchestration.models.document_grounding import GroundingModule
from gen_ai_hub.orchestration.models.document_grounding import GroundingType
from gen_ai_hub.orchestration.models.document_grounding import DocumentGrounding

grounding_config = GroundingModule(
            type=GroundingType.DOCUMENT_GROUNDING_SERVICE.value,
            config=DocumentGrounding(input_params=["user_query"], output_param="grounding_response", filters=filters)
        )

### Create orchestration configuration including Grounding Config

In [None]:
from gen_ai_hub.orchestration.models.config import OrchestrationConfig

config = OrchestrationConfig(
    template=template,
    llm=llm,
    grounding=grounding_config
)

### Execute the  Query
Configuration will be added again to the OrchestrationService and then we run to retrieve the answer.

In [None]:
import importlib
import variables
from gen_ai_hub.orchestration.models.template import TemplateValue
from gen_ai_hub.orchestration.service import OrchestrationService

variables = importlib.reload(variables)

orchestration_service = OrchestrationService(
    api_url=variables.AICORE_ORCHESTRATION_DEPLOYMENT_URL,
    config=config
)

response = orchestration_service.run(
    template_values=[
        TemplateValue( 
            name="user_query",
            value="What will be the evening event at SAP TechEd?"
        )
    ]
)

print(response.orchestration_result.choices[0].message.content)

Nice, this band we do not want to miss. 

However let us ask another question. What about dogs, are dogs allowed at SAP TechED ?    
Let us run the next query: 

In [None]:
response = orchestration_service.run(
    template_values=[
        TemplateValue( 
            name="user_query",
            value="Are dogs allowed at SAP TechEd?"
        )
    ]
)

print(response.orchestration_result.choices[0].message.content)

The answer is valid, given the data repository we use to ground the response. However while we embedded **Kasimirs TechEd Policy** in Exercise 3.1 via Vector API, there was at least one chunk that stated, that at least one dog is allowed at TechEd : Bruno!   
However this answer can currently not retrieved as we are currently not using **Kasimirs TechEd Policy** data repository to ground the response.    

Let us change this by adding this as well!

### List again all Data Repositories
First we need to get the id.    
Therefore we list again all the data repos. 

In [None]:
from gen_ai_hub.document_grounding.client import RetrievalAPIClient
retrieval_api_client = RetrievalAPIClient()

repos = retrieval_api_client.get_data_repositories()

print(repos.model_dump_json(indent=2))


‚¨ÜÔ∏è Copy the ```id``` of the resource that as **title = Kasimir.**

### Define the data repository
We need again to configure the Grounding Module, where we add the data repository that we want to use via the **filter** parameter. 

‚¨áÔ∏è Add this ```id ```to data_repository list in the code below.    
It should look similar to the following code snippet (except the ids are different): ``` data_repositories=["cc04bf07-0666-4f07-8398-cf6df8c1bf69", "0a2c7d76-f1a0-462c-b351-7d5e87db9fe3"]```

In [None]:

filters = [
            DocumentGroundingFilter(    id="KasimirTechEd2025", 
                                        data_repository_type= DataRepositoryType.VECTOR.value,
                                        data_repositories=["cc04bf07-0666-4f07-8398-cf6df8c1bf69", "0a2c7d76-f1a0-462c-b351-7d5e87db9fe3"])
        ]

### Create Grounding Configuration
Next we create the grounding configuration by using **GroundingModule** for managing and applying grounding configurations.

In [None]:

from gen_ai_hub.orchestration.models.document_grounding import GroundingModule
from gen_ai_hub.orchestration.models.document_grounding import GroundingType
from gen_ai_hub.orchestration.models.document_grounding import DocumentGrounding

grounding_config = GroundingModule(
            type=GroundingType.DOCUMENT_GROUNDING_SERVICE.value,
            config=DocumentGrounding(input_params=["user_query"], output_param="grounding_response", filters=filters)
        )

### Create orchestration configuration including Grounding Config

In [None]:
from gen_ai_hub.orchestration.models.config import OrchestrationConfig

config = OrchestrationConfig(
    template=template,
    llm=llm,
    grounding=grounding_config
)

### Execute the  Query
Configuration will be added again to the OrchestrationService and then we run to retrieve the answer.

Let's check whether the response changes towards whether really not dogs are allowed: 

In [None]:
import importlib
import variables
from gen_ai_hub.orchestration.models.template import TemplateValue
from gen_ai_hub.orchestration.service import OrchestrationService

variables = importlib.reload(variables)

orchestration_service = OrchestrationService(
    api_url=variables.AICORE_ORCHESTRATION_DEPLOYMENT_URL,
    config=config
)

response = orchestration_service.run(
    template_values=[
        TemplateValue( 
            name="user_query",
            value="Are dogs allowed at SAP TechEd?"
        )
    ]
)

print(response.orchestration_result.choices[0].message.content)

Awesome, now also the TechEd policy will considered in grounding our response.    

üéâ Congratulations! You successfully mastered to add two data repositories to your Grounding Module to ground your response  .üéâ

## Summary 

üéâ And that's a wrap! .üéâ

Today you learned how to ground an LLM using Grounding in GenAIHub, created your own vector-based data repository via Vector API, processed documents with the Pipeline API, retrieved knowledge via the Retrieval API, and orchestrated everything end-to-end.
Grounding brings AI from impressive to reliable, and with these tools, you now have the foundation to build enterprise-grade, trustworthy AI solutions with your own data.
I hope this session sparked new ideas and confidence ‚Äî and that you leave ready to turn your real-world knowledge into real AI impact.



## Bonus: Postprocessing search results with the Cohere 3.5 Reranker

We have now seen that content can be brought into Document Grounding in different ways. When we add more content, it can become increasingly hard to surface the relevant information, especially if we need to query across different repositories.

In this year's SAP TechEd, we are very happy to introduce you to a new feature in Document Grounding: post-processing for result sets, with the first application being the introduction of the [Cohere 3.5 reranking model](https://cohere.com/rerank).

In [None]:
from gen_ai_hub.document_grounding.client import RetrievalAPIClient

retrieval_client = RetrievalAPIClient()

Let's start by creating some filters.

In the first filter, we will focus on documents ingested from PDF files. If you look closely, you will also see one new field in the API payload below.

In [None]:
filter_1 = {
    "id": "filter-1",
    "searchConfiguration": {"maxChunkCount": 5},
    "dataRepositories": ["*"],
    "dataRepositoryType": "vector",
    "filter": {
        "operator": "or",
        "left": {
            "key": "fileSuffix",
            "value": [".pdf"],
            "scope": "document",
        },
        "right": {
            "key": "mimeType",
            "value": ["application/pdf"],
            "scope": "document",
        },
    },
}

In the second filter, we reference the custom metadata which we added in exercise 3-1.

In [None]:
filter_2 = {
    "id": "filter-2",
    "searchConfiguration": {"maxChunkCount": 5},
    "dataRepositories": ["*"],
    "dataRepositoryType": "vector",
    "filter": {
        "key": "purpose",
        "value": ["Kasimirs TechEd Cat Policy"],
        "scope": "document",
    },
}

By the way, the new API field mentioned above is the `filter` key. This recently introduced field enables complex boolean filters on document metadata - later to be extended to include also collection and chunk metadata.

Note that this is independent of the postprocessing feature, but rather an enhanced filtering capability for retrieval in the vector API which is accesssed here via the retrieval API.

Finally, we also craft a filter to pull in content from the SAP Help Portal.

In [None]:
filter_3 = {
    "id": "filter-3",
        "searchConfiguration": {"maxChunkCount": 10},
        "dataRepositories": ["*"],
        "dataRepositoryType": "help.sap.com",
}

We can send all three filters to the retrieval API in one request, and we will get back three individual result sets.

While this is practical for some use cases, we sometimes just want to most relevant results overall.

Below, we assemble the final search request from the three filters.

Additionally, we include the newly introduced `postProcessing` key, where we instruct the reranker to gives us the 10 most relevant chunks from the output of the three filters.

In [None]:
query = "Responsible AI for cats"

In [None]:
search_input_postprocessing = {
    "query": query,
    "filters": [
        filter_1,
        filter_2,
        filter_3,
    ],
    "postProcessing": [
        {
            "id": "post-processing-1",
            "inputs": [
                {"id": "filter-1"},
                {"id": "filter-2"},
                {"id": "filter-3"},
            ],
            "maxChunkCount": 10,
            "strategy": {
                "type": "reranker",
                "model": "cohere-3.5",
            },
        }
    ],
}

In [None]:
response = retrieval_client.rest_client.post(path=f"{retrieval_client.path}/search", body=search_input_postprocessing)

In [None]:
# Feel free to inspect the entire response!
# import json # For pretty-printing
# print(json.dumps(response, indent=2))

In [None]:
import json # For pretty-printing

post_processing_result = None

for result_set in response['results']:
    if  result_set['filter_id'] == "post-processing-1":
        post_processing_result = result_set

if post_processing_result is None:
    print("Oops, no post-processing results found - check your code!")

In [None]:
for data_repository in post_processing_result["results"]:
    for document in data_repository["data_repository"]["documents"]:
        for chunk in document['chunks']:
            print(chunk["content"])
            print("-----")

In [None]:
# print(json.dumps(post_processing_result, indent=2))

The results are still quite noisy, containing many irrelevant chunks - no surprise given that we asked for 10 chunks. We can further reduce the number of chunks by applying a client-side filter on the final post processing score.

The relevant threshold must be carefully calibrated for each use case, and is also dependent on the reranker model use. See the Cohere [documentation on interpreting reranker results](https://docs.cohere.com/docs/reranking-best-practices#interpreting-results), which documents a possible process to determine the relevance threshold. 

In [None]:
# Threshold of 0.3 chosen based on superficial inspection of results :)
threshold = 0.3

for data_repository in post_processing_result["results"]:
    for document in data_repository["data_repository"]["documents"]:
        for chunk in document['chunks']:
            if chunk['post_processing_score']["value"] >= threshold:
                print("Postprocessing Score:", chunk['post_processing_score']["value"])
                print(chunk["content"])
                print("-----")

Stay tuned for postprocessing support in the orchestration service. For now, the postprocessing API can only be accessed directly via the retrieval API.