Skip to content

pradhan-is/atomic-attribute-graph

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

AtomicAttributeGraph (AAG)

A graph-native customer data model for field-level truth.

AAG solves record-level timestamp pollution in Customer Data Platforms by modelling every distinct attribute value as an independent graph node, linked to entities through timestamped relationships that carry the field's own event time — not the record's write time.

CustomerNode(C001) ──HAS_EMAIL @ 09:05 (salesforce)──► ValueNode("alex@example.com")
                   ──HAS_PHONE @ 09:05 (salesforce)──► ValueNode("+49 111 000 0000")  ✗ loses
                   ──HAS_PHONE @ 09:00 (shopify)   ──► ValueNode("+49 987 654 3210")  ✓ wins

Two Delta tables. Any entity type. Any relationship type. No vendor lock-in.

Framework blog post
Interactive demo


The Problem

Traditional CDPs timestamp entire records. When Salesforce syncs a full contact at 09:05, every field in that record — including phone numbers unchanged since 2024 — inherits a LastModifiedDate of 09:05. The CDP sees this as the freshest version and overwrites the correct value that Shopify wrote at 09:00 from an actual customer update.

AAG fixes this structurally: every relationship edge carries its own updated_at derived from the source event that changed that specific field, not the record that carried it.


Data Model

Two Delta tables store the entire graph:

-- graph.nodes: any entity — customer, company, asset, or a discrete attribute value
graph.nodes (
  node_id     STRING,    -- UUID for entity nodes; SHA-256(node_type:value) for value nodes
  node_type   STRING,    -- "customer" | "company" | "asset" | "email" | "phone" | ...
  value       STRING,    -- NULL for entity nodes; raw value for attribute-value nodes
  checksum    STRING,    -- SHA-256(value) for dedup; NULL for entity nodes
  created_at  TIMESTAMP,
  properties  MAP<STRING, STRING>
)

-- graph.relationships: directed, timestamped edge between any two nodes
graph.relationships (
  source_node_id  STRING,    -- origin — any node type
  target_node_id  STRING,    -- destination — any node type
  rel_type        STRING,    -- HAS_EMAIL | HAS_PHONE | WORKS_AT | OWNS | ...
  updated_at      TIMESTAMP, -- field-level event time, not record write time
  confidence      FLOAT,
  is_current      BOOLEAN,
  source_system   STRING,
  source_event_id STRING,
  properties      MAP<STRING, STRING>
)

Neither table has a customer_id column. Any node — customer, company, asset — can be source or target. Multi-hop traversal (customer → company → asset) uses the same two tables with no schema change.


Architecture

BRONZE   Raw source events (Fivetran CDC, Segment webhooks, API ingest)
           ↓  DLT Streaming — APPLY CHANGES INTO (SCD Type 2)
SILVER   graph.nodes + graph.relationships
           ↓  dbt — versioned SQL models, schema contracts, data tests
GOLD     customer_profile_v1 / v2  ·  company_assets  ·  consent_log
           ↓  Serverless SQL endpoint
SERVING  Hightouch reverse ETL  ·  BI tools  ·  REST APIs  ·  AI agents

Quick Start

Prerequisites

  • Databricks workspace with Unity Catalog enabled
  • Databricks CLI configured (databricks configure)
  • Python 3.11+
  • dbt-databricks 1.8+

1. Set up Unity Catalog schemas

databricks workspace import databricks/setup/00_unity_catalog_setup.sql
# Run interactively or via Databricks SQL

2. Seed configuration tables

cd dbt
dbt seed --select attribute_name_registry source_priority

3. Deploy the DLT pipeline

databricks pipelines create --json databricks/dlt/pipeline_config.json

4. Run dbt models

dbt run --select silver gold
dbt test --select silver gold

5. Point Hightouch at the Gold layer

Use c360.gold.customer_profile_v2 as your Hightouch source. See docs/hightouch-setup.md.


Source Systems

Out-of-the-box extractors:

Source Tier Connector Notes
Salesforce 2 Fivetran CDF replay on Contact, Account
SAP ECC / S4HANA 2 Fivetran KNA1 + ADR6 join; LAEDA as event time
Shopify 2+1 Fivetran + webhook updated_at record-level; accepts_marketing_updated_at field-level
Segment 1 Databricks destination identify() trait explosion — native field-level timestamps
Zendesk 2 Fivetran updated_at record-level

Adding a new source requires: one row per field in attribute_name_registry, one row in source_priority, one extraction notebook following the base class pattern.


dbt Model Versioning

Gold models use dbt 1.5+ versioned models for backward-compatible schema evolution:

# dbt/models/gold/schema.yml
models:
  - name: customer_profile
    latest_version: 2
    versions:
      - v: 1
        deprecation_date: "2027-01-01"
      - v: 2

Downstream consumers pin to a version: ref('customer_profile', version=1).
Breaking changes increment the version. Non-breaking additions go into the latest version.


File Guide

Databricks — infrastructure and pipeline

File What it does
databricks/setup/00_unity_catalog_setup.sql Creates the Unity Catalog, schemas (bronze, graph, config), and the two Silver tables (graph.nodes, graph.relationships). Run this once before anything else.
databricks/setup/01_config_tables.sql Creates the two config tables (attribute_name_registry, source_priority) that control field mapping and conflict resolution.
databricks/dlt/aag_pipeline.py The Delta Live Tables pipeline. Reads streaming Bronze tables from all five sources, normalises each record into one row per changed field, joins against the attribute registry, and writes deduplicated value nodes and SCD Type 2 edges into the Silver graph tables.
databricks/dlt/pipeline_config.json DLT pipeline metadata (cluster config, catalog name, schema parameters). Pass this to databricks pipelines create.
databricks/jobs/aag_orchestration.yml Databricks Jobs YAML that chains DLT pipeline → dbt rundbt test in order. This is what you schedule or trigger in production.
databricks/monitoring/freshness_and_conflicts.sql Ad-hoc SQL queries for ops health checks: how stale is each source, how many active attribute conflicts exist, profile completeness by source.

