# Metadata Manager

## Config

In [4]:
metadata_folder = '../metadata'
metadata_repo_location = '../metadata/DataPlatform_Metadata/metadata'
metadata_stash_folder = '../metadata/stash'
load_workspace_from = 'store' # 'stash' or 'store'

## Load and validate

In [6]:
# Setup and check
from rich import print, inspect

from metadata_lib.schemas import Namespace, Schema, System, Data_entity, Pipeline
from metadata_lib.manage import (
    MetadataLocation, MetadataStructure, MetadataManager
)
from metadata_lib.build import flatten_instance
import pendulum
from metadata_lib.validate import get_next_free_id
from metadata_lib.storage_adapters import LocalFilesystem

from deps.mm_setup import setup_manager

import pandas as pd
# Pandas settings
# Do not truncate cell contents
pd.set_option('display.max_colwidth', None)
# Do not truncate nr of columns shown
pd.set_option('display.max_columns', None)
# Do not truncate nr of rows
pd.set_option('display.max_rows', None)

# Check data structures
valid = setup_manager()

# init
if valid:
    print()
    print()
    print('[bold bright_green]** Datastructures are valid. Loading Metadata Manager **[/bold bright_green]')
    store = MetadataLocation(
        storage_type='localstorage',
        Storage_Adapter=LocalFilesystem,
        address=metadata_repo_location,
        entities_to_load=None,
        is_git_repo=False
    )
    stash = MetadataLocation(
        storage_type='localstorage',
        Storage_Adapter=LocalFilesystem,
        address=metadata_stash_folder,
        entities_to_load=None,
        is_git_repo=False
    )
    mm = MetadataManager(
        store=store,
        stash=stash,
        load_current=True,
        load_workspace=load_workspace_from
    )
else:
    print()
    print()
    print('[bold bright_red]** Invalid datastructures. Metadata Manager can not be loaded **[/bold bright_red]')

***

## ** Namespaces **

### View

In [3]:
mm.workspace.view['namespaces']

Unnamed: 0,id,unid,name,description,created,modified
0,1,dedimo,dedimo,"Dedimo, the parent company of our clinics. Also used as an overarching generic namespace.",2022-09-10 06:56:07+00:00,2022-09-10 06:56:07+00:00
1,2,adhdcentraal,adhdcentraal,ADHDcentraal is a clinic for diagnosis and treatment of adult ADHD.,2022-09-10 06:56:07+00:00,2022-09-10 06:56:07+00:00
2,3,sobercare,sobercare,SoberCare is a clinic for treatment of addiction.,2022-09-10 06:56:07+00:00,2022-09-10 06:56:07+00:00
3,4,dataplatform,dataplatform,dataplatform is a namespace for platform internal services and resources,2022-09-24 07:20:44+00:00,2022-09-24 07:20:44+00:00


***

### Add New

#### - Create new namespace

In [22]:
new_namespace = Namespace(
    id=get_next_free_id(mm.workspace.by_id, 'namespaces'),
    unid=None,
    name='dedimo',
    description='WPEX',
    created=pendulum.now(tz='Europe/Amsterdam'),
    modified=pendulum.now(tz='Europe/Amsterdam')
)
print(new_namespace)

#### - Validate new namespace and add it to workspace

In [None]:
mm.add_new_entity(new_namespace)

#### - Save metadata from workspace to disk

In [None]:
mm.stash_workspace()
# mm.store_workspace()

***

### Edit existing

#### - Get the namespace you want to edit

In [None]:
namespace_UNID = 'UNID'

edited_namespace = mm.get_entity_by_unid(
    entity_type='namespaces',
    unid=namespace_UNID
)
print(edited_namespace)

#### - Edit the namespace

In [None]:
edited_namespace.name = 'NEW NAME'
edited_namespace.description = 'NEW DESCRIPTION'

print(edited_namespace)

#### - Validate the namespace and add it to workspace structure

In [None]:
mm.update_entity(edited_namespace)

#### - Save metadata from workspace to disk

In [None]:
mm.stash_workspace()
# mm.store_workspace()

***

### Delete

