In [1]:
import ml_metadata
from ml_metadata import metadata_store
from ml_metadata.proto import metadata_store_pb2
from google.protobuf import json_format

In [2]:
connection_config = metadata_store_pb2.ConnectionConfig()
connection_config.mysql.host = '127.0.0.1'
connection_config.mysql.port = 3306
connection_config.mysql.database = 'metadb' 
connection_config.mysql.user = 'root'  

In [3]:
store = metadata_store.MetadataStore(connection_config)

In [4]:
# Step 1: Let's assume we have a RUN ID (DAG ID) to start with
run_id = "34c07c09-8485-4192-8e31-697208277a7a"
# the run containing the iris dataset we were exploring in the previous slides

In [5]:
# Step 2: Let's query the mlmd to get the execution ID associated with this run
run_execution_id = store.get_executions(
    list_options = ml_metadata.ListOptions(
        filter_query=f'name="run/{run_id}"'
    )
)[0].id
run_execution_id

257

In [6]:
# Step 3: Grab all the pod executions from that run execution
container_executions = store.get_executions(
    list_options = ml_metadata.ListOptions(
        filter_query=f'custom_properties.parent_dag_id.int_value={run_execution_id}'
    )
)

In [7]:
container_execution_ids = [container.id for container in container_executions]
container_execution_ids

[258, 259, 260]

In [8]:
events = store.get_events_by_execution_ids(
            execution_ids=container_execution_ids
        )
artifact_ids = [event.artifact_id for event in events]

artifacts = store.get_artifacts_by_id(artifact_ids)
artifacts

[id: 2
 type_id: 16
 uri: "minio://mlpipeline/v2/artifacts/iris-training-pipeline/34c07c09-8485-4192-8e31-697208277a7a/create-dataset/iris_dataset"
 custom_properties {
   key: "display_name"
   value {
     string_value: "iris_dataset"
   }
 }
 state: LIVE
 type: "system.Dataset"
 create_time_since_epoch: 1724782514643
 last_update_time_since_epoch: 1724782514643,
 id: 3
 type_id: 16
 uri: "minio://mlpipeline/v2/artifacts/iris-training-pipeline/34c07c09-8485-4192-8e31-697208277a7a/normalize-dataset/normalized_iris_dataset"
 custom_properties {
   key: "display_name"
   value {
     string_value: "normalized_iris_dataset"
   }
 }
 state: LIVE
 type: "system.Dataset"
 create_time_since_epoch: 1724782554534
 last_update_time_since_epoch: 1724782554534]

In [9]:
json_artifacts = [json_format.MessageToDict(artifact) for artifact in artifacts]
json_artifacts

[{'id': '2',
  'typeId': '16',
  'uri': 'minio://mlpipeline/v2/artifacts/iris-training-pipeline/34c07c09-8485-4192-8e31-697208277a7a/create-dataset/iris_dataset',
  'customProperties': {'display_name': {'stringValue': 'iris_dataset'}},
  'state': 'LIVE',
  'type': 'system.Dataset',
  'createTimeSinceEpoch': '1724782514643',
  'lastUpdateTimeSinceEpoch': '1724782514643'},
 {'id': '3',
  'typeId': '16',
  'uri': 'minio://mlpipeline/v2/artifacts/iris-training-pipeline/34c07c09-8485-4192-8e31-697208277a7a/normalize-dataset/normalized_iris_dataset',
  'customProperties': {'display_name': {'stringValue': 'normalized_iris_dataset'}},
  'state': 'LIVE',
  'type': 'system.Dataset',
  'createTimeSinceEpoch': '1724782554534',
  'lastUpdateTimeSinceEpoch': '1724782554534'}]

In [10]:
from paleonotology import KFPaleontologist
paleontologist = KFPaleontologist(connection_config)

In [11]:
artifacts = paleontologist.get_artifacts_from_run(run_id)
artifacts

