In [1]:
from pydrive2.drive import GoogleDrive, GoogleDriveFile
from pydrive2.auth import GoogleAuth
from neo4j import GraphDatabase
from dotenv import load_dotenv
from os import getenv

In [2]:
load_dotenv()

True

In [12]:
NEO4J_URI = getenv("NEO4J_URI")
NEO4J_AUTH = (getenv("NEO4J_USER", "neo4j"), getenv("NEO4J_PASSWORD"))
NEO4J_DATABASE = getenv("NEO4J_DATABASE", "neo4j")

DATA_LAKE_FOLDER = getenv("DATA_LAKE_FOLDER", "HR Analytics Data Lake")

MIME_TYPES = {
    "csv": "text/csv",
    "folder": "application/vnd.google-apps.folder",
}

In [7]:
gauth_settings = {"client_config_file": "../secrets/client_secrets.json"}

gauth = GoogleAuth(settings=gauth_settings)

gauth.LocalWebserverAuth()

drive = GoogleDrive(gauth)

Your browser has been opened to visit:

    https://accounts.google.com/o/oauth2/auth?client_id=604337038875-rg09a40fl0dmtedfrsi9dac70l4epmt6.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A8080%2F&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive&access_type=online&response_type=code

Authentication successful.


In [8]:
folder = drive.ListFile(
    {
        "q": f"title='{DATA_LAKE_FOLDER}' and mimeType='{MIME_TYPES['folder']}' and trashed=false",
        "spaces": "drive",
    }
).GetList()[0]

folder_id = folder["id"]

folder_id

'1F7ve6K5mk6T_FQv3BqQk6pEiV3ebu7tE'

In [9]:
def overwrite_csv(name) -> GoogleDriveFile:
    file_list = drive.ListFile({"q": f"title='{name}.csv' and trashed=false"}).GetList()

    for file in file_list:
        file.Delete()

    file = drive.CreateFile(
        {
            "title": f"{name}.csv",
            "parents": [{"kind": "drive#fileLink", "id": folder_id}],
            "mime_type": MIME_TYPES["csv"],
        }
    )
    file.SetContentFile(f"{name}.csv")
    file.Upload()

    return file

In [11]:
departments_link = overwrite_csv("../seed/departments")["webContentLink"]
staffs_link = overwrite_csv("../seed/staffs")["webContentLink"]
sessions_link = overwrite_csv("../dist/sessions")["webContentLink"]
skills_link = overwrite_csv("../dist/skills")["webContentLink"]
fields_link = overwrite_csv("../dist/fields")["webContentLink"]
fact_training_link = overwrite_csv("../dist/fact_training")["webContentLink"]

In [None]:
delete_relationship_query = """
MATCH ()-[r]-() DELETE r;
"""

delete_node_query = """
MATCH (n) DELETE n;
"""

# CONSTRAINT creation
-------------------

Create node uniqueness constraints, ensuring no duplicates for the given node label and ID property exist in the database. This also ensures no duplicates are introduced in future.

NOTE: The following constraint creation syntax is generated based on the current connected database version 5.15-aura.

In [None]:
create_staff_constraint_query = """
CREATE CONSTRAINT `imp_uniq_Staff_id` IF NOT EXISTS
FOR (n: `Staff`)
REQUIRE (n.`id`) IS UNIQUE;
"""

In [None]:
create_session_constraint_query = """
CREATE CONSTRAINT `imp_uniq_Session_id` IF NOT EXISTS
FOR (n: `Session`)
REQUIRE (n.`id`) IS UNIQUE;
"""

In [None]:
create_skill_constraint_query = """
CREATE CONSTRAINT `imp_uniq_Skill_id` IF NOT EXISTS
FOR (n: `Skill`)
REQUIRE (n.`id`) IS UNIQUE;
"""

In [None]:
create_field_constraint_query = """
CREATE CONSTRAINT `imp_uniq_Field_id` IF NOT EXISTS
FOR (n: `Field`)
REQUIRE (n.`id`) IS UNIQUE;
"""

In [None]:
create_department_constraint_query = """
CREATE CONSTRAINT `imp_uniq_Department_id` IF NOT EXISTS
FOR (n: `Department`)
REQUIRE (n.`id`) IS UNIQUE;
"""

# NODE load
---------

Load nodes in batches, one node label at a time. Nodes will be created using a MERGE statement to ensure a node with the same label and ID property remains unique. Pre-existing nodes found by a MERGE statement will have their other properties set to the latest values encountered in a load file.