#### - Specify UNID and inspect namespace

In [None]:
namespace_UNID_to_delete = 'UNID'

namespace_to_delete = mm.get_entity_by_unid(
    entity_type='namespaces',
    unid=namespace_UNID_to_delete
)
print(namespace_to_delete)

#### - Validate removal of the namespace and remove it from workspace

In [None]:
report = mm.delete_entity(
    entity_type='namespaces',
    unid=namespace_UNID_to_delete
)
print(report)

#### - Save metadata from workspace to disk

In [None]:
mm.stash_workspace()
# mm.store_workspace()

***

## ** Schemas **

### View

In [4]:
mm.workspace.view['schemas']

Unnamed: 0,id,unid,namespace,name,description,type,version,entity_schema,created,modified
0,1,medicore_risks_conditions_datasource.1,adhdcentraal,medicore_risks_conditions_datasource,The risks and conditions table as it is received from Medicore,bigquery,1,"[{'name': 'risico_id', 'mode': 'REQUIRED', 'type': 'INTEGER'}, {'name': 'client_code', 'mode': 'NULLABLE', 'type': 'STRING'}, {'name': 'client_naam', 'mode': 'NULLABLE', 'type': 'STRING'}, {'name': 'omschrijving', 'mode': 'REQUIRED', 'type': 'STRING'}, {'name': 'classificatie', 'mode': 'NULLABLE', 'type': 'STRING'}, {'name': 'startdatum', 'mode': 'NULLABLE', 'type': 'STRING'}, {'name': 'einddatum', 'mode': 'NULLABLE', 'type': 'STRING'}, {'name': 'vastgesteld_door', 'mode': 'NULLABLE', 'type': 'STRING'}]",2022-09-10 06:56:07+00:00,2022-10-21 11:16:49+00:00
1,2,medicore_risks_conditions_core.1,adhdcentraal,medicore_risks_conditions_core,The risks and conditions table in the dataplatform,bigquery,1,"[{'name': 'risico_id', 'mode': 'REQUIRED', 'type': 'INTEGER'}, {'name': 'client_code', 'mode': 'NULLABLE', 'type': 'STRING'}, {'name': 'client_naam', 'mode': 'NULLABLE', 'type': 'STRING'}, {'name': 'omschrijving', 'mode': 'REQUIRED', 'type': 'STRING'}, {'name': 'classificatie', 'mode': 'NULLABLE', 'type': 'STRING'}, {'name': 'startdatum', 'mode': 'NULLABLE', 'type': 'DATE'}, {'name': 'einddatum', 'mode': 'NULLABLE', 'type': 'DATE'}, {'name': 'vastgesteld_door', 'mode': 'NULLABLE', 'type': 'STRING'}]",2022-09-10 06:56:07+00:00,2022-10-21 11:16:49+00:00


***

### Add New

#### - Create new schema

In [None]:
new_schema = Schema(
    id=get_next_free_id(mm.workspace.by_id, 'schemas'),
    unid=None,
    namespace='dedimo',
    name='test',
    description=None,
    type='bigquery',
    version=1,
    entity_schema=[{'mode': 'REQUIRED', 'name': 'nummer', 'type': 'STRING'}],
    created=pendulum.now(tz='Europe/Amsterdam'),
    modified=pendulum.now(tz='Europe/Amsterdam')
)
print(new_schema)

#### - Validate new schema and add it to workspace

In [None]:
mm.add_new_entity(new_schema)

#### - Save metadata from workspace to disk

In [10]:
mm.stash_workspace()
# mm.store_workspace()

***

### Edit existing

#### - Get the schema to edit

In [None]:
schema_UNID = 'UNID'

edited_schema = mm.get_entity_by_unid(
    entity_type='schemas',
    unid=schema_UNID
)
print(edited_schema)

#### - Edit the schema

In [None]:
edited_schema.namespace = 'NEW NAMESPACE UNID'
edited_schema.name = 'NEW NAME'
edited_schema.description = 'NEW DESCRIPTION' # None
edited_schema.type = 'bigquery' # 'avro'
edited_schema.version = edited_schema.version + 1
edited_schema.entity_schema = [
    {'mode': 'REQUIRED', 'name': 'nummer', 'type': 'STRING'}
]

