
## Task Ingest

This notebook reads source file from Azure Data Lake Storage Account (ADLS) and updates TASK graph in Neo4j. This notebook mimicks work completed while serving in a position and replaces all PII and confidential data with dummy data. The data will not run unless connected to a real databricks account, storage account, neo4j instance, and corresponding code is updated to reflect so.

### Configuration

In [0]:
# Configuring access credentials and endpoints for interacting with Azure services and Neo4j, within Databricks. 

AUTHORITY = f"https://login.microsoftonline.us/dummyaccount.onmicrosoft.com"
NEO4J_APPLICATION_ID = "0000000-0000-0000-0000-0000000"
SP_CLIENT_ID = "0000000-0000-0000-0000-0000000"
SP_CLIENT_SECRET = dbutils.secrets.get(scope = "NeoSecretScope", key = "ABC-DEVTEST-ABC-DEF")

STORAGE_ACCOUNT_URL = "https://mystorageaccount.dfs.core.windows.net/my-container"
CONTAINER = "published"

In [None]:
# Import necessary libraries 

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import datetime
import numpy as np  
import os
import random
import string
from delta import configure_spark_with_delta_pip
from azure.storage.blob import BlobServiceClient

### Create Intermediate Dataframes
In order to ingest to graph it is necessary to create an intermediate dataframes from source tables.

In this notebook an intermediate table that renames ID to DEF ID and splits the Applicable Oranization (Rename new column to DEF_Org_Name) and Task_Table_Choice (Rename new column to ID) columns in the completion table is created 

An intermediate table that connects abv_abvr to org (on OPTION Abvr_ID) to the new completion table (on DEF_ORG_Name) is created

An intermediate table that connects the task to crosswalk_task to dftask_task table is created

An intermediate table that counts the number of times a DEF_Org_Name completion an ID is created

In [0]:
# Helps with Delta features like generated columns or writer versioning in case columns are renamed or reordered
spark.conf.set("spark.databricks.delta.columnMapping.mode", "name")

## List Tables

In [0]:
BASE_PATH = f"abfss://{CONTAINER}@{STORAGE_ACCOUNT_URL}"

tables_df = spark.createDataFrame(dbutils.fs.ls(BASE_PATH))

display(tables_df)

##Intermediate dataframes

In [0]:
# Intermediate table that renames ID to DEF ID and splits the Applicable Organization (Rename new column to DEF_Org_Name) and Task_Title_Choice (Rename new column to ID) columns in the completion table is created 

completion_df = spark.read.format("delta").load(BASE_PATH + "/" + "completion/")

completion_split_df = completion_df.withColumn("Organizations", split(col("Organizations"), ";")).withColumn("Task Title Choice", split(col("Task Title Choice"), ";"))

completion_explode_df = completion_split_df.withColumn("Organizations", explode(col("Organizations"))).withColumn("Task Title Choice", explode(col("Task Title Choice")))

completion_clean_df = completion_explode_df.filter(~col("Organizations").rlike(r"#\d{3}"))

completion_filter_df = completion_clean_df.withColumn("Organizations", regexp_replace(col("Organizations"), r"^#", "")).withColumn("Task Title Choice", regexp_replace(col("Task Title Choice"), r"^#", ""))

#Change Applicable_Organizations to DEF_Org_Name, to match org. Change ID to Complete_Num to match, so that the label Task_Title Choice could be changed to ID
completion_new_df = completion_filter_df.withColumnRenamed("Organizations", "DEF Org Name").withColumnRenamed("ID", "Completion Num")

#Change Task_Title_Choice to ID, which is the Task ID
completion_newest_df = completion_new_df.withColumnRenamed("Task Title Choice", "ID")


In [0]:
# Intermediate table that connects abvr to org (on OPTION_Abvr_ID) to the new completion table (on DEF_ORG_Name) is created

#Load the exisiting tables 
abvr_df = spark.read.format("delta").load(BASE_PATH+"/"+"abvr/")

org_df = spark.read.format("delta").load(BASE_PATH+"/"+"org/")
org_df = org_df.filter(col('ID_ORG').isNotNull())

#Join abvabv table and org table on the 'OPTION_Abvr_ID' field
abv_org_df = org_df.join(abvr_df, on= "OPTION Abvr ID", how="outer")
abvrorg_completion_df = completion_new_df.join(abv_org_df, on="DEF Org Name", how="outer")

for col_name in abvrorg_completion_df.columns:
    abvrorg_completion_df = abvrorg_completion_df.withColumnRenamed(col_name, col_name.replace(" ", "_"))

abvrorg_update_df = abvrorg_completion_df

display(abvrorg_update_df)


In [0]:
# Intermediate table that connects crosswalk_task to task_path and then to task table is created
#Load the exisiting tables 

crosswalk_task_df = spark.read.format("delta").load(BASE_PATH+"/"+"crosswalk_task_path")
crosswalk_task_df = crosswalk_task_df.filter(col('task_p_hier_id').rlike('^(\d+(?:\.\d+)*)'))

crosswalk_df = crosswalk_task_df.drop("task_hier_nm", "task_hier_txt")

task_df = spark.read.format("delta").load(BASE_PATH+"/"+"task_path/")
task_df = task_df.withColumn('Task ID', trim(col('Task ID')))
task_df = task_df.withColumn('Task ID', regexp_replace("Task ID", "-", ""))
task_df = task_df.filter(col('Task ID').rlike('^(\d+(?:\.\d+)*)'))

#Then join task_crosswalk_df to task_path and rename tasks columns (Task ID to task_p_hier_id)
#Rename columns
task_renamed_df = task_df.withColumnRenamed("Task ID", "task_p_hier_id")

task_renamed_df = task_renamed_df.withColumn("task_p_hier_id", trim(col("task_p_hier_id")))

task_crosswalk_df = task_crosswalk_df.withColumn("task_p_hier_id", trim(col("task_p_hier_id")))

functions_df = task_renamed_df.join(task_crosswalk_df, on="task_p_hier_id", how="outer")
display(functions_df)

# Resolves bug SDP-37858
functions_drop_df = functions_df.where(~functions_df["ID"].contains("Both voice"))

functions_update_df = functions_drop_df.where(col("ID").isNotNull()).withColumn("ID", col("ID").cast(IntegerType()))
display(functions_update_df)


In [0]:
#An intermediate table that counts the number of times a DEF_Org_Name completion an ID is created
performed_df = abvrorg_update_df.withColumnRenamed("Task_Title_Choice", "ID")
display(performed_df)

#Updated to resolve bug SDP-37887
Task_count = performed_df.groupBy("DEF_Org_Name", "ID").count()
display(Task_count.orderBy("DEF_Org_Name","ID"))


## NEO4J Ingestion

###Nodes

In [0]:
# Define the Cypher query to use in the write 

organization_df = abvrorg_update_df.filter(col('ABVR').isNotNull())

# ['ABVR', 'Abvr_NAME', 'OPTION_Abvr_ID']
organization_query = "MERGE (n:Organization {id: event.ABVR}) SET n.OPTIONAbvrID = event.OPTION_Abvr_ID, n.AbvrName = event.Abvr_NAME"

# ['DEF_Org_Name', 'Abvr', 'ABVR', 'Filter_ABC', 'ID_ORG']
DEForg_query = "MERGE (n:DEFOrg {id: event.DEF_Org_Name}) SET n.Abvr = event.ABVR, n.ABCFilter = event.Filter_ABC, n.OrgID = event.ID_ORG"

# ['task_p_hier_nm', 'task_p_hier_id', 'ID']
function_query = "MERGE (n:Function {id: event.ID}) SET n.DEFTaskName = event.task_p_hier_nm, n.DEFTaskID = event.task_p_hier_id"

nodes :dict = {
    "Organization" : (organization_df, organization_query), 
    "DEForg" : (abvrorg_update_df, DEForg_query), 
    "Function" : (functions_update_df, function_query),
}

In [0]:
## Load Nodes

for label in nodes.keys(): 
    query = nodes[label][1]
    df = nodes[label][0]

    df.write.format("org.neo4j.spark.DataSource").option("query", query).mode("Overwrite").save()

###Relationships

In [0]:
Task_count_2 = Task_count.withColumn("ID", col("ID").cast(IntegerType())).filter(col("ID").isNotNull())

In [0]:
display(Task_count_2)

In [0]:
# Queries to load relationships
performed_query = """
MATCH (n:DEFOrg {id: event.DEF_Org_Name})
MATCH (m:Function {id: event.ID})
MERGE (n)-[r:PERFORMED {Task_count: event.count}]->(m)
"""

belongs_query = """
MATCH (n:Organization {id: event.ABVR})
MATCH (m:DEFOrg {Abvr: event.ABVR})
MERGE (m)-[b:BELONGS_TO]->(n)
"""

relationships :dict = {
    "PERFORMED" : (Task_count_2, performed_query),
    "BELONGS_TO" : (abvrorg_update_df, belongs_query)
}

In [0]:
# Load Relationships
for label in relationships.keys():  
    query = relationships[label][1]
    df = relationships[label][0]

    df.write.format("org.neo4j.spark.DataSource").option("query", query).mode("Overwrite").save()