In [1]:
# | default_exp classes.DomoDatacenter

In [2]:
# | exporti
import asyncio
import importlib
from dataclasses import dataclass, field
from enum import Enum
from typing import Union

import httpx

from fastcore.basics import patch_to

import domolibrary.client.DomoAuth as dmda
import domolibrary.classes.DomoDataset as dmds

import domolibrary.routes.account as account_routes
import domolibrary.routes.datacenter as datacenter_routes

In [3]:
# | export

@dataclass
class DomoDatacenter:
    "class for quering entities in the datacenter"
    auth: dmda.DomoAuth = None


In [4]:
# | hide

@patch_to(DomoDatacenter, cls_method=True)
async def search_datacenter(cls,
                            auth: dmda.DomoAuth,
                            maximum: int = None,  # maximum number of results to return

                            body: dict = None,  # either pass a body or generate a body in the function using search_text, entity_type, and additional_filters parameters

                            search_text=None,
                            # can accept one value or a list of values
                            entity_type: Union[str, list] = "dataset",
                            additional_filters_ls=None,

                            return_raw: bool = False,
                            session: httpx.AsyncClient = None,
                            debug_api: bool = False) -> list:

    res = await datacenter_routes.search_datacenter(auth=auth,
                                                    maximum = maximum,
                                                    body=body,
                                                    session=session,
                                                    search_text=search_text,
                                                    entity_type = entity_type,
                                                    additional_filters_ls=additional_filters_ls,
                                                    debug_api=debug_api)
    
    if return_raw:
        return res

    return res.response


#### sample implementation of search_datacenter

In [5]:
import domolibrary.client.DomoAuth as dmda
import os
import pandas as pd

token_auth = dmda.DomoTokenAuth(
    domo_instance='domo-community', domo_access_token=os.environ['DOMO_DOJO_ACCESS_TOKEN'])

additional_filters_ls = [
    datacenter_routes.generate_search_datacenter_filter("dataprovidername_facet", "Jupyter Data")]

res = await DomoDatacenter.search_datacenter(auth=token_auth,
                                       additional_filters_ls = additional_filters_ls,
                                    #    search_text = "*kb*",
                                       entity_type = datacenter_routes.Datacenter_Enum.DATASET.value)

pd.DataFrame(res[0:5])


Unnamed: 0,entityType,databaseId,searchId,createDate,lastModified,lastIndexed,highlightedFields,language,requestAccess,score,...,pdpEnabled,used,owners,ownersLocalized,cloudName,cloudId,systemTagFields,winnerText,ownedByType,customer
0,dataset,04c1574e-c8be-4721-9846-c6ffa491144b,"{'indexName': None, 'databaseId': '04c1574e-c8...",1668379680000,1680616622000,1680617077900,{},English,False,153.36414,...,False,False,"[{'id': '1893952720', 'type': 'USER', 'display...","{'localizedMessage': 'Jae Wilson1', 'count': 1}",Domo,domo,[system_tags],domo_kbs,USER,mmmm-0012-0200
1,dataset,4b2775a4-dfb4-4abb-ab87-b0a48de0d58d,"{'indexName': None, 'databaseId': '4b2775a4-df...",1617645238000,1674497883000,1674498594871,{},English,False,9.0,...,False,False,"[{'id': '2129297797', 'type': 'USER', 'display...","{'localizedMessage': 'John Stevens', 'count': 1}",Domo,domo,[system_tags],processedReviews,USER,mmmm-0012-0200
2,dataset,df61d41c-add8-43a1-b2ea-6b41af552e7d,"{'indexName': None, 'databaseId': 'df61d41c-ad...",1618812052000,1664892737000,1665618420974,{},English,False,9.0,...,False,False,"[{'id': '2129297797', 'type': 'USER', 'display...","{'localizedMessage': 'John Stevens', 'count': 1}",Domo,domo,[system_tags],Topics,USER,mmmm-0012-0200
3,dataset,bfb31724-232d-40fc-80c4-ae489b213d33,"{'indexName': None, 'databaseId': 'bfb31724-23...",1664730961000,1666715593000,1666716466630,{},English,False,9.0,...,False,False,"[{'id': '1893952720', 'type': 'USER', 'display...","{'localizedMessage': 'Jae Wilson', 'count': 1}",Domo,domo,[system_tags],jw_address_out,USER,mmmm-0012-0200
4,dataset,598d6954-9526-48d3-8658-9f6fc0aa05c0,"{'indexName': None, 'databaseId': '598d6954-95...",1667778377000,1667778678000,1667781273171,{},English,False,9.0,...,False,False,"[{'id': '1893952720', 'type': 'USER', 'display...","{'localizedMessage': 'Jae Wilson', 'count': 1}",Domo,domo,[system_tags],android_reviews_sent,USER,mmmm-0012-0200