print(edited_schema)

#### - Validate the schema and add it to workspace structure

In [None]:
mm.update_entity(edited_schema)

#### - Save metadata from workspace to disk

In [None]:
mm.stash_workspace()
# mm.store_workspace()

***

### Delete

#### - Specify UNID and inspect schema

In [None]:
schema_UNID_to_delete = 'UNID'

schema_to_delete = mm.get_entity_by_unid(
    entity_type='schemas',
    unid=schema_UNID_to_delete
)
print(schema_to_delete)

#### - Validate removal of the schema and remove it from workspace

In [None]:
report = mm.delete_entity(
    entity_type='schemas',
    unid=schema_UNID_to_delete
)
print(report)

#### - Save metadata from workspace to disk

In [None]:
mm.stash_workspace()
# mm.store_workspace()

***

## ** Systems **

### View

In [5]:
mm.workspace.view['systems']

Unnamed: 0,id,unid,namespace,name,description,type,config,created,modified
0,1,adhdcentraal.medicore,adhdcentraal,medicore,The medical records system for ADHDcentraal,external,,2022-09-10 06:56:07+00:00,2022-10-09 21:36:01+00:00
1,2,dataplatform.google_cloud_storage,dataplatform,google_cloud_storage,Blob storage on Google Cloud Platform,platform,,2022-09-10 06:56:07+00:00,2022-10-25 16:41:05+00:00
2,3,dataplatform.google_bigquery,dataplatform,google_bigquery,Datawarehouse on Google Cloud Platform,platform,,2022-09-10 06:56:07+00:00,2022-10-25 16:41:05+00:00


***

### Add New

#### - Create new system

In [None]:
new_system = System(
    id=get_next_free_id(mm.workspace.by_id, 'systems'),
    unid=None,
    namespace='dedimo',
    name='freshservice',
    description='The ticket system for IT',
    type='external',
    config={'url': 'www.freshservice.com'},
    created=pendulum.now(tz='Europe/Amsterdam'),
    modified=pendulum.now(tz='Europe/Amsterdam')
)
print(new_system)

#### - Validate new system and add it to workspace

In [None]:
mm.add_new_entity(new_system)

#### - Save metadata from workspace to disk

In [6]:
mm.stash_workspace()
# mm.store_workspace()

***

### Edit existing

#### - Get the system to edit

In [None]:
system_UNID = 'UNID'

edited_system = mm.get_entity_by_unid(
    entity_type='systems',
    unid=system_UNID
)
print(edited_system)

#### - Edit the system

In [None]:
edited_system.namespace = 'NEW NAMESPACE UNID'
edited_system.name = 'NEW NAME'
edited_system.description = 'NEW DESCRIPTION' # None
edited_system.type = 'external' # 'internal', 'platform'
edited_system.config = {'key': 'value'}

print(edited_system)

#### - Validate the system and add it to workspace structure

In [None]:
mm.update_entity(edited_system)

#### - Save metadata from workspace to disk

In [None]:
mm.stash_workspace()
# mm.store_workspace()

***

### Delete

#### - Specify UNID and inspect system

In [None]:
system_UNID_to_delete = 'UNID'

system_to_delete = mm.get_entity_by_unid(
    entity_type='systems',
    unid=system_UNID_to_delete
)
print(system_to_delete)

#### - Validate removal of the system and remove it from workspace

In [None]:
report = mm.delete_entity(
    entity_type='systems',
    unid=system_UNID_to_delete
)
print(report)

#### - Save metadata from workspace to disk

In [None]:
mm.stash_workspace()
# mm.store_workspace()

***

## ** Data Entities **

### View

In [4]:
mm.workspace.view['data_entities']

