# DomoJupyter (GetInstanceCredentials)

> a function for interacting with DomoJupyter Accounts and Credentials

In [None]:
# | default_exp integrations.DomoJupyter

In [None]:
#| exporti
from dataclasses import dataclass, field

import re
import pandas as pd
import importlib
import datetime as dt
import time

import domolibrary.client.DomoAuth as dmda
import domolibrary.classes.DomoDataset as dmds

import domolibrary.client.Logger as lc

from fastcore.basics import patch_to


## get_jupyter_account
Uses the `domojupyter` library to retrieve shared account objects from the domo jupyter workspace.

Because the retrieval process can fail, we use a loop to try until success.

In [None]:
#| export
class ErrorRetrievingAccount(Exception):
    def __init__(self, account_name):
        self.message = f'failure to retrieve DomoDojo Account {account_name}'
        super().__init__(self.message)


def get_jupyter_account(account_name : str, # name of account as it appears in the 
maximum_retry : int = 10
) -> (list, dict): # returns account properties list and a dictionary of the properties.

    """import a domojupyter account, will loop until success"""
    account_properties = None

    retry_attempt = 0
    while not account_properties and retry_attempt <= maximum_retry:
        try:
            account_properties = dj.get_account_property_keys(account_name)
            retry_attempt += 1

            if retry_attempt == maximum_retry:
                raise ErrorRetrievingAccount(account_name= account_name)
        
        except ErrorRetrievingAccount as e:
            pass

        except Exception as e:
            print(f"trying again - {account_name}")
            time.sleep(2)


    obj = {}

    retry_attempt= 0
    for index, prop in enumerate(account_properties):
        value = None

        while not value and retry_attempt <= maximum_retry:
            try:
                value = dj.get_account_property_value(
                    account_name, account_properties[index])

            except Exception as e:
                print(f"trying again - {prop}")
                time.sleep(2)

        obj.update({prop: value})

    return account_properties, obj

In [None]:
#| export
class InvalidAccountTypeError(Exception):
    """raised when account type is not abstract credentials"""

    def __init__(self, account_name, account_type):

        self.message = f"account: {account_name} of type {account_type} is invalid"
        
        super().__init__(self.message)
    pass

In [None]:
#| export
class InvalidAccountNameError(Exception):
    """raised when account name does not follow format string"""

    def __init__(self, account_name=None, regex_pattern=None):
        account_str = f'"{account_name}" '
        regex_str = f'"{regex_pattern}"'

        message = f"string {account_str if account_name else ''}does not match regex pattern {regex_str or ''}"
        self.message = message

        super().__init__(self.message)

In [None]:
#| export
class NoInstanceError(Exception):
    """ raised when required domo_instance argument has not been supplied"""

    def __init__(self):
        
        self.message = "must pass a domo_instance argument"
        super().__init__(self.message)

## Dojo Account Class

In [None]:
#| export
@dataclass
class DojoAccount_InstanceAuth:

    account_name: str

    domo_username: str = None
    display_name: str = field(repr=False, default=None)

    domo_instance: str = field(repr=False, default=None)
    domo_instance_ls: list = field(repr=False, default=None)

    raw_cred: dict = field(repr=False, default=None)
    domo_password: str = field(repr=False, default=None)

    full_auth_ls: list = field(repr=False, default=None)

    account_name_mask = '^dj_.*_acc'

    def __post_init__(self):
        import re
        if not self.display_name:
            clean_text = re.sub('@.*$', '', self.domo_username)

            self.display_name = clean_text

    @staticmethod
    def _test_regex_mask(test_string:str  # the string to test
    , regex_mask : str # the regex expression to test 
    ) -> bool: # boolean of the re match
        """tests if a string matches the regex pattern"""
     
     
        return bool(re.match(regex_mask, test_string))

    @staticmethod
    def _clean_account_str(account_name):
        import re

        clean_str = re.sub('^dj_', '', account_name)
        clean_str = re.sub('_acc$', '', clean_str)

        return clean_str

    @classmethod
    def get_domo_account(cls, account_name, domo_instance=None):
        import json
        import domojupyter as dj

        if not cls._test_regex_mask(account_name, cls.account_name_mask):
            raise InvalidAccountNameError(account_name, cls.account_name_mask)

        account_properties, dj_account = get_jupyter_account(account_name)

        if account_properties != ['credentials']:
            raise InvalidAccountTypeError

        creds = json.loads(dj_account.get('credentials'))

        return cls(
            account_name=account_name,
            raw_cred=creds,
            domo_username=creds.get('DOMO_USERNAME'),
            domo_password=creds.get('DOMO_PASSWORD'),
            domo_instance=domo_instance or creds.get('DOMO_INSTANCE')
        )

    def generate_full_auth_ls(self,
                              domo_instance_ls: list = None):

        self.full_auth_ls = None
        self.domo_instance = None

        self.domo_instance_ls = list(
            set(domo_instance_ls or self.domo_instance_ls))

        if not self.domo_instance_ls:
            return None

        for domo_instance in self.domo_instance_ls:
            full_auth = dmda.DomoFullAuth(domo_instance=domo_instance,
                                          domo_username=self.domo_username,
                                          domo_password=self.domo_password)
            if self.full_auth_ls is None:
                self.full_auth_ls = [full_auth]
            else:
                self.full_auth_ls.append(full_auth)
        return self.full_auth_ls

    def generate_full_auth(self,
                           domo_instance: str = None):
        
        import Library.DomoClasses.DomoAuth as dmda

        self.full_auth_ls = None
        self.domo_instance_ls = None

        domo_instance = domo_instance or self.domo_instance

        if not domo_instance:
            raise NoInstanceError

        full_auth = dmda.DomoFullAuth(domo_instance=domo_instance,
                                      domo_username=self.domo_username,
                                      domo_password=self.domo_password)

        self.full_auth_ls = [full_auth]

        return full_auth

    def generate_user(self, display_name: str = None) -> dict:
        return {
            'domo_username': self.domo_username,
            'domo_password': self.domo_password,
            'display_name': display_name or self.display_name}