In [6]:
@patch_to(DomoDatacenter, cls_method=True)
async def search_datasets(cls,
                          auth=dmda.DomoAuth,

                          maximum: int = None,  # maximum number of results to return

                          search_text=None,
                          # can accept one value or a list of values
                          additional_filters_ls=None,

                          return_raw: bool = False,
                          debug_api: bool = False,
                          debug_prn: bool = False,
                          session: httpx.AsyncClient = None) -> list[dmds.DomoDataset]:

    json_list = await cls.search_datacenter(auth=auth,
                                            maximum=maximum,
                                            search_text=search_text,
                                            entity_type=datacenter_routes.Datacenter_Enum.DATASET.value,
                                            additional_filters_ls=additional_filters_ls,
                                            return_raw=return_raw,
                                            session=session,
                                            debug_api=debug_api)

    if return_raw or len(json_list) == 0:
        return json_list

    return await asyncio.gather(
        *[dmds.DomoDataset.get_from_id(dataset_id=json_obj.get('databaseId'),
                                       auth=auth,
                                       debug_api=debug_api,
                                       session=session
                                       ) for json_obj in json_list
          ])


#### sample implementation search_datasets

In [7]:
import domolibrary.client.DomoAuth as dmda
import os
import pandas as pd

token_auth = dmda.DomoTokenAuth(
    domo_instance='domo-community', domo_access_token=os.environ['DOMO_DOJO_ACCESS_TOKEN'])

additional_filters_ls = [
    datacenter_routes.generate_search_datacenter_filter("dataprovidername_facet", "Jupyter Data")]

res = await DomoDatacenter.search_datasets(auth=token_auth,
                                           search_text="kb",
                                           additional_filters_ls=additional_filters_ls,
                                           )

res


SearchDatacenter_NoResultsFound: {'entities': ['DATASET'], 'filters': [{'field': 'name_sort', 'filterType': 'wildcard', 'query': 'kb'}, {'filterType': 'term', 'field': 'dataprovidername_facet', 'value': 'Jupyter Data', 'not': False}], 'combineResults': False, 'query': '*', 'count': 100, 'offset': 0} at domo-community

In [None]:
@patch_to(DomoDatacenter,cls_method=True)
async def get_accounts(cls :DomoDatacenter, auth: dmda.DomoAuth,
                        search_str: str = None,
                        is_exact_match: bool = True,
                        maximum: int = None,
                        session: httpx.AsyncClient = None, debug_api: bool = False):

    import Library.DomoClasses.DomoAccount as dma

    json_ls = None

    if search_str:
        body = datacenter_routes.generate_search_datacenter_account_body(
            search_str=search_str, is_exact_match=is_exact_match)

        search_ls = await cls.search_datacenter(auth=auth,
                                                maximum=maximum,
                                                body=body,
                                                session=session,
                                                debug_api=debug_api)

        if not search_ls or len(search_ls) == 0:
            return None

        json_ls = [{'id': obj.get('databaseId')} for obj in search_ls]

    else:
        res = await account_routes.get_accounts(auth=auth, session=session, debug_api=debug_api)

        if debug_api:
            print(res)

        if res.status != 200:
            return None

        json_ls = res.response

    domo_account_ls = await asyncio.gather(*[dma.DomoAccount.get_from_id(account_id=obj.get('id'), auth=auth) for obj in json_ls])

    if is_exact_match and search_str:
        return next((domo_account for domo_account in domo_account_ls if domo_account.name == search_str), None)

    return domo_account_ls


In [None]:



#     @classmethod
#     async def get_lineage_upstream(cls,
#                                    auth: dmda.DomoAuth,
#                                    entity_id,
#                                    entity_type,
#                                    session: httpx.AsyncClient = None,
#                                    debug_api: bool = False,
#                                    debug_prn: bool = False):

#         import Library.DomoClasses.DomoDataflow as dmdf
#         import Library.DomoClasses.DomoDataset as dmds

#         try:
#             if not session:
#                 session = httpx.AsyncClient()
#                 is_close_session = True

#             res = await datacenter_routes.get_lineage_upstream(auth=auth,
#                                                                entity_type=entity_type,
#                                                                entity_id=entity_id,
#                                                                session=session, debug_api=debug_api)

#             if res.status == 200:
#                 obj = res.response

#                 domo_obj = []
#                 for key, item in obj.items():
#                     if item.get('type') == 'DATA_SOURCE':
#                         domo_obj.append(await dmds.DomoDataset.get_from_id(auth=auth, id=item.get('id')))

#                     if item.get('type') == 'DATAFLOW':
#                         # print(item.get('id'))
#                         domo_obj.append(await dmdf.DomoDataflow.get_from_id(auth=auth, id=item.get('id')))
#                         pass

#                 return domo_obj
#             else:
#                 return None

#         finally:
#             if is_close_session:
#                 await session.aclose()



In [None]:
#| hide
import nbdev
nbdev.nbdev_export()