# Provena Client Library Workflow Guide and Example. 

This notebook contains guidance and examples on how to use the Provena Client Library with common Provena operations (create, fetch, model run lodge etc.)

The client library is an user friendly interface to interact with the various API's of Provena (Registry, Prov, Datastore, etc.) through code and is currently compatiable with the Python programming language.

To find further information or explore other Provena operations of the client library: https://provena.github.io/provena-python-client/

### Client Configuration and Initialisation

In [1]:
# Import initial modules needed.
from provenaclient import ProvenaClient, Config
from provenaclient.auth import DeviceFlow
from provenaclient.auth.implementations import OfflineFlow
from pprint import pprint


Instantiate the client library by providing the domain your Provena instance is hosted on and the name of your Keycloak realm. 

In [2]:
# Provena config - replace with your Provena instance endpoints
client_config = Config(
    domain="dev.rrap-is.com",
    realm_name="rrap"
)

offline_mode = False

if offline_mode:
    load_dotenv()
    offline_token=os.getenv('PROVENA_API_TOKEN')
    assert offline_token, "Offline token must be present in .env file e.g. PROVENA_API_TOKEN=1234."
    print(f"Offline mode activated and token found in .env file.")

if not offline_mode:
    auth = DeviceFlow(config=client_config,
                    client_id="client-tools")
else:
    auth = OfflineFlow(config=client_config, client_id="automated-access", offline_token=offline_token)


# Instantiate the client.
client = ProvenaClient(config=client_config, auth=auth)

In [3]:
import example_workflow_config

config_path = "configs/example_workflow3.json"
config = example_workflow_config.load_config(path=config_path)
config.pprint()

{
  "inputs": {
    "input_dataset": "10378.1/1904964",
    "input_dataset_template": "10378.1/1905250"
  },
  "outputs": {
    "output_dataset": "10378.1/1904961",
    "output_dataset_template": "10378.1/1926245"
  },
  "associations": {
    "person": "10378.1/1893843",
    "organisation": "10378.1/1893860"
  },
  "workflow_configuration": {
    "workflow_template": "10378.1/1905251"
  }
}


## Querying Datastore API.

We will take a look at querying and interacting with the Datastore API exploring common operations of fetching dataset, minting dataset and fetching all datasets in various formats (paginated, all).

In [4]:

dataset = await client.datastore.fetch_dataset(id = "10378.1/1908974")

print(dataset) # Fetched dataset pythonic object.
print()
print("Dataset Query Details:", dataset.status.details) # Accessing fetched dataset query details
print()
print("Dataset Display Name:", dataset.item.display_name) # Accessing fetched dataset name