Unnamed: 0,id,unid,namespace,system,name,description,type,interface,entity_schema,checks,config,created,modified
0,1,adhdcentraal.adhdcentraal.medicore.risks_conditions.datasource,adhdcentraal,adhdcentraal.medicore,risks_conditions,Risks and conditions table in medicore,datasource,google_cloud_storage,medicore_risks_conditions_datasource.1,[],,2022-09-10 06:56:07+00:00,2022-10-14 14:42:00+00:00
1,2,adhdcentraal.dataplatform.google_bigquery.risks_conditions.dataset,adhdcentraal,dataplatform.google_bigquery,risks_conditions,Production dwh table for medicore riscs and conditions,dataset,sql,medicore_risks_conditions_core.1,[],"{'table': 'risks_conditions', 'dataset': 'adhdcentraal_medicore', 'projectid': 'dedimo-dataplatform'}",2022-09-10 06:56:07+00:00,2022-09-10 06:56:07+00:00


***

### Add New

#### - Create new data entity

In [None]:
new_data_entity = Data_entity(
    id=get_next_free_id(mm.workspace.by_id, 'data_entities'),
    unid=None,
    namespace='dedimo',
    system='dedimo.freshservice',
    name='tickets',
    description='Those little things that cause work.',
    type='datasource',
    interface='api_rest',
    entity_schema='test.1',
    checks=[],
    config={'table': 'tickets'},
    created=pendulum.now(tz='Europe/Amsterdam'),
    modified=pendulum.now(tz='Europe/Amsterdam')
)
print(new_data_entity)

#### - Validate new data entity and add it to workspace

In [None]:
mm.add_new_entity(new_data_entity)

#### - Save metadata from workspace to disk

In [6]:
mm.stash_workspace()
# mm.store_workspace()

***

### Edit existing

#### - Get the data_entity to edit

In [None]:
data_entity_UNID = 'UNID'

edited_data_entity = mm.get_entity_by_unid(
    entity_type='data_entities',
    unid=data_entity_UNID
)
print(edited_data_entity)

#### - Edit the data_entity

In [None]:
edited_data_entity.namespace = 'NEW NAMESPACE UNID'
edited_data_entity.system = 'NEW SYSTEM UNID'
edited_data_entity.name = 'NEW NAME'
edited_data_entity.description = 'NEW DESCRIPTION' # None
edited_data_entity.type = 'datasource'
edited_data_entity.interface = 'api_rest'
edited_data_entity.entity_schema = 'NEW SCHEMA UNID'
edited_data_entity.checks=[]
edited_data_entity.config={'key': 'value'} # None

print(edited_data_entity)

#### - Validate the data_entity and add it to workspace structure

In [None]:
mm.update_entity(edited_data_entity)

#### - Save metadata from workspace to disk

In [None]:
mm.stash_workspace()
# mm.store_workspace()

***

### Delete

#### - Specify UNID and inspect data_entity

In [None]:
data_entity_UNID_to_delete = 'UNID'

data_entity_to_delete = mm.get_entity_by_unid(
    entity_type='data_entities',
    unid=data_entity_UNID_to_delete
)
print(data_entity_to_delete)

#### - Validate removal of the data_entity and remove it from workspace

In [None]:
report = mm.delete_entity(
    entity_type='data_entities',
    unid=data_entity_UNID_to_delete
)
print(report)

#### - Save metadata from workspace to disk

In [None]:
mm.stash_workspace()
# mm.store_workspace()

***

## ** Pipelines **

### View

In [10]:
mm.workspace.view['pipelines']

Unnamed: 0,id,unid,namespace,name,description,enabled,version,scope,type,velocity,input_output,config,created,modified
0,1,adhdcentraal.medicore_reports_pad.ingest.1,adhdcentraal,medicore_reports_pad,Ingest pipeline for medicore via its reports interface and PAD.,True,1,compound,ingest,batch,"[{'input': ['adhdcentraal.adhdcentraal.medicore.risks_conditions.datasource'], 'output': ['adhdcentraal.dataplatform.google_bigquery.risks_conditions.dataset']}]",,2022-09-10 06:56:07+00:00,2022-10-07 16:07:38+00:00


***

### Add New

#### - Create new pipeline

