In [1]:
#| default_exp feature.codeengine

In [2]:
#| exporti

from dataclasses import dataclass, field

from typing import List
import datetime as dt
from numbers import Number

import mbison.client.core as dmda
import mbison.client.utils as dmut

In [3]:
#| hide
import nbdev
import os
from pprint import pprint

In [4]:
auth = dmda.DomoAuth(
    domo_instance=os.environ["DOMO_INSTANCE"],
    access_token=os.environ["DOMO_ACCESS_TOKEN"],
)
auth

DomoAuth(domo_instance='domo-community', username=None)

## Routes

In [None]:
card_id = '577316875'

In [5]:
#| exports
class AppDb_API_Exception(dmda.API_Exception):
    def __init__(self, res, message=None):

        super().__init__(res=res, message=message)

In [None]:
def get_cards(card_id, auth :dmda.DomoAuth):
    url = '/api/content/v1/cards'AppDb_API_Exceptionpparts=properties,metadata,metadataOverrides,subscriptions,domoapp,problems,owners,cardFormulas&urns=577316875'

In [6]:
#| exports

def get_collections(
    auth: dmda.DomoAuth,
    debug_api: bool = False,
    app_id : str = None, # filters for a specific datastoreId
):
    """retrieve collections"""

    endpoint = f"/api/datastores/v1/collections/"

    res =  dmda.domo_api_request(
        auth=auth,
        request_type="GET",
        endpoint= endpoint,
        debug_api=debug_api,
        params = {'datastoreId': app_id}
    )

    if not res.is_success and res.status == 400:
        raise AppDb_API_Exception(res, message = f"invalid datastoreId / app_id - {app_id}?")

    if not res.is_success:
        raise AppDb_API_Exception(res)
    
    return res

In [26]:
res = get_collections(auth=auth, debug_api= False,
                      app_id ='c3314265-965a-46b2-9e21-5ad27fd1101b'
                      )
collections = res.response
collections

[{'id': '475cc3b8-4318-406a-8070-c023bf0b9152',
  'datastoreId': 'c3314265-965a-46b2-9e21-5ad27fd1101b',
  'defaultPermissions': 'read,read_content',
  'requiredAuthorities': {'CREATE': ['domoapps.edit'],
   'CREATE_CONTENT': ['domoapps.edit'],
   'DELETE': ['domoapps.edit'],
   'DELETE_CONTENT': ['domoapps.edit'],
   'UPDATE': ['domoapps.edit'],
   'UPDATE_CONTENT': ['domoapps.edit']},
  'owner': 1893952720,
  'name': 'ddx_app_client_code',
  'datasourceId': None,
  'schema': None,
  'filters': None,
  'syncEnabled': False,
  'syncRequired': False,
  'fullReplaceRequired': False,
  'lastSync': None,
  'createdOn': '2023-05-05T21:54:26.634Z',
  'updatedOn': '2023-05-05T21:54:26.634Z',
  'updatedBy': 1893952720}]

In [8]:
#| exports
def get_collection_by_id(
    auth: dmda.DomoAuth,
    collection_id : str,
    debug_api: bool = False,
):

    endpoint = f"/api/datastores/v1/collections/{collection_id}"

    res =  dmda.domo_api_request(
        auth=auth,
        request_type="GET",
        endpoint= endpoint,
        debug_api=debug_api,
    )

    if not res.is_success:
        raise AppDb_API_Exception(res)
    
    return res

In [22]:
res = get_collection_by_id(auth=auth, debug_api= False, collection_id = collections[0]['id'] )

collection = res.response
collection

{'id': '3c36902b-cce0-4792-8e34-bb72944c58f5',
 'datastoreId': '314d780d-a5a5-46c3-9bdd-2ea3c5ebc746',
 'defaultPermissions': 'read,read_content',
 'requiredAuthorities': {'CREATE': ['domoapps.edit'],
  'CREATE_CONTENT': ['domoapps.edit'],
  'DELETE': ['domoapps.edit'],
  'DELETE_CONTENT': ['domoapps.edit'],
  'UPDATE': ['domoapps.edit'],
  'UPDATE_CONTENT': ['domoapps.edit']},
 'owner': 1728973208,
 'name': 'ddx_app_client_code',
 'datasourceId': None,
 'schema': None,
 'filters': None,
 'syncEnabled': False,
 'syncRequired': False,
 'fullReplaceRequired': False,
 'lastSync': None,
 'createdOn': '2024-07-15T17:44:52.335Z',
 'updatedOn': '2024-07-15T17:44:52.335Z',
 'updatedBy': 1728973208}

In [10]:
#| exports

def query_collection_documents(
    auth: dmda.DomoAuth,
    collection_id: str,
    query : dict = None,
    debug_api : bool = False
):
    endpoint = f'/api/datastores/v2/collections/{collection_id}/documents/query'

    query = query or {}

    res =  dmda.domo_api_request(
        auth=auth,
        request_type="POST",
        endpoint= endpoint,
        debug_api=debug_api,
        body = query
    )

    if not res.is_success:
        raise AppDb_API_Exception(res)
    
    return res


In [25]:
res = query_collection_documents(auth=auth,
                                 debug_api= False,
                                 collection_id = collection['id']
                                 )

documents = res.response
documents

AppDb_API_Exception:  || 403 - Forbidden || domo-community

### Classes

In [12]:
# | exports


def to_json(value):
    """
    converts complex dictionaries with nested classes to dictionary.
    assumes nested classes have a `to_json` method
    """

    if hasattr(value, "to_json"):
        return value.to_json()

    if isinstance(value, dict):
        return {key: to_json(v) for key, v in value.items()}

    if isinstance(value, list):
        return [to_json(v) for v in value]

    if isinstance(value, Number):
        return value

    return str(value)