[id: 2
 type_id: 16
 uri: "minio://mlpipeline/v2/artifacts/iris-training-pipeline/34c07c09-8485-4192-8e31-697208277a7a/create-dataset/iris_dataset"
 custom_properties {
   key: "display_name"
   value {
     string_value: "iris_dataset"
   }
 }
 state: LIVE
 type: "system.Dataset"
 create_time_since_epoch: 1724782514643
 last_update_time_since_epoch: 1724782514643,
 id: 3
 type_id: 16
 uri: "minio://mlpipeline/v2/artifacts/iris-training-pipeline/34c07c09-8485-4192-8e31-697208277a7a/normalize-dataset/normalized_iris_dataset"
 custom_properties {
   key: "display_name"
   value {
     string_value: "normalized_iris_dataset"
   }
 }
 state: LIVE
 type: "system.Dataset"
 create_time_since_epoch: 1724782554534
 last_update_time_since_epoch: 1724782554534]

In [12]:
artifact_history = paleontologist.get_artifact_execution_history(artifacts[0].id)

print(f"number of execs related to artifact: {len(artifact_history)}")
for exec in artifact_history:
    print(exec)


number of execs related to artifact: 2
id: 258
type_id: 14
last_known_state: COMPLETE
custom_properties {
  key: "display_name"
  value {
    string_value: "create-dataset"
  }
}
custom_properties {
  key: "image"
  value {
    string_value: ""
  }
}
custom_properties {
  key: "inputs"
  value {
    struct_value {
    }
  }
}
custom_properties {
  key: "namespace"
  value {
    string_value: "kubeflow"
  }
}
custom_properties {
  key: "outputs"
  value {
    struct_value {
    }
  }
}
custom_properties {
  key: "parent_dag_id"
  value {
    int_value: 257
  }
}
custom_properties {
  key: "pod_name"
  value {
    string_value: "iris-training-pipeline-84xw9-system-container-impl-2685640547"
  }
}
custom_properties {
  key: "pod_uid"
  value {
    string_value: "ceb0bcfd-6844-464f-ade1-023c8dcec331"
  }
}
custom_properties {
  key: "task_name"
  value {
    string_value: "create-dataset"
  }
}
type: "system.ContainerExecution"
create_time_since_epoch: 1724782486499
last_update_time_since_

In [13]:
lineage = paleontologist.get_artifact_lineage(artifacts[0].id)
lineage

I1009 14:51:04.427124 50490 rdbms_metadata_access_object.cc:3129] max_num_hops is not set. Using maximum value: 100 to limit the steps of the traversal.


artifact_types {
  id: 16
  name: "system.Dataset"
}
artifact_types {
  id: 17
  name: "system.Model"
}
execution_types {
  id: 13
  name: "system.DAGExecution"
}
execution_types {
  id: 14
  name: "system.ContainerExecution"
}
context_types {
  id: 11
  name: "system.Pipeline"
}
context_types {
  id: 12
  name: "system.PipelineRun"
}
artifacts {
  id: 2
  type_id: 16
  uri: "minio://mlpipeline/v2/artifacts/iris-training-pipeline/34c07c09-8485-4192-8e31-697208277a7a/create-dataset/iris_dataset"
  custom_properties {
    key: "display_name"
    value {
      string_value: "iris_dataset"
    }
  }
  state: LIVE
  type: "system.Dataset"
  create_time_since_epoch: 1724782514643
  last_update_time_since_epoch: 1724782514643
}
artifacts {
  id: 3
  type_id: 16
  uri: "minio://mlpipeline/v2/artifacts/iris-training-pipeline/34c07c09-8485-4192-8e31-697208277a7a/normalize-dataset/normalized_iris_dataset"
  custom_properties {
    key: "display_name"
    value {
      string_value: "normalized_ir

In [14]:
paleontologist.visualize_lineage(lineage)

Error: no "view" mailcap rules found for type "application/pdf"
Try running the update-desktop-database command. If you
don't have this command you should install the
desktop-file-utils package. This package is available from
http://freedesktop.org/wiki/Software/desktop-file-utils/
Can't call method "get_value" on an undefined value at /usr/bin/mimeopen line 159.


In [None]:
paleontologist.visualize_lineage(lineage,display_association=True, display_attribution=True)