-
Notifications
You must be signed in to change notification settings - Fork 2
Ingest
Normalize external data into clean CRM rows. One endpoint — POST /ingest
— with pluggable format adapters.
%%{init: {"look": "handDrawn", "theme": "dark"}}%%
flowchart LR
In[POST /ingest] --> Dispatch{format}
Dispatch -->|csv| A1[csv]
Dispatch -->|json| A2[json]
Dispatch -->|vcard| A3[vcard]
Dispatch -->|text| A4[text]
A1 & A2 & A3 & A4 --> Norm[normalize:<br/>lowercase email<br/>E.164 phone<br/>canonicalize URL]
Norm --> Dedupe[dedupe:<br/>external_id → email/domain]
Dedupe --> Rows[(CRM rows)]
Dedupe -.->|ingest.completed| Timeline[[timeline + audit]]
Agents do the hard reasoning (extract people from a PDF, reconcile two
feeds); Nakatomi standardizes the shape. The dry_run flag previews.
{
"source": "apollo",
"format": "json",
"payload": [...],
"mapping": {...},
"dry_run": false
}-
source— free-form label (shows up iningest_runsfor traceability) -
format—csv | json | vcard | text -
payload— the raw data; shape depends onformat -
mapping— optional hints (see each adapter) -
dry_run— if true, report what would happen without writing
{
"run_id": "...",
"record_count": 50,
"created": 42,
"updated": 8,
"errors": 0,
"created_ids": ["..."],
"updated_ids": ["..."],
"diagnostics": [{"level": "info", "message": "...", "row": 3}]
}Payload = raw CSV string with a header row. First column of the header
becomes a field name. Default target is contact.
curl -X POST http://localhost:8000/ingest \
-H "Authorization: Bearer nk_..." \
-H "Content-Type: application/json" \
-d '{
"source":"paste",
"format":"csv",
"payload":"external_id,first_name,last_name,email\nlead-1,Ada,Lovelace,ada@ex.com\n"
}'Target a different entity via mapping._entity:
{"mapping": {"_entity": "company"}}Remap column names:
{"mapping": {"Full Name": "name", "Web": "website", "_entity": "company"}}Payload is an array of dicts. Keys map to entity fields directly, or
through mapping.
{
"source": "hubspot",
"format": "json",
"payload": [
{"external_id":"hs-1", "first_name":"A", "email":"a@ex.com"},
{"external_id":"hs-2", "first_name":"B", "email":"b@ex.com"}
]
}A single dict is accepted too (wrapped in a list server-side).
Payload is a vCard-format string. Each BEGIN:VCARD ... END:VCARD block
becomes a contact. FN, N, EMAIL, TEL, TITLE map to first_name /
last_name / email / phone / title. The full parsed vCard goes into
data.vcard for agents that care about the original fields.
Payload is a plain-text blob that becomes a markdown note on a specific
entity. mapping.entity_type and mapping.entity_id are required.
{
"source":"meeting-transcript",
"format":"text",
"payload":"<full transcript>",
"mapping":{"entity_type":"deal","entity_id":"<uuid>"}
}Good for landing a call transcript on a deal without extracting entities yourself. The extraction / summarization is the agent's job — Nakatomi just stores.
All adapters run every value through:
-
email→ lowercased; invalid emails dropped -
phone→ digits-only with leading+preserved (E.164-ish) -
domain→ striphttps://, strip path, lowercase -
website→ addhttps://if missing -
tags→ split on,;|, trim, dedupe, preserve order - strings → trim whitespace; empty → None
In priority order:
-
external_id(exact match within workspace) — always wins - For contacts: lowercased email match
- For companies: lowercased domain match
- Otherwise: creates a new row (with a diagnostic)
Every row emits zero or more diagnostics. Levels: info, warn, error.
error diagnostics increment the errors counter in the response but
don't abort the run — other rows still process.
Examples:
{"level":"error","message":"row needs email, external_id, or a name","row":3}-
{"level":"info","message":"would create contact","row":0}(dry_run)
{"dry_run": true}Runs the full pipeline inside a SAVEPOINT and rolls it back. Counts + ids in the response reflect what would have changed. Great for previewing a large ingest before committing.
Every invocation creates an ingest_runs row recording source, format,
counts, and diagnostics. Query it later to audit what an agent fed in:
SELECT * FROM ingest_runs
WHERE workspace_id = '<uuid>'
ORDER BY created_at DESC LIMIT 20;Each run also emits an ingest.completed timeline event.
> ingest(source="apollo", format="json",
payload=[{"external_id":"apollo-1","email":"a@ex.com"}],
dry_run=true)
See MCP-Tools.
- Add a function to app/services/ingest/adapters.py
- Decorate with
@register_adapter("<format>") - Signature:
def my_adapter(db, p, payload, mapping, dry_run) -> IngestResult - Reuse
norm_email,norm_phone,norm_domain, etc. fromingest.base - Add a test under
tests/test_ingest.py
Repository · Issues · MIT licensed · maintained by Matt Dula