DerivaML is a class library built on the Deriva Scientific Asset management system that is designed to help simplify a number of the basic operations associated with building and testing ML libraries based on common toolkits such as TensorFlow.  This notebook reviews the basic features of the DerivaML library.

In [None]:
from bdbag.bdbag_api import materialize
%load_ext autoreload
%autoreload 2

In [1]:
import builtins
from deriva.core.utils.globus_auth_utils import GlobusNativeLogin
from deriva_ml import DatasetBag, ExecutionConfiguration, Workflow, MLVocab, DerivaSystemColumns
from deriva_ml.demo_catalog import create_demo_catalog, DemoML
from IPython.display import display, Markdown, JSON
import pandas as pd

PydanticSchemaGenerationError: Unable to generate pydantic-core schema for <class 'deriva.core.ermrest_model.Table'>. Set `arbitrary_types_allowed=True` in the model_config to ignore this error or implement `__get_pydantic_core_schema__` on your type to fully support it.

If you got this error by calling handler(<some type>) within `__get_pydantic_core_schema__` then you likely need to call `handler.generate_schema(<some type>)` since we do not call `__get_pydantic_core_schema__` on `<some type>` otherwise to avoid infinite recursion.

For further information visit https://errors.pydantic.dev/2.10/u/schema-for-unknown-type

Set the details for the catalog we want and authenticate to the server if needed.

In [None]:
hostname = 'dev.eye-ai.org'
domain_schema = 'demo-schema'

gnl = GlobusNativeLogin(host=hostname)
if gnl.is_logged_in([hostname]):
    print("You are already logged in.")
else:
    gnl.login([hostname], no_local_server=True, no_browser=True, refresh_tokens=True, update_bdbag_keychain=True)
    print("Login Successful")


Create a test catalog and get an instance of the DerivaML class.  Use options so that we create some initial datasets and features.  Use the exploration API to find out what features and datasets we have.

In [None]:
test_catalog = create_demo_catalog(hostname, domain_schema, create_features=True, create_datasets=True)
ml_instance = DemoML(hostname, test_catalog.catalog_id)
print(f'Creating catalog at {ml_instance.catalog_id}')

In [None]:
display(
    Markdown('## Datasets'),
    pd.DataFrame(ml_instance.find_datasets()).drop(columns=DerivaSystemColumns),

    Markdown('## Features'),
    [f'{f.target_table.name}:{f.feature_name}' for f in ml_instance.find_features("Subject")],
    [f'{f.target_table.name}:{f.feature_name}' for f in ml_instance.find_features("Image")]
)

In [None]:
datasets = pd.DataFrame(ml_instance.find_datasets()).drop(columns=DerivaSystemColumns)
training_dataset_rid = [ds['RID'] for ds in ml_instance.find_datasets() if 'Training' in ds['Dataset_Type']][0]
testing_dataset_rid = [ds['RID'] for ds in ml_instance.find_datasets() if 'Testing' in ds['Dataset_Type']][0]


display(
    Markdown(f'Training Dataset: {training_dataset_rid}'),
    Markdown('## Datasets'),
    datasets)

A feature is a set of values that are attached to a table in the DerivaML catalog. Instances of features are distingushed from one another by the ID of the execution that produced the feature value. The execution could be the result of a program, or it could be a manual process by which a person defines a set of values

To create a new feature, we need to know the name of the feature, the table to which it is attached, and the set of values that make up the feature.  The values could be terms from a controlled vocabulary, a set of one or more file based assets, or other values, such as integers, or strings. However, use of strings outside of controlled vocabularies is discouraged.

In [None]:
ml_instance.add_term(MLVocab.workflow_type, "Manual Workflow", description="Inital setup of Model File")
ml_instance.add_term(MLVocab.execution_asset_type, "API_Model", description="Model for our API workflow")

api_workflow = Workflow(
    name="Manual Workflow",
    url='https://github.com/informatics-isi-edu/deriva-ml/blob/main/Notebooks/DerivaML%20Execution.ipynb',
    workflow_type="Manual Workflow",
    description="A manual operation"
)

