In [2]:
from neo4j import GraphDatabase
import pandas as pd
import uuid  # For generating unique IDs
import math 


# Load CSV files
account_booking_train = pd.read_csv('data/account_booking_train.csv')
external_parties_train = pd.read_csv('data/external_parties_train.csv')



In [3]:
external_parties_train.head()

Unnamed: 0,transaction_reference_id,party_role,party_info_unstructured,parsed_name,parsed_address_street_name,parsed_address_street_number,parsed_address_unit,parsed_address_postal_code,parsed_address_city,parsed_address_state,parsed_address_country,party_iban,party_phone,external_id
0,04ff0d1c680189e3a80c92d86407f0f5,BENE,mary mith 107 107 angela brooks n. thomasfurt ...,mary mith,angela brooks,107 107,,,n. thomasfurt,,bulgaria,GB49MYOB82127728573340,+1.815660-6791x8486,50039037
1,439ab0ad7380e6135ab2ff3fddd4a727,ORG,yesneia kim north michael 93971 koribati,yesneia kim,north michael,,,93971,koribati,,,,0 (269)620-8734x2349,60044692
2,00cac12d41191a84f9e31aa731a83512,ORG,w. roberson jr. 41010 rachel crossingapt. 923 ...,w. roberson jr.,rachel crossingapt.,41010 923,,p2235417,thompsonshire amyport,,,GB08OTHR53515837682953,,30008244
3,e4fba5f878dd3453e35973605a783a16,BENE,azquez-nelson co. suarez ports suite & 024 bri...,azquez-nelson co.,ports suite &,,,,brittanyberg,,bulgaria bulgaria,GB17VVGW66321494633280,,40017944
4,d03d7e4c31878b0255d39e8c3f0ab625,ORG,m.j. bytd iii 856 john lake s. glenn cocos (ke...,m.j. bytd iii,john lake s. glenn,856,,125838276,cocos (keeling),islands,,,(260)3371534,40012658


In [4]:
account_booking_train.head()

Unnamed: 0,transaction_reference_id,debit_credit_indicator,account_id,transaction_amount,transaction_currency,transaction_date
0,0ace8fca6ada96883ef2e823b5dea26b,CREDIT,25110,5249.26,GBP,2023-05-15
1,d52c4f1a546f5d784ee46a8f347ad607,DEBIT,27293,4481.5,GBP,2023-02-27
2,dac45362e7471a7fa2726c81adae8534,DEBIT,23088,1347.27,GBP,2023-03-18
3,2ee574398cd6c4a7f3e111447141550e,DEBIT,21641,9276.56,GBP,2023-07-15
4,c5154ea99a0ff84ba8e72217d34d3397,CREDIT,24233,8002.28,GBP,2024-01-12


In [5]:
# Clean the Data
# Clean the Data
# Step 1: Identify transaction counts by `transaction_reference_id`
transaction_counts = account_booking_train['transaction_reference_id'].value_counts()

# Step 2: Filter transactions with only one leg
single_leg_transactions = transaction_counts[transaction_counts == 1].index

# Step 3: Filter the dataset for these transactions
account_booking_train = account_booking_train[
    account_booking_train['transaction_reference_id'].isin(single_leg_transactions)
]


def sanitize_value(value):
    """Replace NaN with None to avoid Neo4j errors."""
    if value is None or (isinstance(value, float) and math.isnan(value)):
        return None
    return value



In [6]:
print('Number of rows in account_booking_train', len(account_booking_train))

print('Number of rows in external_parties_train', len(external_parties_train))

Number of rows in account_booking_train 11064
Number of rows in external_parties_train 11064


In [7]:
if external_parties_train['transaction_reference_id'].isna().any():
    print("The 'transaction_reference_id' column contains NaN values.")
else:
    print("The 'transaction_reference_id' column does not contain any NaN values.")

The 'transaction_reference_id' column does not contain any NaN values.


In [12]:
# Neo4j connection details
NEO4J_URI = "bolt://localhost:7687"
NEO4J_USERNAME = "neo4j"
NEO4J_PASSWORD = "password123"

class Neo4jLoader:
    def __init__(self, uri, username, password):
        """Initialize the Neo4j driver."""
        self.driver = GraphDatabase.driver(uri, auth=(username, password))

    def close(self):
        """Close the Neo4j driver connection."""
        if self.driver:
            self.driver.close()

    def create_constraints(self):
        """Create unique constraints for External_Party nodes."""
        with self.driver.session() as session:
            try:
                session.run("""
                CREATE CONSTRAINT unique_party_id IF NOT EXISTS
                FOR (p:External_Party) REQUIRE p.id IS UNIQUE
                """)
                print("Constraint 'unique_party_id' created successfully.")
            except Exception as e:
                print(f"Error creating 'unique_party_id' constraint: {e}")

    def load_external_parties(self, session, external_parties_train):
        """Load external parties as nodes."""
        for _, row in external_parties_train.iterrows():
            unique_party_id = str(uuid.uuid4())
            try:
                # Prepare non-null properties


                properties = {
                    "id": unique_party_id,
                    "transaction_reference_id": sanitize_value(row.get('transaction_reference_id')),
                    "party_role": sanitize_value(row.get('party_role')),
                    "party_info_unstructured": sanitize_value(row.get('party_info_unstructured')),
                    "parsed_name": sanitize_value(row.get('parsed_name')),
                    "parsed_address_street_name": sanitize_value(row.get('parsed_address_street_name')),
                    "parsed_address_street_number": sanitize_value(row.get('parsed_address_street_number')),
                    "parsed_address_unit": sanitize_value(row.get('parsed_address_unit')),
                    "parsed_address_postal_code": sanitize_value(row.get('parsed_address_postal_code')),
                    "parsed_address_city": sanitize_value(row.get('parsed_address_city')),
                    "parsed_address_state": sanitize_value(row.get('parsed_address_state')),
                    "parsed_address_country": sanitize_value(row.get('parsed_address_country')),
                    "party_iban": sanitize_value(row.get('party_iban')),
                    "party_phone": sanitize_value(row.get('party_phone')),
                    "external_id": sanitize_value(row.get('external_id'))
                }
                # Remove None values from properties
                non_null_properties = {k: v for k, v in properties.items() if v is not None}

                # Dynamically construct Cypher query
                query = f"MERGE (p:External_Party {{{', '.join([f'{k}: ${k}' for k in non_null_properties.keys()])}}})"
                session.run(query, non_null_properties)
            except Exception as e:
                print(f"Error loading External_Party node: {e}")

    def load_transactions_as_relationships(self, session, account_booking_train):
        """Load transactions as relationships between UBS_Client and External_Party nodes."""
        for _, row in account_booking_train.iterrows():
            try:
                # Prepare transaction properties
                properties = {
                    "transaction_reference_id": sanitize_value(row.get('transaction_reference_id')),
                    "debit_credit_indicator": sanitize_value(row.get('debit_credit_indicator')),
                    "account_id": sanitize_value(row.get('account_id')),
                    "transaction_amount": sanitize_value(row.get('transaction_amount')),
                    "transaction_currency": sanitize_value(row.get('transaction_currency')),
                    "transaction_date": sanitize_value(row.get('transaction_date'))
                }
                # Remove None values from properties
                non_null_properties = {k: v for k, v in properties.items() if v is not None}

                # Ensure account_id is present
                if "account_id" not in non_null_properties or "transaction_reference_id" not in non_null_properties:
                    print(f"Skipping row due to missing required fields: {row}")
                    continue

                # Dynamically construct Cypher queries
                create_ubs_client_query = """
                MERGE (c:UBS_Client {account_id: $account_id})
                """
                create_transaction_relationship_query = """
                MATCH (c:UBS_Client {account_id: $account_id}),
                    (p:External_Party {transaction_reference_id: $transaction_reference_id})
                MERGE (c)-[:TRANSACTION {
                    debit_credit_indicator: $debit_credit_indicator,
                    transaction_amount: $transaction_amount,
                    transaction_currency: $transaction_currency,
                    transaction_date: $transaction_date
                }]->(p)
                """

                # Execute queries
                session.run(create_ubs_client_query, {"account_id": non_null_properties["account_id"]})
                session.run(create_transaction_relationship_query, non_null_properties)
            except Exception as e:
                print(f"Error creating transaction relationship: {e}")

    def aggregate_party_nodes(self, session):
        """Aggregate External_Party nodes based on their name and address into a new node."""
        try:
            # Step 1: Identify groups of External_Party nodes with the same name and address
            grouping_query = """
            MATCH (p:External_Party)
            WITH p.parsed_name AS name, 
                p.parsed_address_street_name AS street_name, 
                p.parsed_address_street_number AS street_number, 
                p.parsed_address_city AS city, 
                p.parsed_address_postal_code AS postal_code, 
                p.parsed_address_country AS country, 
                COLLECT(p) AS parties
            WHERE name IS NOT NULL AND street_name IS NOT NULL AND street_number IS NOT NULL 
                AND city IS NOT NULL AND postal_code IS NOT NULL AND country IS NOT NULL
            RETURN name, street_name, street_number, city, postal_code, country, parties
            """

            # Step 2: Aggregate nodes and update relationships
            aggregate_query = """
            UNWIND $groups AS group
            WITH group.name AS name, 
                group.street_name AS street_name, 
                group.street_number AS street_number, 
                group.city AS city, 
                group.postal_code AS postal_code, 
                group.country AS country, 
                group.parties AS parties
            MERGE (new_party:AggregatedParty {
                parsed_name: name, 
                parsed_address_street_name: street_name, 
                parsed_address_street_number: street_number, 
                parsed_address_city: city, 
                parsed_address_postal_code: postal_code, 
                parsed_address_country: country
            })
            WITH new_party, parties
            UNWIND parties AS old_party
            MATCH (old_party)-[r]->(target)
            MERGE (new_party)-[new_r:RELATIONSHIP_TYPE]->(target)
            SET new_r = r
            WITH old_party, new_party
            MATCH (source)-[r]->(old_party)
            MERGE (source)-[new_r:RELATIONSHIP_TYPE]->(new_party)
            SET new_r = r
            DELETE r
            DETACH DELETE old_party
            """

            # Step 1: Get grouped nodes
            result = session.run(grouping_query)
            groups = []
            for record in result:
                groups.append({
                    "name": record["name"],
                    "street_name": record["street_name"],
                    "street_number": record["street_number"],
                    "city": record["city"],
                    "postal_code": record["postal_code"],
                    "country": record["country"],
                    "parties": [party.id for party in record["parties"]]
                })

            # Step 2: Aggregate nodes and update relationships
            session.run(aggregate_query, {"groups": groups})
            print("Aggregation of External_Party nodes completed successfully.")
        except Exception as e:
            print(f"Error aggregating External_Party nodes: {e}")



    def load_data(self, external_parties_train, account_booking_train):
        """Load all data into Neo4j."""
        with self.driver.session() as session:
            # Load nodes and relationships
            self.load_external_parties(session, external_parties_train)
            self.load_transactions_as_relationships(session, account_booking_train)

# Initialize the Neo4j loader
loader = Neo4jLoader(NEO4J_URI, NEO4J_USERNAME, NEO4J_PASSWORD)

try:
    print("Creating constraints in Neo4j...")
    loader.create_constraints()

    print("Loading data into Neo4j...")
    loader.load_data(external_parties_train, account_booking_train)
    print("Data loaded successfully!")
finally:
    loader.close()


Creating constraints in Neo4j...
Constraint 'unique_party_id' created successfully.
Loading data into Neo4j...


  "parties": [party.id for party in record["parties"]]


Error aggregating External_Party nodes: {code: Neo.ClientError.Statement.TypeError} {message: Expected to find a node at ref slot 8 but found Long(3) instead}
Data loaded successfully!


In [14]:
def aggregate_party_nodes(session):
    """Aggregate External_Party nodes based on their name and address into a new node."""
    try:
        # Step 1: Identify groups of External_Party nodes with the same name and address
        grouping_query = """
        MATCH (p:External_Party)
        WITH p.parsed_name AS name, 
             p.parsed_address_street_name AS street_name, 
             p.parsed_address_street_number AS street_number, 
             p.parsed_address_city AS city, 
             p.parsed_address_postal_code AS postal_code, 
             p.parsed_address_country AS country, 
             COLLECT(p) AS parties
        WHERE name IS NOT NULL AND street_name IS NOT NULL AND street_number IS NOT NULL 
              AND city IS NOT NULL AND postal_code IS NOT NULL AND country IS NOT NULL
        RETURN name, street_name, street_number, city, postal_code, country, parties
        """

        # Step 2: Aggregate nodes and update relationships
        aggregate_query = """
        UNWIND $groups AS group
        WITH group.name AS name, 
             group.street_name AS street_name, 
             group.street_number AS street_number, 
             group.city AS city, 
             group.postal_code AS postal_code, 
             group.country AS country, 
             group.parties AS parties
        MERGE (new_party:AggregatedParty {
            parsed_name: name, 
            parsed_address_street_name: street_name, 
            parsed_address_street_number: street_number, 
            parsed_address_city: city, 
            parsed_address_postal_code: postal_code, 
            parsed_address_country: country
        })
        WITH new_party, parties
        UNWIND parties AS old_party
        MATCH (old_party)-[r]->(target)
        MERGE (new_party)-[new_r:RELATIONSHIP_TYPE]->(target)
        SET new_r = r
        WITH old_party, new_party
        MATCH (source)-[r]->(old_party)
        MERGE (source)-[new_r:RELATIONSHIP_TYPE]->(new_party)
        SET new_r = r
        DELETE r
        DETACH DELETE old_party
        """

        # Step 1: Get grouped nodes
        result = session.run(grouping_query)
        groups = []
        for record in result:
            groups.append({
                "name": record["name"],
                "street_name": record["street_name"],
                "street_number": record["street_number"],
                "city": record["city"],
                "postal_code": record["postal_code"],
                "country": record["country"],
                # Use element_id instead of id
                "parties": [party["element_id"] for party in record["parties"]]
            })

        # Step 2: Aggregate nodes and update relationships
        session.run(aggregate_query, {"groups": groups})
        print("Aggregation of External_Party nodes completed successfully.")
    except Exception as e:
        print(f"Error aggregating External_Party nodes: {e}")

print("Aggregating External_Party nodes...")
with loader.driver.session() as session:
    aggregate_party_nodes(session)
print("Aggregation completed successfully!")

Aggregating External_Party nodes...


  with loader.driver.session() as session:
  "parties": [party.id for party in record["parties"]]


BufferError: Existing exports of data: object cannot be re-sized

In [11]:

class Neo4jReset:

    def __init__(self, uri, username, password):
        self.driver = GraphDatabase.driver(uri, auth=(username, password))
    
    def close(self):
        self.driver.close()
    
    def reset_database(self):
        with self.driver.session() as session:
            # Delete all nodes and relationships
            session.run("MATCH (n) DETACH DELETE n")
            
            # Drop all constraints
            constraints = session.run("SHOW CONSTRAINTS")
            for record in constraints:
                constraint_name = record["name"]
                session.run(f"DROP CONSTRAINT {constraint_name}")

            # Drop all indexes
            indexes = session.run("SHOW INDEXES")
            for record in indexes:
                index_name = record["name"]
                session.run(f"DROP INDEX {index_name}")

# Initialize the Neo4jReset class
resetter = Neo4jReset(NEO4J_URI, NEO4J_USERNAME, NEO4J_PASSWORD)

try:
    print("Resetting Neo4j database...")
    resetter.reset_database()
    print("Database reset successfully!")
finally:
    resetter.close()


Resetting Neo4j database...
Database reset successfully!
