# 02a — Preview layout + embedding skills (Document Intelligence + Azure OpenAI)

This notebook uses preview skills in Azure AI Search (2024-11-01-preview) to:
- Extract markdown structure using DocumentIntelligenceLayoutSkill
- Split into overlapping chunks with SplitSkill
- Generate embeddings with AzureOpenAIEmbeddingSkill
- Map outputs to your existing index fields via indexer output mappings (no index projections)

Prereqs:
- Ensure your Search service/region supports API version 2024-11-01-preview
- Run 01-create-index.ipynb first so the index exists and vectorizer is configured in schema.json
- Ensure your .env has: SEARCH_*, OPENAI_*, BLOB_*, DOC_INTELLIGENCE_APIM_KEY


In [None]:
# Imports
import os
import json
import time
import requests
from dotenv import load_dotenv


In [None]:
# Load configuration from .env
load_dotenv()

config = {
    'search_service_name': os.getenv('SEARCH_SERVICE_NAME'),
    'search_admin_key': os.getenv('SEARCH_ADMIN_KEY'),
    'search_index_name': os.getenv('SEARCH_INDEX_NAME'),
    'search_api_version': '2024-11-01-preview',  # forced preview for DI + Embedding skills,
    'blob_service_name': os.getenv('BLOB_SERVICE_NAME'),
    'blob_container': os.getenv('BLOB_CONTAINER'),
    'blob_key': os.getenv('BLOB_KEY'),
    'openai_api_base': os.getenv('OPENAI_API_BASE'),
    'openai_api_key': os.getenv('OPENAI_API_KEY'),
    'openai_api_version': os.getenv('OPENAI_API_VERSION', '2024-02-01'),
    'openai_embedding_model': os.getenv('OPENAI_EMBEDDING_MODEL'),  # deployment name
    'openai_embedding_model_name': os.getenv('OPENAI_EMBEDDING_MODEL_NAME'),  # optional model name
    'doc_intelligence_apim_key': os.getenv('DOC_INTELLIGENCE_APIM_KEY'),
    'cognitive_services_key': os.getenv('COGNITIVE_SERVICES_KEY'),
    'doc_intelligence_endpoint': os.getenv('DOC_INTELLIGENCE_ENDPOINT'),
}

for k, v in config.items():
    print(f'{k}: {v}')

search_service_name = config['search_service_name']
search_service_url = f'https://{search_service_name}.search.windows.net'
search_admin_key = config['search_admin_key']
index_name = config['search_index_name']
# Force the preview API version regardless of env to avoid 400s
api_version = '2024-11-01-preview'
print('Using Search API version:', api_version)

blob_service_name = config['blob_service_name']
blob_container = config['blob_container']
blob_key = config['blob_key']
blob_connection_string = 'DefaultEndpointsProtocol=https;AccountName=' + blob_service_name + ';AccountKey=' + blob_key + ';EndpointSuffix=core.windows.net'

azure_openai_base = config['openai_api_base']
azure_openai_key = config['openai_api_key']
azure_openai_api_version = config['openai_api_version']
azure_openai_embeddings_deployment = config['openai_embedding_model']
azure_openai_embeddings_model_name = config['openai_embedding_model_name'] or config['openai_embedding_model']

cogsvc_key = config['cognitive_services_key']

# Using Cognitive Services multi-service key for skillset

headers = {
    'Content-Type': 'application/json',
    'api-key': search_admin_key
}


In [None]:
# Helpers for Search REST calls
def _url(path):
    if '?' in path:
        return f'{search_service_url}{path}&api-version={api_version}'
    return f'{search_service_url}{path}?api-version={api_version}'

def search_get(path):
    return requests.get(_url(path), headers=headers)

def search_put(path, payload):
    return requests.put(_url(path), headers=headers, data=json.dumps(payload))

def search_post(path, payload):
    return requests.post(_url(path), headers=headers, data=json.dumps(payload))

def search_delete(path):
    return requests.delete(_url(path), headers=headers)


In [None]:
# Create or update the Blob data source (raw documents)
data_source_name = 'blob-raw-docs'
data_source = {
  'name': data_source_name,
  'type': 'azureblob',
  'credentials': { 'connectionString': blob_connection_string },
  'container': { 'name': blob_container }
}
resp = search_put(f'/datasources/{data_source_name}', data_source)
print('datasource:', resp.status_code, resp.text[:300])