In [None]:
new_pipeline = Pipeline(
    id=get_next_free_id(mm.workspace.by_id, 'pipelines'),
    unid=None,
    namespace='dedimo',
    name='testest',
    description='Ingest pipeline for freshservice',
    enabled=True,
    version=1,
    scope='single',
    type='ingest',
    velocity='batch',
    input_output=[
        {'input':['adhdcentraal.adhdcentraal.medicore.risks_conditions.datasource'], 'output':['adhdcentraal.dataplatform.google_bigquery.risks_conditions.dataset']}
    ],
    config={'apiversion': 'beta'},
    created=pendulum.now(tz='Europe/Amsterdam'),
    modified=pendulum.now(tz='Europe/Amsterdam')
)
print(new_pipeline)

#### - Validate new pipeline and add it to workspace

In [None]:
mm.add_new_entity(new_pipeline)

#### - Save metadata from workspace to disk

In [12]:
mm.stash_workspace()
# mm.store_workspace()

True

***

### Edit existing

#### - Get the pipeline to edit

In [None]:
pipeline_UNID = 'UNID'

edited_pipeline = mm.get_entity_by_unid(
    entity_type='pipelines',
    unid=pipeline_UNID
)
print(edited_pipeline)

#### - Edit the pipeline

In [None]:
edited_pipeline.namespace = 'NEW NAMESPACE UNID'
edited_pipeline.name = 'NEW NAME'
edited_pipeline.description = 'NEW DESCRIPTION' # None
edited_pipeline.enabled = True # False
edited_pipeline.version = edited_pipeline.version + 1
edited_pipeline.scope = 'compound' # 'single'
edited_pipeline.type = 'ingest' # 'transform', 'delivery'
edited_pipeline.velocity = 'batch' # 'streaming'
edited_pipeline.input_output = [
    {'input':'ENTITY_UNID_INPUT', 'output':'ENTITY_UNID_OUTPUT'}
] # {'input':['ENTITY_UNID_INPUT_1', 'ENTITY_UNID_INPUT_2'], 'output':['ENTITY_UNID_OUTPUT_1', 'ENTITY_UNID_OUTPUT_2']}
edited_pipeline.config={'key': 'value'} # None

print(edited_pipeline)

#### - Validate the pipeline and add it to workspace structure

In [None]:
mm.update_entity(edited_pipeline)

#### - Save metadata from workspace to disk

In [None]:
mm.stash_workspace()
# mm.store_workspace()

***

### Delete

#### - Specify UNID and inspect the pipeline

In [None]:
pipeline_UNID_to_delete = 'UNID'

pipeline_to_delete = mm.get_entity_by_unid(
    entity_type='pipelines',
    unid=pipeline_UNID_to_delete
)
print(pipeline_to_delete)

#### - Validate removal of the pipeline and remove it from workspace

In [None]:
report = mm.delete_entity(
    entity_type='pipelines',
    unid=pipeline_UNID_to_delete
)
print(report)

#### - Save metadata from workspace to disk

In [None]:
mm.stash_workspace()
# mm.store_workspace()

***

## ** Config for DAGs and scripts **

#### - View pipelines

In [11]:
mm.workspace.view['pipelines']

Unnamed: 0,id,unid,namespace,name,description,enabled,version,scope,type,velocity,input_output,config,created,modified
0,1,adhdcentraal.medicore_reports_pad.ingest.1,adhdcentraal,medicore_reports_pad,Ingest pipeline for medicore via its reports interface and PAD.,True,1,compound,ingest,batch,"[{'input': ['adhdcentraal.adhdcentraal.medicore.risks_conditions.datasource'], 'output': ['adhdcentraal.dataplatform.google_bigquery.risks_conditions.dataset']}]",,2022-09-10 06:56:07+00:00,2022-10-07 16:07:38+00:00


#### - Inspect an instance's complete, flat config

In [None]:
pl_unid = 'adhdcentraal.medicore_reports_pad.ingest.1'
instance_nr = 0

deep_by_unid = {pl.unid: pl for pl in mm.workspace.integrated}

c = flatten_instance(
    pipeline=deep_by_unid[pl_unid],
    instance_nr=instance_nr,
    objectify_output=False
)
print(c)