# Metastore Demo

## metastore sdk 安装

In [1]:
# metastore安装
# pip install --upgrade "dp-metadata-sdk" -i https://repo.mlops.dp.tech/repository/pypi-group/simple
import dp.metadata

## 生成假数据

In [5]:

! dd if=/dev/zero of=mocked.dataset bs=1M count=20
! ls

20+0 records in
20+0 records out
20971520 bytes (21 MB, 20 MiB) copied, 0.00784302 s, 2.7 GB/s
metastore.ipynb  mocked.dataset  tracking.ipynb


## 环境变量配置

metastore本质上是一个元数据管理平台，并没有数据存储的功能，因此需要与提供存储功能的应用一起使用，可以简单将其理解成metastore只负责存储数据的指针，而指针具体指向了什么位置，怎么调用需要另外的模块来一起协作实现。目前推荐的做法是利用dflow的对象存储功能，将数据存储为artifacts，再利用其uri（对应artifact.key）对数据进行使用。

### Bohrium 环境配置

In [7]:
# 数据上传下载不需要配置 bohrium project id
bohrium_username = 'yaosk@dp.tech'
bohrium_passwd = ''

### Dflow环境配置

In [3]:
from dflow.plugins.bohrium import TiefblueClient
from dflow.plugins import bohrium
from dflow import config, s3_config
import json

# 详情可以参考dflow尝鲜群群公告
# dflow相关配置
config["host"] = "https://workflows.deepmodeling.com"
config["k8s_api_server"] = "https://workflows.deepmodeling.com"
# dflow token请到dflow尝鲜群-群公告里查找，注意token要与host匹配
config["token"] = "..."

# bohrium账号密码
bohrium.config["username"] = bohrium_username
bohrium.config["password"] = bohrium_passwd

# 存储功能配置（s3统一存储）
s3_config["repo_key"] = "oss-bohrium"
s3_config["storage_client"] = TiefblueClient()

## Metastore 环境变量配置

