# Entity Resolution Data → Neo4j KG

This notebook follows the ERKG flow (datasets → graph): load entity-resolution outputs, build nodes, and add relationships in Neo4j.

Update the **Configuration** section with your paths and Neo4j credentials before running.

## Configuration
Set the file paths and Neo4j connection info. Defaults mirror the ERKG notebooks (local Neo4j Desktop).

In [None]:
import os
from pathlib import Path

# Paths
BASE_DIR = Path('.')
EXPORT_JSON = Path(os.getenv('ER_EXPORT_JSON', 'data/export.json'))  # Senzing export
RAW_DATA_DIR = Path(os.getenv('ER_RAW_DATA_DIR', 'data/raw'))        # optional raw inputs

# Neo4j connection (match ERKG notebooks defaults)
NEO4J_URI = os.getenv('NEO4J_URI', 'neo4j://localhost:7687')
NEO4J_USER = os.getenv('NEO4J_USER', 'neo4j')
NEO4J_PASSWORD = os.getenv('NEO4J_PASSWORD', 'neo4j')  # change me

EXPORT_JSON, RAW_DATA_DIR, NEO4J_URI

## Neo4j Helpers
Create a small helper to run Cypher from Python (similar pattern to ERKG `graph.ipynb`).

In [None]:
from neo4j import GraphDatabase

def run_cypher(query, params=None):
    params = params or {}
    with GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) as driver:
        with driver.session() as session:
            return list(session.run(query, params))

# Quick connectivity check
run_cypher('RETURN 1 AS ok')

## Load Senzing Export
The Senzing export is typically a JSON array or JSONL. This loader handles either. Adjust if your export format differs.

In [None]:
import json

def load_export(path: Path):
    text = path.read_text(encoding='utf-8')
    text_stripped = text.lstrip()
    if text_stripped.startswith('['):
        return json.loads(text)
    # assume JSON Lines
    rows = []
    for line in text.splitlines():
        line = line.strip()
        if not line:
            continue
        rows.append(json.loads(line))
    return rows

entities = load_export(EXPORT_JSON)
len(entities)

## Normalize Records
Convert Senzing entities into node/relationship payloads.
This normalization is intentionally flexible: it looks for common Senzing fields like `ENTITY_ID`, `RECORDS`, `FEATURES`, etc.
If your export schema differs, adjust the mapping here.

In [None]:
def norm_entities(rows):
    entity_rows = []
    record_rows = []
    name_rows = []
    addr_rows = []
    phone_rows = []

    for e in rows:
        entity_id = e.get('ENTITY_ID') or e.get('ENTITY_ID_STR') or e.get('entity_id')
        if entity_id is None:
            continue

        entity_rows.append({
            'entity_id': str(entity_id),
            'resolved': True,
            'score': e.get('MATCH_LEVEL') or e.get('match_level')
        })

        for r in e.get('RECORDS', []) or e.get('records', []):
            record_id = r.get('RECORD_ID') or r.get('record_id') or r.get('RECORD_ID_STR')
            data_source = r.get('DATA_SOURCE') or r.get('data_source')

            record_rows.append({
                'record_id': str(record_id),
                'data_source': data_source,
                'entity_id': str(entity_id),
            })

            # Common Senzing record features
            for f in r.get('FEATURES', []) or r.get('features', []):
                ftype = f.get('FEAT_TYPE') or f.get('feat_type')
                fvals = f.get('FEAT_VALUES') or f.get('feat_values') or []
                if ftype == 'NAME':
                    for v in fvals:
                        name_rows.append({
                            'record_id': str(record_id),
                            'name': v.get('NAME_FULL') or v.get('name_full') or v.get('name')
                        })
                elif ftype == 'ADDRESS':
                    for v in fvals:
                        addr_rows.append({
                            'record_id': str(record_id),
                            'address': v.get('ADDR_FULL') or v.get('addr_full') or v.get('address')
                        })
                elif ftype == 'PHONE':
                    for v in fvals:
                        phone_rows.append({
                            'record_id': str(record_id),
                            'phone': v.get('PHONE_NUMBER') or v.get('phone_number') or v.get('phone')
                        })

    return entity_rows, record_rows, name_rows, addr_rows, phone_rows

