# Celonis Direct Connection - Extract, Embed & Upload

Step-by-step guide for connecting to Celonis EMS, extracting data via PQL queries,
generating embeddings, and uploading to Azure AI Search.

**Prerequisites:**
- Celonis EMS account with API access
- `pycelonis` installed: `pip install --extra-index-url=https://pypi.celonis.cloud/ pycelonis`
- Azure credentials configured in `.env`
- Celonis credentials configured in `.env`

## 1. Setup & Imports

In [None]:
import sys
from pathlib import Path
import time
import warnings
from datetime import datetime, timezone
import pandas as pd

# Suppress SSL warnings (needed for IP-based endpoints)
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Add parent directory to path
sys.path.insert(0, str(Path().absolute().parent))

from utils import (
    config,
    connect_celonis,
    get_data_pool,
    get_data_model,
    list_data_pools,
    list_data_models,
    build_pql_query,
    extract_dataframe,
    format_datetime_column,
    init_embedding_tracking,
    save_checkpoint,
    load_checkpoint,
    get_embedding,
    upload_documents,
    print_embedding_summary,
    get_index_stats
)

print("\u2705 Setup complete")
print(f"   Index: {config.azure_search_index_name}")

## 2. Connect to Celonis

Uses credentials from `.env` (CELONIS_BASE_URL, CELONIS_API_TOKEN, CELONIS_KEY_TYPE).
You can also pass them directly to override.

In [None]:
config.validate_celonis()
celonis = connect_celonis()

## 3. Explore Data Pools & Models

Use these cells to discover what's available in your Celonis environment.

In [None]:
# List all data pools
pools = list_data_pools(celonis)

In [None]:
# Get your data pool (uses CELONIS_DATA_POOL_NAME or _ID from .env)
data_pool = get_data_pool(celonis)

# List models in this pool
models = list_data_models(data_pool)

In [None]:
# Get your data model (uses CELONIS_DATA_MODEL_NAME or _ID from .env)
data_model = get_data_model(data_pool)

## 4. Build PQL Query

Define the columns to extract. Each column needs:
- `name`: The output column name (should match your FIELD_MAPPING keys)
- `query`: The PQL expression to evaluate

Customize the table and column names for your data model.

In [None]:
# Define PQL columns - customize for your data model
PQL_COLUMNS = [
    {"name": "Id",                          "query": '"o_celonis_ContractItem"."ID"'},
    {"name": "SystemContractNumber",        "query": '"o_celonis_ContractItem"."SystemContractNumber"'},
    {"name": "SystemContractItemNumber",    "query": '"o_celonis_ContractItem"."SystemContractItemNumber"'},
    {"name": "ShortText",                   "query": '"o_celonis_ContractItem"."ShortText"'},
    {"name": "Name",                        "query": '"o_celonis_ContractItem"."Name"'},
    {"name": "NetUnitPrice",                "query": '"o_celonis_ContractItem"."NetUnitPrice"'},
    {"name": "Currency",                    "query": '"o_celonis_ContractItem"."Currency"'},
    {"name": "ValidityPeriodStartDate",     "query": '"o_celonis_ContractItem"."ValidityPeriodStartDate"'},
    {"name": "ValidityPeriodEndDate",       "query": '"o_celonis_ContractItem"."ValidityPeriodEndDate"'},
]

# Build the query (optional: add filters, ordering, limit)
query = build_pql_query(
    columns=PQL_COLUMNS,
    # filters=['FILTER "o_celonis_ContractItem"."Currency" = \'USD\''],
    # order_by=[{"query": '"o_celonis_ContractItem"."ID"', "direction": "ASC"}],
    # distinct=True,
    # limit=1000,
)

print(f"Query built with {len(PQL_COLUMNS)} columns")

## 5. Extract Data to DataFrame

Execute the PQL query and preview the results.

In [None]:
df = extract_dataframe(data_model, query)

print(f"\n\ud83d\udcca Dataset shape: {df.shape}")
print(f"\nColumns: {list(df.columns)}")
df.head()

## 6. Configure Field Mapping

Map extracted column names to Azure Search index fields.
Since PQL column `name` values match the CSV column names, the same mapping works.

In [None]:
FIELD_MAPPING = {
    "Id": "contract_item_id",
    "SystemContractNumber": "contract_number",
    "SystemContractItemNumber": "contract_item_number",
    "ShortText": "item_text",
    "Name": "vendor_name",
    "NetUnitPrice": "unit_price",
    "Currency": "currency",
    "ValidityPeriodStartDate": "contract_start",
    "ValidityPeriodEndDate": "contract_end"
}

TEXT_COLUMN_TO_EMBED = "ShortText"
DATETIME_COLUMNS = ["ValidityPeriodStartDate", "ValidityPeriodEndDate"]

# Verify all mapping keys exist in the DataFrame
missing = [k for k in FIELD_MAPPING if k not in df.columns]
if missing:
    print(f"\u26a0\ufe0f  Warning: columns missing from data: {missing}")
else:
    print("\u2705 All field mapping columns present in data")

## 7. Prepare Data

Format datetime columns and initialize embedding tracking.

In [None]:
print("\ud83d\udcc5 Formatting datetime columns...")
for col in DATETIME_COLUMNS:
    if col in df.columns:
        format_datetime_column(df, col)
        print(f"   \u2713 {col}")

