<a href="https://colab.research.google.com/github/someshwaranM/n8n_expense_tracking_tool/blob/main/som_expense_tool_testing_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### *End-to-End Workflow: n8n + Cloudflare Tunnel + AWS Bedrock + Elasticsearch + Telegram Bot*

This notebook sets up a complete, production-grade personal expense intelligence system using fully serverless and cloud-native tools ‚Äî all running interactively inside Google Colab.

You will:

* **Deploy n8n** inside Colab and expose it securely via a **Cloudflare Tunnel**
* **Configure API credentials** for:

  * Telegram Bot API
  * AWS Bedrock (for LLM classification + embeddings)
  * Elasticsearch / Elastic Cloud
  * MCP Client (for structured ES queries)
  * Sarvam STT (for voice transcription)
* **Install a fully automated Telegram expense bot workflow**, including:

  * Voice ‚Üí text transcription via Sarvam
  * Intent classification via AWS Bedrock
  * Expense extraction via LLM
  * Query agent for Elasticsearch analytics via MCP
  * Write and read expenses from Elasticsearch in real time
* **Create Elasticsearch inference endpoint** (`bedrock-embeddings`) for semantic vectors
* **Define the semantic index mapping** for `daily_personal_expenses`
* **Bulk ingest synthetic expense data**, with:

  * automatic timestamp normalization
  * numeric cleanup
  * batching
  * throttling/backoff handling for Bedrock inference
* **Automatically install the entire n8n workflow** via the REST API using placeholders and templating
* **Validate everything end-to-end**, enabling you to send Telegram messages and instantly receive:

  * Expense ingestion acknowledgements
  * Real-time analytics
  * Natural-language answers backed by LLMs + Elasticsearch

**Outcome:**
By the end, you will have a fully functional, AI-powered expense intelligence assistant running through Telegram, backed by n8n, AWS Bedrock, Elasticsearch, and semantic search ‚Äî all orchestrated automatically from Colab.


## 1. Install Node.js, n8n, and helpers

This cell installs the required system dependencies for running n8n inside the Colab VM:

- Node.js and npm (if not already available)
- n8n (workflow automation server)

Run this once per fresh runtime. It may take a couple of minutes.

In [None]:
!npm install -g n8n

