In [None]:
def get_absolute_path(*relative_parts):
    import os
    if 'dbutils' in globals():
        # dbutils is available to the entry point when run in Databricks
        base_dir = os.path.dirname(dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()) # type: ignore
        path = os.path.normpath(os.path.join(base_dir, *relative_parts))
        return path if path.startswith("/Workspace") else "/Workspace" + path
    else:
        return os.path.join(*relative_parts)


In [None]:
from pyspark.dbutils import DBUtils
import os
import requests
import json
import yaml

# data mesh manager api base url
dmm_api_base_url = "https://api.datamesh-manager.com/api"

# data mesh manager api key from databricks secrets
# initially you need to create a secret in Databricks for you Data Mesh Manager API Key:
# $ databricks secrets create-scope datamesh_manager
# $ databricks secrets put-secret datamesh_manager api_key
dmm_api_key = DBUtils().secrets.get("datamesh_manager", "api_key")
data_contract_path = get_absolute_path("..", "datacontract.yaml")
data_product_path = get_absolute_path("..", "dataproduct.yaml")


def main():
    data_mesh_manager = DataMeshManager(dmm_api_base_url, dmm_api_key)

    # send data contract to data mesh manager
    with open(data_contract_path, 'r') as file:
        data_contract = DataContract(file)
        data_mesh_manager.put_data_contract(data_contract)
        file.close()

    # send data product to data mesh manager
    with open(data_product_path, 'r') as file:
        data_product = DataProduct(file)
        data_mesh_manager.put_data_product(data_product)
        file.close()


class DataProduct:
    def __init__(self, yaml_file):
        self._data = yaml.safe_load(yaml_file)

    def id(self) -> str:
        return self._data["id"]

    def json(self) -> str:
        return json.dumps(self._data)


class DataContract:
    def __init__(self, yaml_file):
        self._data = yaml.safe_load(yaml_file)

    def id(self) -> str:
        return self._data["id"]

    def json(self) -> str:
        return json.dumps(self._data)


class DataMeshManager:
    def __init__(self, base_url, api_key):
        self._base_url = base_url
        self._api_key = api_key

    def put_data_product(self, data_product: DataProduct):
        self._put("dataproducts", data_product.id(), data_product.json())

    def put_data_contract(self, data_contract: DataContract):
        self._put('datacontracts', data_contract.id(), data_contract.json())

    def _put(self, entity_segment, entity_id, data):
        response = requests.put(
            url="%s/%s/%s" % (self._base_url, entity_segment, entity_id),
            data=data,
            headers={
                'x-api-key': self._api_key,
                'accept': 'application/json',
                'Content-Type': 'application/json'
            },
        )
        try:
            response.raise_for_status()
        except requests.exceptions.HTTPError as e:
            raise Exception(e, response.json()["detail"])


if __name__ == '__main__':
    main()