# Cora Co-author Graph Import & Perturbation

This notebook:
1. Loads author co-authorship data from the Cora dataset
2. Imports clean data into a **constraint graph** (canonical reference)
3. Creates an **instance graph** (copy with intentional perturbations)
4. Swaps random author identities to simulate data quality issues

**Workflow:**
- Environment setup → Load data → Import to Neo4j → Create perturbations → Validate

## Step 1: Environment Configuration

Load Neo4j credentials and database names from `.env` file.
Never commit secrets — use `.env` (gitignored) for local credentials.

In [None]:
# --- Env loader & sanitiser ---
import os
import pathlib
from dotenv import load_dotenv
# .env path 
env_path = pathlib.Path.cwd() / ".env"
load_dotenv(dotenv_path=env_path, override=True)

def _strip_quotes(v):
    if v is None:
        return None
    return v.strip().strip('"').strip("'")

# sanitized env values
URI = _strip_quotes(os.getenv("NEO4J_URI"))
USERNAME = _strip_quotes(os.getenv("NEO4J_USERNAME"))
PASSWORD = _strip_quotes(os.getenv("NEO4J_PASSWORD"))

missing = [k for k,v in (("NEO4J_URI",URI),("NEO4J_USERNAME",USERNAME),("NEO4J_PASSWORD",PASSWORD)) if not v]
if missing:
    raise RuntimeError(f"Missing env vars: {missing}. Edit .env or export env vars and re-run this cell.")

# printing safe info only
print(f"Loaded Neo4j URI: {URI!r}, username: {USERNAME!r} (password hidden)")
AUTH = (USERNAME, PASSWORD)

Loaded Neo4j URI: 'neo4j://127.0.0.1:7687', username: 'neo4j' (password hidden)


## Step 2: Configure Databases & Parameters

Set constraint and instance database names, fraud count.
All values can be overridden via `.env` file.

In [3]:
# --- DB names, hyperparameters, and defaults ---
CONSTRAINT_DB = _strip_quotes(os.getenv("NEO4J_CONSTRAINT_DB")) or "test"
INSTANCE_DB   = _strip_quotes(os.getenv("NEO4J_INSTANCE_DB")) or "test-instance-graph"

from easydict import EasyDict as edict
hypp = edict()
hypp.fraud_number = int(os.getenv("FRAUD_NUMBER") or 10)

print("Constraint DB:", CONSTRAINT_DB)
print("Instance DB:", INSTANCE_DB)
print("Fraud / swap count:", hypp.fraud_number)

Constraint DB: coauthor-constraint
Instance DB: coauthor-instance
Fraud / swap count: 10


## Step 3: Locate Data Files

Auto-detects the latest timestamped author/connection files from `datasets/temp/`.
These are generated by `cora_data_cleaning.ipynb`.

In [4]:
# --- Pick latest generated authors/connection files automatically ---
from pathlib import Path
import re
from datetime import datetime

def latest_by_timestamp(folder: Path, pattern: str, name_re: str, dt_fmt="%Y%m%d-%H%M%S"):
    folder = Path(folder)
    candidates = list(folder.glob(pattern))
    if not candidates:
        return None
    ts_files = []
    rx = re.compile(name_re)
    for p in candidates:
        m = rx.search(p.name)
        if m:
            try:
                ts = datetime.strptime(m.group(1), dt_fmt)
                ts_files.append((ts, p))
            except Exception:
                pass
    if ts_files:
        return str(max(ts_files, key=lambda t_p: t_p[0])[1])
    return str(max(candidates, key=lambda p: p.stat().st_mtime))

base = Path("datasets") / "temp"
AUTHORS_PATH = latest_by_timestamp(base, "authors_*.txt", r"authors_(\d{8}-\d{6})\.txt")
CONNECTIONS_PATH = latest_by_timestamp(base, "author_connections_*.txt", r"author_connections_(\d{8}-\d{6})\.txt")

if not AUTHORS_PATH or not CONNECTIONS_PATH:
    raise FileNotFoundError(f"No authors/connection files found in {base}. Run `cora_data_cleaning.ipynb` first.")

print("Using AUTHORS_PATH =", AUTHORS_PATH)
print("Using CONNECTIONS_PATH =", CONNECTIONS_PATH)

Using AUTHORS_PATH = datasets\temp\authors_20260107-111333.txt
Using CONNECTIONS_PATH = datasets\temp\author_connections_20260107-111333.txt


## Step 4: Load Data into Memory

Reads authors and co-author pairs from generated files.
Parses connection format: `(author1,author2)` tuples.

In [5]:
# --- Read authors and connection pairs ---
# Read authors file
with open(AUTHORS_PATH, encoding="utf-8") as f:
    authors = [line.strip() for line in f if line.strip()]

# Read connections file (expects lines like "(authorA,authorB)")
connections = []
with open(CONNECTIONS_PATH, "r", encoding="utf-8") as file:
    for line in file:
        clean_line = line.strip().lstrip("(").rstrip(")")
        parts = [p.strip() for p in clean_line.split(",")]
        if len(parts) >= 2:
            connections.append((parts[0], parts[1]))

print(f"Loaded {len(authors)} authors and {len(connections)} connections.")

Loaded 47 authors and 120 connections.


## Step 5: Define Neo4j Operations

Helper functions:
- `clear_database()` — remove all nodes/relationships
- `setup_database()` — create uniqueness constraints
- `import_data()` — bulk insert authors and co-author edges
- `swap_random_authors()` — swap names and label fraudulent nodes