NOTE: Any nodes with IDs in the 'idsToSkip' list parameter will not be loaded.

In [None]:
load_staff_nodes_query = """
LOAD CSV WITH HEADERS FROM ($file_1) AS row
WITH row
WHERE NOT row.`id` IN $idsToSkip AND NOT toInteger(trim(row.`id`)) IS NULL
CALL {
  WITH row
  MERGE (n: `Staff` { `id`: toInteger(trim(row.`id`)) })
  SET n.`id` = toInteger(trim(row.`id`))
  SET n.`name` = row.`name`
  SET n.`years_of_service` = toInteger(trim(row.`years_of_service`))
} IN TRANSACTIONS OF 10000 ROWS;
"""

In [None]:
load_skill_nodes_query = """
LOAD CSV WITH HEADERS FROM ($file_3) AS row
WITH row
WHERE NOT row.`id` IN $idsToSkip AND NOT toInteger(trim(row.`id`)) IS NULL
CALL {
  WITH row
  MERGE (n: `Skill` { `id`: toInteger(trim(row.`id`)) })
  SET n.`id` = toInteger(trim(row.`id`))
  SET n.`name` = row.`name`
} IN TRANSACTIONS OF 10000 ROWS;
"""

In [None]:
load_field_nodes_query = """
LOAD CSV WITH HEADERS FROM ($file_4) AS row
WITH row
WHERE NOT row.`id` IN $idsToSkip AND NOT toInteger(trim(row.`id`)) IS NULL
CALL {
  WITH row
  MERGE (n: `Field` { `id`: toInteger(trim(row.`id`)) })
  SET n.`id` = toInteger(trim(row.`id`))
  SET n.`name` = row.`name`
} IN TRANSACTIONS OF 10000 ROWS;
"""

In [None]:
load_department_nodes_query = """
LOAD CSV WITH HEADERS FROM ($file_0) AS row
WITH row
WHERE NOT row.`id` IN $idsToSkip AND NOT toInteger(trim(row.`id`)) IS NULL
CALL {
  WITH row
  MERGE (n: `Department` { `id`: toInteger(trim(row.`id`)) })
  SET n.`id` = toInteger(trim(row.`id`))
  SET n.`name` = row.`name`
  SET n.`courses` = toInteger(trim(row.`number_of_courses`))
  SET n.`students` = toInteger(trim(row.`number_of_students`))
} IN TRANSACTIONS OF 10000 ROWS;
"""


Your script contains the datetime datatype. Our app attempts to convert dates to ISO 8601 date format before passing them to the Cypher function.
This conversion cannot be done in a Cypher script load. Please ensure that your CSV file columns are in ISO 8601 date format to ensure equivalent loads.

In [None]:
load_session_nodes_query = """
LOAD CSV WITH HEADERS FROM ($file_2) AS row
WITH row
WHERE NOT row.`id` IN $idsToSkip AND NOT toInteger(trim(row.`id`)) IS NULL
CALL {
  WITH row
  MERGE (n: `Session` { `id`: toInteger(trim(row.`id`)) })
  SET n.`id` = toInteger(trim(row.`id`))
  SET n.`type` = row.`type`
  SET n.`name` = row.`name`
  SET n.`year` = datetime(row.`year`)
  SET n.`medium` = row.`medium`
  SET n.`duration_days` = row.`duration_days`
  SET n.`duration_hours` = toInteger(trim(row.`duration_hours`))
} IN TRANSACTIONS OF 10000 ROWS;
"""

# RELATIONSHIP load
-----------------

Load relationships in batches, one relationship type at a time. Relationships are created using a MERGE statement, meaning only one relationship of a given type will ever be created between a pair of nodes.

In [None]:
load_belongs_to_relationships_query = """
LOAD CSV WITH HEADERS FROM ($file_1) AS row
WITH row 
CALL {
  WITH row
  MATCH (source: `Staff` { `id`: toInteger(trim(row.`id`)) })
  MATCH (target: `Department` { `id`: toInteger(trim(row.`department_id`)) })
  MERGE (source)-[r: `BELONGS_TO`]->(target)
  SET r.`years_in_department` = toInteger(trim(row.`years_in_current_department`))
} IN TRANSACTIONS OF 10000 ROWS;
"""

In [None]:
load_trains_relationships_query = """
LOAD CSV WITH HEADERS FROM ($file_2) AS row
WITH row
CALL {
  WITH row
  MATCH (source: `Session` { `id`: toInteger(trim(row.`id`)) })
  MATCH (target: `Skill` { `id`: toInteger(trim(row.`skill_id`)) })
  MERGE (source)-[r: `TRAINS`]->(target)
} IN TRANSACTIONS OF 10000 ROWS;
"""

In [None]:
load_skill_of_relationships_query = """
LOAD CSV WITH HEADERS FROM ($file_3) AS row
WITH row
CALL {
  WITH row
  MATCH (source: `Skill` { `id`: toInteger(trim(row.`id`)) })
  MATCH (target: `Field` { `id`: toInteger(trim(row.`field_id`)) })
  MERGE (source)-[r: `SKILL_OF`]->(target)
} IN TRANSACTIONS OF 10000 ROWS;
"""

In [None]:
load_attended_relationships_query = """
LOAD CSV WITH HEADERS FROM ($file_5) AS row
WITH row 
CALL {
  WITH row
  MATCH (source: `Staff` { `id`: toInteger(trim(row.`staff_id`)) })
  MATCH (target: `Session` { `id`: toInteger(trim(row.`session_id`)) })
  MERGE (source)-[r: `ATTENDS`]->(target)
  SET r.`attended_hours` = toInteger(trim(row.`attended_hours`))
} IN TRANSACTIONS OF 10000 ROWS;
"""

In [None]:
load_has_head_of_relationship_query = """
LOAD CSV WITH HEADERS FROM ($file_0) AS row
WITH row 
CALL {
  WITH row
  MATCH (source: `Department` { `id`: toInteger(trim(row.`id`)) })
  MATCH (target: `Staff` { `id`: toInteger(trim(row.`head_staff_id`)) })
  MERGE (source)-[r: `HAS_HEAD_OF`]->(target)
} IN TRANSACTIONS OF 10000 ROWS;
"""

# Relationship creation
---------------------
Create direct relationships in the database. The nodes already have indirect relationships through the loaded relationships. To ease visualization and querying, direct relationships are created.

In [None]:
link_staffs_skills_relationship_query = """
MATCH (s:Staff)-[:ATTENDS]->(:Session)-[:TRAINS]->(sk:Skill)
WITH s, sk, count(*) AS count
CREATE (s)-[:HAS_SKILL {count: count}]->(sk)
"""

In [None]:
link_staffs_fields_relationship_query = """
MATCH (s:Staff)-[HS:HAS_SKILL]->(:Skill)-[:SKILL_OF]->(f:Field)
WITH s, f, sum(HS.count) AS count
CREATE (s)-[:IN_FIELD {count: count}]->(f)
"""


Define the file path root and the individual file names required for loading.
https://neo4j.com/docs/operations-manual/current/configuration/file-locations/

In [None]:
import_parameters = {
    "file_0": departments_link,
    "file_1": staffs_link,
    "file_2": sessions_link,
    "file_3": skills_link,
    "file_4": fields_link,
    "file_5": fact_training_link,
    "idsToSkip": [],
}

In [None]:
with GraphDatabase.driver(NEO4J_URI, auth=NEO4J_AUTH) as driver:
    with driver.session(database=NEO4J_DATABASE) as session:
        session.run(delete_relationship_query)
        session.run(delete_node_query)

        session.run(create_department_constraint_query, parameters=import_parameters)
        session.run(create_staff_constraint_query, parameters=import_parameters)
        session.run(create_session_constraint_query, parameters=import_parameters)
        session.run(create_skill_constraint_query, parameters=import_parameters)
        session.run(create_field_constraint_query, parameters=import_parameters)

        session.run(load_department_nodes_query, parameters=import_parameters)
        session.run(load_staff_nodes_query, parameters=import_parameters)
        session.run(load_field_nodes_query, parameters=import_parameters)
        session.run(load_skill_nodes_query, parameters=import_parameters)
        session.run(load_session_nodes_query, parameters=import_parameters)

        session.run(load_belongs_to_relationships_query, parameters=import_parameters)
        session.run(load_has_head_of_relationship_query, parameters=import_parameters)
        session.run(load_trains_relationships_query, parameters=import_parameters)
        session.run(load_skill_of_relationships_query, parameters=import_parameters)
        session.run(load_attended_relationships_query, parameters=import_parameters)

        session.run(link_staffs_skills_relationship_query, parameters=import_parameters)
        session.run(link_staffs_fields_relationship_query, parameters=import_parameters)