status=Status(success=True, details="Successfully fetched data for handle '10378.1/1908974'") item=ItemDataset(display_name='TEst', user_metadata=None, collection_format=CollectionFormat(associations=CollectionFormatAssociations(organisation_id='10378.1/1877551', data_custodian_id=None, point_of_contact=None), approvals=CollectionFormatApprovals(ethics_registration=DatasetEthicsRegistrationCheck(relevant=False, obtained=False), ethics_access=DatasetEthicsAccessCheck(relevant=False, obtained=False), indigenous_knowledge=IndigenousKnowledgeCheck(relevant=False, obtained=False), export_controls=ExportControls(relevant=False, obtained=False)), dataset_info=CollectionFormatDatasetInfo(name='TEst', description='TEst', access_info=AccessInfo(reposited=False, uri='http://google.com', description='test'), publisher_id='10378.1/1877551', created_date=CreatedDate(relevant=True, value=datetime.date(2024, 6, 5)), published_date=PublishedDate(relevant=True, value=datetime.date(2024, 6, 21)), license

In [5]:
from ProvenaInterfaces.RegistryModels import *
from datetime import date

dataset_to_create = CollectionFormat(
        associations=CollectionFormatAssociations(
        organisation_id="10378.1/1893860",
        data_custodian_id="10378.1/1893843",
        point_of_contact= None
        ),
        approvals=CollectionFormatApprovals(
            ethics_registration = DatasetEthicsRegistrationCheck(relevant=False, obtained=False),
            ethics_access=DatasetEthicsAccessCheck(relevant= False, obtained= False),
            indigenous_knowledge=IndigenousKnowledgeCheck(relevant=False, obtained= False),
            export_controls=ExportControls(relevant=False, obtained=False)
        ),
        dataset_info=CollectionFormatDatasetInfo(
            name="Parth testing",
            description="testing dataset",
            access_info=AccessInfo(reposited=True, uri=None, description=None),
            publisher_id="10378.1/1893860",
            published_date=PublishedDate(relevant=True,value=date.today()),
            license = "https://www.google.com", #type:ignore
            created_date=CreatedDate(relevant=True,value=date.today()),
            purpose= None,
            rights_holder=None,
            usage_limitations=None,
            preferred_citation=None,
            formats = None,
            keywords= None,
            user_metadata= None,
            version = None
        )
    )

created_dataset = await client.datastore.mint_dataset(dataset_mint_info=dataset_to_create)

print("Created Dataset handle is:", created_dataset.handle)
print("Created Dataset reqeuest details:", created_dataset.status.details)

Created Dataset handle is: 10378.1/1948403
Created Dataset reqeuest details: Successfully seeded location - see location details.


In [6]:
from ProvenaInterfaces.RegistryAPI import *

# Sort criteria to receive datasets.
sort_criteria = NoFilterSubtypeListRequest(
            sort_by=SortOptions(sort_type=SortType.DISPLAY_NAME, ascending=False, begins_with=None), 
            pagination_key=None, 
            page_size=10
        )


list_datasets = await client.datastore.list_datasets(list_dataset_request=sort_criteria)

for i in list_datasets:
    print(i)


('status', Status(success=True, details='Successfully listed items.'))
('items', [ItemDataset(display_name='test dataset 1', user_metadata={'another custom': 'annotation', 'my custom': 'annotation'}, collection_format=CollectionFormat(associations=CollectionFormatAssociations(organisation_id='10378.1/1943278', data_custodian_id='10378.1/1943279', point_of_contact='Not Peter Baker.'), approvals=CollectionFormatApprovals(ethics_registration=DatasetEthicsRegistrationCheck(relevant=False, obtained=False), ethics_access=DatasetEthicsAccessCheck(relevant=False, obtained=False), indigenous_knowledge=IndigenousKnowledgeCheck(relevant=False, obtained=False), export_controls=ExportControls(relevant=False, obtained=False)), dataset_info=CollectionFormatDatasetInfo(name='test dataset 1', description='test dataset 1', access_info=AccessInfo(reposited=True, uri=None, description=None), publisher_id='10378.1/1943278', created_date=CreatedDate(relevant=True, value=datetime.date(2024, 8, 8)), published

In [7]:
# Getting all datasets in datastore with specified sort criteria.
all_datasets = await client.datastore.list_all_datasets(sort_criteria=sort_criteria)
print(f"Total datasets fetched: {len(all_datasets)}")

Total datasets fetched: 251


## Querying Provenance API.

#### We will now take a look at exploring some of the common operations of the PROV-API with existing and valid entities. 

Exploring Lineage

In [8]:
# Upstream

print("Exploring upstream query")

upstream_result = await client.prov_api.explore_upstream(starting_id="10378.1/1904964")
pprint(upstream_result)
print()
pprint(upstream_result.graph.get('nodes'))

print()

print("Exploring downstream query")

downstream_result = await client.prov_api.explore_downstream(starting_id="10378.1/1904961")
pprint(downstream_result)
print()
pprint(downstream_result.graph.get('nodes'))


Exploring upstream query
LineageResponse(status=Status(success=True, details='Made lineage query (with depth 3) to neo4j backend.'), record_count=5, graph={'directed': True, 'multigraph': False, 'graph': {}, 'nodes': [{'item_category': 'ENTITY', 'item_subtype': 'DATASET', 'id': '10378.1/1904964'}, {'item_category': 'ACTIVITY', 'item_subtype': 'CREATE', 'id': '10378.1/1904975'}, {'item_category': 'AGENT', 'item_subtype': 'PERSON', 'id': '10378.1/1893843'}, {'item_category': 'ENTITY', 'item_subtype': 'DATASET_TEMPLATE', 'id': '10378.1/1905250'}, {'item_category': 'ACTIVITY', 'item_subtype': 'CREATE', 'id': '10378.1/1905252'}], 'links': [{'type': 'wasGeneratedBy', 'source': '10378.1/1904964', 'target': '10378.1/1904975'}, {'type': 'wasAttributedTo', 'source': '10378.1/1904964', 'target': '10378.1/1893843'}, {'type': 'wasInfluencedBy', 'source': '10378.1/1904964', 'target': '10378.1/1905250'}, {'type': 'wasAssociatedWith', 'source': '10378.1/1904975', 'target': '10378.1/1893843'}, {'type':

In [9]:
# Contributing and Effected Datasets

contributing_datasets = await client.prov_api.get_contributing_datasets(starting_id = "10378.1/1904964")
print("Contributing datasets " + "\n", contributing_datasets)

effected_datasets = await client.prov_api.get_effected_datasets(starting_id = "10378.1/1904964")
print("Effected datasets " + "\n", effected_datasets)

Contributing datasets 
 status=Status(success=True, details='Made upstream contribution query (with depth 3) to neo4j backend.') record_count=0 graph={'directed': True, 'multigraph': False, 'graph': {}, 'nodes': [], 'links': []}
Effected datasets 
 status=Status(success=True, details='Made downstream effect query (with depth 3) to neo4j backend.') record_count=29 graph={'directed': True, 'multigraph': False, 'graph': {}, 'nodes': [{'item_category': 'ENTITY', 'item_subtype': 'DATASET', 'id': '10378.1/1904961'}, {'item_category': 'ACTIVITY', 'item_subtype': 'MODEL_RUN', 'id': '10378.1/1926259'}, {'item_category': 'ENTITY', 'item_subtype': 'DATASET', 'id': '10378.1/1904964'}, {'item_category': 'ACTIVITY', 'item_subtype': 'MODEL_RUN', 'id': '10378.1/1926270'}, {'item_category': 'ACTIVITY', 'item_subtype': 'MODEL_RUN', 'id': '10378.1/1926271'}, {'item_category': 'ACTIVITY', 'item_subtype': 'MODEL_RUN', 'id': '10378.1/1935470'}, {'item_category': 'ACTIVITY', 'item_subtype': 'MODEL_RUN', 'id'

Lodging Model Runs && Querying with Job-API

In [10]:
from ProvenaInterfaces.ProvenanceAPI import ModelRunRecord, TemplatedDataset, DatasetType, AssociationInfo
from ProvenaInterfaces.AsyncJobAPI import JobStatus

# Building the Model Run Payload.
model_run_payload = ModelRunRecord(
    workflow_template_id=config.workflow_configuration.workflow_template,
    model_version = None, 
    inputs = [
        TemplatedDataset(
            dataset_template_id=config.inputs.input_dataset_template, 
            dataset_id=config.inputs.input_dataset,
            dataset_type=DatasetType.DATA_STORE
        )
    ], 
    outputs=[
        TemplatedDataset(
            dataset_template_id=config.outputs.output_dataset_template, 
            dataset_id=config.outputs.output_dataset,
            dataset_type=DatasetType.DATA_STORE
        )
    ], 
    annotations=None,
    display_name="Notebook Model Run Testing",
    description="Standard Provena Model Run Example",
    study_id=None,
    associations=AssociationInfo(
        modeller_id=config.associations.person,
        requesting_organisation_id=config.associations.organisation
    ),
    start_time=0,
    end_time=1

)


In [11]:
# Registering Model Run
model_run_register_result = await client.prov_api.register_model_run(model_run_payload=model_run_payload)


In [12]:
# Check the response of the model run registration
print("Status of registration", model_run_register_result.status)
print("Job Session ID", model_run_register_result.session_id)


# Check the job to see if it's complete. We will do this by polling the job_api
job_result = await client.job_api.await_successful_job_completion(session_id=model_run_register_result.session_id)

while job_result.status != JobStatus.SUCCEEDED: # Keep polling on this cell till this turns to "SUCCEEDED"
    
    job_result = await client.job_api.await_successful_job_completion(session_id=model_run_register_result.session_id)
    pprint(job_result.result)
    pprint(job_result.job_type)


print()
print("Current job status:", job_result.status) 

Status of registration success=True details='Job dispatched, monitor session ID using the job API to see progress.'
Job Session ID b389dfbe-a2b8-40aa-ab3c-e490fb4d5580
Starting wait_for_entry_in_queue polling stage.
Polling Job API. Wait time: 0sec out of 20sec.
Running wait_for_entry_in_queue callback. Session ID: b389dfbe-a2b8-40aa-ab3c-e490fb4d5580.
Callback registered incomplete. Waiting for polling interval.
Polling Job API. Wait time: 2sec out of 20sec.
Running wait_for_entry_in_queue callback. Session ID: b389dfbe-a2b8-40aa-ab3c-e490fb4d5580.
200OK response for user fetch of b389dfbe-a2b8-40aa-ab3c-e490fb4d5580.
Finished wait_for_entry_in_queue polling stage.
Starting wait_for_in_progress polling stage.
Polling Job API. Wait time: 0sec out of 120sec.
Running wait for in progress callback. Session ID: b389dfbe-a2b8-40aa-ab3c-e490fb4d5580.
200OK response for user fetch of b389dfbe-a2b8-40aa-ab3c-e490fb4d5580 in state PENDING.
Callback registered incomplete. Waiting for polling int

Generating Report Functionality - Allows You To Generate Report (Word Document, .docx) For Study Close Out Reports from a Model Run Or Study. 

There are two potential approaches, you can take to generate the report: 

 - 1- Default Path: If you don't provide a custom ```file_path``` parameter, it will store the generated word file in your relative directory (This is the directory where you are running the code from). 

 - 2- Custom Path: If you provide a custom ```file_path``` parameter, even if the file path/directory does not exist, it will be automatically made and your file will be saved inside that directory. 

   If you provide a file path, and the file path already exists your will be saved inside that existing directory as well. 

In [None]:
from ProvenaInterfaces.ProvenanceAPI import GenerateReportRequest
from ProvenaInterfaces.RegistryModels import ItemSubType


# Generate's report document in your relative directory.
await client.prov_api.generate_report(report_request = GenerateReportRequest(
        id = "10378.1/1968661", 
        item_subtype=ItemSubType.STUDY,
        depth=1
    ))

# Generate's report document in a specified directory
await client.prov_api.generate_report(report_request = GenerateReportRequest(
        id = "10378.1/1968661", 
        item_subtype=ItemSubType.STUDY,
        depth=1
    ), file_path="./idontexistpath/butinhere/")

## Querying Registry API.

We will take a look at creating various entities with different subtypes (org, model) and then fetching those newly created entities through the client library. 

In [13]:
# Organisation 
org_domain_info = OrganisationDomainInfo(
    display_name="Test org",
    name="Test org",
    ror="http://example.org/test-org", #type:ignore
    user_metadata={
        "my custom": "annotation",
        "another custom": "annotation"
    }
)
created_organisation = await client.registry.organisation.create_item(create_item_request=org_domain_info)
print("Created Organisation", created_organisation)

# Model 
model_domain_info = ModelDomainInfo(
    display_name="Example model",
    name="Example model",
    description="This is a fake model",
    documentation_url="https://example_model.org", #type:ignore
    source_url="https://example_model.org", #type:ignore
    user_metadata={
        "my custom": "annotation",
        "another custom": "annotation"
    }
)
created_model = await client.registry.model.create_item(create_item_request=model_domain_info)
print("Created Model", created_model)


# Fetching items...
fetched_org = await client.registry.organisation.fetch(id = created_organisation.created_item.id)
print("Fetched Organisation", fetched_org)

fetched_model = await client.registry.model.fetch(id = created_model.created_item.id)
print("Fetched model", fetched_model)

Created Organisation status=Status(success=True, details='Successfully uploaded the complete item. Return item includes handle id.') created_item=ItemOrganisation(display_name='Test org', user_metadata={'my custom': 'annotation', 'another custom': 'annotation'}, name='Test org', ror=AnyHttpUrl('http://example.org/test-org', ), history=[HistoryEntry[OrganisationDomainInfo](id=0, timestamp=1723697688, reason='Initial record creation', username='ross', item=OrganisationDomainInfo(display_name='Test org', name='Test org', ror=AnyHttpUrl('http://example.org/test-org', ), user_metadata={'my custom': 'annotation', 'another custom': 'annotation'}))], id='10378.1/1948406', owner_username='ross', created_timestamp=1723697688, updated_timestamp=1723697688, item_category=<ItemCategory.AGENT: 'AGENT'>, item_subtype=<ItemSubType.ORGANISATION: 'ORGANISATION'>, record_type=<RecordType.COMPLETE_ITEM: 'COMPLETE_ITEM'>, workflow_links=None, versioning_info=None) register_create_activity_session_id=None
C

We will take a look at listing all items present in registry based on their subtypes (Organisation, Model) for this example. 

In [14]:
from ProvenaInterfaces.RegistryAPI import GeneralListRequest

general_list_request = GeneralListRequest(
    filter_by=None,
    sort_by=None,
    pagination_key=None
)

list_org = await client.registry.organisation.list_items(list_items_payload=general_list_request)
print(f"Found {list_org.total_item_count} organisations")


Found 20 organisations


In [15]:
list_models = await client.registry.model.list_items(list_items_payload=general_list_request)

print(f"Found {list_models.total_item_count} models")


Found 19 models


General Registry Actions (Fetching without subtype, Listing All Registry Items and Count of all items in registry (client library special))

In [16]:
# Fetching without subtype. 
fetch_result = await client.registry.general_fetch_item(id = "10378.1/1876000")
print(f"Fetched item named: '{fetch_result.item['display_name']}' and id: '{fetch_result.item['id']}'")


Fetched item named: 'CoralReefSim Input Dataset' and id: '10378.1/1876000'


In [17]:
# Listing all registry items. 
all_general_registry_items = await client.registry.list_general_registry_items(general_list_request=general_list_request)
print(f"Total items fetched: {all_general_registry_items.total_item_count}")


Total items fetched: 20


In [18]:
# Count of all items based on subtypes. 
count_of_all_items = await client.registry.list_registry_items_with_count()
print(count_of_all_items)

{'MODEL': 43, 'ORGANISATION': 65, 'MODEL_RUN': 149, 'CREATE': 367, 'DATASET': 251, 'MODEL_RUN_WORKFLOW_TEMPLATE': 11, 'STUDY': 15, 'DATASET_TEMPLATE': 25, 'VERSION': 23, 'PERSON': 47}