In [None]:
class GetInstanceConfig:
    config : pd.DataFrame = None
    
    pass

In [None]:
# | export

class NoConfigCompanyError(Exception):
    def __init__(self:GetInstanceConfig, sql, domo_instance):
        message = f'SQL "{sql}" returned no results in {domo_instance}'
        self.message = message
        super().__init__(self.message)

@patch_to(GetInstanceConfig)
async def _retrieve_company_ds(self : GetInstanceConfig, 
                               config_auth: dmda.DomoAuth,
                               dataset_id: str,
                               sql: str,
                               debug_prn: bool = False,
                               debug_api: bool = False,
                               debug_log: bool = False,
                               logger: lc.Logger = None) -> pd.DataFrame:
    
    if not logger:
        logger = lc.Logger(app_name='_retrieve_company_ds')

    ds = await dmds.DomoDataset.get_from_id(auth=config_auth,
                                            dataset_id=dataset_id, debug_api=debug_api)

    message = f"⚙️ START - Retrieving company list \n{ds.display_url()} using \n{sql}"
    
    if debug_prn:
        print(message)

    logger.log_info(message, debug_log = debug_log)

    df = await ds.query_dataset_private(auth=config_auth,
                                        dataset_id=dataset_id,
                                        sql=sql,
                                        debug_api=debug_api)
    if len(df.index) == 0:
        raise NoConfigCompanyError(
            sql, domo_instance=config_auth.domo_instance)
    
    self.config = df
    
    message = f"\n⚙️ SUCCESS 🎉 Retrieved company list \nThere are {len(df.index)} companies to update"
    
    if debug_prn:
        print(message)
    logger.log_info(message, debug_log=debug_log)

    return df


In [None]:
import os

config_auth = dmda.DomoTokenAuth(domo_instance = 'domo-dojo', domo_access_token=os.environ['DOMO_DOJO_ACCESS_TOKEN'])
logger = lc.Logger( app_name='test_retrieve_company')

gic = GetInstanceConfig()

await gic._retrieve_company_ds(config_auth = config_auth,
                               dataset_id = '8d2a8055-7918-4039-b67d-361647e96ea8',
                               sql = 'SELECT domain from Table',
                               debug_prn = True,
                               debug_log = False,
                               debug_api = False,
                               logger= logger
                            )

pd.DataFrame(gic.config)

# pd.DataFrame(logger.logs)

⚙️ START - Retrieving company list 
https://domo-dojo.domo.com/datasources/8d2a8055-7918-4039-b67d-361647e96ea8/details/overview using 
SELECT domain from Table

⚙️ SUCCESS 🎉 Retrieved company list 
There are 1 companies to update


Unnamed: 0,domain
0,domo-dojo