[1G[0K‚†ô[1G[0K‚†π[1G[0K‚†∏[1G[0K‚†º[1G[0K‚†¥[1G[0K‚†¶[1G[0K‚†ß[1G[0K‚†á[1G[0K‚†è[1G[0K‚†ã[1G[0K‚†ô[1G[0K‚†π[1G[0K‚†∏[1G[0K‚†º[1G[0K‚†¥[1G[0K‚†¶[1G[0K‚†ß[1G[0K‚†á[1G[0K‚†è[1G[0K‚†ã[1G[0K‚†ô[1G[0K‚†π[1G[0K‚†∏[1G[0K‚†º[1G[0K‚†¥[1G[0K‚†¶[1G[0K‚†ß[1G[0K‚†á[1G[0K‚†è[1G[0K‚†ã[1G[0K‚†ô[1G[0K‚†π[1G[0K‚†∏[1G[0K‚†º[1G[0K‚†¥[1G[0K‚†¶[1G[0K‚†ß[1G[0K‚†á[1G[0K‚†è[1G[0K‚†ã[1G[0K‚†ô[1G[0K‚†π[1G[0K‚†∏[1G[0K‚†º[1G[0K‚†¥[1G[0K‚†¶[1G[0K‚†ß[1G[0K‚†á[1G[0K‚†è[1G[0K‚†ã[1G[0K‚†ô[1G[0K‚†π[1G[0K‚†∏[1G[0K‚†º[1G[0K‚†¥[1G[0K‚†¶[1G[0K‚†ß[1G[0K‚†á[1G[0K‚†è[1G[0K‚†ã[1G[0K‚†ô[1G[0K‚†π[1G[0K‚†∏[1G[0K‚†º[1G[0K‚†¥[1G[0K‚†¶[1G[0K‚†ß[1G[0K‚†á[1G[0K‚†è[1G[0K‚†ã[1G[0K‚†ô[1G[0K‚†π[1G[0K‚†∏[1G[0K‚†º[1G[0K‚†¥[1G[0K‚†¶[1G[0K‚†ß[1G[0K‚†á[1G[0K‚†è[1G[0K‚†ã[1G[0K‚†ô[1G[0K‚†π[1G[0K‚†∏[1G[0K‚†º[1G[0K‚†¥[1G[0K‚†¶[1G[0K‚†ß[1G[0K‚†á[1G[0K‚†è[1G[0K‚†ã[1G[0K‚†

Version Check

In [None]:
!n8n -v

1.121.3


In [None]:
import requests
import pandas as pd
import json
import time
import subprocess
import threading
import re
from datetime import datetime

## 2. Start the n8n server (in the background)

This cell starts the n8n server process, listening on port `5678` with basic auth disabled.
In Colab, it runs in the background so we can still execute further cells.

You should see logs showing that n8n has started successfully.


In [None]:
!N8N_BASIC_AUTH_ACTIVE=true \
  N8N_BASIC_AUTH_USER="admin" \
  N8N_BASIC_AUTH_PASSWORD="admin123" \
  N8N_HOST="0.0.0.0" \
  N8N_PORT=5678 \
  n8n start --port 5678 > n8n.log 2>&1 &


In [None]:
!tail -n 20 n8n.log


No encryption key found - Auto-generating and saving to: /root/.n8n/config
Permissions 0644 for n8n settings file /root/.n8n/config are too wide. This is ignored for now, but in the future n8n will attempt to change the permissions automatically. To automatically enforce correct permissions now set N8N_ENFORCE_SETTINGS_FILE_PERMISSIONS=true (recommended), or turn this check off set N8N_ENFORCE_SETTINGS_FILE_PERMISSIONS=false.
Initializing n8n process
n8n ready on ::, port 5678
Migrations in progress, please do NOT stop the process.
Starting migration InitialMigration1588102412422
Finished migration InitialMigration1588102412422
Starting migration WebhookModel1592445003908


## 3. Expose n8n via Cloudflare Tunnel

This cell:

1. Starts a `cloudflared` tunnel from the Colab VM to the public internet.
2. Parses the tunnel logs to extract the public `https://...trycloudflare.com` URL.
3. Prints the URL so we can:
   - Open the n8n UI in the browser.
   - Use it as the `N8N_URL` for API calls.

Copy the printed URL and update the config cell (`N8N_URL`) accordingly.


In [None]:
!wget -q https://github.com/cloudflare/cloudflared/releases/latest/download/cloudflared-linux-amd64.deb
!sudo dpkg -i cloudflared-linux-amd64.deb


Selecting previously unselected package cloudflared.
(Reading database ... 121713 files and directories currently installed.)
Preparing to unpack cloudflared-linux-amd64.deb ...
Unpacking cloudflared (2025.11.1) ...
Setting up cloudflared (2025.11.1) ...
Processing triggers for man-db (2.10.2-1) ...


In [None]:
# Start tunnel
tunnel = subprocess.Popen(
    ["cloudflared", "tunnel", "--url", "http://localhost:5678"],
    stdout=subprocess.PIPE,
    stderr=subprocess.STDOUT,
    text=True
)

# Parse logs to extract the public URL
public_url = None
for line in tunnel.stdout:
    print(line.strip())
    match = re.search(r"https://[-a-zA-Z0-9]+\.trycloudflare\.com", line)
    if match:
        public_url = match.group(0)
        print("\n Your public n8n URL:", public_url)
        break

# keep tunnel running


2025-11-27T08:13:26Z INF Thank you for trying Cloudflare Tunnel. Doing so, without a Cloudflare account, is a quick way to experiment and try it out. However, be aware that these account-less Tunnels have no uptime guarantee, are subject to the Cloudflare Online Services Terms of Use (https://www.cloudflare.com/website-terms/), and Cloudflare reserves the right to investigate your use of Tunnels for violations of such terms. If you intend to use Tunnels in production you should use a pre-created named tunnel by following: https://developers.cloudflare.com/cloudflare-one/connections/connect-apps
2025-11-27T08:13:26Z INF Requesting new quick Tunnel on trycloudflare.com...
2025-11-27T08:13:30Z INF +--------------------------------------------------------------------------------------------+
2025-11-27T08:13:30Z INF |  Your quick Tunnel has been created! Visit it at (it may take some time to be reachable):  |
2025-11-27T08:13:30Z INF |  https://seems-sys-singing-devon.trycloudflare.com    

Open the Your public n8n URL: https://something_url.trycloudflare.com

And, create the API Key via UI:
- Go to Settings > n8n API Key or https://something_url.trycloudflare.com/settings/api.
- Create the API Key

## 4. Configure the n8n Python API client

Here we configure:

- `N8N_URL` ‚Äì the Cloudflare public URL from the previous step.
- `API_KEY` ‚Äì the n8n API key created in the UI.
- Helper functions: `n8n_get`, `n8n_post`, `n8n_patch`, and `n8n_delete`.

These helpers let us create credentials and workflows programmatically from Python.

In [None]:

N8N_URL = "https://<update_your_n8n_public_url>.trycloudflare.com"  # your Cloudflare n8n URL, copy-paste from the above.
API_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJmMzIzOWZmZS1kOTg3LTQ3MzUtYjBiNC0zOTI4MmIzMWQ2YWQiLCJpc3MiOiJuOG4iLCJhdWQiOiJwdWJsaWMtYXBpIiwiaWF0IjoxNzY0MjMxMzI3LCJleHAiOjE3NjY4MTE2MDB9.VPiOt1Edd3W_uuarZ2nCuwW1g8sqO3DlGSWQPAiSuNA"
# NOTE: We'll move this into environment variables later.

In [None]:


HEADERS = {
    "accept": "application/json",
    "content-type": "application/json",
    "X-N8N-API-KEY": API_KEY,
}


# --------------------------
# Helper: GET
# --------------------------
def n8n_get(endpoint, params=None):
    url = f"{N8N_URL}{endpoint}"
    r = requests.get(url, headers=HEADERS, params=params)
    print("\n=== GET", url, "===")
    print("Status:", r.status_code)
    try:
        print(r.json())
    except:
        print(r.text)
    return r


# --------------------------
# Helper: POST
# --------------------------
def n8n_post(endpoint, data):
    url = f"{N8N_URL}{endpoint}"
    r = requests.post(url, headers=HEADERS, json=data)
    print("\n=== POST", url, "===")
    print("Status:", r.status_code)
    try:
        print("Body:", r.json())
    except:
        print("Body (raw):", r.text)
    return r


# --------------------------
# Helper: PATCH
# --------------------------
def n8n_patch(endpoint, data):
    url = f"{N8N_URL}{endpoint}"
    r = requests.patch(url, headers=HEADERS, json=data)
    print("\n=== PATCH", url, "===")
    print("Status:", r.status_code)
    try:
        print(r.json())
    except:
        print(r.text)
    return r


# --------------------------
# Helper: DELETE
# --------------------------
def n8n_delete(endpoint):
    url = f"{N8N_URL}{endpoint}"
    r = requests.delete(url, headers=HEADERS)
    print("\n=== DELETE", url, "===")
    print("Status:", r.status_code)
    try:
        print(r.json())
    except:
        print(r.text)
    return r


print("n8n API client ready.")


n8n API client ready.


In [None]:
n8n_get("/api/v1/users?limit=100&includeRole=true")



=== GET https://seems-sys-singing-devon.trycloudflare.com/api/v1/users?limit=100&includeRole=true ===
Status: 200
{'data': [{'id': 'f3239ffe-d987-4735-b0b4-39282b31d6ad', 'email': 'somesh.rokz@gmail.com', 'firstName': 'somesh', 'lastName': 'M', 'createdAt': '2025-11-27T08:13:13.322Z', 'updatedAt': '2025-11-27T08:14:40.000Z', 'isPending': False, 'role': 'global:owner'}], 'nextCursor': None}


<Response [200]>

In [None]:
n8n_get("/api/v1/workflows")



=== GET https://seems-sys-singing-devon.trycloudflare.com/api/v1/workflows ===
Status: 200
{'data': [], 'nextCursor': None}


<Response [200]>

In [None]:
sample_workflow = {
    "name": "API-created workflow",
    "nodes": [],
    "connections": {},
    "settings": {},
}

n8n_post("/api/v1/workflows", sample_workflow)



=== POST https://seems-sys-singing-devon.trycloudflare.com/api/v1/workflows ===
Status: 200
Body: {'name': 'API-created workflow', 'nodes': [], 'connections': {}, 'settings': {'callerPolicy': 'workflowsFromSameOwner', 'availableInMCP': False}, 'active': False, 'versionId': 'c5e26d5f-43a0-4c02-87b9-53a1901cf780', 'id': 'Cnqu9W2rDwiRvNMk', 'description': None, 'staticData': None, 'meta': None, 'pinData': None, 'updatedAt': '2025-11-27T08:15:51.581Z', 'createdAt': '2025-11-27T08:15:51.581Z', 'isArchived': False, 'versionCounter': 1, 'triggerCount': 0}


<Response [200]>

## 5. Fill in runtime configuration (URLs, keys, index names)

This cell defines **all configuration values** needed by the workflow:

- n8n public URL (for reference)
- Sarvam STT endpoint + API key
- Elasticsearch HTTP endpoint, index name, and API key
- AWS Bedrock region and credentials (wired into the AWS credential)
- Telegram bot token (wired into the Telegram credential)
- n8n credential IDs created in the previous step

These values are used to replace placeholders in the workflow JSON template.


In [None]:
SARVAM_ENDPOINT = "https://api.sarvam.ai/speech-to-text"
SARVAM_API_KEY = "sk_xxxxxxxxxxxxxxxxxxxxxxxxxx"
ES_MCP_URL = "https://<ES_HTTP_BASE>/api/agent_builder/mcp"
ES_HTTP_BASE = "https://xxxxxxxxxxxxxxxxxxxxx.es.us-central1.gcp.elastic.cloud"
ES_API_KEY = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxZw=="
ES_INDEX = "daily_personal_expenses_27_nov"
AWS_ACCESS_KEY = "AKXXXXXXXXXXXXXXXX7O24"
AWS_SECRET_KEY = "62xxxxxxxxxxxxxxxxxxxxxygP2ZBj4ICghq"
TELEGRAM_ACCESS_TOKEN = "815xxxxxxxxxxxxxxxxxYLcpaRt2Y0"

## 6. Create n8n credentials via API

This section creates three credential objects in n8n using the REST API:

1. **AWS (IAM) account** ‚Äì used by the AWS Bedrock LangChain nodes.
2. **HTTP Header Auth** ‚Äì used by the MCP client node to call Elasticsearch MCP.
3. **Telegram Bot** ‚Äì used by the Telegram trigger and send message nodes.

Each cell posts a JSON payload to `/api/v1/credentials` and prints the resulting credential ID.
These IDs are later injected into the workflow template.

In [None]:
telegram_credential = {
    "name": "Telegram account",
    "type": "telegramApi",
    "data": {
        "accessToken": TELEGRAM_ACCESS_TOKEN
    }
}

resp = n8n_post("/api/v1/credentials", telegram_credential)
resp_json = resp.json()
telegram_cred_id = resp_json.get("id") or resp_json.get("data", {}).get("id")
print("Telegram credential ID:", telegram_cred_id)



=== POST https://seems-sys-singing-devon.trycloudflare.com/api/v1/credentials ===
Status: 200
Body: {'name': 'Telegram account', 'type': 'telegramApi', 'id': 'xY8ZSZHyuA6DDB9T', 'updatedAt': '2025-11-27T08:16:03.853Z', 'createdAt': '2025-11-27T08:16:03.853Z', 'isManaged': False}
Telegram credential ID: xY8ZSZHyuA6DDB9T


Step 1C ‚Äî Create HTTP Header Auth credential for MCP client

In [None]:
schema_resp = requests.get(
    f"{N8N_URL}/api/v1/credentials/schema/aws",
    headers=HEADERS,
)
print("Schema status:", schema_resp.status_code)
print(schema_resp.json())


Schema status: 200
{'additionalProperties': False, 'type': 'object', 'properties': {'region': {'type': 'string', 'enum': ['af-south-1', 'ap-east-1', 'ap-south-1', 'ap-south-2', 'ap-southeast-1', 'ap-southeast-2', 'ap-southeast-3', 'ap-southeast-4', 'ap-southeast-5', 'ap-southeast-7', 'ap-northeast-1', 'ap-northeast-2', 'ap-northeast-3', 'ca-central-1', 'ca-west-1', 'cn-north-1', 'cn-northwest-1', 'eu-central-1', 'eu-central-2', 'eu-north-1', 'eu-south-1', 'eu-south-2', 'eu-west-1', 'eu-west-2', 'eu-west-3', 'il-central-1', 'me-central-1', 'me-south-1', 'mx-central-1', 'sa-east-1', 'us-east-1', 'us-east-2', 'us-gov-east-1', 'us-west-1', 'us-west-2', 'us-gov-west-1']}, 'accessKeyId': {'type': 'string'}, 'secretAccessKey': {'type': 'string'}, 'temporaryCredentials': {'type': 'boolean'}, 'sessionToken': {'type': 'string'}, 'customEndpoints': {'type': 'boolean'}, 'rekognitionEndpoint': {'type': 'string'}, 'lambdaEndpoint': {'type': 'string'}, 'snsEndpoint': {'type': 'string'}, 'sesEndpoint'

In [None]:
aws_credential = {
    "name": "AWS (IAM) account",
    "type": "aws",
    "data": {
        "accessKeyId": AWS_ACCESS_KEY,
        "secretAccessKey": AWS_SECRET_KEY,
        "region": "us-west-2",  # or your real region

        # required by n8n aws credential schema:
        "sessionToken": "",
        "rekognitionEndpoint": "",
        "lambdaEndpoint": "",
        "snsEndpoint": "",
        "sesEndpoint": "",
        "sqsEndpoint": "",
        "s3Endpoint": "",
        "ssmEndpoint": ""
    }
}

resp = n8n_post("/api/v1/credentials", aws_credential)
resp_json = resp.json()
aws_cred_id = resp_json.get("id") or resp_json.get("data", {}).get("id")
print("AWS credential ID:", aws_cred_id)



=== POST https://seems-sys-singing-devon.trycloudflare.com/api/v1/credentials ===
Status: 200
Body: {'name': 'AWS (IAM) account', 'type': 'aws', 'id': 'j1q5AoHGL1JEHhNH', 'updatedAt': '2025-11-27T08:16:09.338Z', 'createdAt': '2025-11-27T08:16:09.338Z', 'isManaged': False}
AWS credential ID: j1q5AoHGL1JEHhNH


In [None]:
mcp_credential = {
    "name": "Header Auth account",
    "type": "httpHeaderAuth",
    "data": {
        "name": "Authorization",
        "value": f"ApiKey {ES_API_KEY}"
    }
}

resp = n8n_post("/api/v1/credentials", mcp_credential)
resp_json = resp.json()
mcp_cred_id = resp_json.get("id") or resp_json.get("data", {}).get("id")
print("MCP Header Auth credential ID:", mcp_cred_id)



=== POST https://seems-sys-singing-devon.trycloudflare.com/api/v1/credentials ===
Status: 200
Body: {'name': 'Header Auth account', 'type': 'httpHeaderAuth', 'id': 'NrLgpwILkhV2Zjl5', 'updatedAt': '2025-11-27T08:16:11.937Z', 'createdAt': '2025-11-27T08:16:11.937Z', 'isManaged': False}
MCP Header Auth credential ID: NrLgpwILkhV2Zjl5


In [None]:
print("MCP Header Auth credential ID:", mcp_cred_id)
print("AWS credential ID:", aws_cred_id)
print("Telegram credential ID:", telegram_cred_id)

MCP Header Auth credential ID: NrLgpwILkhV2Zjl5
AWS credential ID: j1q5AoHGL1JEHhNH
Telegram credential ID: xY8ZSZHyuA6DDB9T


## 7. Workflow JSON template (Telegram ‚Üí LLM ‚Üí Elasticsearch)

This cell defines the full n8n workflow as a **JSON template** with placeholders:

- `{{TELEGRAM_CRED_ID}}` ‚Äì Telegram credential ID
- `{{AWS_CRED_ID}}` ‚Äì AWS credential ID
- `{{MCP_CRED_ID}}` ‚Äì MCP header-auth credential ID
- `{{SARVAM_API_KEY}}` ‚Äì Sarvam STT key
- `{{ES_HTTP_BASE}}`, `{{ES_INDEX}}`, `{{ES_API_KEY}}` ‚Äì Elasticsearch settings

The workflow implements:

- Telegram trigger (voice or text).
- Optional Sarvam STT to transcribe voice.
- Intent classifier via AWS Bedrock.
- Split into **ingestion** vs **query** paths.
- Ingestion path ‚Üí LLM extraction ‚Üí index into Elasticsearch.
- Query path ‚Üí MCP client to query ES ‚Üí formatted response back to Telegram.


In [None]:
raw_workflow_json = r"""
{
  "nodes": [
    {
      "parameters": {
        "updates": [
          "message"
        ],
        "additionalFields": {}
      },
      "type": "n8n-nodes-base.telegramTrigger",
      "typeVersion": 1.2,
      "position": [
        -96,
        400
      ],
      "id": "ee05fea8-b0d4-474a-a6b2-b10db895c871",
      "name": "Telegram Trigger",
      "webhookId": "df86a66c-7c82-4cbc-9df0-5a350f84b590",
      "credentials": {
        "telegramApi": {
          "id": "{{TELEGRAM_CRED_ID}}",
          "name": "Telegram account"
        }
      }
    },
    {
      "parameters": {
        "conditions": {
          "options": {
            "caseSensitive": true,
            "leftValue": "",
            "typeValidation": "strict",
            "version": 2
          },
          "conditions": [
            {
              "id": "allow-user",
              "leftValue": "={{ $json.message.from.id }}",
              "rightValue": 957087985,
              "operator": {
                "type": "number",
                "operation": "equals"
              }
            }
          ],
          "combinator": "and"
        },
        "options": {}
      },
      "type": "n8n-nodes-base.if",
      "typeVersion": 2.2,
      "position": [
        208,
        400
      ],
      "id": "7f9299cc-bb14-4376-96c8-e723eede06d0",
      "name": "Allowed User?"
    },
    {
      "parameters": {
        "rules": {
          "values": [
            {
              "conditions": {
                "options": {
                  "caseSensitive": true,
                  "leftValue": "",
                  "typeValidation": "strict",
                  "version": 2
                },
                "conditions": [
                  {
                    "leftValue": "={{ $json.message.voice }}",
                    "rightValue": "",
                    "operator": {
                      "type": "object",
                      "operation": "exists",
                      "singleValue": true
                    },
                    "id": "5db5977a-590d-452a-94a2-7340adc55300"
                  }
                ],
                "combinator": "and"
              },
              "renameOutput": true,
              "outputKey": "Audio"
            },
            {
              "conditions": {
                "options": {
                  "caseSensitive": true,
                  "leftValue": "",
                  "typeValidation": "strict",
                  "version": 2
                },
                "conditions": [
                  {
                    "id": "24e2a1cd-2c86-4f59-8cc5-ccca86d581d3",
                    "leftValue": "={{ $json.message.voice }}",
                    "rightValue": "",
                    "operator": {
                      "type": "object",
                      "operation": "notExists",
                      "singleValue": true
                    }
                  }
                ],
                "combinator": "and"
              },
              "renameOutput": true,
              "outputKey": "Text"
            }
          ]
        },
        "options": {}
      },
      "type": "n8n-nodes-base.switch",
      "typeVersion": 3.2,
      "position": [
        528,
        384
      ],
      "id": "3dfb901a-e422-4a67-a540-288272b33f3c",
      "name": "Voice or Text?"
    },
    {
      "parameters": {
        "resource": "file",
        "fileId": "={{ $json.message.voice.file_id }}",
        "additionalFields": {}
      },
      "type": "n8n-nodes-base.telegram",
      "typeVersion": 1.2,
      "position": [
        816,
        144
      ],
      "id": "a1716da9-355f-4e2d-9b2e-87ef7f43e459",
      "name": "Get File (Voice)",
      "webhookId": "a1581ecb-329e-4250-8912-70d988673ad3",
      "credentials": {
        "telegramApi": {
          "id": "{{TELEGRAM_CRED_ID}}",
          "name": "Telegram account"
        }
      }
    },
    {
      "parameters": {
        "method": "POST",
        "url": "{{SARVAM_ENDPOINT}}",
        "sendHeaders": true,
        "headerParameters": {
          "parameters": [
            {
              "name": "api-subscription-key",
              "value": "{{SARVAM_API_KEY}}"
            }
          ]
        },
        "sendBody": true,
        "contentType": "multipart-form-data",
        "bodyParameters": {
          "parameters": [
            {
              "name": "model",
              "value": "saarika:v2.5"
            },
            {
              "name": "language_code",
              "value": "en-IN"
            },
            {
              "parameterType": "formBinaryData",
              "name": "file",
              "inputDataFieldName": "=data"
            }
          ]
        },
        "options": {}
      },
      "type": "n8n-nodes-base.httpRequest",
      "typeVersion": 4.2,
      "position": [
        1040,
        144
      ],
      "id": "8d6576eb-216e-469a-9813-785a13cbe25a",
      "name": "Sarvam STT"
    },
    {
      "parameters": {
        "jsCode": "const msg = $input.item.json;\nif (msg.text && !msg.transcript) msg.transcript = msg.text;\nreturn [{ json: msg }];"
      },
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        1264,
        144
      ],
      "id": "912af435-aa73-4ae9-a8da-88bd9523316c",
      "name": "Set Transcript"
    },
    {
      "parameters": {
        "jsCode": "const msg = $input.item.json;\nconst unified_text = (msg.transcript || msg.message?.text || '').trim();\nreturn [{ json: {\n  ...msg,\n  chat_id: msg.message?.chat?.id ?? msg.chat_id ?? 'unknown',\n  user_id: msg.message?.from?.id ?? msg.user_id ?? 'unknown',\n  first_name: msg.message?.from?.first_name ?? null,\n  unified_text,\n} }];"
      },
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        1504,
        400
      ],
      "id": "c977d2d5-c3e9-49c2-b2fc-d98ee07d6182",
      "name": "Build Unified Text"
    },
    {
      "parameters": {
        "model": "anthropic.claude-3-5-sonnet-20241022-v2:0",
        "options": {}
      },
      "type": "@n8n/n8n-nodes-langchain.lmChatAwsBedrock",
      "typeVersion": 1.1,
      "position": [
        -144,
        1072
      ],
      "id": "b2cf5558-6a55-46cd-b980-1f50f79f653c",
      "name": "AWS Bedrock (Classifier)",
      "credentials": {
        "aws": {
          "id": "{{AWS_CRED_ID}}",
          "name": "AWS (IAM) account"
        }
      }
    },
    {
      "parameters": {
        "promptType": "define",
        "text": "={{ $json.unified_text }}",
        "options": {
          "systemMessage": "You are a strict intent classifier for a Telegram expense bot. Classify exactly one message into one of: INGEST, QUERY, or UNKNOWN.\\n\\nDEFINITIONS:\\n- INGEST: User is recording a personal expense they made (mentions amount/currency/merchant/time/payment etc.).\\n- QUERY: User asks about past expenses, totals, trends, merchants, date ranges, top N, comparisons.\\n- UNKNOWN: Anything else.\\n\\nOUTPUT FORMAT (JSON only):\\n{\"intent\":\"INGEST|QUERY|UNKNOWN\",\"confidence\":0.0-1.0,\"reason\":\"short phrase\",\"entities\":{\"amount\":[],\"currency\":[],\"payment\":[],\"merchants\":[],\"dates\":[]}}\\n\\nGUIDANCE:\\n- Prefer INGEST if the message clearly records a new spend.\\n- Prefer QUERY if it asks to search/analyze previous data.\\n- Use UNKNOWN if ambiguous.\\n- Be conservative with confidence for vague texts."
        }
      },
      "type": "@n8n/n8n-nodes-langchain.agent",
      "typeVersion": 2.2,
      "position": [
        -96,
        864
      ],
      "id": "0930a81c-af94-4438-a596-9d46e1d8d205",
      "name": "Classifier Agent (LLM)",
      "onError": "continueRegularOutput"
    },
    {
      "parameters": {
        "jsCode": "const raw = $input.item.json.output;\nfunction extractFirstJsonObject(str){\n  const s = str.indexOf('{'); if(s===-1) throw new Error('No JSON found');\n  let d=0; for(let i=s;i<str.length;i++){ const c=str[i]; if(c==='{' ) d++; else if(c==='}'){ d--; if(d===0) return str.slice(s,i+1);} }\n  throw new Error('Unbalanced JSON');\n}\nlet intent='UNKNOWN', confidence=0, reason='';\ntry{\n  const text = extractFirstJsonObject(raw);\n  const obj = JSON.parse(text);\n  intent = (obj.intent||'').toUpperCase();\n  confidence = Number(obj.confidence)||0;\n  reason = obj.reason||'';\n  if(!['INGEST','QUERY','UNKNOWN'].includes(intent)) intent='UNKNOWN';\n} catch(e){ intent='UNKNOWN'; confidence=0; reason='parse_error'; }\nreturn [{ json: { ...$input.item.json, intent, confidence, classify_reason: reason } }];"
      },
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        240,
        864
      ],
      "id": "ef3ad348-0a02-4bf8-803c-33f956880e3d",
      "name": "Parse Classifier JSON"
    },
    {
      "parameters": {
        "chatId": "={{ $('Telegram Trigger').item.json.message.chat.id }}",
        "text": "I couldn't tell if you want to add an expense or ask a question. Reply with either:\n‚Ä¢ \"add expense: <your text>\"\n‚Ä¢ \"ask: <your question>\"",
        "additionalFields": {}
      },
      "type": "n8n-nodes-base.telegram",
      "typeVersion": 1.2,
      "position": [
        640,
        688
      ],
      "id": "ef499365-f2e9-46da-8ed4-9963d980cd71",
      "name": "Ask Clarification",
      "webhookId": "3a1112a4-5a0c-45b8-bdcf-141b4ee8204b",
      "credentials": {
        "telegramApi": {
          "id": "{{TELEGRAM_CRED_ID}}",
          "name": "Telegram account"
        }
      }
    },
    {
      "parameters": {
        "conditions": {
          "options": {
            "caseSensitive": true,
            "leftValue": "",
            "typeValidation": "strict",
            "version": 2
          },
          "conditions": [
            {
              "leftValue": "={{ $json.intent }}",
              "rightValue": "INGEST",
              "operator": {
                "type": "string",
                "operation": "equals"
              },
              "id": "route-intent"
            }
          ],
          "combinator": "and"
        },
        "options": {}
      },
      "type": "n8n-nodes-base.if",
      "typeVersion": 2.2,
      "position": [
        800,
        880
      ],
      "id": "cab8fa74-8990-4b01-90ef-ba5068669b6e",
      "name": "Route by Intent"
    },
    {
      "parameters": {
        "promptType": "define",
        "text": "={{ $('Build Unified Text').item.json.unified_text }}",
        "options": {
          "systemMessage": "Role:\nYou are a Telegram assistant connected to Elasticsearch via the MCP server. Your job is to turn user questions into precise tool calls and reply with clear, concise answers suitable for chat.\n\nInputs:\nPlain-text user messages from Telegram.\n\nData Sources (choose exactly one per query):\n\nindex - notebook_index_semantic_v1 ‚Üí Uber trips (fare, promotions, travel date/time, duration).\n\nindex - {{ES_INDEX}} ‚Üí Personal expenses (merchants, items like biryani/burger/fried rice, car rentals; payment methods; categories).\n\nIf the expense data is not available in \"{{ES_INDEX}}\" index, then try \"expenses\" index.\n\nBehavior:\n\nParse intent (examples: totals, time ranges, top-N, categories, merchants, payment methods, receipts/trips).\n\nPick the correct index based on intent:\n\nUber trip info ‚Üí notebook_index_semantic_v1\n\nGeneral/personal spend ‚Üí expenses\n\nPrefer structured queries (ES|QL or equivalent MCP tools) over vague responses.\n\nReturn curated answers:\n\nA one-paragraph summary in natural language.\n\nThen a compact numbers/table block if helpful (totals, counts, top-N, date, merchant, amount).\n\nHide internals: Do not expose URLs, tokens, raw JSON, or query payloads unless the user explicitly asks for them.\n\nDate & Time Handling (VERY IMPORTANT):\n\nTimezone: Asia/Kolkata.\n\nRelative dates:\n\n‚Äútoday‚Äù, ‚Äúyesterday‚Äù, ‚Äúday before yesterday‚Äù ‚Üí resolve to their calendar dates in Asia/Kolkata.\n\nYear rule: Unless the user explicitly specifies another year, interpret all relative-date queries as referring to the year 2025.\n\nExample: On any day, ‚Äúyesterday‚Äù ‚Üí yesterday‚Äôs date in 2025, not 2024/2023.\n\nIf the user says a different year (e.g., ‚Äúyesterday in 2024‚Äù), honor that year.\n\nIf the user provides explicit dates, use them as-is (dd-mm-yyyy or yyyy-mm-dd; be tolerant).\n\nTone:\nProfessional, clear, and direct. Short enough for a chat interface.\n\nConstraints / Fallback:\n\nIf a query can‚Äôt be answered from Elasticsearch, reply:\n‚ÄúI wasn‚Äôt able to find that in Elasticsearch. Can you rephrase or provide more details?‚Äù\n\nIf intent or dates are ambiguous, make the best safe assumption per rules above and proceed.\n\nOutput Format:\n\n1‚Äì3 concise sentences summarizing the result.\n\nOptional small table for top-N or breakdowns (date, merchant/category, count/amount).\n\nCurrency: preserve stored currency fields; if mixed, state totals per currency.\n\nExamples (Intent ‚Üí Index ‚Üí Action ‚Üí Answer style):\n\n‚ÄúTotal Uber spend last month?‚Äù\n‚Üí notebook_index_semantic_v1 ‚Üí ES|QL sum(total_fare) over last month (in 2025 unless year specified).\n‚Üí ‚ÄúYour Uber spend for September 2025 was ‚Çπ7,840 across 12 trips.‚Äù\n\n‚ÄúShow my food expenses yesterday.‚Äù\n‚Üí expenses (category = food) ‚Üí filter date = yesterday (Asia/Kolkata) in 2025.\n‚Üí ‚ÄúYou had 3 food expenses yesterday (2025-10-09) totaling ‚Çπ1,120.‚Äù + 3-row table.\n\n‚ÄúTop 5 merchants this week by amount.‚Äù\n‚Üí expenses ‚Üí group by merchant, sum(amount), order desc, limit 5.\n‚Üí ‚ÄúTop merchants this week (2025): ‚Ä¶‚Äù + small table.\n\n‚ÄúLongest Uber trip in August?‚Äù\n‚Üí notebook_index_semantic_v1 ‚Üí filter Aug 2025, order by duration desc, limit 1.\n‚Üí ‚ÄúYour longest August 2025 trip was 48m on 2025-08-17 (‚Çπ420 after ‚Çπ30 promo).‚Äù\n\nDisallowed to reveal unless asked:\n\nRaw ES|QL/MCP payloads, index schemas, hostnames, tokens.\n\nSuccess Criterion:\nEach reply picks the right index, uses structured queries, resolves dates per the 2025 rule, and returns a crisp, human summary with numbers/tables when useful."
        }
      },
      "type": "@n8n/n8n-nodes-langchain.agent",
      "typeVersion": 2.2,
      "position": [
        1088,
        1088
      ],
      "id": "c3a045a7-5839-4159-91d7-966086a2c6f8",
      "name": "AI Agent (Query)",
      "onError": "continueRegularOutput"
    },
    {
      "parameters": {
        "model": "anthropic.claude-3-5-sonnet-20241022-v2:0",
        "options": {}
      },
      "type": "@n8n/n8n-nodes-langchain.lmChatAwsBedrock",
      "typeVersion": 1.1,
      "position": [
        1040,
        1248
      ],
      "id": "3a8555c0-5302-4d89-8450-a505efe7dac8",
      "name": "AWS Bedrock Chat Model",
      "credentials": {
        "aws": {
          "id": "{{AWS_CRED_ID}}",
          "name": "AWS (IAM) account"
        }
      }
    },
    {
      "parameters": {
        "endpointUrl": "{{ES_MCP_URL}}",
        "serverTransport": "httpStreamable",
        "authentication": "headerAuth",
        "options": {
          "timeout": 60000
        }
      },
      "type": "@n8n/n8n-nodes-langchain.mcpClientTool",
      "typeVersion": 1.1,
      "position": [
        1296,
        1264
      ],
      "id": "51540921-ff04-4b22-8d7c-9783523d4a49",
      "name": "MCP Client",
      "notesInFlow": true,
      "credentials": {
        "httpHeaderAuth": {
          "id": "{{MCP_CRED_ID}}",
          "name": "Header Auth account"
        }
      }
    },
    {
      "parameters": {
        "chatId": "={{ $('Telegram Trigger').item.json.message.chat.id }}",
        "text": "={{ $json.output }}",
        "additionalFields": {
          "appendAttribution": false,
          "parse_mode": "HTML"
        }
      },
      "type": "n8n-nodes-base.telegram",
      "typeVersion": 1.2,
      "position": [
        1456,
        1088
      ],
      "id": "262c734c-3591-43a4-9ae4-3d14f4fada48",
      "name": "Send a text message (Query)",
      "webhookId": "3a1112a4-5a0c-45b8-bdcf-141b4ee8204b",
      "credentials": {
        "telegramApi": {
          "id": "{{TELEGRAM_CRED_ID}}",
          "name": "Telegram account"
        }
      }
    },
    {
      "parameters": {
        "promptType": "define",
        "text": "={{ $json.chat_id }}{{ $json.user_id }}{{ $json.first_name }}{{ $json.unified_text }}{{ $('Build Unified Text').item.json.unified_text }}",
        "options": {
          "systemMessage": "You extract daily expense details from a Telegram message and return exactly ONE valid JSON object (no code fences). Fields: chat_id, user_id, ts (ISO 8601), amount (number), currency (ISO 4217), normalized_inr (number if INR else omit), merchant, category (food,grocery,transport,fuel,shopping,utilities,rent,entertainment,health,travel,education,fees,subscriptions,other), payment_method (upi,credit_card,debit_card,card,cash,net_banking,wallet,cod,unknown), note, raw_transcript, segments[]. Rules: Use Asia/Kolkata now if time not provided; interpret today/yesterday as 2025. Detect ‚Çπ‚ÜíINR. Normalize GPay‚Üíupi. Do not invent values; omit unknowns. Output only one clean JSON object."
        }
      },
      "type": "@n8n/n8n-nodes-langchain.agent",
      "typeVersion": 2.2,
      "position": [
        1088,
        688
      ],
      "id": "5068c593-fa13-4ce9-86bc-279db56f58cc",
      "name": "Ingestion Agent",
      "onError": "continueRegularOutput"
    },
    {
      "parameters": {
        "jsCode": "const raw = $input.all()[0]?.json?.output;\nfunction extractFirstJsonObject(str){\n  const start = str.indexOf('{');\n  if(start===-1) throw new Error(\"No '{' found\");\n  let depth=0;\n  for(let i=start;i<str.length;i++){\n    const ch=str[i];\n    if(ch==='{' ) depth++;\n    else if(ch==='}'){\n      depth--;\n      if(depth===0) return str.slice(start,i+1);\n    }\n  }\n  throw new Error('Unbalanced braces');\n}\nconst text = extractFirstJsonObject(raw).trim();\nlet doc;\ntry { doc = JSON.parse(text); } catch { throw new Error('Invalid JSON from model'); }\nreturn [{ json: doc }];"
      },
      "type": "n8n-nodes-base.code",
      "typeVersion": 2,
      "position": [
        1376,
        688
      ],
      "id": "d0bc90fa-6623-4349-a9d2-477d74cde288",
      "name": "Extract JSON"
    },
    {
      "parameters": {
        "method": "PUT",
        "url": "= {{ES_HTTP_BASE}}/{{ES_INDEX}}/_create/{{ $('Telegram Trigger').item.json.message.message_id }}",
        "sendHeaders": true,
        "headerParameters": {
          "parameters": [
            {
              "name": "Authorization",
              "value": "ApiKey {{ES_API_KEY}}"
            }
          ]
        },
        "sendBody": true,
        "specifyBody": "json",
        "jsonBody": "={\n  \"ts\": \"{{ $json.ts }}\",\n  \"amount\": \"{{ $json.amount }}\",\n  \"currency\": \"{{ $json.currency }}\",\n  \"normalized_inr\": \"{{ $json.normalized_inr || 0 }}\",\n  \"merchant\": \"{{ $json.merchant }}\",\n  \"category\": \"{{ $json.category }}\",\n  \"payment_method\": \"{{ $json.payment_method }}\",\n  \"note\": \"{{ $json.note }}\",\n  \"raw_transcript\": \"{{ $json.raw_transcript }}\",\n  \"@timestamp\" : \"{{ new Date($json.ts).toISOString() }}\"\n}",
        "options": {}
      },
      "type": "n8n-nodes-base.httpRequest",
      "typeVersion": 4.2,
      "position": [
        1568,
        688
      ],
      "id": "b76c0efb-9ff8-42d8-b5e6-c622a8eb9bda",
      "name": "Index in Elasticsearch"
    },
    {
      "parameters": {
        "chatId": "={{ $('Telegram Trigger').item.json.message.chat.id }}",
        "text": "=Added Expense:\nPaid {{ $('Extract JSON').item.json.currency }} {{ $('Extract JSON').item.json.amount }} via {{ $('Extract JSON').item.json.payment_method }} on spend {{ $('Extract JSON').item.json.note }}\n",
        "additionalFields": {
          "appendAttribution": false,
          "parse_mode": "HTML"
        }
      },
      "type": "n8n-nodes-base.telegram",
      "typeVersion": 1.2,
      "position": [
        1776,
        688
      ],
      "id": "dfa97604-72a3-4799-a1eb-4c8caddc685d",
      "name": "Ack (Ingestion)",
      "webhookId": "3a1112a4-5a0c-45b8-bdcf-141b4ee8204b",
      "credentials": {
        "telegramApi": {
          "id": "{{TELEGRAM_CRED_ID}}",
          "name": "Telegram account"
        }
      }
    },
    {
      "parameters": {
        "model": "anthropic.claude-3-5-sonnet-20241022-v2:0",
        "options": {}
      },
      "type": "@n8n/n8n-nodes-langchain.lmChatAwsBedrock",
      "typeVersion": 1.1,
      "position": [
        1056,
        864
      ],
      "id": "064044d5-7d46-4b68-a81f-57dde771716f",
      "name": "AWS Bedrock Chat Model1",
      "credentials": {
        "aws": {
          "id": "{{AWS_CRED_ID}}",
          "name": "AWS (IAM) account"
        }
      }
    },
    {
      "parameters": {
        "options": {}
      },
      "type": "@n8n/n8n-nodes-langchain.chatTrigger",
      "typeVersion": 1.3,
      "position": [
        0,
        0
      ],
      "id": "9f8015b3-f83e-4418-abba-e6def3561813",
      "name": "When chat message received",
      "webhookId": "519472a5-3cef-4223-b83c-9a77e3b86b68"
    },
    {
      "parameters": {
        "content": "Intent Classifier",
        "height": 384,
        "width": 400,
        "color": 6
      },
      "type": "n8n-nodes-base.stickyNote",
      "position": [
        -208,
        848
      ],
      "typeVersion": 1,
      "id": "34151664-254e-4d14-b8ba-5e0a7e2d2d59",
      "name": "Sticky Note"
    },
    {
      "parameters": {
        "content": "Query Flow",
        "height": 320,
        "width": 672
      },
      "type": "n8n-nodes-base.stickyNote",
      "position": [
        992,
        1040
      ],
      "typeVersion": 1,
      "id": "e87e8644-235b-49b1-b53a-e8d5dbea68b8",
      "name": "Sticky Note2"
    },
    {
      "parameters": {
        "conditions": {
          "options": {
            "caseSensitive": true,
            "leftValue": "",
            "typeValidation": "strict",
            "version": 2
          },
          "conditions": [
            {
              "id": "07ecae80-fca2-4d48-b46c-7a9d07a61bb6",
              "leftValue": "={{ $('Parse Classifier JSON').item.json.confidence }}",
              "rightValue": 0.5,
              "operator": {
                "type": "number",
                "operation": "lt"
              }
            }
          ],
          "combinator": "and"
        },
        "options": {}
      },
      "type": "n8n-nodes-base.if",
      "typeVersion": 2.2,
      "position": [
        448,
        864
      ],
      "id": "e2caded4-5aba-43e9-b781-fd148695b97d",
      "name": "Low Confidence Gates?"
    },
    {
      "parameters": {
        "errorMessage": "Sorry did not find out"
      },
      "type": "n8n-nodes-base.stopAndError",
      "typeVersion": 1,
      "position": [
        784,
        688
      ],
      "id": "b5ef68f5-08d3-4f81-8bdb-b07d553a2f52",
      "name": "Stop and Error"
    }
  ],
  "connections": {
    "Telegram Trigger": {
      "main": [
        [
          {
            "node": "Allowed User?",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Allowed User?": {
      "main": [
        [
          {
            "node": "Voice or Text?",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Voice or Text?": {
      "main": [
        [
          {
            "node": "Get File (Voice)",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "Build Unified Text",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Get File (Voice)": {
      "main": [
        [
          {
            "node": "Sarvam STT",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Sarvam STT": {
      "main": [
        [
          {
            "node": "Set Transcript",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Set Transcript": {
      "main": [
        [
          {
            "node": "Build Unified Text",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Build Unified Text": {
      "main": [
        [
          {
            "node": "Classifier Agent (LLM)",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "AWS Bedrock (Classifier)": {
      "ai_languageModel": [
        [
          {
            "node": "Classifier Agent (LLM)",
            "type": "ai_languageModel",
            "index": 0
          }
        ]
      ]
    },
    "Classifier Agent (LLM)": {
      "main": [
        [
          {
            "node": "Parse Classifier JSON",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Parse Classifier JSON": {
      "main": [
        [
          {
            "node": "Low Confidence Gates?",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Ask Clarification": {
      "main": [
        [
          {
            "node": "Stop and Error",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Route by Intent": {
      "main": [
        [
          {
            "node": "Ingestion Agent",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "AI Agent (Query)",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "AI Agent (Query)": {
      "main": [
        [
          {
            "node": "Send a text message (Query)",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "AWS Bedrock Chat Model": {
      "ai_languageModel": [
        [
          {
            "node": "AI Agent (Query)",
            "type": "ai_languageModel",
            "index": 0
          }
        ]
      ]
    },
    "MCP Client": {
      "ai_tool": [
        [
          {
            "node": "AI Agent (Query)",
            "type": "ai_tool",
            "index": 0
          }
        ]
      ]
    },
    "Ingestion Agent": {
      "main": [
        [
          {
            "node": "Extract JSON",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Extract JSON": {
      "main": [
        [
          {
            "node": "Index in Elasticsearch",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Index in Elasticsearch": {
      "main": [
        [
          {
            "node": "Ack (Ingestion)",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "AWS Bedrock Chat Model1": {
      "ai_languageModel": [
        [
          {
            "node": "Ingestion Agent",
            "type": "ai_languageModel",
            "index": 0
          }
        ]
      ]
    },
    "Low Confidence Gates?": {
      "main": [
        [
          {
            "node": "Ask Clarification",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "Route by Intent",
            "type": "main",
            "index": 0
          }
        ]
      ]
    }
  },
  "pinData": {},
  "meta": {
    "templateCredsSetupCompleted": true,
    "instanceId": "2d5a76cf96ffbc2b91c35804a071c60f24e30c98aa4233ffa131f1129b7b0c6b"
  }
}
"""

workflow_data = json.loads(raw_workflow_json)
print("Keys in imported JSON:", workflow_data.keys())


Keys in imported JSON: dict_keys(['nodes', 'connections', 'pinData', 'meta'])


## 8. Render placeholders and create the workflow in n8n

This cell:

1. Takes the workflow JSON template from above.
2. Replaces all placeholders with the real IDs and secrets from our config.
3. Sanity-checks that no placeholders remain.
4. Posts the final workflow JSON to `POST /api/v1/workflows` in n8n.

After running this, you should see the new workflow appear in the n8n UI.
You can then enable it and copy the Telegram webhook URL for the bot.


In [None]:
patched_json = (
    raw_workflow_json
      .replace("{{TELEGRAM_CRED_ID}}", telegram_cred_id)
      .replace("{{AWS_CRED_ID}}", aws_cred_id)
      .replace("{{MCP_CRED_ID}}", mcp_cred_id)
      .replace("{{SARVAM_ENDPOINT}}", SARVAM_ENDPOINT)
      .replace("{{SARVAM_API_KEY}}", SARVAM_API_KEY)
      .replace("{{ES_MCP_URL}}", ES_MCP_URL)
      .replace("{{ES_HTTP_BASE}}", ES_HTTP_BASE)
      .replace("{{ES_API_KEY}}", ES_API_KEY)
      .replace("{{ES_INDEX}}", ES_INDEX)
)

workflow_data = json.loads(patched_json)


In [None]:
telegram_workflow_payload = {
    "name": "Telegram Expense Bot (API v1)",
    "nodes": workflow_data["nodes"],
    "connections": workflow_data["connections"],
    "settings": {},      # you can tweak later in UI if needed
}

resp = n8n_post("/api/v1/workflows", telegram_workflow_payload)


=== POST https://seems-sys-singing-devon.trycloudflare.com/api/v1/workflows ===
Status: 200
Body: {'name': 'Telegram Expense Bot (API v1)', 'nodes': [{'parameters': {'updates': ['message'], 'additionalFields': {}}, 'type': 'n8n-nodes-base.telegramTrigger', 'typeVersion': 1.2, 'position': [-96, 400], 'id': 'ee05fea8-b0d4-474a-a6b2-b10db895c871', 'name': 'Telegram Trigger', 'webhookId': 'df86a66c-7c82-4cbc-9df0-5a350f84b590', 'credentials': {'telegramApi': {'id': 'xY8ZSZHyuA6DDB9T', 'name': 'Telegram account'}}}, {'parameters': {'conditions': {'options': {'caseSensitive': True, 'leftValue': '', 'typeValidation': 'strict', 'version': 2}, 'conditions': [{'id': 'allow-user', 'leftValue': '={{ $json.message.from.id }}', 'rightValue': 957087985, 'operator': {'type': 'number', 'operation': 'equals'}}], 'combinator': 'and'}, 'options': {}}, 'type': 'n8n-nodes-base.if', 'typeVersion': 2.2, 'position': [208, 400], 'id': '7f9299cc-bb14-4376-96c8-e723eede06d0', 'name': 'Allowed User?'}, {'paramete

Now, the N8n Setup is completed. Let's take a look in the Elasticsearch setup

## 9. Create Elasticsearch mapping and Bedrock inference endpoint

This section does two things:

1. Creates an **inference endpoint** in Elasticsearch called `bedrock-embeddings`, which calls
   `amazon.titan-embed-text-v2:0` on AWS Bedrock.
2. Creates the expenses index (`ES_INDEX`) with a semantic field `semantic_all` that uses
   the `bedrock-embeddings` inference ID.

This enables semantic search and analytics over the ingested expense data.






In [None]:
headers = {
    "Content-Type": "application/json",
    "Authorization": f"ApiKey {ES_API_KEY}",
}

# Endpoint for inference creation
url = f"{ES_HTTP_BASE}/_inference/text_embedding/bedrock-embeddings_3"

payload = {
    "service": "amazonbedrock",
    "service_settings": {
        "access_key": AWS_ACCESS_KEY,       # <-- your AWS key
        "secret_key": AWS_SECRET_KEY,
        "region": "us-west-2",
        "provider": "amazontitan",
        "model": "amazon.titan-embed-text-v2:0"
    }
}

print("Creating AWS Bedrock embedding inference in Elasticsearch...")
response = requests.put(url, headers=headers, data=json.dumps(payload))

print("Status:", response.status_code)
try:
    print(response.json())
except:
    print(response.text)

Creating AWS Bedrock embedding inference in Elasticsearch...
Status: 200
{'inference_id': 'bedrock-embeddings_3', 'task_type': 'text_embedding', 'service': 'amazonbedrock', 'service_settings': {'region': 'us-west-2', 'model': 'amazon.titan-embed-text-v2:0', 'provider': 'AMAZONTITAN', 'rate_limit': {'requests_per_minute': 240}, 'dimensions': 1024, 'similarity': 'cosine'}, 'chunking_settings': {'strategy': 'sentence', 'max_chunk_size': 250, 'sentence_overlap': 1}}


Upload sample expense dataset in the Colab. Download from [here](https://gist.github.com/Som23Git/16ecfe39c2e7fbe9bef9fa822924a555).

In [None]:
csv_path = "/content/expense_dataset_elasticsearch.csv"
df = pd.read_csv(csv_path)

df.head()


Unnamed: 0,amount,category,currency,merchant,normalized_inr,note,payment_method,raw_transcript,ts
0,800,transport,INR,Rapido,800,Cab ride,netbanking,800 rupees on Rapido cab ride on 24th of Janua...,2025-01-24T00:45:00.000Z
1,450,food,INR,Domino's,450,Lunch meal,gpay,I spent 450 rupees at Domino's for lunch meal ...,2025-02-16T12:15:09.000Z
2,2000,shopping,INR,Amazon,2000,T-shirt purchase,credit_card,Can you record an expense of 2000 rupees for t...,2025-02-18T00:50:15.000Z
3,500,shopping,INR,Flipkart,500,Jeans purchase,upi,Can you record an expense of 500 rupees for je...,2025-02-21T18:50:45.000Z
4,220,food,INR,Food Panda,220,Paneer tikka,debit_card,Can you add an expense of 220 rupees for panee...,2025-02-23T12:50:15.000Z


Let's create the index with the mappings


In [None]:
headers = {
    "Content-Type": "application/json",
    "Authorization": f"ApiKey {ES_API_KEY}",
}

index_url = f"{ES_HTTP_BASE}/{ES_INDEX}"

# Elasticsearch index mapping
index_mappings = {
  "mappings": {
    "properties": {
      "@timestamp": { "type": "date" },
      "amount": { "type": "double" },
      "category": { "type": "keyword" },
      "currency": { "type": "keyword" },
      "merchant": { "type": "keyword" },
      "normalized_inr": { "type": "double" },
      "note": {
        "type": "text",
        "copy_to": ["semantic_all"]
      },
      "payment_method": {
        "type": "keyword",
        "copy_to": ["semantic_all"]
      },
      "raw_transcript": {
        "type": "text",
        "copy_to": ["semantic_all"]
      },
      "semantic_all": {
        "type": "semantic_text",
        "inference_id": "bedrock-embeddings",
        "model_settings": {
          "service": "amazonbedrock",
          "task_type": "text_embedding",
          "dimensions": 1024,
          "similarity": "cosine",
          "element_type": "float"
        }
      },
      "ts": {
        "type": "date",
        "copy_to": ["@timestamp"]
      }
    }
  }
}




print("Creating index:", index_url)

response = requests.put(
    index_url,
    headers=headers,
    data=json.dumps(index_mappings)
)

print("Status:", response.status_code)
try:
    print(response.json())
except:
    print(response.text)



Creating index: https://somserverlessv1-bfa3d6.es.us-central1.gcp.elastic.cloud/daily_personal_expenses_27_nov
Status: 200
{'acknowledged': True, 'shards_acknowledged': True, 'index': 'daily_personal_expenses_27_nov'}


## 10. Bulk ingest CSV expenses into Elasticsearch (batched)

This cell loads the synthetic expenses CSV and ingests it into Elasticsearch:

- Cleans numeric fields (`amount`, `normalized_inr`) by removing thousand separators.
- Normalizes `ts` timestamps to ISO-8601.
- Uses `_bulk` API with a configurable `BATCH_SIZE` and delay between batches.
- Handles Bedrock throttling (HTTP 429) with simple retries and backoff.
- Prints the estimated number of successful vs failed documents.

Tune `BATCH_SIZE` and `SLEEP_BETWEEN_BATCHES` if you hit Bedrock throttling often.


In [None]:
ES_HTTP_BASE = ES_HTTP_BASE
ES_API_KEY   = ES_API_KEY

CSV_FILE = csv_path

BATCH_SIZE = 10                   # 10 docs per batch
SLEEP_BETWEEN_BATCHES = 25       # seconds
MAX_RETRIES = 4                   # number of retries for throttling

# ---------------------------------------------------
# Load CSV
# ---------------------------------------------------
df = pd.read_csv(CSV_FILE)

def ensure_iso(ts):
    try:
        return datetime.fromisoformat(str(ts)).isoformat()
    except Exception:
        return ts

df["ts"] = df["ts"].apply(ensure_iso)

# Drop @timestamp (ES will derive from ts via copy_to)
if "@timestamp" in df.columns:
    df = df.drop(columns=["@timestamp"])

# Fix "1,500" style numbers ‚Üí 1500.0
if "amount" in df.columns:
    df["amount"] = (
        df["amount"]
        .astype(str)
        .str.replace(",", "", regex=False)
        .astype(float)
    )

if "normalized_inr" in df.columns:
    df["normalized_inr"] = (
        df["normalized_inr"]
        .astype(str)
        .str.replace(",", "", regex=False)
        .astype(float)
    )

print("Rows loaded:", len(df))
print("Columns:", df.columns.tolist())

# ---------------------------------------------------
# BULK INGEST
# ---------------------------------------------------
headers = {
    "Content-Type": "application/x-ndjson",
    "Authorization": f"ApiKey {ES_API_KEY}",
}

bulk_url = f"{ES_HTTP_BASE}/{ES_INDEX}/_bulk"

def generate_bulk_payload(df_chunk):
    lines = []
    for _, row in df_chunk.iterrows():
        doc = row.to_dict()

        if "ts" not in doc or pd.isna(doc["ts"]):
            continue

        # Action line
        lines.append(json.dumps({"index": {"_index": ES_INDEX}}))

        # Document line
        lines.append(json.dumps(doc))

    if not lines:
        return ""
    return "\n".join(lines) + "\n"

total_docs = len(df)
success_docs = 0
failed_docs = 0

for start in range(0, total_docs, BATCH_SIZE):
    end = start + BATCH_SIZE
    chunk = df.iloc[start:end]

    payload = generate_bulk_payload(chunk)
    if not payload:
        continue

    print(f"\nüì¶ Ingesting docs {start}‚Äì{end-1}...")

    for attempt in range(1, MAX_RETRIES + 1):
        resp = requests.post(bulk_url, headers=headers, data=payload)
        print(f"  Attempt {attempt} ‚Üí Status: {resp.status_code}")

        try:
            resp_json = resp.json()
        except:
            print("  Raw response:", resp.text[:300])
            break

        if not resp_json.get("errors"):
            success_docs += len(chunk)
            print("  ‚úÖ Success.")
            break

        # Handle failures
        items = resp_json.get("items", [])
        batch_failures = 0
        throttled = False

        for item in items:
            idx = item.get("index", {})
            status = idx.get("status", 0)
            if status >= 400:
                batch_failures += 1
                err = idx.get("error", {})
                reason = err.get("reason", "")

                if "Too many requests" in reason or "throttling" in reason:
                    throttled = True

        if throttled and attempt < MAX_RETRIES:
            backoff = 2 * attempt
            print(f"  ‚ö† Bedrock throttled ‚Üí waiting {backoff}s...")
            time.sleep(backoff)
            continue

        failed_docs += batch_failures
        success_docs += (len(chunk) - batch_failures)
        print(f"  ‚ùå {batch_failures} docs failed.")
        break

    time.sleep(SLEEP_BETWEEN_BATCHES)

print("\nüéâ Ingestion complete.")
print("‚úì Successful docs:", success_docs)
print("‚úó Failed docs:", failed_docs)


Rows loaded: 100
Columns: ['amount', 'category', 'currency', 'merchant', 'normalized_inr', 'note', 'payment_method', 'raw_transcript', 'ts']

üì¶ Ingesting docs 0‚Äì9...
  Attempt 1 ‚Üí Status: 200
  ‚úÖ Success.

üì¶ Ingesting docs 10‚Äì19...
  Attempt 1 ‚Üí Status: 200
  ‚úÖ Success.

üì¶ Ingesting docs 20‚Äì29...
  Attempt 1 ‚Üí Status: 200
  ‚úÖ Success.

üì¶ Ingesting docs 30‚Äì39...
  Attempt 1 ‚Üí Status: 200
  ‚úÖ Success.

üì¶ Ingesting docs 40‚Äì49...
  Attempt 1 ‚Üí Status: 200
  ‚úÖ Success.

üì¶ Ingesting docs 50‚Äì59...
  Attempt 1 ‚Üí Status: 200
  ‚úÖ Success.

üì¶ Ingesting docs 60‚Äì69...
  Attempt 1 ‚Üí Status: 200
  ‚úÖ Success.

üì¶ Ingesting docs 70‚Äì79...
  Attempt 1 ‚Üí Status: 200
  ‚úÖ Success.

üì¶ Ingesting docs 80‚Äì89...
  Attempt 1 ‚Üí Status: 200
  ‚úÖ Success.

üì¶ Ingesting docs 90‚Äì99...
  Attempt 1 ‚Üí Status: 200
  ‚úÖ Success.

üéâ Ingestion complete.
‚úì Successful docs: 100
‚úó Failed docs: 0


Now, that all your setup is completed where, the n8n workflows are updated with the proper credentials and the Elasticsearch sample dataset is created. It's time for us to ingest and search for the expenses.