主要的两个变量参数endpoint、token。官方提供的endpoint为(https://datahub-gms.dp.tech/), token可以在datahub的前端界面（https://datahub.dp.tech/ ）通过右上角的setting拿到。

In [25]:
from dp.metadata.context import MetadataContext

endpoint = 'https://datahub-gms.dp.tech'
token = ''

## 数据使用

uri: 指向真实的数据存储地址，形如s3://xxxx，只要解析s3协议就能在全网找到该数据集，数据集和uri一一对应  
urn：metastore 统一名称表示符，用于传递数据集，具有一定的可读性。一个urn指向唯一一份数据集，但多个urn可指向同一份数据集

### 数据上传下载

结合上述说明，数据的上传分为两步，第一步是向对象存储中传输真实的数据，第二步是将数据的uri、描述说明等元信息注册到datahub中。在这里我们有两种办法，第一个是直接调用原生的dflow上传接口，然后将artifact.key作为dataset的uri注册到datahub中，第二种方法是在context中声明一个storage client，只要在参数中指定bohrium邮箱和密码即可。

方法1：使用 dflow 内置的 upload_artifact 上传并注册到 metastore，下载时使用 dflow 的 download_artifact

In [17]:
# 第一种方式
from dflow import upload_artifact, download_artifact
from dp.metadata import Dataset
import os
import dflow

# -------------------------------上传数据-------------------------------
# 使用 dflow 上传数据获得 artifact
artifact = upload_artifact('mocked.dataset')

# 将 uri 注册到datahub中
with MetadataContext(endpoint=endpoint, token=token) as context:
    dataset_urn = Dataset.gen_urn(context, 'MLOps demo', 'mocked_dataset1', auto_suffix=False)
    dataset = Dataset(urn=dataset_urn, description='this is a dataset', uri=artifact.key)
    client = context.client
    urn = client.create_dataset(dataset)
    print("dataset urn:", urn)

# -------------------------------下载数据-------------------------------
_dataset = client.get_dataset(urn)
print('dataset1')
print(_dataset.description)
print(_dataset.uri)

data_artifact = dflow.S3Artifact.from_dict({'key': _dataset.uri})
dflow.download_artifact(
    data_artifact,
    path=str('./download_dflow'),
    skip_exists=True,
)


dataset urn: urn:li:dataset:(urn:li:dataPlatform:MLOps demo,default.mocked_dataset1,CORP)
dataset1
this is a dataset
11874/0/store/11874/upload/37aaa4b7-fc84-446b-93a3-d4bbf02f4571/tmpixens0qr.tgz


['./download_dflow/mocked.dataset']

方法2：只使用 metadata 上传下载数据

In [27]:
# 第二种方式
from dp.metadata.utils.storage import TiefblueStorageClient
import uuid

# -------------------------------上传数据-------------------------------
# metasotre负责上传并注册数据
with MetadataContext(endpoint=endpoint, token=token) as context:
    context.storage_client = TiefblueStorageClient(username=bohrium_username, password=bohrium_passwd)
    client = context.client
    _uuid = uuid.uuid4().hex
    uri = client.upload_artifact(_uuid, '', 'mocked.dataset')
    dataset_urn = Dataset.gen_urn(context, 'MLOps demo', 'mocked_dataset2', auto_suffix=False)
    dataset = Dataset(urn=dataset_urn, description='this is a dataset', uri=uri)
    client = context.client
    urn = client.create_dataset(dataset)

# -------------------------------下载数据-------------------------------
# MetadataContext传入的参数有两种写法，这一种和上面一种都可以
with MetadataContext(endpoint=endpoint, token=token, storage_client=TiefblueStorageClient(username=bohrium_username, password=bohrium_passwd)) as context:
    client = context.client
    client.download_dataset(urn, './download_metastore')

2023-02-13 16:15:51.447 INFO    dp.metadata.client.client: Uploading /mnt/vepfs/users/yaosk/code/mlops-hello-world/examples/mocked.dataset


### 注意
方法1上传的数据可以用方法2下载，但是不建议这么用。两者上传的方式有细微区别，某些情况下可能报错。

## Dflow 中使用 metastore

### 将数据作为 op 的 parameters 使用

In [33]:
from typing import List
from pathlib import Path

import dflow

# 在没有 metastore 的时候，数据从本地上传后生成dflow.S3Artifact，再传入op中使用。
artifact1 = upload_artifact('mocked.dataset')

# 当使用 metastore 时，将 urn 转化为 Artifact 传入dflow中
# 已经上传的数据的urn
dataset_urn = 'urn:li:dataset:(urn:li:dataPlatform:MLOps demo,default.mocked_dataset2,CORP)'
with MetadataContext(endpoint=endpoint, token=token, storage_client=TiefblueStorageClient(username=bohrium_username, password=bohrium_passwd)) as context:
    client = context.client
    # dataset 是数据集的根目录
    dataset = client.get_dataset(dataset_urn)
    artifact2 = dflow.S3Artifact(key=dataset.uri)
    # 列出根目录下所有文件
    subdataset_list = client.list_artifact(dataset.uri)
    # 该数据集中只有一个文件，选取subdataset_list[0]作为key生成S3Artifact
    artifact3 = dflow.S3Artifact(key=subdataset_list[0])

artifact1 和 artifact3 可作为 Artifact(Path) 直接输入dflow的OP中, artifact2 可作为 Artifact(List[Path]) 传入OP中

### 在 op 中上传下载数据

方法同【数据上传下载】一节中的数据上传下载方法。  
注意：**禁止使用 parameters 传递账号密码和token，存在泄密风险**

## Dflow中传入账号密码和token



```
# 需要加密的环境变量
envs = {
        "DATAHUB_GMS_TOKEN": "...",
        "DATAHUB_GMS_URL": "https://datahub-gms.dp.tech",
        "bohrium_username": "xxxx",
        "bohrium_passwd": "xxxx",
}

# 使用dflow加密
from dflow import Secret
envs = {k: Secret(str(v)) for k, v in envs.items()}

# 作为envs参数传入OP中
step = Step(
    'hello',
    template=PythonOPTemplate(
        xxxOP,
        envs=envs,
    ),
    parameters={
        ...
    },
    artifacts={
        ...
    },
    executor = ...
)


@OP.function
def Hello(filename: str) -> {'foo': Artifact(Path)}:
    open(filename, "w").write("foo")
    # OP中使用os.environ读取加密的环境变量
    # 注意：metastore 默认从环境变量"DATAHUB_GMS_URL"和"DATAHUB_GMS_TOKEN"读取endpoint和token，因此这里无需再传入
    with MetadataContext(
        storage_client=TiefblueStorageClient(username=os.environ["bohrium_username"], password=os.environ["bohrium_passwd"])) as context:
        ....
    return {'foo': Path(filename)}

```