# ML Metadata
## Managing records in the production which can be later used to re-work , re-train , re-collect the dataset , and maintain the model according to the changes in the world. Storing MetaData in the MetaData Store in the machine learning pipeline . It will be created along with the creation of pipelines.

In [1]:
!pip install tfx



In [2]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import tensorflow as tf
import requests
import urllib
import zipfile
import tensorflow_data_validation as tfdv
from ml_metadata.metadata_store import metadata_store
from ml_metadata.proto import metadata_store_pb2

In [3]:
# Download the zip file from GCP and unzip it
url = 'https://storage.googleapis.com/artifacts.tfx-oss-public.appspot.com/datasets/chicago_data.zip'
zip, headers = urllib.request.urlretrieve(url)
zipfile.ZipFile(zip).extractall()
zipfile.ZipFile(zip).close()

print("Here's what we downloaded:")
!ls -R data

Here's what we downloaded:
data:
eval  serving  train

data/eval:
data.csv

data/serving:
data.csv

data/train:
data.csv


In [4]:
print(f"TF version : {tf.__version__}")
print(f"tensorflow validation version : {tfdv.version.__version__}")

TF version : 2.13.1
tensorflow validation version : 1.14.0


# Starting from scratch will be defining each component of the data model.The outline will be :
1. Defining the ML Metadata's store database.
2. Setting up the necessary artifacts types.
3. Setting up the execution types.
4. Generating an input artifact unit.
5. Generating an execution unit.
6. Registering an input event.
7. Running the TFDV component
8. Generating an output aritfact unit
9. Updating the execution unit.
10. updating the execution unit
11. setting up and generating a context unit.
12. Generating attributions and associations.

## Define ML Metastore's storage Database
- instantiate the storage in the backend.
- there are serveral types supported such as fake database,SQLite,MySQL and even cloud-based storage.
- Here we are using a fake database.

In [5]:
# Instantiate a connection config
connection_config = metadata_store_pb2.ConnectionConfig()

# Set an empty fake database proto
connection_config.fake_database.SetInParent()

# setup the metadata store
store = metadata_store.MetadataStore(connection_config)

## Register ArtifactTypes
- will have to create the artifact type needed and register them to the store.
- Generating a schema usin TFDV will only create two artifact types : - input dataset - output schema.

In [6]:
# Create Artifact for the input dataset

# Creates a new instance for the artifactype
data_artifact_type = metadata_store_pb2.ArtifactType()

# Sets the name for the data_artifact_type
data_artifact_type.name = "DataSet"

# The properties are the additional attributes associated with the artifactype.
# This below line refers that the dataset can have the property named "name" and the value of the property is expected to be "string"
data_artifact_type.properties['name'] = metadata_store_pb2.STRING
# This below line refers that the dataset can have the property named "split" and the value of the property is expected to be "String"
data_artifact_type.properties['split'] = metadata_store_pb2.STRING
# This below line refer that the dataset can have the property named "version" and the value of the property is expected to be "INT"
data_artifact_type.properties['version'] = metadata_store_pb2.INT

# Storing the artifact type,name and its property to the metastore.
# We are registering the artifact to the metadata store
data_artifact_type_id = store.put_artifact_type(data_artifact_type)

# Create ArtifactType for schema : used to define the artifact within the metadata store.
schema_artifact_type = metadata_store_pb2.ArtifactType()
schema_artifact_type.name = 'Schema'
schema_artifact_type.properties['name'] = metadata_store_pb2.STRING
schema_artifact_type.properties['version'] = metadata_store_pb2.INT

# Register the artifact type to the metadata store
schema_artifact_type_id = store.put_artifact_type(schema_artifact_type)

print(f"Data Artifact type : {data_artifact_type}")
print(f"schema_Artifact type : {schema_artifact_type}")
print(f"Data artifact type id : {data_artifact_type_id}")
print(f"Schema artifact type id : {schema_artifact_type_id}")