In [None]:
# Create or update the preview skillset (DI Layout + Split + AOAI Embedding)
skillset_name = 'content-processing-skillset-preview'
skills = [
  {
    '@odata.type': '#Microsoft.Skills.Util.DocumentIntelligenceLayoutSkill',
    'name': 'di-layout',
    'context': '/document',
    'outputMode': 'oneToMany',
    'markdownHeaderDepth': 'h3',
    'inputs': [ { 'name': 'file_data', 'source': '/document/file_data' } ],
    'outputs': [ { 'name': 'markdown_document', 'targetName': 'markdownDocument' } ]
  },
  {
    '@odata.type': '#Microsoft.Skills.Text.SplitSkill',
    'name': 'markdown-split',
    'description': 'Split markdown content into overlapping chunks',
    'context': '/document/markdownDocument/*',
    'defaultLanguageCode': 'en',
    'textSplitMode': 'pages',
    'maximumPageLength': 700,
    'pageOverlapLength': 100,
    'inputs': [ { 'name': 'text', 'source': '/document/markdownDocument/*/content' } ],
    'outputs': [ { 'name': 'textItems', 'targetName': 'pages' } ]
  },
  {
    '@odata.type': '#Microsoft.Skills.Text.AzureOpenAIEmbeddingSkill',
    'name': 'embed-pages',
    'context': '/document/markdownDocument/*/pages/*',
    'inputs': [ { 'name': 'text', 'source': '/document/markdownDocument/*/pages/*' } ],
    'outputs': [ { 'name': 'embedding', 'targetName': 'text_vector' } ],
    'resourceUri': azure_openai_base,
    'deploymentId': azure_openai_embeddings_deployment,
    'apiKey': azure_openai_key,
    'modelName': azure_openai_embeddings_model_name,
  },
  {
    '@odata.type': '#Microsoft.Skills.Util.ShaperSkill',
    'name': 'shape-title',
    'context': '/document',
    'inputs': [ { 'name': 'name', 'source': '/document/metadata_storage_name' } ],
    'outputs': [ { 'name': 'output', 'targetName': 'title' } ]
  }
]

skillset = {
  'name': skillset_name,
  'description': 'Preview skillset: DI Layout + Split + Azure OpenAI Embedding',
  'skills': skills,
  'cognitiveServices': { '@odata.type': '#Microsoft.Azure.Search.CognitiveServicesByKey', 'key': cogsvc_key }
}

resp = search_put(f'/skillsets/{skillset_name}', skillset)
# print('skillset:', resp.status_code, resp.text[:30,
#   {
#     '@odata.type': '#Microsoft.Skills.Util.ShaperSkill',
#     'name': 'shape-parent',
#     'context': '/document',
#     'inputs': [ { 'name': 'path', 'source': '/document/metadata_storage_path' } ],
#     'outputs': [ { 'name': 'output', 'targetName': 'parent_id' } ]
#   }
# ]])
# ,

# Add index projections to write per-page results directly to the index
skillset['indexProjections'] = {
  'selectors': [
    {
      'targetIndexName': index_name,
      'parentKeyFieldName': 'parent_id',
      'sourceContext': '/document/markdownDocument/*/pages/*',
      'generatedKeyName': 'chunk_id',
      'mappings': [
        { 'name': 'chunk', 'source': '/document/markdownDocument/*/pages/*' },
        { 'name': 'vector', 'source': '/document/markdownDocument/*/pages/*/text_vector' },
        { 'name': 'title', 'source': '/document/title' },
        { 'name': 'url', 'source': '/document/metadata_storage_path' },
        { 'name': 'file_name', 'source': '/document/metadata_storage_name' },
        { 'name': 'last_updated', 'source': '/document/metadata_storage_last_modified' }
      ]
    }
  ],
  'parameters': { 'projectionMode': 'skipIndexingParentDocuments' }
}


In [None]:
# Create or update the indexer that applies the skillset and writes to the index
indexer_name = 'blob-to-index-with-skillset-preview'
indexer = {
  'name': indexer_name,
  'dataSourceName': data_source_name,
  'targetIndexName': index_name,
  'skillsetName': skillset_name,
  'parameters': {
    'configuration': {
      'parsingMode': 'default',
      'dataToExtract': 'contentAndMetadata',
      'failOnUnsupportedContentType': False,
      'failOnUnprocessableDocument': False
    }
  },
}

_ = search_delete(f'/indexers/{indexer_name}')
resp = search_put(f'/indexers/{indexer_name}', indexer)
print('indexer:', resp.status_code, resp.text[:300])


In [None]:
# Run the indexer once and poll for completion
run_resp = search_post(f'/indexers/{indexer_name}/run', {})
print('run:', run_resp.status_code)

def get_indexer_status():
    r = search_get(f'/indexers/{indexer_name}/status')
    if r.status_code != 200:
        return None
    return r.json()

for i in range(60):
    status = get_indexer_status()
    if not status:
        print('Unable to get status')
        break
    last_result = status.get('lastResult')
    if last_result and last_result.get('status') in ('success', 'transientFailure', 'persistentFailure'):
        print(json.dumps(last_result, indent=2)[:1000])
        break
    time.sleep(5)
else:
    print('Timed out waiting for indexer to complete')
