This notebook explains how to create custom entities in Azure Purview to represent ML workflows

In [3]:

import dotenv
from pyapacheatlas.auth import ServicePrincipalAuthentication
from pyapacheatlas.core import PurviewClient

dotenv.load_dotenv()

# authenticate and instantiate client
tenant_id = os.environ["TENANT_ID"]
client_id = os.environ["SP_ID"]
client_secret = os.environ["SP_SECRET"]

auth = ServicePrincipalAuthentication(
    tenant_id=tenant_id,
    client_id=client_id,
    client_secret=client_secret
)

client = PurviewClient(account_name="ml-purview", authentication=auth)


In [30]:

# Define ML Pipelne Process Type

from pyapacheatlas.core import EntityTypeDef, AtlasAttributeDef
from pyapacheatlas.core.typedef import Cardinality


pipeline_name = AtlasAttributeDef(
    name="pipeline_name",
    displayName="Pipeline Name",
    description="Name of the Azure ML pipeline"
)

pipeline_owner = AtlasAttributeDef(
    name="pipeline_owner",
    displayName="Pipeline Owner",
    description="Name of the main developer of the ML pipeline"
)

process = EntityTypeDef(
    name="azureml_pipeline",
    superTypes=["Process"],
    attributeDefs = [pipeline_name, pipeline_owner]
)

process.to_json()

client.upload_typedefs(process)



{'enumDefs': [],
 'structDefs': [],
 'classificationDefs': [],
 'entityDefs': [{'category': 'ENTITY',
   'guid': 'f2e8cdf5-ca17-4972-befd-5d8d7fa5b160',
   'createdBy': '33d76053-d12a-473a-86a9-5234ba482970',
   'updatedBy': '33d76053-d12a-473a-86a9-5234ba482970',
   'createTime': 1621593463485,
   'updateTime': 1621593463485,
   'version': 1,
   'name': 'azureml_pipeline',
   'description': 'azureml_pipeline',
   'typeVersion': '1.0',
   'lastModifiedTS': '1',
   'attributeDefs': [{'name': 'pipeline_name',
     'typeName': 'string',
     'isOptional': True,
     'cardinality': 'SINGLE',
     'valuesMinCount': 0,
     'valuesMaxCount': 1,
     'isUnique': False,
     'isIndexable': False,
     'includeInNotification': False,
     'description': 'Name of the Azure ML pipeline'},
    {'name': 'pipeline_owner',
     'typeName': 'string',
     'isOptional': True,
     'cardinality': 'SINGLE',
     'valuesMinCount': 0,
     'valuesMaxCount': 1,
     'isUnique': False,
     'isIndexable': Fa

In [50]:
foo=client.get_entity(guid="5dc733af-52bc-487a-8351-a807210c28d9")

In [61]:
from pyapacheatlas.core import AtlasEntity, AtlasProcess

iris_score = client.get_entity(guid="3f92f951-a198-4cdf-a303-39a7164484b7")["entities"][0]
iris_predicted = client.get_entity(guid="5dc733af-52bc-487a-8351-a807210c28d9")["entities"][0]


In [67]:
iris_predicted["attributes"]["name"]

'iris.csv'

In [74]:

my_pipeline = AtlasProcess(
    name="iris_score_pipeline",
    typeName="azureml_pipeline",
    qualified_name="https://westeurope.api.azureml.ms/pipelines/v1.0/subscriptions/d50ade7c-2587-4da8-9c63-fc828541722c/resourceGroups/rgp-show-weu-aml-databricks/providers/Microsoft.MachineLearningServices/workspaces/aml-mlops-demo/PipelineRuns/PipelineSubmit/a00b0cec-769f-4623-a795-e7b7968bb405",
    guid=-1,
    outputs=[
        AtlasEntity(
            name=iris_predicted["attributes"]["name"],
            typeName=iris_predicted["typeName"],
            qualified_name=iris_predicted["attributes"]["qualifiedName"],
            guid=iris_predicted["guid"],
        )
    ],
    inputs=[
        AtlasEntity(
            name=iris_score["attributes"]["name"],
            typeName=iris_score["typeName"],
            qualified_name=iris_score["attributes"]["qualifiedName"],
            guid=iris_score["guid"],
        )
    ]
)

In [75]:
my_pipeline.to_json()

{'typeName': 'azureml_pipeline',
 'guid': -1,
 'attributes': {'name': 'iris_score_pipeline',
  'qualifiedName': 'https://westeurope.api.azureml.ms/pipelines/v1.0/subscriptions/d50ade7c-2587-4da8-9c63-fc828541722c/resourceGroups/rgp-show-weu-aml-databricks/providers/Microsoft.MachineLearningServices/workspaces/aml-mlops-demo/PipelineRuns/PipelineSubmit/a00b0cec-769f-4623-a795-e7b7968bb405',
  'inputs': [{'typeName': 'azure_blob_path',
    'guid': '3f92f951-a198-4cdf-a303-39a7164484b7',
    'qualifiedName': 'https://topsecretdata.blob.core.windows.net/data/iris-score.csv'}],
  'outputs': [{'typeName': 'azure_blob_path',
    'guid': '5dc733af-52bc-487a-8351-a807210c28d9',
    'qualifiedName': 'https://topsecretdata.blob.core.windows.net/staging/predictions/iris.csv'}]},
 'relationshipAttributes': {}}

In [78]:
client.upload_entities(my_pipeline)

{'mutatedEntities': {'CREATE': [{'typeName': 'azureml_pipeline',
    'attributes': {'qualifiedName': 'https://westeurope.api.azureml.ms/pipelines/v1.0/subscriptions/d50ade7c-2587-4da8-9c63-fc828541722c/resourceGroups/rgp-show-weu-aml-databricks/providers/Microsoft.MachineLearningServices/workspaces/aml-mlops-demo/PipelineRuns/PipelineSubmit/a00b0cec-769f-4623-a795-e7b7968bb405'},
    'lastModifiedTS': '1',
    'guid': '9bf0379b-c9e8-44e3-9e90-bf740945a907'}]},
 'guidAssignments': {'-1': '9bf0379b-c9e8-44e3-9e90-bf740945a907'}}