## Using the Fabric Data Broker Programmatically

### 1. Initial Configurations
#### 1.1 Libraries and packages

In [None]:
# Install initial packages and sdk
%pip install openai==1.70.0 httpx==0.27.2

StatementMeta(, 51ebffc9-83c3-47c6-8274-648f1f95bb1c, 18, Finished, Available, Finished)




[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.





In [None]:
# Install generic libraries and packages
import requests
import json
import pprint
import typing as t
import time
import uuid

# Install openai libraries and packages
import openai, httpx
from openai import OpenAI
from openai._exceptions import APIStatusError
from openai._models import FinalRequestOptions
from openai._types import Omit
from openai._utils import is_given
from synapse.ml.mlflow import get_mlflow_env_config
from sempy.fabric._token_provider import SynapseTokenProvider

StatementMeta(, 51ebffc9-83c3-47c6-8274-648f1f95bb1c, 20, Finished, Available, Finished)

#### 1.1 Initial Variables

In [None]:
# Public URL (end-point) you get when you publish your Fabric Data Agent to the Fabric Portal
base_url = "https://api.fabric.microsoft.com/v1/workspaces/3ec8fad3-44ca-4524-9168-00acec9ab0e1/aiskills/80caf30a-9a48-4a48-81fd-f3762bba1365/aiassistant/openai"
# Text that you want to send as message form the user to the agent
question = "What datasources do you have access to?"
# Gets configuration of the MLflow/Synapse environment
# Typically exposes the kernel/driver's AAD token, which is then used to authorize calls to Fabric
configs = get_mlflow_env_config()

StatementMeta(, 51ebffc9-83c3-47c6-8274-648f1f95bb1c, 21, Finished, Available, Finished)

### 2. Define Classes and Subclasses

#### 2.1 FabricOpenAI Class

The **FabricOpenAI** subclass adapt the OpenAI client to the Fabric endpoint.

It creates a class that inherits from the OpenAI client (the official OpenAI library for Python). The reason is because it is recommended calling to point to the Fabric Data Agent's base_url and include api-version in the query string. Additionally, you don't use api_key; instead, authentication will be done via an AAD token injected into the headers (see _prepare_options).

In addition, within this class the **_prepare_options** class is defined, which injects headers and correlation. Every time the client interacts with the Fabric Data Agent, some important headers are automatically configured:

* Authorization: An access token (Bearer <driver_aad_token>) is sent to authenticate the request with Fabric. This token is automatically obtained from the runtime environment using get_mlflow_env_config(), so you don't need to manage it manually.

* Content-Type (Accept): This defaults to application/json, indicating that responses will be received in JSON format.

* Activity Identifier (ActivityId): A unique UUID is generated for each request, facilitating traceability and searching logs or issues within Fabric.

Note: To achieve this, the client internally overrides the _prepare_options function of the OpenAI SDK. This allows the necessary headers (token and traceability) to be automatically injected without the need for OpenAI keys.


In [None]:
# Create OpenAI Client
class FabricOpenAI(OpenAI):
    def __init__(
        self,
        api_version: str ="2024-05-01-preview",
        **kwargs: t.Any,
    ) -> None:
        self.api_version = api_version
        default_query = kwargs.pop("default_query", {})
        default_query["api-version"] = self.api_version
        super().__init__(
            api_key="",
            base_url=base_url,
            default_query=default_query,
            **kwargs,
        )
    
    def _prepare_options(self, options: FinalRequestOptions) -> None:
        headers: dict[str, str | Omit] = (
            {**options.headers} if is_given(options.headers) else {}
        )
        options.headers = headers
        headers["Authorization"] = f"Bearer {configs.driver_aad_token}"
        if "Accept" not in headers:
            headers["Accept"] = "application/json"
        if "ActivityId" not in headers:
            correlation_id = str(uuid.uuid4())
            headers["ActivityId"] = correlation_id

        return super()._prepare_options(options)

StatementMeta(, 51ebffc9-83c3-47c6-8274-648f1f95bb1c, 22, Finished, Available, Finished)

#### 2.2 Helper pretty_print

This variable is used to clearly display the conversation with the Fabric Data Agent. So, it allows you to quickly view the conversation history in a readable manner, without worrying about the full internal structure of the objects returned by the API.

In the back-side, the variable loops through all the messages in a thread and displays:

* Role: Indicates whether the message was sent by the user or the assistant.

* Text: The content of the message.

Internally, each message can have multiple fragments in its content attribute. Therefore, the function accesses the text using content[0].text.value. Depending on the SDK version, the structure of messages may vary, so it's a good idea to review messages.data[...] or the actual schema if you need to adapt it.


In [None]:
# Pretty printing helper
def pretty_print(messages):
    print("---Conversation---")
    for m in messages:
        print(f"{m.role}: {m.content[0].text.value}")
    print()

StatementMeta(, 51ebffc9-83c3-47c6-8274-648f1f95bb1c, 23, Finished, Available, Finished)

### 3. Main Agent flow
Steps to performin the Agent creation:
* 1 Create the client
* 2 Create the assistant
* 3 Create the thread
* 4 Create the message
* 5 Create the execution run of the agent

In [None]:
# 1) Create the client
# Custom client instance (points to base_url, injects token).
fabric_client = FabricOpenAI()
# 2) Create assistant
# Creates an assistant definition (contains instructions, tools, etc.)
assistant = fabric_client.beta.assistants.create(model="not used")
# 3) Create thread
# Creates a thread, which functions as a persistent memory/conversation for that assistant
thread = fabric_client.beta.threads.create()
# 4) Create message on thread
# Adds the user's message to the thread (context for the assistant).
message = fabric_client.beta.threads.messages.create(thread_id=thread.id, role="user", content=question)
# Create execution agent run
# Launches a run that triggers the execution of the assistant on the thread's contents
# The assistant executes its logic (calls models, tools, NL→SQL, etc.) and generates output messages
run = fabric_client.beta.threads.runs.create(thread_id=thread.id, assistant_id=assistant.id)

StatementMeta(, 51ebffc9-83c3-47c6-8274-648f1f95bb1c, 24, Finished, Available, Finished)

Once the Fabric Data Agent main flow creation is defined, it is to pool until the run ends.

So, with the sequence **run.status**, it periodically queries the run status until it exits queued or in_progress. The typical values should be  queued, in_progress, completed, canceled, failed. For production environment, it's a good idea to add a maximum timeout and backoff.


In [None]:
# Wait for run to complete
while run.status == "queued" or run.status == "in_progress":
    run = fabric_client.beta.threads.runs.retrieve(
        thread_id=thread.id,
        run_id=run.id,
    )
    print(run.status)
    time.sleep(2)

StatementMeta(, 51ebffc9-83c3-47c6-8274-648f1f95bb1c, 25, Finished, Available, Finished)

queued


queued


queued


completed


### 4. Read and print the messages 
Print the messages exchanged between the Agent and the users.

In [None]:
# Declare the response of the messages
response = fabric_client.beta.threads.messages.list(thread_id=thread.id, order="asc")
# Print the messages
pretty_print(response)

StatementMeta(, 51ebffc9-83c3-47c6-8274-648f1f95bb1c, 26, Finished, Available, Finished)

---Conversation---
user: What datasources do you have access to?
assistant: I have access to two main data sources for Maderas del Noroeste:

1. Dashboard PL (Power BI Modelo Semántico):  
This is the official source for all financial and sales performance indicators, including EBITDA, EBIT, Resultados Netos, Ventas, Ingresos, Gastos, Margen EBITDA, comparativas interanuales y todas las partidas clave de la cuenta de pérdidas y ganancias (PyG). Es la fuente principal para responder sobre métricas financieras.

2. MaderasDelNoroesteLH (Lakehouse de Fabric):  
Contiene datos transaccionales y de detalle, como movimientos de ingresos y gastos, información de clientes, productos, detalle regional y cuentas contables. Se usa principalmente para análisis detallados a nivel de transacción, producto o cliente cuando se requiere más granularidad que en el modelo financiero.

¿Algún tipo de información concreta que necesites consultar?



### 5. Clean the threads

Delete the service thread to avoid accumulating unnecessary threads.

In [None]:
# Delete thread
fabric_client.beta.threads.delete(thread_id=thread.id)

StatementMeta(, 51ebffc9-83c3-47c6-8274-648f1f95bb1c, 27, Finished, Available, Finished)

ThreadDeleted(id='thread_TJZxvW0gGDq8iA2Sxdn1IKf7', deleted=None, object=None, messages=None, metadata=None)