In [None]:
@patch_to(GetInstanceConfig, cls_method=True)
async def get_domains_with_global_config_auth(cls: GetInstanceConfig,
                                              config_auth: dmda.DomoAuth,
                                              dataset_id: str,
                                              global_auth: dmda.DomoAuth,
                                              exception_auth: dmda.DomoAuth,
                                              sql: str = "select domain from table",
                                              debug_api: bool = False,
                                              debug_log: bool = False,
                                              debug_prn: bool = False,
                                              logger: lc.Logger = None) -> pd.DataFrame:
    if not logger:
        logger = lc.Logger(app_name='get_domains_with_global_config_auth')

    gic = cls()

    df = gic._retrieve_company_ds(config_auth=config_auth,
                                       dataset_id=dataset_id,
                                       sql=sql,
                                       debug_prn=debug_prn,
                                       debug_log=debug_log,
                                       debug_api=debug_api,
                                       logger=logger)

    for index, instance in df.iterrows():
        creds = global_auth

        if instance['config_exception_pw'] == 1:
            creds = exception_auth

        creds.domo_instance=instance['domo_instance']

        try:
            await creds.get_auth_token()
            df.at[index, 'is_valid'] = 1

        except dmda.InvalidCredentialsError as e:
            if debug_prn:
                print(e)
            
            logger.log_error(str(e))
            df.at[index, 'is_valid'] = 0
        
        finally:
            df.at[index, 'instance_auth'] = creds

    return df


In [None]:
import os

config_auth = dmda.DomoTokenAuth(
    domo_instance='domo-dojo', domo_access_token=os.environ['DOMO_DOJO_ACCESS_TOKEN'])

global_auth = dmda.DomoTokenAuth(
    domo_instance='domo-global', domo_access_token=os.environ['DOMO_DOJO_ACCESS_TOKEN'])

exception_auth = dmda.DomoTokenAuth(
    domo_instance='domo-global', domo_access_token=os.environ['DOMO_DOJO_ACCESS_TOKEN'])


logger = lc.Logger(app_name='test_retrieve_company')

await GetInstanceConfig.get_domains_with_global_config_auth(config_auth=config_auth,
                               dataset_id='8d2a8055-7918-4039-b67d-361647e96ea8',
                               sql='SELECT domain from Table',
                               debug_prn=True,
                               debug_log=False,
                               debug_api=False,
                               logger=logger
                               )

TypeError: GetInstanceConfig.get_domains_with_global_config_auth() missing 2 required positional arguments: 'global_auth' and 'exception_auth'

In [None]:
#| export
async def get_domains_with_instance_auth(config_auth: dmda.DomoFullAuth,
                                         dataset_id: str,
                                         pa_auth,
                                         pa_test_auth,
                                         other_auth,
                                         other_test_auth,
                                         sql: str = "select domain from table",
                                         debug: bool = False,
                                         logger: lc.Logger = None
                                         ) -> pd.DataFrame:


    ds = await dmds.DomoDataset.get_from_id(full_auth=config_auth,
                                            id=dataset_id, debug=debug)

    print(f"⚙️ START - Retrieving company list \n{ds.display_url()}")
    print(f"⚙️ SQL = {sql}")
    if logger : 
        logger.log_info (f"⚙️ START - Retrieving company list \n{ds.display_url()}")

    df = await ds.query_dataset_private(full_auth=config_auth,
                                        dataset_id=dataset_id,
                                        sql=sql,
                                        debug=debug)
    if len(df.index) == 0:
        raise NoConfigCompanyError(sql)


    print(
        f"\n⚙️ SUCCESS 🎉 Retrieved company list \nThere are {len(df.index)} companies to update")
    if logger : 
        logger.log_info (f"\n⚙️ SUCCESS 🎉 Retrieved company list \nThere are {len(df.index)} companies to update")

    for index, instance in df.iterrows():
        try:
            creds = other_auth

            if instance['project'] == 'pa' and instance['config_useprod'] == 1:
                creds = pa_auth
            elif instance['project'] == 'pa' and instance['config_useprod'] == 0:
                creds = pa_test_auth
            elif instance['project'] == 'other' and instance['config_useprod'] == 0:
                creds = other_test_auth

            full_auth = dmda.DomoFullAuth(domo_instance=instance['domo_instance'],
                                      domo_username=creds.domo_username,
                                      domo_password=creds.domo_password,
                                      token_name='instance'
                                      )
            try:
                await full_auth.get_auth_token(debug=False)

            except dmda.InvalidCredentialsError as e:
                print(e)
                if logger : 
                    logger.log_error(f"Error with Invalid Credentials {instance} instance. Exception : {e}")
        
            df.at[index, 'instance_auth'] = full_auth
            is_valid = 1 if (full_auth.token) else 0
            df.at[index, 'is_valid'] = is_valid
            if logger : 
                logger.log_info (f"{instance} has {is_valid} valid token")

        
        except Exception as e:
            print(e)
            if logger : 
                logger.log_error(f"Error with {instance} instance. Exception : {e}")

    return df