manual_execution = ml_instance.create_execution(ExecutionConfiguration( description="Sample Execution", workflow=api_workflow))

# Now lets create model configuration for our program.
model_file = manual_execution.execution_asset_path('API_Model') / 'modelfile.txt'
with builtins.open(model_file, "w") as fp:
    fp.write(f"My model")

# Now upload the file and retrieve the RID of the new asset from the returned results.
uploaded_assets = manual_execution.upload_execution_outputs()
training_model_rid = uploaded_assets['API_Model/modelfile.txt'].result['RID']
display(
    Markdown(f'## Training Model: {training_model_rid}'),
    JSON(ml_instance.retrieve_rid(training_model_rid))
)

### Setup for a ML run


In [None]:
ml_instance.add_term(MLVocab.workflow_type, "ML Demo", description="A ML Workflow that uses Deriva ML API")

api_workflow = Workflow(
    name="ML Demo",
    url="https://github.com/informatics-isi-edu/deriva-ml/blob/main/pyproject.toml",
    workflow_type="ML Demo",
    description="A workflow that uses Deriva ML"
)

config = ExecutionConfiguration(
    datasets=[training_dataset_rid, {'rid':testing_dataset_rid, 'materialize':False}],
    assets = [training_model_rid],
    description="Sample Execution",
    workflow=api_workflow
)

ml_execution = ml_instance.create_execution(config)

In [None]:
with ml_execution.execute() as deriva_exec:
    # Get the input datasets:
    training_dataset = DatasetBag(ml_execution.dataset_paths[0])  # Input dataset
    image_rids = training_dataset.get_table_as_dataframe('Image')['RID']

    # Get input files
    with open(ml_execution.asset_paths[0], 'rt') as model_file:
        training_model = model_file.read()
        print(f'Got model file: {training_model}')

    # Put your ML code here....
    pass

    # Write a new model
    model_file = manual_execution.execution_asset_path('API_Model') / 'modelfile.txt'
    with open(model_file, 'w') as f:
        f.write("Hello there a new model;\n")

    # Create some new feature values.
    bb_csv_path, bb_asset_paths = ml_execution.feature_paths('Image', 'BoundingBox')
    bounding_box_files = [bb_asset_paths['BoundingBox'] / f"box{i}.txt" for i in range(10)]
    for i in range(10):
        bounding_box_files.append(fn := bb_asset_paths['BoundingBox'] / f"box{i}.txt")
        with builtins.open(fn, "w") as fp:
            fp.write(f"Hi there {i}")

    ImageBoundingboxFeature = ml_instance.feature_record_class("Image", "BoundingBox")
    image_bounding_box_feature_list = [ImageBoundingboxFeature(Image=image_rid,
                                                               Execution=ml_execution.execution_rid,
                                                               BoundingBox=asset_rid)
                                       for image_rid, asset_rid in zip(image_rids, itertools.cycle(bounding_box_files))]

    ml_execution.write_feature_file(image_bounding_box_feature_list)

upload_status = ml_execution.upload_execution_outputs()

Now lets check the assets produced by this execution to make sure that they are what we expect.

In [None]:
# Get datapath to the ML schema.
schema_path = ml_instance.pathBuilder.schemas[ml_instance.ml_schema]

# Now get path to the execution table, and get our execution record.  We filter on the RID for the
# execution we are looking for.
executions = schema_path.Execution.filter(schema_path.Execution.RID == ml_execution.execution_rid)
execution_info = list(executions.entities().fetch())[0]

# To get the assets for the execution, we need to go through the linking table to the assets.
asset_path = executions.link(schema_path.Execution_Asset_Execution).link(schema_path.Execution_Asset)
pd.DataFrame(asset_path.entities().fetch()).drop(columns=DerivaSystemColumns + ['MD5'])

# Now lets display our results.
display(
    Markdown(f'### Execution: {ml_execution.execution_rid}'),
    JSON(execution_info),
    Markdown(f'### Execution Assets'),
    pd.DataFrame(asset_path.entities().fetch()).drop(columns=DerivaSystemColumns + ['MD5']),
)

In [None]:
test_catalog.delete_ermrest_catalog(really=True)