In [5]:
import pandas as pd
from neo4j import GraphDatabase
import os
import numpy as np

class Neo4jIngester:
    def __init__(self, uri, user, password):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))
    
    def close(self):
        self.driver.close()
    
    def run_query(self, query, parameters=None):
        with self.driver.session() as session:
            result = session.run(query, parameters)
            return result

def clean_dataframe(df, id_cols):
    """Clean dataframe by removing rows with null/NaN in ID columns"""
    initial_count = len(df)
    df = df.dropna(subset=id_cols)
    removed = initial_count - len(df)
    if removed > 0:
        print(f"  ‚ö†Ô∏è  Removed {removed} rows with null IDs")
    
    # Convert ID columns to string and strip whitespace
    for col in id_cols:
        if col in df.columns:
            df[col] = df[col].astype(str).str.strip()
    
    return df

def ingest_data():
    # Neo4j connection parameters - UPDATE THESE
    NEO4J_URI = "bolt://localhost:7687"
    NEO4J_USER = "neo4j" 
    NEO4J_PASSWORD = "password"  # CHANGE THIS
    
    print("=" * 80)
    print("üöÄ STARTING NEO4J DATA INGESTION")
    print("=" * 80)
    
    ingester = Neo4jIngester(NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD)
    
    # Clear existing data
    print("\nüóëÔ∏è  Clearing existing data...")
    ingester.run_query("MATCH (n) DETACH DELETE n")
    print("  ‚úÖ Database cleared")
    
    folder_path = 'output_csv/'
    
    # ==================================================================
    # 1. INGEST NODE ENTITIES
    # ==================================================================
    
    print("\n" + "=" * 80)
    print("PHASE 1: INGESTING NODES")
    print("=" * 80)
    
    # Companies
    print("\nüè¢ Ingesting Companies...")
    companies_df = pd.read_csv(os.path.join(folder_path, 'companies.csv'))
    companies_df = clean_dataframe(companies_df, ['company_id'])
    query = """
    UNWIND $rows as row
    MERGE (c:Company {company_id: row.company_id})
    SET c.name = row.name,
        c.country = row.country,
        c.sector = row.sector,
        c.risk_score = toFloat(row.risk_score),
        c.opportunity_score = toFloat(row.opportunity_score),
        c.is_supplier = CASE WHEN row.is_supplier = 'True' THEN true ELSE false END
    """
    ingester.run_query(query, {'rows': companies_df.to_dict('records')})
    print(f"  ‚úÖ {len(companies_df)} companies ingested")
    
    # Auditors
    print("\nüëÆ Ingesting Auditors...")
    auditors_df = pd.read_csv(os.path.join(folder_path, 'auditors.csv'))
    auditors_df = clean_dataframe(auditors_df, ['auditor_id'])
    query = """
    UNWIND $rows as row
    MERGE (a:Auditor {auditor_id: row.auditor_id})
    SET a.name = row.name,
        a.risk_level = row.risk_level
    """
    ingester.run_query(query, {'rows': auditors_df.to_dict('records')})
    print(f"  ‚úÖ {len(auditors_df)} auditors ingested")
    
    # Shareholders
    print("\nüíº Ingesting Shareholders...")
    shareholders_df = pd.read_csv(os.path.join(folder_path, 'shareholders.csv'))
    shareholders_df = clean_dataframe(shareholders_df, ['shareholder_id'])
    query = """
    UNWIND $rows as row
    MERGE (s:Shareholder {shareholder_id: row.shareholder_id})
    SET s.name = row.name,
        s.type = row.type,
        s.risk_score = toFloat(row.risk_score),
        s.opportunity_score = toFloat(row.opportunity_score)
    """
    ingester.run_query(query, {'rows': shareholders_df.to_dict('records')})
    print(f"  ‚úÖ {len(shareholders_df)} shareholders ingested")
    
    # Invoices
    print("\nüìÑ Ingesting Invoices...")
    invoices_df = pd.read_csv(os.path.join(folder_path, 'invoices.csv'))
    invoices_df = clean_dataframe(invoices_df, ['invoice_id'])
    query = """
    UNWIND $rows as row
    MERGE (i:Invoice {invoice_id: row.invoice_id})
    SET i.amount = toFloat(row.amount),
        i.status = row.status,
        i.is_simulated = CASE WHEN row.is_simulated = 'True' THEN true ELSE false END
    """
    ingester.run_query(query, {'rows': invoices_df.to_dict('records')})
    print(f"  ‚úÖ {len(invoices_df)} invoices ingested")
    
    # ==================================================================
    # 2. INGEST RELATIONSHIPS - USING CORRECT COLUMN NAMES
    # ==================================================================
    
    print("\n" + "=" * 80)
    print("PHASE 2: CREATING RELATIONSHIPS")
    print("=" * 80)
    
    # OWNS_SHARE (shareholder_id, company_id, percentage)
    print("\nüìà Creating OWNS_SHARE relationships...")
    owns_share_df = pd.read_csv(os.path.join(folder_path, 'owns_share.csv'))
    owns_share_df = clean_dataframe(owns_share_df, ['shareholder_id', 'company_id'])
    query = """
    UNWIND $rows as row
    MATCH (s:Shareholder {shareholder_id: row.shareholder_id})
    MATCH (c:Company {company_id: row.company_id})
    MERGE (s)-[r:OWNS_SHARE]->(c)
    SET r.percentage = toFloat(row.percentage)
    """
    ingester.run_query(query, {'rows': owns_share_df.to_dict('records')})
    print(f"  ‚úÖ {len(owns_share_df)} OWNS_SHARE relationships created")
    
    # AUDITED_BY (company_id, auditor_id)
    print("\nüîç Creating AUDITED_BY relationships...")
    audited_by_df = pd.read_csv(os.path.join(folder_path, 'audited_by.csv'))
    audited_by_df = clean_dataframe(audited_by_df, ['company_id', 'auditor_id'])
    query = """
    UNWIND $rows as row
    MATCH (c:Company {company_id: row.company_id})
    MATCH (a:Auditor {auditor_id: row.auditor_id})
    MERGE (c)-[r:AUDITED_BY]->(a)
    """
    ingester.run_query(query, {'rows': audited_by_df.to_dict('records')})
    print(f"  ‚úÖ {len(audited_by_df)} AUDITED_BY relationships created")
    
    # ISSUES_TO (company_id, invoice_id)
    print("\nüì§ Creating ISSUES_TO relationships...")
    issues_to_df = pd.read_csv(os.path.join(folder_path, 'issues_to.csv'))
    issues_to_df = clean_dataframe(issues_to_df, ['company_id', 'invoice_id'])
    query = """
    UNWIND $rows as row
    MATCH (c:Company {company_id: row.company_id})
    MATCH (i:Invoice {invoice_id: row.invoice_id})
    MERGE (c)-[r:ISSUES_TO]->(i)
    """
    ingester.run_query(query, {'rows': issues_to_df.to_dict('records')})
    print(f"  ‚úÖ {len(issues_to_df)} ISSUES_TO relationships created")
    
    # PAYS (company_id, invoice_id)
    print("\nüí≥ Creating PAYS relationships...")
    pays_df = pd.read_csv(os.path.join(folder_path, 'pays.csv'))
    pays_df = clean_dataframe(pays_df, ['company_id', 'invoice_id'])
    query = """
    UNWIND $rows as row
    MATCH (c:Company {company_id: row.company_id})
    MATCH (i:Invoice {invoice_id: row.invoice_id})
    MERGE (c)-[r:PAYS]->(i)
    """
    ingester.run_query(query, {'rows': pays_df.to_dict('records')})
    print(f"  ‚úÖ {len(pays_df)} PAYS relationships created")
    
    # SUBSIDIARY_OF (child_company_id, parent_company_id, since_year)
    print("\nüè¢ Creating SUBSIDIARY_OF relationships...")
    subsidiary_df = pd.read_csv(os.path.join(folder_path, 'subsidiary_of.csv'))
    print(f"  üìã Columns in subsidiary_of.csv: {list(subsidiary_df.columns)}")
    subsidiary_df = clean_dataframe(subsidiary_df, ['child_company_id', 'parent_company_id'])
    query = """
    UNWIND $rows as row
    MATCH (child:Company {company_id: row.child_company_id})
    MATCH (parent:Company {company_id: row.parent_company_id})
    MERGE (child)-[r:SUBSIDIARY_OF]->(parent)
    SET r.since_year = toInteger(row.since_year)
    """
    ingester.run_query(query, {'rows': subsidiary_df.to_dict('records')})
    print(f"  ‚úÖ {len(subsidiary_df)} SUBSIDIARY_OF relationships created")
    
    # SUPPLIES (supplier_company_id, buyer_company_id, annual_volume)
    print("\nüöö Creating SUPPLIES relationships...")
    supplies_df = pd.read_csv(os.path.join(folder_path, 'supplies.csv'))
    print(f"  üìã Columns in supplies.csv: {list(supplies_df.columns)}")
    supplies_df = clean_dataframe(supplies_df, ['supplier_company_id', 'buyer_company_id'])
    query = """
    UNWIND $rows as row
    MATCH (supplier:Company {company_id: row.supplier_company_id})
    MATCH (buyer:Company {company_id: row.buyer_company_id})
    MERGE (supplier)-[r:SUPPLIES]->(buyer)
    SET r.annual_volume = toFloat(row.annual_volume)
    """
    ingester.run_query(query, {'rows': supplies_df.to_dict('records')})
    print(f"  ‚úÖ {len(supplies_df)} SUPPLIES relationships created")
    
    # ==================================================================
    # 3. CREATE INDEXES FOR PERFORMANCE
    # ==================================================================
    
    print("\n" + "=" * 80)
    print("PHASE 3: CREATING INDEXES")
    print("=" * 80)
    
    indexes = [
        "CREATE INDEX company_id_idx IF NOT EXISTS FOR (c:Company) ON (c.company_id)",
        "CREATE INDEX shareholder_id_idx IF NOT EXISTS FOR (s:Shareholder) ON (s.shareholder_id)",
        "CREATE INDEX auditor_id_idx IF NOT EXISTS FOR (a:Auditor) ON (a.auditor_id)",
        "CREATE INDEX invoice_id_idx IF NOT EXISTS FOR (i:Invoice) ON (i.invoice_id)",
        "CREATE INDEX company_risk_idx IF NOT EXISTS FOR (c:Company) ON (c.risk_score)",
        "CREATE INDEX auditor_risk_idx IF NOT EXISTS FOR (a:Auditor) ON (a.risk_level)"
    ]
    
    for idx_query in indexes:
        try:
            ingester.run_query(idx_query)
            print(f"  ‚úÖ {idx_query.split('FOR')[0].split('IF')[0].strip()}")
        except Exception as e:
            print(f"  ‚ö†Ô∏è  Index creation skipped (may already exist)")
    
    # ==================================================================
    # 4. GRAPH SUMMARY
    # ==================================================================
    
    print("\n" + "=" * 80)
    print("üìä GRAPH SUMMARY")
    print("=" * 80)
    
    # Count nodes by label
    with ingester.driver.session() as session:
        # Node counts
        node_query = """
        MATCH (n)
        RETURN labels(n)[0] as label, count(n) as count
        ORDER BY count DESC
        """
        result = session.run(node_query)
        print("\nNodes:")
        for record in result:
            print(f"  ‚Ä¢ {record['label']}: {record['count']:,}")
        
        # Relationship counts
        rel_query = """
        MATCH ()-[r]->()
        RETURN type(r) as type, count(r) as count
        ORDER BY count DESC
        """
        result = session.run(rel_query)
        print("\nRelationships:")
        for record in result:
            print(f"  ‚Ä¢ {record['type']}: {record['count']:,}")
        
        # Total counts
        total_query = """
        MATCH (n)
        WITH count(n) as nodeCount
        MATCH ()-[r]->()
        RETURN nodeCount, count(r) as relCount
        """
        result = session.run(total_query)
        record = result.single()
        print(f"\n{'=' * 80}")
        print(f"TOTALS: {record['nodeCount']:,} nodes, {record['relCount']:,} relationships")
        print(f"{'=' * 80}")
    
    # ==================================================================
    # 5. VALIDATION QUERIES
    # ==================================================================
    
    print("\n" + "=" * 80)
    print("üîç VALIDATION CHECKS")
    print("=" * 80)
    
    with ingester.driver.session() as session:
        # Check for high-risk auditors
        result = session.run("""
            MATCH (a:Auditor)
            WHERE a.risk_level = 'HIGH'
            RETURN count(a) as count
        """)
        high_risk = result.single()['count']
        print(f"\n‚úì High-risk auditors: {high_risk}")
        
        # Check for companies with subsidiaries
        result = session.run("""
            MATCH (c:Company)-[:SUBSIDIARY_OF]->()
            RETURN count(DISTINCT c) as count
        """)
        subs = result.single()['count']
        print(f"‚úì Companies with parent companies: {subs}")
        
        # Check for supply relationships
        result = session.run("""
            MATCH ()-[r:SUPPLIES]->()
            RETURN count(r) as count
        """)
        supplies = result.single()['count']
        print(f"‚úì Supply relationships: {supplies}")
        
        # Check for circular patterns (sample)
        result = session.run("""
            MATCH path = (c1:Company)-[:SUPPLIES]->(c2:Company)-[:SUPPLIES]->(c3:Company)-[:SUPPLIES]->(c1)
            RETURN count(path) as count
            LIMIT 100
        """)
        cycles = result.single()['count']
        print(f"‚úì Sample circular supply patterns found: {cycles}")
        
        # Check for ownership concentration
        result = session.run("""
            MATCH (s:Shareholder)-[o:OWNS_SHARE]->(c:Company)
            WHERE o.percentage > 25
            RETURN count(o) as count
        """)
        major_stakes = result.single()['count']
        print(f"‚úì Ownership stakes >25%: {major_stakes}")
    
    print("\n" + "=" * 80)
    print("üéâ INGESTION COMPLETE!")
    print("=" * 80)
    print("\nNext steps:")
    print("  1. Open Neo4j Browser at http://localhost:7474")
    print("  2. Run: CALL db.schema.visualization()")
    print("  3. Start detecting fraud patterns!")
    print("=" * 80)
    
    ingester.close()

