# OneLake Indexer (Fabric Lakehouse)

Use **OneLake Indexer** to index documents from Microsoft Fabric Lakehouse to Azure AI Search, enabling intelligent retrieval for enterprise data lakes.

## üìã Table of Contents

| Step | Description | Jump |
|------|-------------|------|
| 0Ô∏è‚É£ Environment Config | Configure Azure AI Search, Azure OpenAI | [View](#0Ô∏è‚É£-environment-configuration) |
| 1Ô∏è‚É£ Configure Fabric | Set Workspace and Lakehouse GUID | [View](#1Ô∏è‚É£-configure-fabric-workspace-and-lakehouse) |
| 2Ô∏è‚É£ Authorization Config | Configure Managed Identity permissions | [View](#2Ô∏è‚É£-authorization-configuration) |
| 3Ô∏è‚É£ Create Data Source | OneLake connection config | [View](#3Ô∏è‚É£-create-onelake-data-source) |
| 4Ô∏è‚É£ Create Index | Define index structure | [View](#4Ô∏è‚É£-create-search-index) |
| 5Ô∏è‚É£ Create Indexer | Periodically fetch documents | [View](#5Ô∏è‚É£-create-indexer) |
| 6Ô∏è‚É£ Check Status | View Indexer run status | [View](#6Ô∏è‚É£-check-indexer-status) |
| 7Ô∏è‚É£ Manual Run | Manually trigger Indexer | [View](#7Ô∏è‚É£-manually-run-indexerif-needed) |
| 8Ô∏è‚É£ Query Index | Search indexed content | [View](#8Ô∏è‚É£-query-index-content) |
| 9Ô∏è‚É£ Create Knowledge Source | Integrate into Agentic Retrieval | [View](#9Ô∏è‚É£-create-index-knowledge-source) |
| üîü Create Knowledge Base | Configure knowledge base | [View](#üîü-create-knowledge-base) |
| ‚ì´ Query Knowledge Base | Test Q&A | [View](#‚ì´-query-onelake-knowledge-base) |

---

## ‚ö†Ô∏è Prerequisites

| Requirement | Description |
|-------------|-------------|
| **Azure AI Search** | Must have **System Managed Identity** enabled |
| **Microsoft Fabric** | Need Workspace and Lakehouse |
| **Permission** | Azure AI Search's Managed Identity must be **Contributor** of the Fabric Workspace |

### üîê Authorization Method

This solution uses **Managed Identity** authorization, **no App Registration needed**!

```mermaid
flowchart TB
    subgraph Azure["Azure"]
        AIS["üîç Azure AI Search<br/>(Managed Identity)"]
    end
    
    subgraph Fabric["Microsoft Fabric"]
        WS["üìÅ Fabric Workspace"]
        LH["üè† Lakehouse"]
        FILES["üìÑ Files/"]
        DOCS["üìë Documents"]
        
        WS --> LH
        LH --> FILES
        FILES --> DOCS
    end
    
    AIS -->|"Contributor permission"| WS
```

---

## Solution Architecture

```mermaid
flowchart TB
    FABRIC["üè¢ Microsoft Fabric Lakehouse"]
    
    DS["üì¶ Data Source<br/><i>OneLake connection config</i>"]
    INDEXER["‚öôÔ∏è Indexer<br/><i>Periodically fetch documents</i>"]
    INDEX["üóÇÔ∏è Search Index<br/><i>Store document content</i>"]
    KS["üìö Knowledge Source<br/><i>Native Index type</i>"]
    KB["üß† Knowledge Base<br/><i>Agentic Retrieval</i>"]
    
    FABRIC -->|"Managed Identity"| DS
    DS --> INDEXER
    INDEXER --> INDEX
    INDEX --> KS
    KS --> KB
    
    style FABRIC fill:#6366f1,color:#fff
    style DS fill:#8b5cf6,color:#fff
    style INDEXER fill:#a855f7,color:#fff
    style INDEX fill:#d946ef,color:#fff
    style KS fill:#ec4899,color:#fff
    style KB fill:#f43f5e,color:#fff
```

## Environment Configuration Notes

The following configuration is read from `.env` file (refer to `.env.example`):

| Environment Variable | Description |
|---------------------|-------------|
| `AZURE_SEARCH_ENDPOINT` | Azure AI Search service endpoint |
| `AZURE_SEARCH_API_KEY` | Azure AI Search API Key |
| `AZURE_OPENAI_ENDPOINT` | Azure OpenAI endpoint |
| `AZURE_OPENAI_DEPLOYMENT` | GPT model deployment name |

---

## 0Ô∏è‚É£ Environment Configuration

In [None]:
%load_ext dotenv
%dotenv

import os
import requests
import json

# Azure AI Search Configuration
search_endpoint = os.environ.get("AZURE_SEARCH_ENDPOINT")
search_api_key = os.environ.get("AZURE_SEARCH_API_KEY")

# Azure OpenAI Configuration
azure_openai_endpoint = os.environ.get("AZURE_OPENAI_ENDPOINT")
gpt_deployment = os.getenv("AZURE_OPENAI_DEPLOYMENT", "gpt-4o-mini")

print(f"‚úÖ Azure AI Search: {search_endpoint}")
print(f"‚úÖ Azure OpenAI: {azure_openai_endpoint}")

## 1Ô∏è‚É£ Configure Fabric Workspace and Lakehouse

Get the GUIDs from the Power BI / Fabric Lakehouse URL:

```
https://msit.powerbi.com/groups/{WorkspaceGUID}/lakehouses/{LakehouseGUID}
```

For example:
- Workspace GUID: `00000000-0000-0000-0000-000000000000`
- Lakehouse GUID: `11111111-1111-1111-1111-111111111111`

In [None]:
# ‚ö†Ô∏è Replace with your Fabric Workspace and Lakehouse GUIDs!
# Get these values from the Power BI URL

FABRIC_WORKSPACE_GUID = "YOUR_WORKSPACE_GUID"  # e.g.: "a0a0a0a0-bbbb-cccc-dddd-e1e1e1e1e1e1"
LAKEHOUSE_GUID = "YOUR_LAKEHOUSE_GUID"  # e.g.: "11111111-1111-1111-1111-111111111111"

# Optional: specify a subfolder in the Lakehouse
LAKEHOUSE_FOLDER = None  # e.g.: "documents" or None for the entire Files folder

print(f"üìÅ Fabric Workspace: {FABRIC_WORKSPACE_GUID}")
print(f"üìÅ Lakehouse: {LAKEHOUSE_GUID}")
print(f"üìÅ Folder: {LAKEHOUSE_FOLDER or '(root directory)'}")

## 2Ô∏è‚É£ Authorization Configuration

### Step 1: Enable Managed Identity in Azure AI Search

1. Open Azure Portal ‚Üí Your Azure AI Search service
2. Left menu ‚Üí **Identity**
3. **System assigned** ‚Üí Enable (Status: On)
4. Copy the **Object ID**

### Step 2: Add Permission in Fabric Workspace

1. Open Microsoft Fabric ‚Üí Your Workspace
2. Click **Manage access**
3. Add member ‚Üí Search for your **Azure AI Search service name**
4. Select role: **Contributor**
5. Click **Add**

> **Note**: You only need to be a Workspace admin, no Global Admin required!

In [None]:
# Check Azure AI Search's Managed Identity
# You can view it in Azure Portal or run the following command

# Extract service name from Azure AI Search endpoint
search_service_name = search_endpoint.replace("https://", "").replace(".search.windows.net/", "")
print(f"üîç Azure AI Search service name: {search_service_name}")
print(f"\nüìã Make sure to add '{search_service_name}' as Contributor in Fabric Workspace")

## 3Ô∏è‚É£ Create OneLake Data Source

OneLake Data Source uses Managed Identity to connect to Fabric Lakehouse.

In [None]:
# Data Source name
datasource_name = "onelake-lakehouse-ds"

# Data Source definition
datasource_payload = {
    "name": datasource_name,
    "type": "onelake",  # OneLake type
    "credentials": {
        # Use Fabric Workspace GUID as ResourceId
        "connectionString": f"ResourceId={FABRIC_WORKSPACE_GUID}"
    },
    "container": {
        "name": LAKEHOUSE_GUID,  # Lakehouse GUID
        "query": LAKEHOUSE_FOLDER  # Optional subfolder
    }
}

# Create Data Source
headers = {
    "Content-Type": "application/json",
    "api-key": search_api_key
}

response = requests.put(
    f"{search_endpoint}/datasources/{datasource_name}?api-version=2024-07-01",
    headers=headers,
    json=datasource_payload
)

if response.status_code in [200, 201]:
    print(f"‚úÖ Data Source '{datasource_name}' created successfully!")
else:
    print(f"‚ùå Creation failed: {response.status_code}")
    print(response.text)

## 4Ô∏è‚É£ Create Search Index

Define the index structure for storing OneLake document content.

In [None]:
# Index name
index_name = "onelake-lakehouse-index"

# Index definition
index_payload = {
    "name": index_name,
    "fields": [
        {"name": "id", "type": "Edm.String", "key": True, "searchable": False},
        {"name": "content", "type": "Edm.String", "searchable": True, "analyzer": "standard.lucene"},
        {"name": "metadata_storage_name", "type": "Edm.String", "searchable": True, "filterable": True, "sortable": True},
        {"name": "metadata_storage_path", "type": "Edm.String", "searchable": False, "filterable": True},
        {"name": "metadata_storage_size", "type": "Edm.Int64", "searchable": False, "filterable": True, "sortable": True},
        {"name": "metadata_storage_content_type", "type": "Edm.String", "searchable": False, "filterable": True},
        {"name": "metadata_storage_last_modified", "type": "Edm.DateTimeOffset", "searchable": False, "filterable": True, "sortable": True}
    ]
}

# Create Index
response = requests.put(
    f"{search_endpoint}/indexes/{index_name}?api-version=2024-07-01",
    headers=headers,
    json=index_payload
)

if response.status_code in [200, 201]:
    print(f"‚úÖ Index '{index_name}' created successfully!")
else:
    print(f"‚ùå Creation failed: {response.status_code}")
    print(response.text)

## 5Ô∏è‚É£ Create Indexer

The Indexer will periodically fetch documents from OneLake and index them.

In [None]:
# Indexer name
indexer_name = "onelake-lakehouse-indexer"

# Indexer definition
indexer_payload = {
    "name": indexer_name,
    "dataSourceName": datasource_name,
    "targetIndexName": index_name,
    "parameters": {
        "configuration": {
            "indexedFileNameExtensions": ".pdf,.docx,.pptx,.xlsx,.txt,.md,.json",
            "excludedFileNameExtensions": ".png,.jpg,.jpeg,.gif,.bmp,.parquet",
            "dataToExtract": "contentAndMetadata",
            "parsingMode": "default"
        }
    },
    # üîÑ Auto-update Schedule - runs every hour
    "schedule": {
        "interval": "PT1H"  # Every hour
    }
}

# Create Indexer
response = requests.put(
    f"{search_endpoint}/indexers/{indexer_name}?api-version=2024-07-01",
    headers=headers,
    json=indexer_payload
)

if response.status_code in [200, 201]:
    print(f"‚úÖ Indexer '{indexer_name}' created successfully!")
    print("\nüìÖ Schedule: runs automatically every hour")
    print("üîÑ Indexer will start running automatically...")
else:
    print(f"‚ùå Creation failed: {response.status_code}")
    print(response.text)

## 6Ô∏è‚É£ Check Indexer Status

In [None]:
import time

# Wait a few seconds for Indexer to start running
print("‚è≥ Waiting for Indexer to run...")
time.sleep(5)

# Get Indexer status
response = requests.get(
    f"{search_endpoint}/indexers/{indexer_name}/status?api-version=2024-07-01",
    headers=headers
)

if response.status_code == 200:
    status = response.json()
    last_result = status.get("lastResult", {})
    print(f"üìä Indexer status:")
    print(f"   Status: {status.get('status', 'N/A')}")
    print(f"   Last run status: {last_result.get('status', 'N/A')}")
    print(f"   Documents indexed: {last_result.get('itemsProcessed', 0)}")
    print(f"   Documents failed: {last_result.get('itemsFailed', 0)}")
    
    if last_result.get('errors'):
        print(f"\n‚ö†Ô∏è Error messages:")
        for err in last_result['errors'][:3]:
            print(f"   - {err.get('message', 'Unknown error')}")
else:
    print(f"‚ùå Failed to get status: {response.status_code}")
    print(response.text)

## 7Ô∏è‚É£ Manually Run Indexer (If Needed)

In [None]:
# Manually trigger Indexer run
response = requests.post(
    f"{search_endpoint}/indexers/{indexer_name}/run?api-version=2024-07-01",
    headers=headers
)

if response.status_code == 202:
    print(f"‚úÖ Indexer '{indexer_name}' triggered!")
    print("\n‚è≥ Please wait a few minutes and then check status...")
else:
    print(f"Status: {response.status_code}")
    print(response.text)

## 8Ô∏è‚É£ Query Index Content

In [None]:
# Search documents in the index
search_query = {
    "search": "*",
    "top": 10,
    "select": "id,metadata_storage_name,metadata_storage_path,metadata_storage_content_type,metadata_storage_size"
}

response = requests.post(
    f"{search_endpoint}/indexes/{index_name}/docs/search?api-version=2024-07-01",
    headers=headers,
    json=search_query
)

if response.status_code == 200:
    results = response.json()
    docs = results.get("value", [])
    print(f"üìÑ Documents in index: {len(docs)}")
    print("-" * 50)
    for doc in docs:
        size_kb = doc.get('metadata_storage_size', 0) / 1024
        print(f"  üìÅ {doc.get('metadata_storage_name', 'N/A')}")
        print(f"     Type: {doc.get('metadata_storage_content_type', 'N/A')}")
        print(f"     Size: {size_kb:.1f} KB")
        print()
else:
    print(f"‚ùå Search failed: {response.status_code}")
    print(response.text)

## 9Ô∏è‚É£ Create Index Knowledge Source

Integrate the OneLake Index into Agentic Retrieval's Knowledge Source.

In [None]:
from azure.identity import DefaultAzureCredential
from azure.search.documents.indexes import SearchIndexClient
from azure.core.credentials import AzureKeyCredential
from azure.search.documents.indexes.models import (
    IndexKnowledgeSource,
    IndexKnowledgeSourceParameters
)

# Create Index Client
index_client = SearchIndexClient(
    endpoint=search_endpoint, 
    credential=AzureKeyCredential(search_api_key)
)

# Knowledge Source name
ks_name = "onelake-lakehouse-ks"

# Create Index Knowledge Source
onelake_ks = IndexKnowledgeSource(
    name=ks_name,
    description="OneLake Lakehouse document index - pre-indexed via Indexer",
    index_parameters=IndexKnowledgeSourceParameters(
        index_name=index_name,
        content_fields=["content"],
        title_field="metadata_storage_name"
    )
)

# Create Knowledge Source
index_client.create_or_update_knowledge_source(knowledge_source=onelake_ks)
print(f"‚úÖ Knowledge Source '{ks_name}' created successfully!")

## üîü Create Knowledge Base

In [None]:
from azure.search.documents.indexes.models import (
    KnowledgeBase,
    KnowledgeSourceReference,
    KnowledgeBaseAzureOpenAIModel,
    AzureOpenAIVectorizerParameters,
    KnowledgeRetrievalOutputMode,
    KnowledgeRetrievalLowReasoningEffort
)

# Knowledge Base name
kb_name = "onelake-lakehouse-kb"

# Azure OpenAI parameters
aoai_params = AzureOpenAIVectorizerParameters(
    resource_url=azure_openai_endpoint,
    deployment_name=gpt_deployment,
    model_name=gpt_deployment
)

# Create Knowledge Base
onelake_kb = KnowledgeBase(
    name=kb_name,
    description="OneLake Lakehouse Knowledge Base - based on pre-indexed documents",
    
    # Reference Index Knowledge Source
    knowledge_sources=[
        KnowledgeSourceReference(name=ks_name)
    ],
    
    # Retrieval instructions
    retrieval_instructions="""Use this knowledge source to answer questions about documents in the Lakehouse.
    Includes PDF, Word, PowerPoint, Excel and other document content.
    """,
    
    # Answer instructions
    answer_instructions="Provide accurate answers based on indexed Lakehouse document content. Cite relevant document sources.",
    
    # Output mode
    output_mode=KnowledgeRetrievalOutputMode.ANSWER_SYNTHESIS,
    
    # LLM model configuration
    models=[
        KnowledgeBaseAzureOpenAIModel(azure_open_ai_parameters=aoai_params)
    ],
    
    # Reasoning Effort
    retrieval_reasoning_effort=KnowledgeRetrievalLowReasoningEffort()
)

# Create Knowledge Base
index_client.create_or_update_knowledge_base(knowledge_base=onelake_kb)
print(f"‚úÖ Knowledge Base '{kb_name}' created successfully!")

## ‚ì´ Query OneLake Knowledge Base

In [None]:
from azure.identity import DefaultAzureCredential
from azure.search.documents.knowledgebases import KnowledgeBaseRetrievalClient
from azure.search.documents.knowledgebases.models import (
    KnowledgeBaseRetrievalRequest,
    KnowledgeBaseMessage,
    KnowledgeBaseMessageTextContent
)

# Create Knowledge Base retrieval client
credential = DefaultAzureCredential()
kb_client = KnowledgeBaseRetrievalClient(
    endpoint=search_endpoint,
    knowledge_base_name=kb_name,
    credential=credential
)

# Define query question
question = "What are the main topics covered in these documents?"  # You can modify the question

# Create retrieval request
request = KnowledgeBaseRetrievalRequest(
    include_activity=True,
    messages=[
        KnowledgeBaseMessage(
            role="user",
            content=[KnowledgeBaseMessageTextContent(text=question)]
        )
    ]
)

print(f"üîç Querying: {question}")
print("=" * 60)

# Execute query
result = kb_client.retrieve(retrieval_request=request)

# Display answer
print("\nüìù Answer:")
print("-" * 40)
for resp in result.response:
    for content in resp.content:
        print(content.text)
print("-" * 40)

# Display references
if result.references:
    print("\nüìö Reference sources:")
    for i, ref in enumerate(result.references, 1):
        ref_dict = ref.as_dict()
        print(f"  [{i}] {ref_dict.get('title', ref_dict.get('metadata_storage_name', 'N/A'))}")

---

## üìå OneLake Shortcuts Support

OneLake supports creating **Shortcuts** from external data sources, and the Indexer can directly index these Shortcuts:

| Shortcut Type | Support Status | Deletion Detection |
|--------------|----------------|-------------------|
| ADLS Gen2 | ‚úÖ | ‚úÖ |
| Another OneLake | ‚úÖ | ‚úÖ |
| Amazon S3 | ‚úÖ | ‚ùå |
| Google Cloud Storage | ‚úÖ | ‚ùå |

Just create Shortcuts in the Lakehouse, and the Indexer will automatically index the content!

## üßπ Cleanup Resources (Optional)

In [None]:
# To delete resources, uncomment the lines below

# Delete Knowledge Base
# index_client.delete_knowledge_base(kb_name)
# print(f"‚úÖ Knowledge Base '{kb_name}' deleted")

# Delete Knowledge Source
# index_client.delete_knowledge_source(ks_name)
# print(f"‚úÖ Knowledge Source '{ks_name}' deleted")

# Delete Indexer
# requests.delete(f"{search_endpoint}/indexers/{indexer_name}?api-version=2024-07-01", headers=headers)
# print(f"‚úÖ Indexer '{indexer_name}' deleted")

# Delete Index
# requests.delete(f"{search_endpoint}/indexes/{index_name}?api-version=2024-07-01", headers=headers)
# print(f"‚úÖ Index '{index_name}' deleted")

# Delete Data Source
# requests.delete(f"{search_endpoint}/datasources/{datasource_name}?api-version=2024-07-01", headers=headers)
# print(f"‚úÖ Data Source '{datasource_name}' deleted")

print("üí° Tip: To delete resources, uncomment the code above and run")