Data Artifact type : name: "DataSet"
properties {
  key: "name"
  value: STRING
}
properties {
  key: "split"
  value: STRING
}
properties {
  key: "version"
  value: INT
}

schema_Artifact type : name: "Schema"
properties {
  key: "name"
  value: STRING
}
properties {
  key: "version"
  value: INT
}

Data artifact type id : 10
Schema artifact type id : 11


## Register Execution Type
- create the execution types needed for the machine learning pipelines.
- declare one component with data validation property so that you can record if the process is running or already completed.

In [7]:
# Create ExecutionType for data validation component
dv_execution_type = metadata_store_pb2.ExecutionType()
dv_execution_type.name = 'Data Validation'
dv_execution_type.properties['state'] = metadata_store_pb2.STRING

# Register execution type to the metadata store
dv_execution_type_id = store.put_execution_type(dv_execution_type)

print(f"data validation : {dv_execution_type}")
print(f"data validation id : {dv_execution_type_id}")

data validation : name: "Data Validation"
properties {
  key: "state"
  value: STRING
}

data validation id : 12


## Generate input artifact unit
- with the artifact created,now create instance of those types.
- this artifacts is recorded in the metadata store.
- again it generated the id that can be used for reference.

In [8]:
# Declare the input artifact of type dataset
data_artifact = metadata_store_pb2.Artifact()
data_artifact.uri = './data/train/data.csv'
data_artifact.type_id = data_artifact_type_id
data_artifact.properties['name'].string_value = 'Chicago Taxi Dataset'
data_artifact.properties['split'].string_value = 'Train'
data_artifact.properties['version'].int_value = 1

# Submit the input artifact to the metadata store
data_artifact_id = store.put_artifacts([data_artifact])[0]

print(f"data artifact : {data_artifact}")
print(f"data artifact id : {data_artifact_id}")

data artifact : type_id: 10
uri: "./data/train/data.csv"
properties {
  key: "name"
  value {
    string_value: "Chicago Taxi Dataset"
  }
}
properties {
  key: "split"
  value {
    string_value: "Train"
  }
}
properties {
  key: "version"
  value {
    int_value: 1
  }
}

data artifact id : 1


# Generate execution unit
- will create the instance for the data validation execution type which have been registered earlier.
- set the state to RUNNING to signify that you are about to run the TFDV function.

In [27]:
# Register the execution of a data validation run
dv_execution = metadata_store_pb2.Execution()
dv_execution.type_id = dv_execution_type_id
dv_execution.properties['state'].string_value = "RUNNING"

# Submit the execution unit to the metadata store

dv_execution_id = store.put_executions([dv_execution])[0]

print(f"data validation Execution : {dv_execution}")
print(f"data validation Execution id : {dv_execution_id}")

data validation Execution : type_id: 12
properties {
  key: "state"
  value {
    string_value: "RUNNING"
  }
}

data validation Execution id : 3


# Register Input Event
- event define the relationships between the artifact and the executions.
- Generate the input event relationship for dataset artifact and data validation execution units.

In [10]:
# Declare the input event
input_event = metadata_store_pb2.Event()
input_event.artifact_id = data_artifact_id
input_event.execution_id = dv_execution_id
input_event.type = metadata_store_pb2.Event.DECLARED_INPUT

# Submit the event to the metadata store
store.put_events([input_event])

print(f"Input Event : {input_event}")

Input Event : artifact_id: 1
execution_id: 1
type: DECLARED_INPUT



## Run the TFDV Component
- Run the tfdv component to generate the schema of dataset.

In [11]:
train_data = './data/train/data.csv'
train_stats = tfdv.generate_statistics_from_csv(train_data)
schema = tfdv.infer_schema(statistics = train_stats)

schema_file = './schema.pbtxt'
tfdv.write_schema_text(schema,schema_file)

print(f"Dataset's Schema has been generated at : {schema_file}")



Instructions for updating:
Use eager execution and: 
`tf.data.TFRecordDataset(path)`