In [6]:
# --- Neo4j helper functions and import/swap workflow ---
from neo4j import GraphDatabase, basic_auth
 
def clear_database(driver, database):
    driver.execute_query("MATCH (n) DETACH DELETE n", database_=database)
    print(f"Database '{database}' cleared.")

def setup_database(driver, database):
    driver.execute_query("""
        CREATE CONSTRAINT author_name_unique IF NOT EXISTS
        FOR (a:Author) REQUIRE a.name IS UNIQUE
    """, database_=database)
    print(f"Constraint ensured on '{database}'.")

def import_data(driver, authors_list, connections_list, database):
    # Bulk create authors
    driver.execute_query("""
        UNWIND $names AS name
        MERGE (:Author {name: name})
    """, names=authors_list, database_=database)
    # Bulk create connections
    driver.execute_query("""
        UNWIND $pairs AS pair
        MATCH (a:Author {name: pair[0]})
        MATCH (b:Author {name: pair[1]})
        MERGE (a)-[:CO_AUTHOR]->(b)
    """, pairs=connections_list, database_=database)
    print(f"Imported data into '{database}' (authors: {len(authors_list)}, pairs: {len(connections_list)})")

def swap_random_authors(driver, database, number_of_swaps):
    # Safer swap using a temporary property
    query = """
    MATCH (a1:Author), (a2:Author)
    WHERE id(a1) < id(a2)
    WITH a1, a2 ORDER BY rand() LIMIT $limit
    CALL {
      WITH a1, a2
      SET a1._tmp = a1.name
      SET a1.name = a2.name
      SET a2.name = a1._tmp
      REMOVE a1._tmp
      SET a1:Fraudulent, a2:Fraudulent
      RETURN a1.name AS swapped_name1, a2.name AS swapped_name2
    }
    RETURN swapped_name1, swapped_name2
    """
    with driver.session(database=database) as session:
        result = session.run(query, limit=number_of_swaps)
        swapped = []
        for record in result:
            swapped.append((record["swapped_name1"], record["swapped_name2"]))
        print(f"Performed {len(swapped)} swaps in '{database}'")
        return swapped   

## Step 6: Execute Import & Perturbation Workflow

Run the full pipeline:
1. Verify Neo4j connectivity
2. Import clean data to constraint DB
3. Copy data to instance DB
4. Swap random author pairs (simulate fraud)

**Note:** Instance DB has NO uniqueness constraint (allows temporary duplicates during swap).

In [7]:
# --- Verify connectivity and run import/swap workflow ---

# Test connection
with GraphDatabase.driver(URI, auth=AUTH) as driver:
    driver.verify_connectivity()
    print("Connected to Neo4j database successfully.")

# Run import: constraint DB (canonical)
with GraphDatabase.driver(URI, auth=AUTH) as driver:
    clear_database(driver, CONSTRAINT_DB)
    setup_database(driver, CONSTRAINT_DB)
    import_data(driver, authors, connections, CONSTRAINT_DB)

# Run import: instance DB (sandbox) and perform swaps
with GraphDatabase.driver(URI, auth=AUTH) as driver:
    clear_database(driver, INSTANCE_DB)
    # do NOT create the unique constraint on instance DB (swaps could violate it)
    import_data(driver, authors, connections, INSTANCE_DB)
    swap_random_authors(driver, INSTANCE_DB, int(hypp.fraud_number))

print("\nImport and swap workflow complete.")

Connected to Neo4j database successfully.
Database 'coauthor-constraint' cleared.
Constraint ensured on 'coauthor-constraint'.




Imported data into 'coauthor-constraint' (authors: 47, pairs: 120)
Database 'coauthor-instance' cleared.
Imported data into 'coauthor-instance' (authors: 47, pairs: 120)
Performed 10 swaps in 'coauthor-instance'

Import and swap workflow complete.


## Validation & Next Steps

- Constraint graph: clean, deduplicated author data with co-author relationships
- Instance graph: same structure with `hypp.fraud_number` swapped identities labeled `:Fraudulent`

def swap_random_authors(driver, database, number_of_swaps):
    query = """
    // 1. Get a pool of authors and pick two random ones
    MATCH (a1:Author), (a2:Author)
    WHERE elementId(a1) < elementId(a2) // Ensure we don't pick the same node twice
    WITH a1, a2 ORDER BY rand()
    LIMIT $limit

    // 2. Store their names in temporary variables
    WITH a1, a2, a1.name AS oldName1, a2.name AS oldName2
    
    // 3. Perform the swap
    SET a1.name = oldName2
    SET a2.name = oldName1
    
    // 4. Label them as fraudulent/swapped so we can find them
    SET a1:Fraudulent, a2:Fraudulent
    
    RETURN a1.name, a2.name
    """
    
    with driver.session(database=database) as session:
        result = session.run(query, limit=number_of_swaps)
        for record in result:
            print(f"Swapped identities: {record[0]} <-> {record[1]}")

with GraphDatabase.driver(URI, auth=AUTH) as driver:
    clear_database(driver, "test-instance-graph")
    # setup_database(driver, "test-instance-graph") probably constraint not for indtance bc duplicates could exist in it
    import_data(driver, authors, connections, "test-instance-graph")
    swap_random_authors(driver, "test-instance-graph", hypp.fraud_number)