In [4]:
%pip install neo4j python-dotenv

Collecting python-dotenv
  Using cached python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Using cached python_dotenv-1.0.1-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.0.1
Note: you may need to restart the kernel to use updated packages.


In [8]:
from dotenv import load_dotenv
from neo4j import GraphDatabase
import os
import json
import time
from threading import Thread

load_dotenv()

True

In [9]:
user = os.getenv('NEO4J_USER')
password = os.getenv('NEO4J_PASSWORD')
uri = os.getenv('NEO4J_URI')
database = os.getenv('NEO4J_DB')

In [10]:
class CDCService:
    def __init__(self, driver, database, start_cursor=None, selectors=None):
        self.driver = driver
        self.database = database
        self.cursor = start_cursor
        if self.cursor is None:
            self.cursor = self.current_change_id()
        self.selectors = selectors

    def apply_change(self, record):
        record_dict = {
            k: record.get(k) for k in ('id', 'txId', 'seq', 'event', 'metadata')
        }
        print(json.dumps(record_dict, indent=2, default=repr))
    
    def query_changes_query(self, tx):
        current = self.current_change_id()
        result = tx.run('CALL db.cdc.query($cursor, $selectors)',
                        cursor=self.cursor, selectors=self.selectors)
        if result.peek() == None:
            self.cursor = current
        else:
            for record in result:
                try:
                    self.apply_change(record)
                except Exception as e:
                    print('Error whilst applying change', e)
                    break
                self.cursor = record['id']
    
    def query_changes(self):
        with self.driver.session(database=self.database) as session:
            session.execute_read(self.query_changes_query)
    
    def earliest_change_id(self):
        records, _,_ = self.driver.execute_query(
            'CALL db.cdc.earliest', 
            database_=self.database
        )
        return records[0]['id']
    
    def current_change_id(self):
        records, _,_ = self.driver.execute_query(
            'CALL db.cdc.current', 
            database_=self.database
        )
        return records[0]['id']
    
    def run(self):
        while True:
            self.query_changes()
            time.sleep(1)

In [11]:
cursor = None
selectors = []

with GraphDatabase.driver(uri, auth=(user, password)) as driver:
    driver.verify_connectivity()
    cdc = CDCService(driver, database, cursor, selectors)
    
    cdc_thread = Thread(target=cdc.run, daemon=True)
    cdc_thread.start()
    cdc_thread.join()

{
  "id": "CfHUvr5AFUBpjWZ-qvFojpwAAAAAAAAACwAAAAAAAAAAAAABkPmqv-o=",
  "txId": 11,
  "seq": 0,
  "event": {
    "elementId": "4:f1d4bebe-4015-4069-8d66-7eaaf1688e9c:2",
    "operation": "c",
    "keys": {},
    "labels": [
      "Hero"
    ],
    "state": {
      "after": {
        "labels": [
          "Hero"
        ],
        "properties": {
          "name": "Superman"
        }
      },
      "before": null
    },
    "eventType": "n"
  },
  "metadata": {
    "txStartTime": "neo4j.time.DateTime(2024, 7, 28, 14, 7, 15, 391000000, tzinfo=<UTC>)",
    "databaseName": "neo4j",
    "executingUser": "neo4j",
    "authenticatedUser": "neo4j",
    "connectionServer": "10.1.0.2:7687",
    "connectionType": "bolt",
    "serverId": "be426bb5",
    "captureMode": "DIFF",
    "connectionClient": "202.80.215.113:60666",
    "txCommitTime": "neo4j.time.DateTime(2024, 7, 28, 14, 7, 15, 434000000, tzinfo=<UTC>)",
    "txMetadata": {
      "app": "neo4j-browser_v5.21.0",
      "type": "user-direct

KeyboardInterrupt: 

  with self.driver.session(database=self.database) as session:
  records, _,_ = self.driver.execute_query(
