In [None]:
from common import utils

DBX_CTX = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
API_URL = DBX_CTX.apiUrl().getOrElse(None)
API_TOKEN = DBX_CTX.apiToken().getOrElse(None)

CATALOG_NAME = utils.config_value("catalog_name")
SCHEMA_NAME = utils.config_value("schema_name")
KNOWLEDGE_ASSISTANT_NAME = utils.snake_case(f"{CATALOG_NAME}_{SCHEMA_NAME}_knowledge_assistant")
KNOWLEDGE_ASSISTANT_SYNC_INTERVAL_MS = int(utils.config_value("knowledge_assistant_sync_interval_ms"))

In [None]:
import json
import requests
from datetime import datetime, timedelta, timezone
from common import vector_search_config

headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {API_TOKEN}",
}


def get_tile():
    url = f"{API_URL}/api/2.0/tiles"
    params = {
        "filter": "name_contains=reggie_pierce_document_hub_dev_knowledge_assistant"
    }
    response = requests.get(url, headers=headers, params=params)
    response.raise_for_status()
    tiles = response.json().get("tiles", [])
    if len(tiles) == 1:
        matched_tile = tiles[0]
        utils.logger().info(f"Knowledge-assistant '{KNOWLEDGE_ASSISTANT_NAME}' found:\n%s", json.dumps(matched_tile))
        return matched_tile
    elif len(tiles) > 1:
        raise ValueError(f"Multiple knowledge-assistants found - {KNOWLEDGE_ASSISTANT_NAME}")
    return None


def get_knowledge_assistant(id: str):
    url = f"{API_URL}/api/2.0/knowledge-assistants/{id}"
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.json()


def sync_knowledge_assistant(id: str):
    url = f"{API_URL}/api/2.0/knowledge-assistants/{id}/sync-knowledge-sources"
    response = requests.post(url, headers=headers)
    response.raise_for_status()
    return response.json()


def create_knowledge_assistant():
    url = f"{API_URL}/api/2.0/knowledge-assistants"
    payload = {
        "name": KNOWLEDGE_ASSISTANT_NAME,
        "description": utils.config_value("knowledge_assistant_description"),
        "instructions": utils.config_value("knowledge_assistant_instructions", ""),
        "knowledge_sources": [
            {
                "files_source": {
                    "name": "files",
                    "type": "files",
                    "description": utils.config_value("file_source_description"),
                    "files": {
                        "path": vector_search_config.VOLUME_PATH
                    }
                }
            },
            {
                "index_source": {
                    "name": "vector_search_index",
                    "type": "index",
                    "description": utils.config_value("vector_search_index_description"),
                    "index": {
                        "name": vector_search_config.INDEX_NAME,
                        "doc_uri_col": "path",
                        "text_col": "text"
                    }
                }
            }
        ]
    }
    utils.logger().info("knowledge-assistant create request body:\n%s", json.dumps(payload))
    response = requests.post(url, headers=headers, json=payload)
    body_text = response.text
    utils.logger().info("knowledge-assistant create response body:\n%s", body_text)
    response.raise_for_status()


tile = get_tile()
if tile:
    tile_id = tile.get("tile_id")
    knowledge_assistant = get_knowledge_assistant(tile_id)
    knowledge_sources = knowledge_assistant.get("knowledge_assistant", {}).get("knowledge_sources", []) or []
    latest_last_successful_update_timestamp_ms = None
    for knowledge_source in knowledge_sources:
        file_source = knowledge_source.get("files_source", {})
        if not file_source:
            continue
        state = knowledge_source.get("state", "")
        if "KNOWLEDGE_SOURCE_STATE_UPDATED" == state:
            timestamp = knowledge_source["last_successful_update_timestamp_ms"]
            if timestamp:
                if (latest_last_successful_update_timestamp_ms is None) or (
                        timestamp > latest_last_successful_update_timestamp_ms):
                    latest_last_successful_update_timestamp_ms = timestamp
                continue
        latest_last_successful_update_timestamp_ms = None
        break
    if latest_last_successful_update_timestamp_ms is not None:
        last_update = datetime.fromtimestamp(latest_last_successful_update_timestamp_ms / 1000, tz=timezone.utc)
        cutoff = datetime.now(timezone.utc) - timedelta(milliseconds=KNOWLEDGE_ASSISTANT_SYNC_INTERVAL_MS)
        if last_update >= cutoff:
            utils.logger().info("Skipping knowledge assistant sync - last_update:%s", last_update.isoformat())
        else:
            utils.logger().info("Syncing knowledge assistant - last_update:%s", last_update.isoformat())
            sync_knowledge_assistant(tile_id)
    else:
        utils.logger().info("Latest sync not found")
else:
    create_knowledge_assistant()