if __name__ == "__main__":
    try:
        ingest_data()
    except Exception as e:
        print(f"\n‚ùå ERROR: {str(e)}")
        print("\nMake sure:")
        print("  1. Neo4j is running (check localhost:7474)")
        print("  2. Credentials are correct")
        print("  3. CSV files exist in output_csv/ folder")
        import traceback
        traceback.print_exc()

üöÄ STARTING NEO4J DATA INGESTION

üóëÔ∏è  Clearing existing data...
  ‚úÖ Database cleared

PHASE 1: INGESTING NODES

üè¢ Ingesting Companies...
  ‚úÖ 500 companies ingested

üëÆ Ingesting Auditors...
  ‚úÖ 10 auditors ingested

üíº Ingesting Shareholders...
  ‚úÖ 250 shareholders ingested

üìÑ Ingesting Invoices...
  ‚úÖ 1500 invoices ingested

PHASE 2: CREATING RELATIONSHIPS

üìà Creating OWNS_SHARE relationships...
  ‚úÖ 1777 OWNS_SHARE relationships created

üîç Creating AUDITED_BY relationships...
  ‚úÖ 500 AUDITED_BY relationships created

üì§ Creating ISSUES_TO relationships...
  ‚úÖ 1500 ISSUES_TO relationships created

üí≥ Creating PAYS relationships...
  ‚úÖ 1490 PAYS relationships created

üè¢ Creating SUBSIDIARY_OF relationships...
  üìã Columns in subsidiary_of.csv: ['child_company_id', 'parent_company_id', 'since_year']
  ‚úÖ 224 SUBSIDIARY_OF relationships created

üöö Creating SUPPLIES relationships...
  üìã Columns in supplies.csv: ['supplier_company_id'