# Imports and Setup

In [1]:
import pm4py
import numpy as np
import pandas as pd
import pdb
import os
import yatter
from ruamel.yaml import YAML
import kglab
import re
from string import Template
import tempfile

# Testing

In [2]:
## define paths
data_dir = '../data'
output_dir = '../output'
config_dir = '../config'
os.makedirs(data_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
os.makedirs(config_dir, exist_ok=True)

mapping_file = data_dir + '/sample_mapping.yaml'
rml_output_path = output_dir + '/sample_mapping_rml.ttl'
kg_config_path = config_dir + '/sample_kg_config.ini'

In [3]:
## define KGLab config
config = f"""
[test]
mappings={rml_output_path}
"""
with open(kg_config_path, 'w') as f:
    f.write(config)
# define KG namespaces   
namespaces = {
    'ex:' : "http://example.com/",
    'on:' : "https://stl.mie.utoronto.ca/ontologies/spm/"
}

In [4]:
def load_df_from_log(log_path):
  """
  Return a dataframe from a given XES log filepath or CSV
  """
  if any(log_path.lower().endswith(ext) for ext in ['.xes', '.xes.gz']):
    log = pm4py.read_xes(log_path)
    df = pm4py.convert_to_dataframe(log)
  elif log_path.lower().endswith('.csv'):
    df = pd.read_csv(log_path)

  return df

In [5]:
# view the sample event log
log_df = load_df_from_log('../data/sample_log.csv')
log_df.head()

Unnamed: 0,caseID,activityID,eventID,timestamp,resourceID
0,case_0,activity_A,event_0,2016-01-01 09:00:00.000000+00:00,user_1
1,case_0,activity_A,event_1,2016-01-01 09:15:00.000000+00:00,user_1
2,case_0,activity_C,event_2,2016-01-01 09:35:00.000000+00:00,user_1
3,case_1,activity_A,event_3,2016-01-02 09:00:00.000000+00:00,user_1
4,case_1,activity_B,event_4,2016-01-02 09:00:00.000000+00:00,user_0


In [9]:
## convert YARRRML to RML
yaml = YAML(typ='safe', pure=True)
yarrrml_content = yaml.load(open(mapping_file))
rml_content = yatter.translate(yarrrml_content)
rml_file = open(rml_output_path, 'w')
rml_file.write(rml_content)
rml_file.close()

2024-07-30 22:13:32,536 | INFO: Translating YARRRML mapping to [R2]RML
2024-07-30 22:13:32,537 | INFO: RML content is created!
2024-07-30 22:13:32,545 | INFO: Mapping has been syntactically validated.
2024-07-30 22:13:32,546 | INFO: Translation has finished successfully.


In [10]:
# init knowledge graph
kg = kglab.KnowledgeGraph(name="test", namespaces=namespaces)
# create instances from mapping
kg.materialize(kg_config_path)
# save rdf instances
kg.save_rdf(output_dir + '/sample_log_instances.ttl')

2024-07-30 22:13:33,702 | DEBUG: CONFIGURATION: {'output_file': 'knowledge-graph', 'na_values': ',nan', 'safe_percent_encoding': '', 'read_parsed_mappings_path': '', 'write_parsed_mappings_path': '', 'mapping_partitioning': 'PARTIAL-AGGREGATIONS', 'logging_file': '', 'oracle_client_lib_dir': '', 'oracle_client_config_dir': '', 'udfs': '', 'output_dir': '', 'output_format': 'N-TRIPLES', 'only_printable_chars': 'no', 'infer_sql_datatypes': 'no', 'logging_level': 'INFO', 'number_of_processes': '24'}
2024-07-30 22:13:33,703 | DEBUG: DATA SOURCE `test`: {'mappings': '../output/sample_mapping_rml.ttl'}
2024-07-30 22:13:34,260 | INFO: 8 mapping rules retrieved.
2024-07-30 22:13:34,269 | DEBUG: All predicate maps are constant-valued, invariant subset is not enforced.
2024-07-30 22:13:34,274 | DEBUG: All graph maps are constant-valued, invariant subset is not enforced.
2024-07-30 22:13:34,278 | INFO: Mapping partition with 8 groups generated.
2024-07-30 22:13:34,280 | INFO: Maximum number of ru

In [135]:
## convert from RDF A-Box to PSL A-Box
ABox = np.array([])

## Query1: Simple unary predicates
df = kg.query_as_df(sparql="SELECT ?s ?o WHERE {?s a ?o}")
unary_preds = df.apply(lambda x: re.sub(r'.*:', '', x['o']) + '(' + re.sub(r'.*/|>$', '', x['s']) + ')', axis=1).values
ABox = np.concatenate((ABox, unary_preds), axis=0)

In [136]:
## Query 2: Timepoints
df = kg.query_as_df(sparql="SELECT ?s ?t WHERE {?s ns1:hasRecordedTime ?t}")
unique_timestamps = df['t'].unique()

# create timestamp mapping
timestamp_mapping = {timestamp: f'ts_{i}' for i, timestamp in enumerate(sorted(unique_timestamps))}

# apply mapping
df['new_t'] = df['t'].map(timestamp_mapping)

# create ordering relations over timestamps
unique_mapped_timestamps = sorted(df['new_t'].unique())
timestamp_pairs = [(unique_mapped_timestamps[i], unique_mapped_timestamps[i+1]) for i in range(len(unique_mapped_timestamps) - 1)]

before_relations = [f'before({t1},{t2})' for t1, t2 in timestamp_pairs]

timestamp_preds = [f'timepoint({t})' for t in unique_mapped_timestamps]

event_timings = df.apply(lambda x: 'hasRecordedTime({}, {})'.format(re.sub(r".*/|>$", '', x["s"]), x["new_t"]), axis=1).values

ABox = np.concatenate((ABox, timestamp_preds, event_timings, before_relations), axis=0)

In [137]:
## Query 3: Other binary relations
df = kg.query_as_df(sparql="SELECT ?s ?p ?o WHERE {?s ?p ?o . FILTER (?p != rdf:type && ?p != ns1:hasRecordedTime)}")
binary_relations = df.apply(lambda x: f'{re.sub(r".*:", "", x["p"])}({re.sub(r".*/|>$", "", x["s"])}, {re.sub(r".*/|>$", "", x["o"])})', axis=1).values

ABox = np.concatenate((ABox, binary_relations), axis=0)

In [138]:
## Lastly, add process instance relations
df = kg.query_as_df(sparql="SELECT ?s WHERE {?s a ns1:Case}")
process_instance = df.apply(lambda x: f'hasProcess({re.sub(r".*/|>$", "", x["s"])}, P1)', axis=1).values
process_instance

ABox = np.concatenate((ABox, process_instance), axis=0)

In [139]:
ABox

array(['Resource(user_0)', 'Resource(user_1)', 'Resource(user_2)',
       'Event(event_3)', 'Event(event_6)', 'Event(event_1)',
       'Event(event_2)', 'Event(event_9)', 'Event(event_7)',
       'Event(event_4)', 'Event(event_5)', 'Event(event_8)',
       'Event(event_0)', 'Case(case_0)', 'Case(case_2)', 'Case(case_1)',
       'Activity(activity_C)', 'Activity(activity_B)',
       'Activity(activity_A)', 'Activity(activity_D)', 'timepoint(ts_0)',
       'timepoint(ts_1)', 'timepoint(ts_2)', 'timepoint(ts_3)',
       'timepoint(ts_4)', 'timepoint(ts_5)', 'timepoint(ts_6)',
       'hasRecordedTime(event_3, ts_4)', 'hasRecordedTime(event_4, ts_4)',
       'hasRecordedTime(event_8, ts_2)', 'hasRecordedTime(event_1, ts_2)',
       'hasRecordedTime(event_2, ts_3)', 'hasRecordedTime(event_7, ts_1)',
       'hasRecordedTime(event_6, ts_0)', 'hasRecordedTime(event_0, ts_0)',
       'hasRecordedTime(event_5, ts_5)', 'hasRecordedTime(event_9, ts_6)',
       'before(ts_0,ts_1)', 'before(ts_1,ts_2

In [141]:
# Save ABox to file
with open(output_dir + '/sample_log_ABox.p9', 'w') as f:
    for item in ABox:
        f.write("%s.\n" % item)

# Object Oriented Form

In [9]:
class LogProcessor:
    def __init__(self, log_path,
                 column_dict={'case_id': 'case:concept:name', 'activity': 'concept:name', 'timestamp': 'time:timestamp', 'resource': 'org:resource', 'event_id' : None},
                 prefixes={'ex':'http://www.example.com/', 'on': 'https://stl.mie.utoronto.ca/ontologies/spm/'},
                 process_name='P1'):
        self.process_name = process_name
        self.column_dict = column_dict
        self.prefixes = prefixes
        self.log_path = log_path
        self.log_df = self.load_df_from_log()
        self.mapping = self.build_mapping()
        self.fol_abox = np.array([])

    
    def load_df_from_log(self):
        """
        Return a dataframe from a given XES log filepath or CSV 
        and creates a temporary csv file with additional columns needed for consistent processing
        """
        log_path = self.log_path
        col_dict = self.column_dict
        if any(log_path.lower().endswith(ext) for ext in ['.xes', '.xes.gz']): # if log is in XES format
            log = pm4py.read_xes(log_path)
            df = pm4py.convert_to_dataframe(log)
            #df.to_csv(re.sub(r'\.xes(\.gz)?$', '.csv', log_path), index=False)
        elif log_path.lower().endswith('.csv'): # if log is in CSV format
            df = pd.read_csv(log_path)
        else: # if log is in an unsupported format
            return None

        # add a process instance column to the dataframe
        df['processID'] = self.process_name
        
        # ensure non-overlapping URIs by prefixing columns with letter type encoding
        if not col_dict['event_id']: # if no unique identifier for events, create one
            df['event_id'] = df.index
            self.column_dict['event_id'] = 'event_id'
        df[col_dict['event_id']] = df[col_dict['event_id']].apply(lambda x: f'E{str(x)}')
        df[col_dict['case_id']] = df[col_dict['case_id']].apply(lambda x: f'C{str(x)}')
        df[col_dict['activity']] = df[col_dict['activity']].apply(lambda x: f'A{str(x)}')
        df[col_dict['resource']] = df[col_dict['resource']].apply(lambda x: f'R{str(x)}')
        
        # create temporary log csv
        with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.csv') as f:
            f.write(df.to_csv(index=False))
            self.log_path = f.name
        
        return df
    
    def build_mapping(self):
        """
        Modifies YARRML mapping according to expected columns in the log
        """
        
        mapping_template = """
        prefixes:
            ex: $ex_prefix
            on: $on_prefix

        mappings:
            events:
                sources:
                - ['$log_path~$log_format']
                s: ex:$$($eventID)
                po:
                - [a, on:Event]
                - [on:hasCase, ex:$$($caseID)]
                - [on:hasActivity, ex:$$($activityID)]
                - [on:hasResource, ex:$$($resourceID)]
                - [on:hasRecordedTime, $$($timestamp), xsd:dateTimeStamp]

            resources:
                sources:
                - ['$log_path~$log_format']
                s: ex:$$($resourceID)
                po:
                - [a, on:Resource]

            cases:
                sources:
                - ['$log_path~$log_format']
                s: ex:$$($caseID)
                po:
                - [a, on:Case]
                - [on:hasProcess, ex:$$(processID)]

            activities:
                sources:
                - ['$log_path~$log_format']
                s: ex:$$($activityID)
                po:
                - [a, on:Activity]
        """
        mapping_template = Template(mapping_template)
        mapping_string = mapping_template.substitute(
            log_path=self.log_path,
            log_format='csv',
            ex_prefix=self.prefixes['ex'],
            on_prefix=self.prefixes['on'],
            eventID=self.column_dict['event_id'],
            caseID=self.column_dict['case_id'],
            activityID=self.column_dict['activity'],
            resourceID=self.column_dict['resource'],
            timestamp=self.column_dict['timestamp']
        )
        yaml = YAML(typ='safe', pure=True)
        yarrrml_content = yaml.load(mapping_string)
        rml_mapping = yatter.translate(yarrrml_content)
        # write rml mapping to temporary file
        with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.ttl') as f:
            f.write(rml_mapping)
            self.rml_path = f.name
        
        return rml_mapping
    
    def generate_knowledge_graph(self):
        """
        Generates a knowledge graph from the log and mapping
        """
        # init knowledge graph
        kg = kglab.KnowledgeGraph(name="test", namespaces=self.prefixes)
        # generate config
        config_string = f"""
        [{self.process_name}]
        mappings={self.rml_path}
        """
        # write config to temporary file
        with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.ini') as f:
            f.write(config_string)
            kg_config_path = f.name
        # generate the knowledge graph    
        kg.materialize(kg_config_path)
        
        self.kg = kg
        
        return kg
        
    def save_knowledge_graph(self, output_path):
        if not hasattr(self, 'kg'):
            self.generate_knowledge_graph()
        output_file = output_path + f'{self.process_name}_log_instances.ttl'
        self.kg.save_rdf(output_file)
        
        return None
        
    def generate_FOL(self):
        """
        Generates a First Order Logic representation of the log
        """
        if not hasattr(self, 'kg'):
            self.generate_knowledge_graph()
            
        # helper functions definitions for converting RDF to FOL    
        def query_and_apply(query, func):
            df = self.kg.query_as_df(sparql=query)
            vals = df.apply(func, axis=1).values
            self.fol_abox = np.concatenate((self.fol_abox, vals), axis=0)

        strip_ex_prefix  = lambda x: re.sub(r".*/|>$", '', x)
        strip_on_prefix = lambda x: re.sub(r".*:", '', x)
        unary_pred = lambda s,o : f'{strip_on_prefix(o)}({strip_ex_prefix(s)})'
        binary_pred = lambda s,p,o : f'{strip_on_prefix(p)}({strip_ex_prefix(s)}, {strip_ex_prefix(o)})'
        
        # helper function for converting timepoints from data property to FOL
        def convert_timepoints(kg):
            tp_query = "SELECT ?s ?t WHERE {?s ns1:hasRecordedTime ?t}"
            df = kg.query_as_df(sparql=tp_query)
            unique_timestamps = df['t'].unique()
            # create timestamp mapping
            timestamp_mapping = {timestamp: f'ts_{i}' for i, timestamp in enumerate(sorted(unique_timestamps))}
            # apply mapping
            df['new_t'] = df['t'].map(timestamp_mapping)
            # create ordering relations over timestamps
            unique_mapped_timestamps = sorted(df['new_t'].unique())
            timestamp_pairs = [(unique_mapped_timestamps[i], unique_mapped_timestamps[i+1]) for i in range(len(unique_mapped_timestamps) - 1)]
            before_relations = [f'before({t1},{t2})' for t1, t2 in timestamp_pairs]
            timestamp_preds = [f'timepoint({t})' for t in unique_mapped_timestamps]
            event_timings = df.apply(lambda x: 'hasRecordedTime({}, {})'.format(re.sub(r".*/|>$", '', x["s"]), x["new_t"]), axis=1).values
            # add to Abox
            self.fol_abox = np.concatenate((self.fol_abox, timestamp_preds, event_timings, before_relations), axis=0)
        
        
        # Convert simple unary rdf:type predicates
        type_query = "SELECT ?s ?o WHERE {?s a ?o}"
        type_f = lambda x: unary_pred(x['s'], x['o'])
        query_and_apply(type_query, type_f)
        print('test')
        
        # convert binary relations other than time and rdf:type
        relation_query = "SELECT ?s ?p ?o WHERE {?s ?p ?o . FILTER (?p != rdf:type && ?p != ns1:hasRecordedTime)}"
        relation_f = lambda x: binary_pred(x['s'], x['p'], x['o'])
        query_and_apply(relation_query, relation_f)
        
        # convert timepoints
        convert_timepoints(self.kg)
        
        return self.fol_abox
        
    def save_FOL(self, output_dir, format='prover9'):
        
        if self.fol_abox.size == 0:
            self.generate_FOL()
        file_ext_map = {'prover9': '.p9', 'clif': '.clif'}
        literal_map = {'prover9': lambda x: f'{str(x)}.\n', 'clif': lambda x: f'({str(x)}\n)'}
        output_file = output_dir + f'{self.process_name}_log_literals{file_ext_map[format]}'
        
        with open(output_file, 'w') as f:
            for item in self.fol_abox:
                f.write(literal_map[format](item))
                
        return None

In [11]:
# Example usage
col_dict = {'case_id': 'caseID', 'activity': 'activityID', 'timestamp': 'timestamp', 'resource': 'resourceID', 'event_id' : 'eventID'}
output_dir= '../output/testing/'
namespaces = {'ex' : "http://example.com/", 'on' : "https://stl.mie.utoronto.ca/ontologies/spm/"}
log_processor = LogProcessor('../data/sample_log.csv', process_name='P1', column_dict=col_dict, prefixes=namespaces)
log_processor.save_knowledge_graph(output_dir)
log_processor.save_FOL(output_dir)

2024-07-30 22:19:37,233 | INFO: Translating YARRRML mapping to [R2]RML
2024-07-30 22:19:37,234 | INFO: RML content is created!
2024-07-30 22:19:37,242 | INFO: Mapping has been syntactically validated.
2024-07-30 22:19:37,243 | INFO: Translation has finished successfully.
2024-07-30 22:19:37,246 | DEBUG: CONFIGURATION: {'output_file': 'knowledge-graph', 'na_values': ',nan', 'safe_percent_encoding': '', 'read_parsed_mappings_path': '', 'write_parsed_mappings_path': '', 'mapping_partitioning': 'PARTIAL-AGGREGATIONS', 'logging_file': '', 'oracle_client_lib_dir': '', 'oracle_client_config_dir': '', 'udfs': '', 'output_dir': '', 'output_format': 'N-TRIPLES', 'only_printable_chars': 'no', 'infer_sql_datatypes': 'no', 'logging_level': 'INFO', 'number_of_processes': '24'}
2024-07-30 22:19:37,247 | DEBUG: DATA SOURCE `P1`: {'mappings': '/tmp/tmpl8mzwsue.ttl'}


2024-07-30 22:19:37,795 | INFO: 9 mapping rules retrieved.
2024-07-30 22:19:37,803 | DEBUG: All predicate maps are constant-valued, invariant subset is not enforced.
2024-07-30 22:19:37,808 | DEBUG: All graph maps are constant-valued, invariant subset is not enforced.
2024-07-30 22:19:37,811 | INFO: Mapping partition with 9 groups generated.
2024-07-30 22:19:37,813 | INFO: Maximum number of rules within mapping group: 1.
2024-07-30 22:19:37,814 | INFO: Mappings processed in 0.562 seconds.
2024-07-30 22:19:37,818 | DEBUG: Parallelizing with 24 cores.
2024-07-30 22:19:38,074 | INFO: Number of triples generated in total: 63.


test


In [8]:
log_processor.fol_abox

array(['Resource(Ruser_1)', 'Resource(Ruser_2)', 'Resource(Ruser_0)',
       'Event(Eevent_3)', 'Event(Eevent_8)', 'Event(Eevent_2)',
       'Event(Eevent_4)', 'Event(Eevent_7)', 'Event(Eevent_1)',
       'Event(Eevent_9)', 'Event(Eevent_5)', 'Event(Eevent_0)',
       'Event(Eevent_6)', 'Case(Ccase_2)', 'Case(Ccase_1)',
       'Case(Ccase_0)', 'Activity(Aactivity_B)', 'Activity(Aactivity_A)',
       'Activity(Aactivity_D)', 'Activity(Aactivity_C)',
       'hasActivity(Eevent_6, Aactivity_A)',
       'hasActivity(Eevent_9, Aactivity_D)', 'hasCase(Eevent_7, Ccase_2)',
       'hasActivity(Eevent_0, Aactivity_A)',
       'hasResource(Eevent_6, Ruser_1)', 'hasCase(Eevent_1, Ccase_0)',
       'hasResource(Eevent_0, Ruser_1)', 'hasResource(Eevent_2, Ruser_1)',
       'hasCase(Eevent_3, Ccase_1)', 'hasActivity(Eevent_5, Aactivity_C)',
       'hasCase(Eevent_5, Ccase_1)', 'hasProcess(Ccase_2, P1)',
       'hasActivity(Eevent_7, Aactivity_B)', 'hasProcess(Ccase_1, P1)',
       'hasResource(Eeven

In [5]:
test_template = '$$($x)'
test_template = Template(test_template)
test_template.substitute(x='test')

'$(test)'