In [1]:
!pip install neo4j

import json
import logging
import os
import sys

from neo4j import GraphDatabase
from neo4j.exceptions import DriverError, SessionError


class CoraNeo4jConnector:

    def __init__(self, uri, user, password, logfile):
        if os.path.exists(logfile):
            logging.basicConfig(filename=logfile, encoding='utf-8', level=logging.INFO)
            self.driver = GraphDatabase.driver(uri, auth=(user, password))
            assert self.__verify_driver_and_connection() is True, "Neo4j connection cannot be verified!"
            logging.info("Driver has been initialized and connected successfully!")
            logging.info(f'''
Server Info:
"Address: {self.driver.get_server_info().address}
"Protocol Version: {self.driver.get_server_info().protocol_version}
"Agent: {self.driver.get_server_info().agent}
''')
        else:
            raise FileNotFoundError("Logger file does not exist!")

    def __verify_driver_and_connection(self):
        if self.driver is None:
            return False
        try:
            self.driver.verify_connectivity()
            return True
        except DriverError as err:
            logging.error(f"Connection cannot be verified!, {err}")
            self.driver.close()
            return False

    def close(self):
        assert self.__verify_driver_and_connection() is True, "Neo4j connection cannot be verified!"
        self.driver.close()

    def clear_entire_db(self):
        assert self.__verify_driver_and_connection() is True, "Neo4j connection cannot be verified!"
        try:
            with self.driver.session() as session:
                session.execute_write(CoraNeo4jConnector.__clear_entire_db, "MATCH (n) DETACH DELETE n")
            logging.info("Database is cleared successfully!")
        except SessionError as err:
            logging.error(f"cannot clear entire db {err}")

    @staticmethod
    def __clear_entire_db(tx, cypher):
        # all nodes and relationships
        result = tx.run(cypher)
        result.consume()

    def load_cora_nodes(self):
        assert self.__verify_driver_and_connection() is True, "Neo4j connection cannot be verified!"
        try:
            with self.driver.session() as session:
                result = session.run(
                    "CREATE CONSTRAINT paper_constraint IF NOT EXISTS FOR (p:Paper) REQUIRE p.id IS UNIQUE")
                _ = result.single()
                logging.info("Constraint is created on Paper for unique id!")
                record = session.execute_write(CoraNeo4jConnector.__load_cora_nodes_and_return_node_count)
                _ = record[0]
                logging.info("Nodes are loaded to database successfully!")
        except IndexError:
            logging.error(f"Record does not have given index, load_cora_nodes!")
            sys.exit(-1)
        except SessionError as err:
            logging.error(f"cannot load nodes to db, {err}")

    @staticmethod
    def __load_cora_nodes_and_return_node_count(tx):

        cypher = """
LOAD CSV WITH HEADERS FROM 'https://cora-dataset.s3.eu-central-1.amazonaws.com/dataset/nodes.csv' AS row
WITH row
MERGE (p:Paper {id: row.paper_id, subject: row.subject, words: row.words})
REMOVE p.features
RETURN COUNT(*)
"""
        # idempotent operation
        result = tx.run(cypher)
        return result.single()

    def load_cora_edges(self):
        try:
            with self.driver.session() as session:
                record = session.execute_write(CoraNeo4jConnector.__load_cora_edges_and_return_node_count)
                _ = record[0]
                logging.info("Edges are loaded successfully")
        except IndexError:
            logging.error(f"Record does not have given index, load_cora_edges!")
            sys.exit(-1)
        except SessionError as err:
            logging.error(f"cannot load given edges to db, {err}")

    @staticmethod
    def __load_cora_edges_and_return_node_count(tx):

        cypher = """
LOAD CSV WITH HEADERS FROM 'https://cora-dataset.s3.eu-central-1.amazonaws.com/dataset/edges.csv' AS row
WITH row
MATCH (to:Paper {id: row.cited_paper_id})
MATCH (from:Paper {id: row.citing_paper_id})
MERGE (from)-[:CITED]->(to)
RETURN COUNT(*)
"""
        # idempotent operation
        result = tx.run(cypher)
        return result.single()


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.2[0m[39;49m -> [0m[32;49m22.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [None]:
with open("auth.json", "r") as auth:
    auth = json.load(auth)
    db = CoraNeo4jConnector(auth["URI"],
                            auth["USER"],
                            auth["PASSWORD"],
                            auth["LOGFILE"])
    db.clear_entire_db()
    db.load_cora_nodes()
    db.load_cora_edges()
    db.close()