## This cell will contain an introduction to the process that this notebook will outline and why I am writing it

Loading tables of data into a knowledge graph is a vital step to set up the graph for analysis. This process includes deciding on how you want to model the data, determining the types and properties that will be created, and loading data from all of your tables in to complete the graph.

During this process, it's important to be able to merge data in order to ensure there are not duplicates of the same entities or relationships created.

## This cell will contain information about the dataset that we will be using : https://github.com/biocypher/pole/blob/main/data/pole.csv

## This cell will contain information about what the same process would look like in Load Table in Pro and highlight the specific processes we are doing in this notebook that aren't currently easily accessible in the python API

### To begin, we will need to import the libraries that are used in this notebook
We will be importing the os and json libraries for dealing with our input tables and manipulating data. The pandas library will be used for cleaning and preparing the csvs that we will be using before passing them into the loading function. Finally, GIS is used to connect to our portal and create the service we will be using and GraphClient along with the following classes will allow us to create the correct structures easily for creating the data model and data for our knowledge graph.

In [1]:
# import libraries
import json, os, uuid
from pprint import pprint
import pandas as pd
from arcgis.gis import GIS
from arcgis.geometry import Point
from arcgis.graph import GraphClient, EntityType, RelationshipType, GraphProperty, GraphObject, Entity, Relationship, SearchIndexProperties, data_model_types

## Set up wrappers to help us with creating correct loading configurations

In [9]:
# input table for the type being loaded
class InputTable:
    def __init__(self, csv_path):
        self.dataframe = pd.read_csv(csv_path)
        uniqueids = []
        for i in range(len(self.dataframe)):
            uniqueids.append(uuid.uuid4())
        self.dataframe["globalid"] = uniqueids
        for col in self.dataframe.columns:
            if self.dataframe[col].dtype in ['string','object']:
                self.dataframe[col].fillna("None",inplace=True)
            if self.dataframe[col].dtype in ['int','float','int64','float64']:
                self.dataframe[col].fillna(-1,inplace=True)
        
    def merge(self, merge_columns:list):
        self.dataframe.drop_duplicates(subset=merge_columns, inplace=True)

# property for a load table type, includes everything necessary to load
class LoadTableProperty:
    def __init__(self, name: str, data_type: data_model_types.esriFieldType, column: str|list[str], merge: bool = False, geometry_type: data_model_types.esriGeometryType = None):
        self.name = name
        self.data_type = data_type
        self.column = column
        self.merge = merge
        self.geometry_type = geometry_type

# relationship endpoints, these will represent the origin and destination information for relationship types
class RelationshipEndpoint:
    def __init__(self, entity_type: str, property: str, column: str, column_in_type=None):
        self.entity_type = entity_type
        self.property = property
        self.column = column
        self.column_in_type = column_in_type

# entity type to load
class LoadTableEntityType:
    def __init__(self, type_name: str, table: InputTable, properties: list[LoadTableProperty] = []):
        self.type_name = type_name
        self.named_object_type = "entity"
        self.table = table
        self.properties = properties
        
    def add_properties(self, property_adds=[]):
        self.properties += property_adds
        
    def get_merge_columns(self):
        merge_columns = []
        for prop in self.properties:
            if prop.merge==True:
                merge_columns.append(prop.column)
        return merge_columns
    
    def get_merge_properties(self):
        merge_props = []
        for prop in self.properties:
            if prop.merge==True:
                merge_props.append(prop.name)
        return merge_props
        
# relationship type to load
class LoadTableRelationshipType:
    def __init__(self, type_name: str, table: InputTable, origin: RelationshipEndpoint, destination: RelationshipEndpoint, properties: list[LoadTableProperty] = []):
        self.type_name = type_name
        self.named_object_type = "relationship"
        self.table = table
        self.origin = origin
        self.destination = destination
        self.properties = properties
        
    def add_properties(self, property_adds=[]):
        self.properties += property_adds
        
    def get_merge_columns(self):
        merge_columns = []
        for prop in self.properties:
            if prop.merge==True:
                merge_columns.append(prop.column)
        return merge_columns
    
    def get_merge_properties(self):
        merge_props = []
        for prop in self.properties:
            if prop.merge==True:
                merge_props.append(prop.name)
        return merge_props

In [22]:
def process_type_to_data_model(knowledge_graph: GraphClient, input_type: LoadTableEntityType|LoadTableRelationshipType):
    # determine data model types and properties to be created
    # add extra properties to type if existing
    existing_datamodel = knowledge_graph.query_data_model()
    if (input_type.type_name in kg_datamodel.entity_types.keys() or input_type.type_name in kg_datamodel.relationship_types.keys()):
        property_adds = []
        existing_type = existing_datamodel.entity_types[input_type.type_name] if input_type.named_object_type == "entity" else existing_datamodel.relationship_types[input_type.type_name]
        for prop in input_type.properties:
            try:
                existing_type.properties[prop.name]
            except:
                if prop.data_type == "esriFieldTypeGeometry":
                    property_adds[prop] = GraphProperty(name=prop.name, field_type=prop.data_type, geometry_type=prop.geometry_type)
                else:
                    property_adds[prop] = GraphProperty(name=prop.name, field_type=prop.data_type)
        # add extra properties if they are not on existing types, this is done now since it uses individual types
        knowledge_graph.graph_property_adds(type_name=input_type.type_name, graph_properties=property_adds)
    else:
        type_to_add = EntityType(name=input_type.type_name) if input_type.named_object_type == "entity" else RelationshipType(name=input_type.type_name)
        for prop in input_type.properties:
            if prop.data_type == "esriFieldTypeGeometry":
                type_to_add.properties[prop] = GraphProperty(name=prop.name, field_type=prop.data_type, geometry_type=prop.geometry_type)
            else:
                type_to_add.properties[prop] = GraphProperty(name=prop.name, field_type=prop.data_type)
        return type_to_add
        
def process_type_to_instances(knowledge_graph: GraphClient, input_type: LoadTableEntityType|LoadTableRelationshipType, entities_for_relationships: list[LoadTableEntityType] = None):
    editing_adds = []
    editing_updates = []
    merge_props = []
    kg_datamodel = knowledge_graph.query_data_model()
    for prop in input_type.properties:
        if prop.merge==True:
            merge_props.append(prop.name)
    if ((merge_props != []) & (input_type.type_name in kg_datamodel.entity_types.keys() or input_type.type_name in kg_datamodel.relationship_types.keys())):
        # get values for the merge columns, put them into lists and create a bind parameter for each column name
        bind_params = {}
        query_string = ""
        merge_columns = input_type.get_merge_columns()
        merge_properties = input_type.get_merge_properties()
        for column,prop in zip(merge_columns,merge_properties):
            bind_params[prop] = input_type.table.dataframe[column].tolist()
        # create query streaming to graph to get any matches on the column/value combos
        if input_type.named_object_type == "entity":
            query_string = f"MATCH (n:{input_type.type_name}) WHERE "
            for prop in merge_properties:
                query_string += f'n.{prop} in ${prop} AND'
            query_string = query_string[:-4] + f" RETURN "
            for prop in merge_properties:
                query_string += "n."+prop+','
            query_string += " n.globalid"
        if input_type.named_object_type == "relationship":
            query_string = f"MATCH ()-[n:{input_type.type_name}]-() WHERE "
            for prop in merge_properties:
                query_string += f'n.{prop} in ${prop} AND'
            query_string = query_string[:-4] + f" RETURN "
            for prop in merge_properties:
                query_string += "n."+prop+','
            query_string += " n.globalid"
        # get matching merge candidates
        results = knowledge_graph.query(query_string, bind_param=bind_params)
        try:
            for match in list(results):
                all_props = {}
                column_params = None
                for i,column in enumerate(merge_columns):
                    if column_params == None:
                        column_params = (input_type.table.dataframe[column] == match[i])
                    else:
                        column_params = column_params & (input_type.table.dataframe[column] == match[i])
                globalid_to_drop = input_type.table.dataframe.loc[column_params]['globalid'].values[0]
                row = input_type.table.dataframe.loc[input_type.table.dataframe['globalid']==globalid_to_drop]
                for prop in input_type.properties:
                    all_props[prop.name] = row[prop.column]
                input_type.table.dataframe[input_type.table.dataframe['globalid'] != globalid_to_drop]
                editing_updates.append(Entity(type_name=input_type.type_name,id=globalid_to_drop,properties=all_props))
        except Exception as error:
            print(error)
    for index, row in input_type.table.dataframe.iterrows():
        all_props = {}
        for prop in input_type.properties:
            if type(prop.column) == list:
                all_props[prop.name] = Point({"x": row[prop.column[1]], "y": row[prop.column[0]], "spatialReference": {"wkid": 4326}})
            else:
                all_props[prop.name] = row[prop.column]
        if input_type.named_object_type == "entity":
            editing_adds.append(Entity(type_name=input_type.type_name, id=row['globalid'], properties=all_props))
        else:
            origin_id = None
            destination_id = None
            if input_type.origin.column_in_type != None:
                for type_to_use in entities_for_relationships:
                    if type_to_use.type_name == input_type.origin.entity_type:
                        try:
                            origin_id = type_to_use.table.dataframe.loc[type_to_use.table.dataframe[input_type.origin.column_in_type] == row[input_type.origin.column],'globalid'].values[0]
                        except:
                            continue
            else:
                result = knowledge_graph.query(f"MATCH (n:{input_type.origin.type_name}) WHERE {input_type.origin.property} = {row[input_type.origin.column]} RETURN n.globalid")
                origin_id = list(result)[0][0]
            if input_type.destination.column_in_type != None:
                for type_to_use in entities_for_relationships:
                    if type_to_use.type_name ==  input_type.destination.entity_type:
                        try:
                            destination_id = type_to_use.table.dataframe.loc[type_to_use.table.dataframe[input_type.destination.column_in_type] == row[input_type.destination.column],'globalid'].values[0]
                        except:
                            continue
            else:
                result = knowledge_graph.query(f"MATCH (n:{input_type.destination.type_name}) WHERE {input_type.destination.property} = {row[input_type.destination.column]} RETURN n.globalid")
                destination_id = list(result)[0][0]
            # TO DO: if either are still null, I should be creating new entities and using the ids
            if origin_id == None:
                origin_id = uuid.uuid4()
                editing_adds.append(Entity(type_name=input_type.origin.entity_type, id=origin_id, properties={input_type.origin.property: row[input_type.origin.column]}))
            if destination_id == None:
                destination_id = uuid.uuid4()
                editing_adds.append(Entity(type_name=input_type.destination.entity_type, id=destination_id, properties={input_type.destination.property: row[input_type.destination.column]}))
            editing_adds.append(Relationship(type_name=input_type.type_name, id=row['globalid'], origin_entity_id=origin_id, destination_entity_id=destination_id, properties=all_props))
    return [editing_adds, editing_updates]
            
def load_tables(knowledge_graph:GraphClient, add_entities: list[LoadTableEntityType], add_relationships: list[LoadTableRelationshipType]):
    # set up variables
    entity_type_adds = []
    relationship_type_adds = []
    all_editing_adds = []
    all_editing_updates = []
    
    for current_type in add_entities:
        # determine if merge needs to be performed on InputTable for type
        merge_props = []
        merge_columns = current_type.get_merge_columns()
        if merge_props != []:
            current_type.table.merge(merge_columns)
            
        # create all necessary new types in data and update existing types with extra properties
        type_add = process_type_to_data_model(knowledge_graph, current_type)
        entity_type_adds.append(type_add)
        
        # set up instances of all types to be created, assign and track globalids as we go to make loading easier and faster, do merging as needed
        [editing_adds, editing_updates] = process_type_to_instances(knowledge_graph, current_type)
        all_editing_adds += editing_adds
        all_editing_updates += editing_updates
    
    for current_type in add_relationships:
        # determine if merge needs to be performed on InputTable for type
        merge_columns = []
        for prop in current_type.properties:
            if prop.merge==True:
                merge_columns.append(prop.column)
        if merge_props != []:
            current_type.table.merge(merge_columns)
            
        # create all necessary new types in data and update existing types with extra properties
        type_add = process_type_to_data_model(knowledge_graph, current_type)
        relationship_type_adds.append(type_add)
        
        [editing_adds, editing_updates] = process_type_to_instances(knowledge_graph, current_type, add_entities)
        all_editing_adds += editing_adds
        all_editing_updates += editing_updates
    # add all necessary types to the data model
    knowledge_graph.named_object_type_adds(entity_types=entity_type_adds, relationship_types=relationship_type_adds)
    if all_editing_adds != []:
        response = knowledge_graph.apply_edits(adds=all_editing_adds)
        if response.error:
            print("there was an error in the edit: "+str(response.error))
    if all_editing_updates != []:
        knowledge_graph.apply_edits(updates=all_editing_updates)
        if response.error:
            print("there was an error in the edit: "+str(response.error))

In [23]:
entity_adds = [
    LoadTableEntityType("Person", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\Person.csv"), properties=[
        LoadTableProperty("id", "esriFieldTypeInteger", ":ID", merge=True),
        LoadTableProperty("last_name", "esriFieldTypeString", "surname", merge=True),
        LoadTableProperty("nhs_no", "esriFieldTypeString", "nhs_no"),
        LoadTableProperty("first_name", "esriFieldTypeString", "name", merge=True)
    ]),
    LoadTableEntityType("Officer", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\Officer.csv"), properties=[
        LoadTableProperty("id", "esriFieldTypeInteger", ":ID", merge= True),
        LoadTableProperty("badge_number", "esriFieldTypeString", "badge_no"),
        LoadTableProperty("rank", "esriFieldTypeString", "rank"),
        LoadTableProperty("first_name", "esriFieldTypeString", "name"),
        LoadTableProperty("last_name", "esriFieldTypeString", "surname")
    ]),
    LoadTableEntityType("Phone", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\Phone.csv"), properties=[
        LoadTableProperty("id", "esriFieldTypeInteger", ":ID", merge=True),
        LoadTableProperty("phone_number", "esriFieldTypeString", "phoneNo")
    ]),
    LoadTableEntityType("Area", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\Area.csv"), properties=[
        LoadTableProperty("id", "esriFieldTypeInteger", ":ID", merge=True),
        LoadTableProperty("area_code", "esriFieldTypeString", "areaCode")
    ]),
    LoadTableEntityType("Crime", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\Crime.csv"), properties=[
        LoadTableProperty("id", "esriFieldTypeInteger", ":ID", merge=True),
        LoadTableProperty("date", "esriFieldTypeString", "date"),
        LoadTableProperty("type", "esriFieldTypeString", "type"),
        LoadTableProperty("last_outcome", "esriFieldTypeString", "last_outcome"),
        LoadTableProperty("note", "esriFieldTypeString", "note"),
        LoadTableProperty("charge", "esriFieldTypeString", "charge")
    ]),
    LoadTableEntityType("Email", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\Email.csv"), properties=[
        LoadTableProperty("id", "esriFieldTypeInteger", ":ID", merge=True),
        LoadTableProperty("email_address", "esriFieldTypeString", "email_address")
    ]),
    LoadTableEntityType("Location", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\Location.csv"), properties=[
        LoadTableProperty("id", "esriFieldTypeInteger", ":ID", merge=True),
        LoadTableProperty("address", "esriFieldTypeString", "address"),
        LoadTableProperty("post_code", "esriFieldTypeString", "postcode"),
        LoadTableProperty("shape", "esriFieldTypeGeometry", ["latitude:double","longitude:double"], geometry_type="esriGeometryPoint")
    ]),
    LoadTableEntityType("Object", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\Object.csv"), properties=[
        LoadTableProperty("id", "esriFieldTypeInteger", ":ID", merge=True),
        LoadTableProperty("description", "esriFieldTypeString", "description"),
        LoadTableProperty("type", "esriFieldTypeString", "type")
    ]),
    LoadTableEntityType("PhoneCall", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\PhoneCall.csv"), properties=[
        LoadTableProperty("id", "esriFieldTypeInteger", ":ID", merge=True),
        LoadTableProperty("call_duration", "esriFieldTypeInteger", "call_duration"),
        LoadTableProperty("call_time", "esriFieldTypeString", "call_time"),
        LoadTableProperty("call_date", "esriFieldTypeString", "call_date"),
        LoadTableProperty("call_type", "esriFieldTypeString", "call_type")
    ]),
    LoadTableEntityType("PostCode", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\PostCode.csv"), properties=[
        LoadTableProperty("id", "esriFieldTypeInteger", ":ID", merge=True),
        LoadTableProperty("code", "esriFieldTypeString", "code")
    ]),
    LoadTableEntityType("Vehicle", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\Vehicle.csv"), properties=[
        LoadTableProperty("id", "esriFieldTypeInteger", ":ID", merge=True),
        LoadTableProperty("model", "esriFieldTypeString", "model"),
        LoadTableProperty("registration", "esriFieldTypeString", "reg"),
        LoadTableProperty("make", "esriFieldTypeString", "make"),
        LoadTableProperty("year", "esriFieldTypeInteger", "year")
    ])
]
relationship_adds = [
    LoadTableRelationshipType("KNOWS", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\KNOWS.csv"), RelationshipEndpoint("Person", "id", ":START_ID", ":ID"), RelationshipEndpoint("Person", "id", ":END_ID", ":ID")),
    LoadTableRelationshipType("CALLED", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\CALLED.csv"), RelationshipEndpoint("PhoneCall", "id", ":START_ID", ":ID"), RelationshipEndpoint("Phone", "id", ":END_ID", ":ID")),
    LoadTableRelationshipType("CALLER", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\CALLER.csv"), RelationshipEndpoint("PhoneCall", "id", ":START_ID", ":ID"), RelationshipEndpoint("Phone", "id", ":END_ID", ":ID")),
    LoadTableRelationshipType("CURRENT_ADDRESS", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\CURRENT_ADDRESS.csv"), RelationshipEndpoint("Person", "id", ":START_ID", ":ID"), RelationshipEndpoint("Location", "id", ":END_ID", ":ID")),
    LoadTableRelationshipType("FAMILY_REL", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\FAMILY_REL.csv"), RelationshipEndpoint("Person", "id", ":START_ID", ":ID"), RelationshipEndpoint("Person", "id", ":END_ID", ":ID")),
    LoadTableRelationshipType("HAS_EMAIL", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\HAS_EMAIL.csv"), RelationshipEndpoint("Person", "id", ":START_ID", ":ID"), RelationshipEndpoint("Email", "id", ":END_ID", ":ID")),
    LoadTableRelationshipType("HAS_PHONE", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\HAS_PHONE.csv"), RelationshipEndpoint("Person", "id", ":START_ID", ":ID"), RelationshipEndpoint("Phone", "id", ":END_ID", ":ID")),
    LoadTableRelationshipType("HAS_POSTCODE", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\HAS_POSTCODE.csv"), RelationshipEndpoint("Location", "id", ":START_ID", ":ID"), RelationshipEndpoint("PostCode", "id", ":END_ID", ":ID")),
    LoadTableRelationshipType("INVESTIGATED_BY", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\INVESTIGATED_BY.csv"), RelationshipEndpoint("Crime", "id", ":START_ID", ":ID"), RelationshipEndpoint("Officer", "id", ":END_ID", ":ID")),
    LoadTableRelationshipType("INVOLVED_IN", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\INVOLVED_IN.csv"), RelationshipEndpoint("Vehicle", "id", ":START_ID", ":ID"), RelationshipEndpoint("Crime", "id", ":END_ID", ":ID")),
    LoadTableRelationshipType("INVOLVED_IN", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\INVOLVED_IN.csv"), RelationshipEndpoint("Object", "id", ":START_ID", ":ID"), RelationshipEndpoint("Crime", "id", ":END_ID", ":ID")),
    LoadTableRelationshipType("KNOWS_LW", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\KNOWS_LW.csv"), RelationshipEndpoint("Person", "id", ":START_ID", ":ID"), RelationshipEndpoint("Person", "id", ":END_ID", ":ID")),
    LoadTableRelationshipType("KNOWS_PHONE", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\KNOWS_PHONE.csv"), RelationshipEndpoint("Person", "id", ":START_ID", ":ID"), RelationshipEndpoint("Person", "id", ":END_ID", ":ID")),
    LoadTableRelationshipType("LOCATION_IN_AREA", InputTable(r"C:\Users\meg10359\Downloads\pole-data-importer\LOCATION_IN_AREA.csv"), RelationshipEndpoint("Location", "id", ":START_ID", ":ID"), RelationshipEndpoint("Area", "id", ":END_ID", ":ID"))
]

In [25]:
gis = GIS("https://dev0025946.esri.com/portal","publisher2","esri.agp123",verify_cert=False)
created_kg = gis.content.create_service(name="TestingLoadNotebook",service_type="KnowledgeGraph")
knowledge_graph = GraphClient(created_kg.url, gis=gis)
load_tables(knowledge_graph, entity_adds, relationship_adds)

Setting `verify_cert` to False is a security risk, use at your own risk.
  Expected `str` but got `LoadTableProperty` - serialized value may not be as expected
  Expected `str` but got `LoadTableProperty` - serialized value may not be as expected
  Expected `str` but got `LoadTableProperty` - serialized value may not be as expected
  Expected `str` but got `LoadTableProperty` - serialized value may not be as expected
  return self.__pydantic_serializer__.to_python(
  Expected `str` but got `LoadTableProperty` - serialized value may not be as expected
  Expected `str` but got `LoadTableProperty` - serialized value may not be as expected
  Expected `str` but got `LoadTableProperty` - serialized value may not be as expected
  Expected `str` but got `LoadTableProperty` - serialized value may not be as expected
  Expected `str` but got `LoadTableProperty` - serialized value may not be as expected
  return self.__pydantic_serializer__.to_python(
  Expected `str` but got `LoadTableProperty` -

In [24]:
gis.content.search("TestingLoadNotebook")[0].delete()

True