dbt — transformation and Gold layer

File What it does
dbt/seeds/attribute_name_registry.csv Maps each source system's raw field name (e.g. Salesforce Email, SAP SMTP_ADDR) to a canonical name (email) and relationship type (HAS_EMAIL). Every source field that should flow into the graph needs a row here.
dbt/seeds/source_priority.csv Ranks sources by trustworthiness (Salesforce=1, Segment=5). When two sources disagree on a field value and timestamps tie, the lower-numbered source wins.
dbt/models/silver/resolved_attributes.sql A view over graph.relationships + graph.nodes that applies the resolution algorithm: for each customer and each attribute type, picks the single winning value based on timestamp → confidence → source priority.
dbt/models/gold/customer_profile_v1.sql Pivots resolved_attributes into one wide row per customer with columns like email, phone, city. The original flat-table format.
dbt/models/gold/customer_profile_v2.sql Same as v1 but adds _source and _updated_at columns per attribute (e.g. email_source, email_updated_at), giving full field-level lineage. Use this for Hightouch and downstream consumers that need provenance.
dbt/models/gold/company_assets.sql Multi-hop traversal: customer → WORKS_AT → company → OWNS → asset. Demonstrates that the same two Silver tables can model any entity relationship without schema changes.
dbt/macros/resolve_winner.sql Reusable dbt macro that encapsulates the ROW_NUMBER() OVER (... ORDER BY updated_at DESC, confidence DESC, priority ASC) window logic. Used by the Silver view and Gold models.
dbt/tests/assert_no_duplicate_current_edges.sql Custom dbt test that fails if any (source_node_id, rel_type, source_system) combination has more than one is_current = TRUE edge — would indicate a broken SCD Type 2 merge.
dbt/tests/assert_gold_recency.sql Custom dbt test that fails if any Gold profile row has not been updated within a configurable window — catches stale pipeline failures silently.

Python library — src/aag/

File What it does
src/aag/utils/checksum.py Two functions: value_node_id(node_type, value) generates a deterministic SHA-256 ID for a value node (namespaced by type so "alex@example.com" as an email and as a username get different IDs); value_checksum(value) generates the dedup key stored in graph.nodes.
src/aag/graph/nodes.py PySpark helpers for graph.nodes: DDL string to create the table, build_value_nodes_df() to deduplicate a batch of field events into new value nodes, upsert_value_nodes() to merge them in (insert-only — value nodes are immutable), and create_entity_node() for one-time entity creation.
src/aag/graph/relationships.py PySpark helpers for graph.relationships: DDL string, the SCD Type 2 MERGE SQL (closes the previous is_current edge and inserts the new one), upsert_relationships() to execute it, and invalidate_entity_edges() for GDPR erasure requests.
src/aag/graph/resolution.py DataFrame implementation of the resolution algorithm (mirrors the SQL view). Also includes pivot_customer_profile() to produce a wide customer row from resolved attributes — used in notebooks and unit tests without needing a full Databricks cluster.
src/aag/ingestion/base.py Abstract base class for all source extractors. Subclasses implement extract_raw_changes() (read Bronze) and normalise() (unpivot to one row per field). The base class handles joining the attribute registry, computing target_node_id, and producing the final FieldEvent DataFrame. Call extractor.run() to get a batch ready for graph upsert.
src/aag/ingestion/salesforce.py Salesforce extractor. Reads salesforce_contact via Delta CDF, unpivots contact fields. Uses LastModifiedDate as event time (Tier 2 — record-level, not field-level).
src/aag/ingestion/sap.py SAP extractor. Reads sap_kna1, uses LAEDA (last changed date) as event time. Handles the SAP-specific field names (SMTP_ADDR → email, STRAS → street).
src/aag/ingestion/shopify.py Shopify extractor. Most fields use record-level updated_at (Tier 2), but accepts_marketing uses accepts_marketing_updated_at (Tier 1 — actual field-level consent timestamp).
src/aag/ingestion/segment.py Segment extractor. Reads segment_identifies, explodes the traits map so each trait becomes a separate row with the identify() call's own timestamp. This is the purest Tier 1 source — every trait already has its own event time.
src/aag/ingestion/zendesk.py Zendesk extractor. Reads zendesk_users, uses record-level updated_at. Maps name to fullName (Zendesk stores full name as a single field, unlike other sources).

Tests

File What it does
tests/test_checksum.py Unit tests for value_node_id and value_checksum: verifies determinism, type-namespacing, and that different inputs produce different hashes.
tests/test_resolution.py Unit tests for the resolution algorithm using small in-memory DataFrames. Covers timestamp wins, confidence tiebreaks, source priority tiebreaks, and GDPR edge invalidation.

CI and project config

File What it does
.github/workflows/ci.yml On every PR: runs ruff + mypy on the Python library, pytest, dbt compile, and sqlfluff lint on all SQL files.
pyproject.toml Python package config: dependencies (PySpark, dbt-databricks), ruff and mypy settings, pytest config.
Makefile Shortcuts: make test, make lint, make dbt-compile.

Contributing

See CONTRIBUTING.md. All SQL is linted with SQLFluff (dialect: databricks). Python follows ruff + mypy.


License

Apache 2.0 — see LICENSE.

Built by Santosh Pradhan · MarTech Solutions Architect, Munich.

About

AAG solves record-level timestamp pollution in Customer Data Platforms by modelling every distinct attribute value as an independent graph node, linked to entities through timestamped relationships that carry the field's own event time — not the record's write time.

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors