# From Relational to NoSQL to Graph: Building a Data Catalog

In [77]:
import sqlite3
from datetime import datetime, timedelta
import random
from pymongo import MongoClient
import json, glob, os
from dotenv import load_dotenv
from neo4j import GraphDatabase

random.seed(10)

# Relational Database (SQLite)

We start with a traditional metadata catalog — a relational schema describing datasets and their columns.

In [78]:
# --- CREATE IN-MEMORY DATABASE ---
# Using ":memory:" creates a temporary DB that exists only while the script runs
conn = sqlite3.connect(":memory:")
cur = conn.cursor()

# --- CREATE TABLE: agent ---
# Represents people or systems responsible for data entities
cur.execute('''
CREATE TABLE agent (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT
)
''')

# Insert a few example agents
agents = ["alice", "bob", "carol"]
cur.executemany("INSERT INTO agent(name) VALUES (?)", [(agent,) for agent in agents])

# --- CREATE TABLE: entity ---
# Represents datasets or data files, each with a name, type, and creation timestamp
# The wasDerivedFrom field models lineage (self-reference to another entity)
cur.execute('''
CREATE TABLE entity (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT,
    file_type TEXT,
    created_at TEXT,
    wasDerivedFrom REFERENCES entity(id)
)
''')

# Define possible file types and arbitrary dataset names (topics)
file_types = ["csv", "json", "parquet", "tiff", "xml", "avro", "xlsx"]
topics = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"]

# Generate synthetic entities with random properties
entities = []
base_date = datetime(2025, 1, 1)

for name in topics:
    file_type = random.choice(file_types)
    created_at = (base_date + timedelta(days=random.randint(0, 700))).isoformat()
    # Randomly link some entities to a previous one to simulate data derivation
    was_derived_from = random.randint(1, len(entities)) if entities and random.random() < 0.5 else None
    entities.append((name, file_type, created_at, was_derived_from))

# Insert generated entities into the database
cur.executemany(
    "INSERT INTO entity(name, file_type, created_at, wasDerivedFrom) VALUES (?, ?, ?, ?)",
    entities
)
conn.commit()

# --- CREATE TABLE: wasAttributedTo ---
# Connects entities to the agents responsible for them
cur.execute('''
CREATE TABLE wasAttributedTo (
    entity_id REFERENCES entity(id),
    agent_id REFERENCES agent(id)
)
''')

# Randomly assign each entity to one of the agents
relations = []
for e_id in range(len(entities)):
    relations.append([e_id + 1, random.choice(range(len(agents))) + 1])

cur.executemany("INSERT INTO wasAttributedTo(entity_id, agent_id) VALUES (?, ?)", relations)
conn.commit()

# --- CREATE TABLE: activity ---
# Represents data processing steps (ETL-style)
cur.execute('''
CREATE TABLE activity (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    name TEXT
)
''')

# List of possible ETL activities
activities = [
    "extract_sales_data",
    "transform_customer_data",
    "load_inventory_snapshot",
    "aggregate_marketing_metrics",
    "clean_web_logs",
    "validate_financial_forecast",
    "normalize_sensor_data",
    "convert_image_formats"
]

# Randomly insert 5 activities into the table
cur.executemany("INSERT INTO activity(name) VALUES (?)", [(a,) for a in random.sample(activities, k=5)])
conn.commit()

# --- CREATE TABLE: wasGeneratedBy ---
# Links entities to the activities that produced them
cur.execute('''
CREATE TABLE wasGeneratedBy (
    entity_id REFERENCES entity(id),
    activity_id REFERENCES activity(id)
)
''')

# Randomly associate each entity with one activity
relations = []
for e_id in range(len(entities)):
    relations.append([e_id + 1, random.choice(range(len(activities))) + 1])

cur.executemany("INSERT INTO wasGeneratedBy(entity_id, activity_id) VALUES (?, ?)", relations)
conn.commit()

# --- DISPLAY TABLE CONTENTS ---
# Print all data for quick inspection and verification
for table in ["agent", "entity", "wasAttributedTo", "activity", "wasGeneratedBy"]:
    print(f"\n--- {table} ---")
    cur.execute(f"SELECT * FROM {table}")
    print([row for row in cur.fetchall()])



--- agent ---
[(1, 'alice'), (2, 'bob'), (3, 'carol')]

--- entity ---
[(1, 'a', 'xml', '2025-02-03T00:00:00', None), (2, 'b', 'tiff', '2026-05-10T00:00:00', None), (3, 'c', 'json', '2026-04-19T00:00:00', None), (4, 'd', 'xlsx', '2025-10-12T00:00:00', None), (5, 'e', 'json', '2025-02-05T00:00:00', None), (6, 'f', 'parquet', '2025-03-19T00:00:00', 3), (7, 'g', 'csv', '2026-03-07T00:00:00', None), (8, 'h', 'xml', '2025-12-30T00:00:00', 3), (9, 'i', 'xlsx', '2026-11-23T00:00:00', 3), (10, 'j', 'avro', '2025-11-07T00:00:00', None)]

--- wasAttributedTo ---
[(1, 1), (2, 2), (3, 1), (4, 2), (5, 3), (6, 2), (7, 1), (8, 3), (9, 1), (10, 1)]

--- activity ---
[(1, 'load_inventory_snapshot'), (2, 'transform_customer_data'), (3, 'convert_image_formats'), (4, 'clean_web_logs'), (5, 'validate_financial_forecast')]

--- wasGeneratedBy ---
[(1, 4), (2, 6), (3, 8), (4, 7), (5, 8), (6, 2), (7, 6), (8, 3), (9, 4), (10, 7)]


# Document Database (MongoDB)

Now we move to MongoDB to handle flexible, nested, and heterogeneous metadata structures.

In [79]:
# Load .env from parent folder
load_dotenv(dotenv_path=os.path.join("..", ".env"))

mongo_uri = f"mongodb://{os.getenv('IP')}:27017/db"
if not mongo_uri:
    raise ValueError("MONGO_URI not set in .env")

# --- CONNECT TO MONGODB ---
client = MongoClient(mongo_uri)
db = client["metadata_catalog"]

# Clean existing collections
for col in ["agents", "activities", "entities"]:
    db[col].delete_many({})

# --- MIGRATE AGENTS ---
cur.execute("SELECT id, name FROM agent")
agents = [{"_id": a_id, "name": name} for a_id, name in cur.fetchall()]
db.agents.insert_many(agents)

# --- MIGRATE ACTIVITIES ---
cur.execute("SELECT id, name FROM activity")
activities = [{"_id": a_id, "name": name} for a_id, name in cur.fetchall()]
db.activities.insert_many(activities)

# --- GET RELATIONS ---
cur.execute("SELECT entity_id, agent_id FROM wasAttributedTo")
entity_to_agents = {}
for e_id, a_id in cur.fetchall():
    entity_to_agents.setdefault(e_id, []).append(a_id)

cur.execute("SELECT entity_id, activity_id FROM wasGeneratedBy")
entity_to_activities = {}
for e_id, act_id in cur.fetchall():
    entity_to_activities.setdefault(e_id, []).append(act_id)

# --- FUNCTION TO CREATE CUSTOM METADATA ---
def generate_metadata(file_type):
    if file_type == "csv":
        return {"columns": random.randint(3, 12), "delimiter": ",", "encoding": "utf-8"}
    elif file_type == "json":
        return {"schema_version": f"v{random.randint(1, 5)}", "nested": random.choice([True, False])}
    elif file_type == "tiff":
        return {"resolution": random.choice(["1080p", "4k"]), "color_depth": random.choice([8, 16, 24])}
    elif file_type == "xlsx":
        return {"sheet_count": random.randint(1, 10), "has_formulas": random.choice([True, False])}
    elif file_type == "avro":
        return {"compression": random.choice(["snappy", "deflate"]), "record_count": random.randint(1000, 100000)}
    else:
        return {"notes": "generic metadata"}

# --- MIGRATE ENTITIES ---
cur.execute("SELECT id, name, file_type, created_at, wasDerivedFrom FROM entity")
mongo_entities = []

for e_id, name, file_type, created_at, wasDerivedFrom in cur.fetchall():
    # get related agents and activities by id
    related_agent_ids = entity_to_agents.get(e_id, [])
    related_activity_ids = entity_to_activities.get(e_id, [])

    # resolve names
    cur.execute("SELECT name FROM agent WHERE id IN (%s)" % ",".join("?" * len(related_agent_ids)), related_agent_ids or [])
    agents = [row[0] for row in cur.fetchall()] if related_agent_ids else []

    cur.execute("SELECT name FROM activity WHERE id IN (%s)" % ",".join("?" * len(related_activity_ids)), related_activity_ids or [])
    activities = [row[0] for row in cur.fetchall()] if related_activity_ids else []

    # flatten metadata
    metadata = generate_metadata(file_type)

    # base document
    doc = {
        "_id": e_id,
        "name": name,
        "file_type": file_type,
        "created_at": created_at,
        "agents": agents,
        "activities": activities,
        "wasDerivedFrom": wasDerivedFrom
    }

    # merge metadata as top-level fields
    doc.update(metadata)

    mongo_entities.append(doc)

db.entities.insert_many(mongo_entities)

for e in db.entities.find({}, {"_id": 0}):
    print(e)


{'name': 'a', 'file_type': 'xml', 'created_at': '2025-02-03T00:00:00', 'agents': ['alice'], 'activities': ['clean_web_logs'], 'wasDerivedFrom': None, 'notes': 'generic metadata'}
{'name': 'b', 'file_type': 'tiff', 'created_at': '2026-05-10T00:00:00', 'agents': ['bob'], 'activities': [], 'wasDerivedFrom': None, 'resolution': '4k', 'color_depth': 24}
{'name': 'c', 'file_type': 'json', 'created_at': '2026-04-19T00:00:00', 'agents': ['alice'], 'activities': [], 'wasDerivedFrom': None, 'schema_version': 'v1', 'nested': True}
{'name': 'd', 'file_type': 'xlsx', 'created_at': '2025-10-12T00:00:00', 'agents': ['bob'], 'activities': [], 'wasDerivedFrom': None, 'sheet_count': 3, 'has_formulas': False}
{'name': 'e', 'file_type': 'json', 'created_at': '2025-02-05T00:00:00', 'agents': ['carol'], 'activities': [], 'wasDerivedFrom': None, 'schema_version': 'v5', 'nested': False}
{'name': 'f', 'file_type': 'parquet', 'created_at': '2025-03-19T00:00:00', 'agents': ['bob'], 'activities': ['transform_cust

# Graph Database (Neo4j)

Now we model lineage and dependencies between datasets, jobs, and owners.

In [80]:
load_dotenv(dotenv_path=os.path.join("..", ".env"))

uri = f"bolt://{os.getenv('IP')}:7687"
user = "neo4j"
password = "password"

driver = GraphDatabase.driver(uri, auth=(user, password))

with driver.session() as session:
    # Clear Neo4j
    session.run("MATCH (n) DETACH DELETE n")

    # --- Create Agent nodes ---
    for agent in db.agents.find():
        session.run(
            "CREATE (:Agent {id: $id, name: $name})",
            id=agent["_id"],
            name=agent["name"]
        )

    # --- Create Activity nodes ---
    for act in db.activities.find():
        session.run(
            "CREATE (:Activity {id: $id, name: $name})",
            id=act["_id"],
            name=act["name"]
        )

    # --- Create Entity nodes ---
    for e in db.entities.find():
        # Flatten entity properties (skip agents, activities, wasDerivedFrom)
        props = {}
        for k, v in e.items():
            if k in ["_id", "agents", "activities", "wasDerivedFrom"]:
                continue
            elif isinstance(v, (str, int, float, bool)):
                props[k] = v
            elif isinstance(v, list):
                props[k] = [str(x) for x in v]
            elif isinstance(v, dict):
                # flatten dict metadata into key-value pairs
                for mk, mv in v.items():
                    props[mk] = str(mv)
            elif v is None:
                continue
            else:
                props[k] = str(v)

        props["id"] = e["_id"]

        # Build dynamic CREATE statement
        prop_keys = ", ".join([f"{k}: ${k}" for k in props])
        cypher = f"CREATE (ent:Entity {{ {prop_keys} }})"
        session.run(cypher, **props)

    # --- Create Relationships ---
    for e in db.entities.find():
        # Derived relationships
        if e.get("wasDerivedFrom") is not None:
            session.run(
                """
                MATCH (src:Entity {id: $derived_id}), (dst:Entity {id: $entity_id})
                CREATE (dst)-[:WAS_DERIVED_FROM]->(src)
                """,
                derived_id=e["wasDerivedFrom"],
                entity_id=e["_id"]
            )

        # Agent relationships
        for agent_name in e.get("agents", []):
            session.run(
                """
                MATCH (a:Agent {name: $agent_name}), (ent:Entity {id: $entity_id})
                CREATE (a)-[:WAS_ATTRIBUTED_TO]->(ent)
                """,
                agent_name=agent_name,
                entity_id=e["_id"]
            )

        # Activity relationships
        for act_name in e.get("activities", []):
            session.run(
                """
                MATCH (act:Activity {name: $act_name}), (ent:Entity {id: $entity_id})
                CREATE (act)-[:WAS_GENERATED_BY]->(ent)
                """,
                act_name=act_name,
                entity_id=e["_id"]
            )

    # Fetch all nodes and their labels/properties
    result = session.run("MATCH (n) RETURN labels(n) AS labels, n LIMIT 100")
    for record in result:
        labels = record["labels"]
        node = record["n"]
        print(f"{labels}: {dict(node)}")

driver.close()

['Agent']: {'name': 'alice', 'id': 1}
['Agent']: {'name': 'bob', 'id': 2}
['Agent']: {'name': 'carol', 'id': 3}
['Activity']: {'name': 'load_inventory_snapshot', 'id': 1}
['Activity']: {'name': 'transform_customer_data', 'id': 2}
['Activity']: {'name': 'convert_image_formats', 'id': 3}
['Activity']: {'name': 'clean_web_logs', 'id': 4}
['Activity']: {'name': 'validate_financial_forecast', 'id': 5}
['Entity']: {'notes': 'generic metadata', 'file_type': 'xml', 'name': 'a', 'created_at': '2025-02-03T00:00:00', 'id': 1}
['Entity']: {'file_type': 'tiff', 'name': 'b', 'created_at': '2026-05-10T00:00:00', 'color_depth': 24, 'id': 2, 'resolution': '4k'}
['Entity']: {'schema_version': 'v1', 'file_type': 'json', 'name': 'c', 'created_at': '2026-04-19T00:00:00', 'id': 3, 'nested': True}
['Entity']: {'sheet_count': 3, 'has_formulas': False, 'file_type': 'xlsx', 'name': 'd', 'created_at': '2025-10-12T00:00:00', 'id': 4}
['Entity']: {'schema_version': 'v5', 'file_type': 'json', 'name': 'e', 'created_