df = init_embedding_tracking(df)
print("\n\u2705 Data prepared")

## 8. Test Embedding on One Row

In [None]:
test_row = df.iloc[0]
test_text = test_row[TEXT_COLUMN_TO_EMBED]

print(f"\ud83e\uddea Testing embedding on: '{test_text}'\n")

embedding = get_embedding(test_text)

if embedding:
    print(f"\u2705 Embedding generated successfully")
    print(f"   Dimensions: {len(embedding)}")
    print(f"   First 5 values: {embedding[:5]}")
else:
    print("\u274c Failed to generate embedding")

## 9. Process & Upload

First test with a small batch, then process all rows.

In [None]:
def process_rows(df, max_rows=None, checkpoint_every=250):
    """Process rows: embed and upload"""
    pending = df[df["embedded_status"] != "success"]

    if len(pending) == 0:
        print("\u2705 All rows already processed!")
        return

    if max_rows:
        pending = pending.head(max_rows)
        print(f"\u2699\ufe0f  Processing {max_rows} rows (test mode)\n")

    total = len(pending)
    success_count = 0
    fail_count = 0

    for i, (idx, row) in enumerate(pending.iterrows(), 1):
        doc_id = row.get("Id", f"row_{idx}")
        print(f"\u2192 [{i}/{total}] {doc_id}...", end=" ")

        text = row[TEXT_COLUMN_TO_EMBED]
        embedding = get_embedding(text)

        if embedding is None:
            print("\u274c Embedding failed")
            df.at[idx, "embedded_status"] = "failed"
            df.at[idx, "embedded_error"] = "Embedding generation failed"
            df.at[idx, "embedded_at"] = datetime.now(timezone.utc)
            fail_count += 1
            continue

        doc = {}
        for csv_col, index_field in FIELD_MAPPING.items():
            if csv_col in row.index:
                value = row[csv_col]
                if pd.isna(value):
                    value = None
                doc[index_field] = value
        doc["embedding"] = embedding

        try:
            success = upload_documents([doc])
            if success:
                print("\u2705")
                df.at[idx, "embedded_status"] = "success"
                df.at[idx, "embedded_error"] = ""
                df.at[idx, "embedded_at"] = datetime.now(timezone.utc)
                success_count += 1
            else:
                print("\u274c Upload failed")
                df.at[idx, "embedded_status"] = "failed"
                df.at[idx, "embedded_error"] = "Upload failed"
                df.at[idx, "embedded_at"] = datetime.now(timezone.utc)
                fail_count += 1
        except Exception as e:
            print(f"\u274c {str(e)[:50]}")
            df.at[idx, "embedded_status"] = "failed"
            df.at[idx, "embedded_error"] = str(e)[:2000]
            df.at[idx, "embedded_at"] = datetime.now(timezone.utc)
            fail_count += 1

        processed = success_count + fail_count
        if processed % checkpoint_every == 0:
            save_checkpoint(df, config.checkpoint_file_path)

        time.sleep(config.sleep_between_requests)

    save_checkpoint(df, config.checkpoint_file_path)
    print(f"\n\u2705 Batch complete: {success_count} success, {fail_count} failed")


# Test with 5 rows first
process_rows(df, max_rows=5)

In [None]:
# Process all remaining rows
process_rows(df)

print("\n" + "=" * 60)
print_embedding_summary(df)
print("=" * 60)

## 10. Verify & Test Search

In [None]:
print("\ud83d\udcca Checking index statistics...\n")
get_index_stats()

In [None]:
from utils import text_search, vector_search, hybrid_search

QUERY = "CHAIR"

def print_results(results, title):
    print(f"\n{'=' * 80}")
    print(title)
    print('=' * 80)
    if not results or "value" not in results:
        print("No results found")
        return
    for i, doc in enumerate(results["value"], 1):
        print(f"\n{i}. {doc.get('item_text', 'N/A')}")
        print(f"   Score: {doc.get('@search.score', 'N/A')}")
        print(f"   Vendor: {doc.get('vendor_name', 'N/A')}")

print(f"\ud83d\udd0d Searching for: '{QUERY}'")

results = hybrid_search(QUERY, top_k=3, search_fields=["item_text", "vendor_name"])
print_results(results, "Hybrid Search (Text + Vector with RRF)")

## Appendix: PQL Reference

### Filters
```python
filters = [
    'FILTER "Table"."Status" = \'Active\'',
    'FILTER "Table"."Amount" > 1000',
    'FILTER "Table"."Date" >= \'2024-01-01\'',
]
```

### Ordering
```python
order_by = [
    {"query": '"Table"."Date"', "direction": "DESC"},
    {"query": '"Table"."Name"', "direction": "ASC"},
]
```

### Pagination / Limits
```python
query = build_pql_query(columns=PQL_COLUMNS, limit=5000)
```

### Distinct
```python
query = build_pql_query(columns=PQL_COLUMNS, distinct=True)
```

### Aggregations
```python
columns = [
    {"name": "vendor", "query": '"Table"."VendorName"'},
    {"name": "total", "query": 'SUM("Table"."Amount")'},
    {"name": "count", "query": 'COUNT("Table"."ID")'},
]
```