In [1]:
DATAFLOW_ID = 12

In [None]:
import domojupyter as dj
import domolibrary_execution.utils.domojupyter as dxdj

async def generate_auth():
    return await dxdj.generate_domo_auth(
        domo_instance = dxdj.which_environment(),
        domojupyter_fn = dj) 

auth = await generate_auth()

sdk_domo-community - no password stored in account


In [None]:
import domolibrary.client.DomoError as dmde
import domolibrary.client.DomoAuth as dmda

import routes.dataflow as dataflow_routes
import models.dataflow_model as dataflow_model
import pandas as pd

def get_dataflow_descriptive_data(dataflow_id : int ,
                                  auth : dmda.DomoAuth,
                                  debug_api: bool = False,
                                 return_raw: bool = False):
    
    """extract data from the dataflow definition that might be useful in producing a description of the dataflow"""
    
    res = dataflow_routes.get_dataflow_by_id_sync(dataflow_id = dataflow_id,
                                                  auth = auth,
                                                  debug_api = debug_api)
    
    if return_raw:
        return res
    
    if res.response.get('description') and "autogen" in res.response.get('description') :
        raise dmde.DomoError(message = f"dataflow_id {dataflow_id} description was already autogenerated")
    
    return dataflow_model.llm_dataflow_process_definition(res)

try:
    descriptive_data = get_dataflow_descriptive_data(DATAFLOW_ID , auth = auth, return_raw = False)
    print(descriptive_data)
except dmde.DomoError as e:
    print(e)


In [None]:
import models.model as model
import datetime as dt

def generate_new_description(description : str) -> str:
    return f'{description}\n autogen via llm - {dt.datetime.now().strftime("%Y-%m-%d %H:%M")}'

def llm_describe_dataflow(descriptive_data, return_raw: bool = False, debug_api : bool = False):
    endpoint = model.EndpointHandler._from_creds_account(domo_instance = 'domo-community')
    
    messages = dataflow_model.generate_llm_messages()
    
    messages = endpoint.invoke_message(data = descriptive_data,
                                       debug_api = debug_api,
                                       messages = messages
                                      )
    
    if return_raw:
        return messages

    return generate_new_description( messages.messages[-1].content)

dataflow_description = llm_describe_dataflow(descriptive_data = descriptive_data)

dataflow_description

In [None]:
def update_dataflow(dataflow_id, 
                    auth : dmda.DomoAuth,
                    new_description : str , 
                    debug_api : bool = False):
    
    res = dataflow_routes.get_dataflow_by_id_sync(dataflow_id = dataflow_id, auth = auth, debug_api = debug_api)
    
    update_dataflow_body = dataflow_routes.generate_update_dataflow_body(obj = res.response, description = new_description)

    return dataflow_routes.update_dataflow_by_id_sync(dataflow_id = dataflow_id, 
                                               auth = auth,
                                               dataflow_body = update_dataflow_body ,
                                               debug_api = debug_api)

update_dataflow(
    dataflow_id = DATAFLOW_ID,
    auth = auth,
    new_description = dataflow_description, 
    debug_api = True
)


In [None]:
def process_dataflow(dataflow_id,
                     auth : dmda.DomoAuth,
                     debug_prn: bool = False,
                     debug_api: bool = False, **kwargs):
    if debug_prn:
        index = kwargs.get('index')
        outOf = kwargs.get('outOf')
        end = None
        
        if index and outOf:
            end = f" {index} out of {outOf}"
        
        print(f'starting {dataflow_id}{end or ""}')
    
    try:
        dataflow_descriptive_data = get_dataflow_descriptive_data(dataflow_id , auth = auth, debug_api=debug_api)
         
        dataflow_description = llm_describe_dataflow(descriptive_data = descriptive_data, debug_api = debug_api)

        return update_dataflow(dataflow_id, 
                        auth = auth, 
                        new_description = dataflow_description,
                        debug_api = debug_api)
    
    except dmde.DomoError as e:
        print(f"{e} - on dataflow - {dataflow_id}")


process_dataflow(12, auth = auth)

In [None]:
import routes.datacenter as datacenter_routes
import importlib
importlib.reload(dmdc)

def main(auth, debug_api: bool = False, debug_prn: bool = False):
    
    query_body = {"entities":["DATAFLOW"],"filters":[
        {"filterType":"term","field":"owned_by_id","value":"1893952720:USER","name":"Jae Wilson1","not":False}],
                  "combineResults":True,"query":"*","count":30,"offset":0,
    }

    res = datacenter_routes.search_datacenter_sync(
        auth = auth,
        query_body= query_body,
        debug_api = debug_api
    )

    for index, obj in enumerate(res.response['searchObjects']):
        process_dataflow(dataflow_id = obj['databaseId'],
                         auth = auth,
                         index = index,
                         outOf = len(res.response['searchObjects']),
                         debug_prn = debug_prn
                        )
        

main(auth = auth, debug_prn = True, debug_api = False)