Dataset's Schema has been generated at : ./schema.pbtxt


## Generate output artifact unit
- That the TFDV component has finished running and schema has been generated, you can create the artifact for the generated schema

In [12]:
# Declare the output artifact of type schema_artifact

schema_artifact = metadata_store_pb2.Artifact()
schema_artifact.uri = schema_file
schema_artifact.type_id = schema_artifact_type_id
schema_artifact.properties['version'].int_value = 1
schema_artifact.properties['name'].string_value = "Chicago taxi schema"

# Submit to the metadata store
schema_artifact_id = store.put_artifacts([schema_artifact])[0]

# Print the artifact schema id and display it
print(f"schema artifact : {schema_artifact}")
print(f"schema artifact id : {schema_artifact_id}")

schema artifact : type_id: 11
uri: "./schema.pbtxt"
properties {
  key: "name"
  value {
    string_value: "Chicago taxi schema"
  }
}
properties {
  key: "version"
  value {
    int_value: 1
  }
}

schema artifact id : 2


## Register output event
- want to define an output event to record the output artifact of a particular execution event in the machine learning pipeline

In [13]:
# Declare the output event
output_event = metadata_store_pb2.Event()
output_event.artifact_id = schema_artifact_id
output_event.execution_id = dv_execution_id
output_event.type = metadata_store_pb2.Event.DECLARED_OUTPUT

# Submit to the metadata store
store.put_events([output_event])

print(f"Output event : {output_event}")

Output event : artifact_id: 2
execution_id: 1
type: DECLARED_OUTPUT



## Update the execution unit
- As the TFDV component has finished running successfully,you need to update the state of the executioni unit and record it again to the store.

In [14]:
# make the 'state' as 'COMPELETED'
dv_execution_id = dv_execution_id
dv_execution.properties['state'].string_value = 'COMPLETED'

# Update the execution unit in the metadata store
store.put_executions([dv_execution])

print(f"Data Validation execution : {dv_execution}")

Data Validation execution : type_id: 12
properties {
  key: "state"
  value {
    string_value: "COMPLETED"
  }
}



## Setting up context types and Generating a context unit
- can group the artifacts and execution units into a context form .
- first need to define the contextType.
- Then we have to register.

In [15]:
# create a context type
expt_context_type = metadata_store_pb2.ContextType()
expt_context_type.name = "Experiment"
expt_context_type.properties['note'] = metadata_store_pb2.STRING

# Register context type to the metadata store
expt_context_type_id = store.put_context_type(expt_context_type)

In [16]:
# Generate the context
expt_context = metadata_store_pb2.Context()
expt_context.type_id = expt_context_type_id

# Give the experiment a name
expt_context.name = "Demo"
expt_context.properties['note'].string_value = "Walkthrough of metadata"

# Submit the context to the metadata store
expt_context_id = store.put_contexts([expt_context])[0]

# Print the context and experimented context
print(f"experiment context type : {expt_context_type}")
print(f"experiment context type id : {expt_context_type_id} ")

print(f'experiment context : {expt_context}')
print(f"experiment context id : {expt_context_id}")

experiment context type : name: "Experiment"
properties {
  key: "note"
  value: STRING
}

experiment context type id : 13 
experiment context : type_id: 13
name: "Demo"
properties {
  key: "note"
  value {
    string_value: "Walkthrough of metadata"
  }
}

experiment context id : 1


## Generate attribution and association relationships
- To create the relationship between schema artifact unit and experiment context unit to form an attribution.
- similarly , create the relationship between data validation execution unit and experiment unit to form an Association.
- These are to be registered.

In [17]:
from pandas.core.internals.blocks import extend_blocks
# Generate the attribution
expt_attribution = metadata_store_pb2.Attribution()
expt_attribution.artifact_id = schema_artifact_id
expt_attribution.context_id = expt_context_id