@dataclass
class AppDbDocument:

    auth: dmda.DomoAuth = field(repr=False)

    # document metadata
    _collection_id: str
    _identity_columns: List[str]
    _id: str = None
    _created_on_dt: dt.datetime = None
    _updated_on_dt: dt.datetime = None

    content: dict = None

    def to_json(self):
        # TO DO - this is not a complete recursive implementation,  may need revision for complex dictionaries
        self.update_config()

        s = {"id": self._id, "collectionId": self._collection_id}

        for key, value in self.__dict__.items():
            if key.startswith("_") or key in ["auth"]:
                continue

            s.update({key: to_json(value)})

        return s

    def __eq__(self, other):
        if self.__class__.__name__ != other.__class__.__name__:
            return False

        if self._identity_columns:
            return all(
                getattr(self, col) == getattr(other.col)
                for col in self._identity_columns
            )

        return self._id == other._id

    @classmethod
    def _from_json(
        cls,
        auth: dmda.DomoAuth,
        content,
        new_cls,
        identity_columns,
        collection_id=None,
        document_id=None,
        metadata=None,
        created_on_dt=None,
        updated_on_dt=None,
    ):

        if metadata:
            collection_id = metadata.pop("collectionId")

            created_on_dt = dlcv.convert_string_to_datetime(metadata.pop("createdOn"))

            updated_on_dt = dlcv.convert_string_to_datetime(metadata.pop("updatedOn"))
            document_id = metadata["id"]

        return new_cls(
            auth=auth,
            _id=document_id,
            _identity_columns=identity_columns,
            _collection_id=collection_id,
            _created_on_dt=created_on_dt,
            _updated_on_dt=updated_on_dt,
            content=content,
            **(content if cls.__name__ != "AppDbDocument" else {})
        )

    @classmethod
    def _from_api(
        cls,
        auth: dmda.DomoAuth,
        obj,
        identity_columns: List[str] = None,
    ):
        content = obj.pop("content")

        return cls._from_json(
            auth=auth,
            content=content,
            new_cls=cls,
            identity_columns=identity_columns,
            metadata=obj,
        )

    @classmethod
    def from_json(
        cls,
        auth: dmda.DomoAuth,
        collection_id: str,
        content: dict,
        identity_columns: List[str] = None,
    ):
        return cls._from_json(
            auth=auth,
            content=content,
            new_cls=cls,
            identity_columns=identity_columns,
            collection_id=collection_id,
        )

    def update_config(self):
        self.content = {
            key: value
            for key, value in self.__dict__.items()
            if key not in ["auth", "content"] and not key.startswith("_")
        }
        return self.content

    @classmethod
    async def get_by_id(
        cls,
        collection_id: str,
        document_id: str,
        auth: dmda.DomoAuth,
        identity_columns=None,
        debug_api: bool = False,
        session: httpx.AsyncClient = None,
        debug_num_stacks_to_drop=1,
        return_raw: bool = False,
    ):
        res = await appdb_routes.get_document_by_id(
            auth=auth,
            collection_id=collection_id,
            document_id=document_id,
            parent_class=cls.__name__,
            debug_api=debug_api,
            session=session,
            debug_num_stacks_to_drop=debug_num_stacks_to_drop,
        )

        if return_raw:
            return res

        return cls._from_api(
            auth=auth,
            obj=res.response,
            identity_columns=identity_columns or [],
        )


@dataclass
class AppDbCollection:
    auth: dmda.DomoAuth = field(repr=False)
    id: str
    name: str

    created_on_dt: dt.datetime
    updated_on_dt: dt.datetime

    schema: dict

    domo_documents: List[AppDbDocument] = None

    @classmethod
    def _from_json(cls, auth, obj):

        return cls(
            auth=auth,
            id=obj["id"],
            name=obj["name"],
            created_on_dt=dlcv.convert_string_to_datetime(obj["createdOn"]),
            updated_on_dt=dlcv.convert_string_to_datetime(obj["updatedOn"]),
            schema=obj["schema"],
        )

    @classmethod
    async def get_by_id(
        cls,
        auth: dmda.DomoAuth,
        collection_id,
        debug_api: bool = False,
        session: httpx.AsyncClient = None,
        debug_num_stacks_to_drop=2,
        return_raw: bool = False,
    ):

        res = await appdb_routes.get_collection_by_id(
            auth=auth,
            collection_id=collection_id,
            parent_class=cls.__name__,
            debug_api=debug_api,
            session=session,
            debug_num_stacks_to_drop=debug_num_stacks_to_drop,
        )

        if return_raw:
            return res

        return cls._from_json(auth=auth, obj=res.response)

    async def search_documents(
        self,
        debug_api: bool = False,
        session: httpx.AsyncClient = None,
        filters=None,
        debug_num_stacks_to_drop=2,
        return_raw: bool = False,
    ):

        res = await appdb_routes.get_documents_from_collection(
            auth=self.auth,
            collection_id=self.id,
            debug_api=debug_api,
            session=session,
            body=filters,
            debug_num_stacks_to_drop=debug_num_stacks_to_drop,
        )

        if return_raw:
            return res

        self.domo_documents = await ce.gather_with_concurrency(
            *[
                AppDbDocument.get_by_id(
                    collection_id=self.id, document_id=doc["id"], auth=auth
                )
                for doc in res.response
            ],
            n=5
        )

        return self.domo_documents

    def __eq__(self, other):
        if not isinstance(other, AppDbCollection):
            return False

        return self.id == other.id

NameError: name 'httpx' is not defined

In [None]:
#| hide
nbdev.nbdev_export('./appdb.ipynb')