entity_rows, record_rows, name_rows, addr_rows, phone_rows = norm_entities(entities)
len(entity_rows), len(record_rows)

## Neo4j Schema
Create constraints and indexes for fast merge.

In [None]:
schema_cypher = [
    'CREATE CONSTRAINT entity_id_unique IF NOT EXISTS FOR (e:Entity) REQUIRE e.entity_id IS UNIQUE',
    'CREATE CONSTRAINT record_id_unique IF NOT EXISTS FOR (r:Record) REQUIRE r.record_id IS UNIQUE',
    'CREATE INDEX name_value IF NOT EXISTS FOR (n:Name) ON (n.value)',
    'CREATE INDEX address_value IF NOT EXISTS FOR (a:Address) ON (a.value)',
    'CREATE INDEX phone_value IF NOT EXISTS FOR (p:Phone) ON (p.value)',
]
for stmt in schema_cypher:
    run_cypher(stmt)

'Schema ready'

## Load Entities and Records
Batch insert nodes and relationships. Adjust batch size for your environment.

In [None]:
def chunks(rows, size=1000):
    for i in range(0, len(rows), size):
        yield rows[i:i+size]

entity_cypher = '''
UNWIND $rows AS row
MERGE (e:Entity {entity_id: row.entity_id})
SET e.resolved = row.resolved, e.score = row.score
'''

record_cypher = '''
UNWIND $rows AS row
MERGE (r:Record {record_id: row.record_id})
SET r.data_source = row.data_source
WITH r, row
MATCH (e:Entity {entity_id: row.entity_id})
MERGE (r)-[:RESOLVED_TO]->(e)
'''

for batch in chunks(entity_rows, 1000):
    run_cypher(entity_cypher, {'rows': batch})

for batch in chunks(record_rows, 1000):
    run_cypher(record_cypher, {'rows': batch})

'Entities and records loaded'

## Add Attribute Nodes and Relationships
Create `Name`, `Address`, and `Phone` nodes, then connect them to `Record` nodes.
If your ERKG process uses different feature types, extend this section.

In [None]:
name_cypher = '''
UNWIND $rows AS row
WITH row WHERE row.name IS NOT NULL AND row.name <> ''
MERGE (n:Name {value: row.name})
WITH n, row
MATCH (r:Record {record_id: row.record_id})
MERGE (r)-[:HAS_NAME]->(n)
'''

addr_cypher = '''
UNWIND $rows AS row
WITH row WHERE row.address IS NOT NULL AND row.address <> ''
MERGE (a:Address {value: row.address})
WITH a, row
MATCH (r:Record {record_id: row.record_id})
MERGE (r)-[:HAS_ADDRESS]->(a)
'''

phone_cypher = '''
UNWIND $rows AS row
WITH row WHERE row.phone IS NOT NULL AND row.phone <> ''
MERGE (p:Phone {value: row.phone})
WITH p, row
MATCH (r:Record {record_id: row.record_id})
MERGE (r)-[:HAS_PHONE]->(p)
'''

for batch in chunks(name_rows, 2000):
    run_cypher(name_cypher, {'rows': batch})

for batch in chunks(addr_rows, 2000):
    run_cypher(addr_cypher, {'rows': batch})

for batch in chunks(phone_rows, 2000):
    run_cypher(phone_cypher, {'rows': batch})

'Attributes linked'

## Sanity Checks
A couple of quick counts to verify the KG is populated.

In [None]:
run_cypher('MATCH (e:Entity) RETURN count(e) AS entities')
run_cypher('MATCH (r:Record) RETURN count(r) AS records')
run_cypher('MATCH (:Record)-[rel:RESOLVED_TO]->(:Entity) RETURN count(rel) AS resolved_edges')

## Notes
- If your Senzing export uses different feature names, adjust `norm_entities()`.
- If you want to reproduce the ERKG visualization, follow the analysis steps in `impact.ipynb`.