# Generate the association
expt_association = metadata_store_pb2.Association()
expt_association.execution_id = dv_execution_id
expt_association.context_id = expt_context_id


# Submit attribution and association to the metadata store
store.put_attributions_and_associations([expt_attribution],[expt_association])

# Print the association and attribution
print(f"Experiment Attribution : {expt_attribution}")
print(f"Experiment Association : {expt_association}")

Experiment Attribution : artifact_id: 2
context_id: 1

Experiment Association : execution_id: 1
context_id: 1



## Retrive the information from the metadata store
- now we have recorded the needed information to the metadata store.
- can track which artifacts and events are related to each other even without seeing the code used to generate it.

In [18]:
# Get the artifact types
store.get_artifact_types()

[id: 10
 name: "DataSet"
 properties {
   key: "name"
   value: STRING
 }
 properties {
   key: "split"
   value: STRING
 }
 properties {
   key: "version"
   value: INT
 },
 id: 11
 name: "Schema"
 properties {
   key: "name"
   value: STRING
 }
 properties {
   key: "version"
   value: INT
 }]

In [19]:
# Get the 1st element in the list of schmea artifacts
# Will investigate which dataset was used to generate it

schema_to_inv = store.get_artifacts_by_type('Schema')[0]
# print output
print(schema_to_inv)

id: 2
type_id: 11
uri: "./schema.pbtxt"
properties {
  key: "name"
  value {
    string_value: "Chicago taxi schema"
  }
}
properties {
  key: "version"
  value {
    int_value: 1
  }
}
type: "Schema"
create_time_since_epoch: 1696623512974
last_update_time_since_epoch: 1696623512974



In [20]:
# Get events related to the schema id
schema_events = store.get_events_by_artifact_ids([schema_to_inv.id])

print(schema_events)

[artifact_id: 2
execution_id: 1
type: DECLARED_OUTPUT
milliseconds_since_epoch: 1696623512993
]


In [22]:
print(dir(schema_events[0]))

['ByteSize', 'Clear', 'ClearExtension', 'ClearField', 'CopyFrom', 'DECLARED_INPUT', 'DECLARED_OUTPUT', 'DESCRIPTOR', 'DiscardUnknownFields', 'Extensions', 'FindInitializationErrors', 'FromString', 'HasExtension', 'HasField', 'INPUT', 'INTERNAL_INPUT', 'INTERNAL_OUTPUT', 'IsInitialized', 'ListFields', 'MergeFrom', 'MergeFromString', 'OUTPUT', 'PENDING_OUTPUT', 'ParseFromString', 'Path', 'RegisterExtension', 'SerializePartialToString', 'SerializeToString', 'SetInParent', 'Type', 'UNKNOWN', 'UnknownFields', 'WhichOneof', '_CheckCalledFromGeneratedFile', '_SetListener', '__class__', '__deepcopy__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setstate__', '__sizeof__', '__slots__', '__str__', '__subclasshook__', '__unicode__', '_extensions_by_name', '_extension

In [29]:
# Get events related to output above
execution_events = store.get_events_by_execution_ids([schema_events[0].execution_id])
print(execution_events)

[artifact_id: 1
execution_id: 1
type: DECLARED_INPUT
milliseconds_since_epoch: 1696623508360
, artifact_id: 2
execution_id: 1
type: DECLARED_OUTPUT
milliseconds_since_epoch: 1696623512993
]


In [30]:
# Look up the artifact that is a declared input
artifact_input = execution_events[0]

store.get_artifacts_by_id([artifact_input.artifact_id])

[id: 1
 type_id: 10
 uri: "./data/train/data.csv"
 properties {
   key: "name"
   value {
     string_value: "Chicago Taxi Dataset"
   }
 }
 properties {
   key: "split"
   value {
     string_value: "Train"
   }
 }
 properties {
   key: "version"
   value {
     int_value: 1
   }
 }
 type: "DataSet"
 create_time_since_epoch: 1696623508299
 last_update_time_since_epoch: 1696623508299]