In [None]:
# pip install --upgrade domolibrary

In [None]:
import domolibrary

domolibrary.__version__

'4.0.1'

# retrieve Authentication credentials
Don't store your password on the internet!!!

In [None]:
import os

domo_username = os.environ.get("DOMO_USERNAME")
domo_instance = "domo-community"
domo_password = os.environ.get("DOJO_PASSWORD")

TEST_DATAFLOW_ID = 108

In [None]:
import domolibrary.client.DomoAuth as dmda

auth = dmda.DomoFullAuth(
    domo_username=domo_username,
    domo_password=domo_password,
    domo_instance=domo_instance,
)

await auth.print_is_token()

assert auth.token

🎉 full_auth token retrieved from domo-community ⚙️


# Retrieve Data
- How do you handle looping??
- How do you handle 'fast' code execution? (asynchronous code execution)

* Will one request be enough?
    * get all dataflow
    * for each dataflow retrieve the correct information"

In [None]:
import domolibrary.classes.DomoDatacenter as dmdc
from typing import List


async def get_dataflow_ids(auth: dmda.DomoAuth) -> List[int]:
    """searches domo datacenter and returns a list of dataflow_ids"""

    domo_datacenter = dmdc.DomoDatacenter(auth=auth)

    dataflows_ls = await domo_datacenter.search_datacenter(
        auth=auth,
        entity_type="DATAFLOW",
        additional_filters_ls=[
            {
                "filterType": "term",
                "field": "data_flow_type",
                "value": "MAGIC",
                "name": "Magic ETL v2",
                "not": False,
            }
        ],
    )

    return [dataflow_obj["databaseId"] for dataflow_obj in dataflows_ls]


dataflow_ids = await get_dataflow_ids(auth)
dataflow_ids[0:5]

['340', '131', '227', '37', '185']

In [None]:
import domolibrary.classes.DomoDataflow as dmdf
import domolibrary.utils.chunk_execution as ce
from typing import List


async def get_dataflows(dataflow_ids_ls: List[int]) -> List[dmdf.DomoDataflow]:
    """retrieves dataflow metadata from a list of dataflows"""

    return await ce.gather_with_concurrency(
        *[
            dmdf.DomoDataflow.get_from_id(dataflow_id=dataflow_id, auth=auth)
            for dataflow_id in dataflow_ids_ls
        ],
        n=20
    )


domo_dataflow_ls = await get_dataflows([TEST_DATAFLOW_ID])
domo_dataflow = domo_dataflow_ls[0]
domo_dataflow

DomoDataflow(id=108, name='Datasets_lineage', owner=None, description=None, tags=None, actions=[DomoDataflow_Action(id='dce3487e-6a2d-49a2-afd4-1f7867cb3b95', type='LoadFromVault', name='Governance_datasets', datasource_id='42917df1-fa58-483f-a290-5fe95ccda4ed', sql=None, depends_on=None, parent_actions=None), DomoDataflow_Action(id='df709f9b-0e6f-4f69-8b66-aa7411028db8', type='LoadFromVault', name='Governance_dataflow_details', datasource_id='260a54f7-9636-4eac-9a24-39e56d9b2f5d', sql=None, depends_on=None, parent_actions=None), DomoDataflow_Action(id='3b3396dd-a673-4211-868f-abbbcc4ce0a3', type='SelectValues', name='Datasets', datasource_id=None, sql=None, depends_on=['dce3487e-6a2d-49a2-afd4-1f7867cb3b95'], parent_actions=[DomoDataflow_Action(id='dce3487e-6a2d-49a2-afd4-1f7867cb3b95', type='LoadFromVault', name='Governance_datasets', datasource_id='42917df1-fa58-483f-a290-5fe95ccda4ed', sql=None, depends_on=None, parent_actions=None)]), DomoDataflow_Action(id='7e839c17-3e1a-4938-86d

In [None]:
import pandas as pd


async def generate_version_action_pdf(domo_dataflow: dmdf.DomoDataflow) -> pd.DataFrame:
    """retrieves dataflow definition version history of a dataflow"""

    dataflow_versions = await domo_dataflow.get_versions()

    if not domo_dataflow.versions:
        return pd.DataFrame()

    df = pd.DataFrame(
        [
            {
                "dataflow_id": dataflow_version.id,
                "dataflow_version": dataflow_version.version_id,
                "dataflow_name": dataflow_version.name,
                **domo_action.__dict__,
            }
            for dataflow_version in dataflow_versions
            for domo_action in dataflow_version.actions
        ]
    )

    df.drop(columns=["parent_actions", "datasource_id", "sql"], inplace=True)
    df.rename(columns={"id": "tile_id", "type": "tile_type"}, inplace=True)

    return df


await generate_version_action_pdf(domo_dataflow)

Unnamed: 0,dataflow_id,dataflow_version,dataflow_name,tile_id,tile_type,name,depends_on
0,108,448,Datasets_lineage,dce3487e-6a2d-49a2-afd4-1f7867cb3b95,LoadFromVault,Governance_datasets,
1,108,448,Datasets_lineage,df709f9b-0e6f-4f69-8b66-aa7411028db8,LoadFromVault,Governance_dataflow_details,
2,108,448,Datasets_lineage,3b3396dd-a673-4211-868f-abbbcc4ce0a3,SelectValues,Datasets,[dce3487e-6a2d-49a2-afd4-1f7867cb3b95]
3,108,448,Datasets_lineage,7e839c17-3e1a-4938-86d4-1b19d61c3662,SelectValues,Dataflow Details,[df709f9b-0e6f-4f69-8b66-aa7411028db8]
4,108,448,Datasets_lineage,b285a26d-cc81-4ac9-bfc5-68effba45241,Filter,Filter Rows,[df709f9b-0e6f-4f69-8b66-aa7411028db8]
5,108,448,Datasets_lineage,741d4ee4-32e9-47ca-8a2d-98f075d17518,SelectValues,Select Columns,[b285a26d-cc81-4ac9-bfc5-68effba45241]
6,108,448,Datasets_lineage,b1ec014b-66d8-4518-84fc-7c6579be9749,ExpressionEvaluator,Add Formula,[741d4ee4-32e9-47ca-8a2d-98f075d17518]
7,108,448,Datasets_lineage,2333341b-3a52-4b43-b72d-4ace763a9d1f,UnionAll,Dataflow Details With datafusion,"[7e839c17-3e1a-4938-86d4-1b19d61c3662, 0d4e85e..."
8,108,448,Datasets_lineage,33a5f316-cb1e-4c2c-bf10-911396a5ae97,PythonEngineAction,Python Script,"[3b3396dd-a673-4211-868f-abbbcc4ce0a3, 2333341..."
9,108,448,Datasets_lineage,dffae0ba-4e21-457f-b190-9af841e5f771,PublishToVault,Datasets_lineage,[33a5f316-cb1e-4c2c-bf10-911396a5ae97]


In [None]:
async def generate_action_stats_df(
    domo_dataflow: dmdf.DomoDataflow,
    maximum_history: int = 10,  # number of history to look back
) -> pd.DataFrame:
    """returns execution history by tile"""

    await domo_dataflow.history.get_execution_history(maximum=maximum_history)

    if (
        not domo_dataflow.history
        or not domo_dataflow.history.execution_history
        or len(domo_dataflow.history.execution_history) == 0
    ):
        print(
            f"⚠️ dataflow {domo_dataflow.id} - {domo_dataflow.name} has never been executed"
        )
        return pd.DataFrame()

    history_ls = [
        domo_history
        for domo_history in domo_dataflow.history.execution_history
        if domo_history.action_results and len(domo_history.action_results) > 0
    ]

    df = pd.DataFrame(
        [
            {
                "dataflow_execution_id": domo_history.dataflow_execution_id,
                "dataflow_version": domo_history.dataflow_version,
                "dataflow_id": domo_history.dataflow_id,
                "dataflow_begin_time": domo_history.begin_time,
                **domo_action.__dict__,
            }
            for domo_history in history_ls
            for domo_action in domo_history.action_results
        ]
    )

    df.drop(columns=["name"], inplace=True)

    df.rename(columns={"id": "tile_id", "type": "tile_type"}, inplace=True)

    return df


stats_df = await generate_action_stats_df(domo_dataflow)
stats_df[0:5]

Unnamed: 0,dataflow_execution_id,dataflow_version,dataflow_id,dataflow_begin_time,tile_id,tile_type,is_success,rows_processed,begin_time,end_time,duration_in_sec
0,133cb2f6-caf7-4ef0-8535-4f5effdf121e,448,108,2022-11-04 10:06:26,dce3487e-6a2d-49a2-afd4-1f7867cb3b95,DataHubManifestLoaderAction,True,576,2022-11-04 10:06:31.425,2022-11-04 10:06:32.316,0.891
1,133cb2f6-caf7-4ef0-8535-4f5effdf121e,448,108,2022-11-04 10:06:26,df709f9b-0e6f-4f69-8b66-aa7411028db8,DataHubManifestLoaderAction,True,367,2022-11-04 10:06:31.425,2022-11-04 10:06:32.314,0.889
2,133cb2f6-caf7-4ef0-8535-4f5effdf121e,448,108,2022-11-04 10:06:26,3b3396dd-a673-4211-868f-abbbcc4ce0a3,SelectValues,True,576,2022-11-04 10:06:32.278,2022-11-04 10:06:32.316,0.038
3,133cb2f6-caf7-4ef0-8535-4f5effdf121e,448,108,2022-11-04 10:06:26,7e839c17-3e1a-4938-86d4-1b19d61c3662,SelectValues,True,367,2022-11-04 10:06:32.276,2022-11-04 10:06:32.277,0.001
4,133cb2f6-caf7-4ef0-8535-4f5effdf121e,448,108,2022-11-04 10:06:26,b285a26d-cc81-4ac9-bfc5-68effba45241,Filter,True,367,2022-11-04 10:06:32.278,2022-11-04 10:06:32.280,0.002


In [None]:
async def process_instance(auth):
    dataflow_ids = await get_dataflow_ids(auth)

    domo_dataflows = await get_dataflows(dataflow_ids)

    actions_df_ls = await ce.gather_with_concurrency(
        *[
            generate_version_action_pdf(domo_dataflow)
            for domo_dataflow in domo_dataflows
        ],
        n=20
    )

    stats_df_ls = await ce.gather_with_concurrency(
        *[generate_action_stats_df(domo_dataflow) for domo_dataflow in domo_dataflows],
        n=20
    )

    actions_df = pd.concat(actions_df_ls)

    stats_df = pd.concat(stats_df_ls)

    # import domojupyter as dj
    # dj.write_dataframe(actions_df, 'MONIT_Dataflow_Tiles')
    # dj.write_dataframe(stats_df, 'MONIT_Dataflow_Stats')

    return actions_df, stats_df


base_actions_df, base_stats_df = await process_instance(auth)

⚠️ dataflow 69 - Copy of gov_datasets_INT has never been executed
⚠️ dataflow 27 - JW_Simple Sample Set_Rolling Averages has never been executed
⚠️ dataflow 85 - Working Hours Test has never been executed
⚠️ dataflow 322 - Forecast Sales - HV has never been executed
⚠️ dataflow 83 - Working Hours Example has never been executed
⚠️ dataflow 323 - New ETL Transform has never been executed


In [None]:
def generate_datasets(actions_df, stats_df, dataflow_id=None, execution_id=None):

    ### filter and configure actions_df
    actions_df = actions_df.copy()
    if dataflow_id:
        actions_df = actions_df[actions_df["dataflow_id"] == dataflow_id]
    actions_df.rename(columns={"name": "tile_name"}, inplace=True)

    #### filter and configure stats_df
    stats_df = stats_df.copy()
    if execution_id:
        stats_df = stats_df[stats_df["dataflow_execution_id"] == execution_id]
    if dataflow_id:
        stats_df = stats_df[stats_df["dataflow_id"] == dataflow_id]

    stats_df = pd.merge(
        stats_df,
        actions_df[
            ["dataflow_id", "dataflow_name", "dataflow_version", "tile_id", "tile_name"]
        ],
        how="inner",
    )

    #### handle generate facts_df with one row per action and parent
    explode_df = actions_df[
        ["dataflow_id", "dataflow_version", "tile_id", "depends_on"]
    ].explode("depends_on")

    facts_df = stats_df[
        [
            "dataflow_name",
            "dataflow_execution_id",
            "dataflow_version",
            "dataflow_id",
            "tile_id",
            "tile_name",
            "tile_type",
            "rows_processed",
            "dataflow_begin_time",
            "begin_time",
            "end_time",
            "duration_in_sec",
        ]
    ]

    parent_df = facts_df[
        [
            "dataflow_execution_id",
            "dataflow_version",
            "dataflow_id",
            "tile_id",
            "tile_name",
            "tile_type",
            "rows_processed",
            "end_time",
        ]
    ].rename(
        columns={
            "tile_id": "depends_on",
            "tile_name": "parent_tile_name",
            "tile_type": "parent_tile_type",
            "rows_processed": "parent_rows_processed",
            "end_time": "parent_end_time",
        }
    )

    facts_by_parent_df = pd.merge(
        facts_df,
        explode_df,
        how="inner",
        on=["dataflow_id", "tile_id", "dataflow_version"],
    )
    facts_by_parent_df = pd.merge(
        facts_by_parent_df,
        parent_df,
        how="left",
        on=["dataflow_id", "dataflow_version", "dataflow_execution_id", "depends_on"],
    ).rename(columns={"depends_on": "parent_tile_id"})

    facts_by_parent_df["parent_end_time"] = facts_by_parent_df[
        "parent_end_time"
    ].fillna(facts_by_parent_df["begin_time"])

    facts_by_parent_df["actual_duration_in_sec"] = (
        facts_by_parent_df["end_time"] - facts_by_parent_df["parent_end_time"]
    ).dt.total_seconds()
    facts_by_parent_df["tile_delay_rank"] = facts_by_parent_df.groupby(
        ["dataflow_execution_id", "dataflow_version", "dataflow_id", "tile_id"]
    )["actual_duration_in_sec"].rank(ascending=False)

    return facts_by_parent_df.sort_values(
        by=["dataflow_id", "begin_time", "parent_tile_id"], ascending=True
    )


generate_datasets(
    base_actions_df,
    base_stats_df,
    TEST_DATAFLOW_ID,
    execution_id="1e331f2b-1db8-460d-9860-334aedc88e93",
)

Unnamed: 0,dataflow_name,dataflow_execution_id,dataflow_version,dataflow_id,tile_id,tile_name,tile_type,rows_processed,dataflow_begin_time,begin_time,end_time,duration_in_sec,parent_tile_id,parent_tile_name,parent_tile_type,parent_rows_processed,parent_end_time,actual_duration_in_sec,tile_delay_rank
0,Datasets_lineage,1e331f2b-1db8-460d-9860-334aedc88e93,441,108,dce3487e-6a2d-49a2-afd4-1f7867cb3b95,Governance_datasets,DataHubManifestLoaderAction,570.0,2022-11-03 09:33:00,2022-11-03 09:33:05.717,2022-11-03 09:33:06.614,0.897,,,,,2022-11-03 09:33:05.717,0.897,1.0
1,Datasets_lineage,1e331f2b-1db8-460d-9860-334aedc88e93,441,108,df709f9b-0e6f-4f69-8b66-aa7411028db8,Governance_dataflow_details,DataHubManifestLoaderAction,361.0,2022-11-03 09:33:00,2022-11-03 09:33:05.717,2022-11-03 09:33:06.656,0.939,,,,,2022-11-03 09:33:05.717,0.939,1.0
2,Datasets_lineage,1e331f2b-1db8-460d-9860-334aedc88e93,441,108,3b3396dd-a673-4211-868f-abbbcc4ce0a3,Datasets,SelectValues,570.0,2022-11-03 09:33:00,2022-11-03 09:33:06.609,2022-11-03 09:33:06.614,0.005,dce3487e-6a2d-49a2-afd4-1f7867cb3b95,Governance_datasets,DataHubManifestLoaderAction,570.0,2022-11-03 09:33:06.614,0.0,1.0
4,Datasets_lineage,1e331f2b-1db8-460d-9860-334aedc88e93,441,108,33a5f316-cb1e-4c2c-bf10-911396a5ae97,Python Script,PythonEngineAction,931.0,2022-11-03 09:33:00,2022-11-03 09:33:06.610,2022-11-03 09:33:15.408,8.798,3b3396dd-a673-4211-868f-abbbcc4ce0a3,Datasets,SelectValues,570.0,2022-11-03 09:33:06.614,8.794,1.0
5,Datasets_lineage,1e331f2b-1db8-460d-9860-334aedc88e93,441,108,33a5f316-cb1e-4c2c-bf10-911396a5ae97,Python Script,PythonEngineAction,931.0,2022-11-03 09:33:00,2022-11-03 09:33:06.610,2022-11-03 09:33:15.408,8.798,7e839c17-3e1a-4938-86d4-1b19d61c3662,Dataflow Details,SelectValues,361.0,2022-11-03 09:33:06.656,8.752,2.0
3,Datasets_lineage,1e331f2b-1db8-460d-9860-334aedc88e93,441,108,7e839c17-3e1a-4938-86d4-1b19d61c3662,Dataflow Details,SelectValues,361.0,2022-11-03 09:33:00,2022-11-03 09:33:06.656,2022-11-03 09:33:06.656,0.0,df709f9b-0e6f-4f69-8b66-aa7411028db8,Governance_dataflow_details,DataHubManifestLoaderAction,361.0,2022-11-03 09:33:06.656,0.0,1.0
6,Datasets_lineage,1e331f2b-1db8-460d-9860-334aedc88e93,441,108,dffae0ba-4e21-457f-b190-9af841e5f771,Datasets_lineage,PublishToVault,1391.0,2022-11-03 09:33:00,2022-11-03 09:33:15.348,2022-11-03 09:33:15.490,0.142,33a5f316-cb1e-4c2c-bf10-911396a5ae97,Python Script,PythonEngineAction,931.0,2022-11-03 09:33:15.408,0.082,1.0


In [None]:
generate_datasets(base_actions_df, base_stats_df, None, None)

# dj.write_dataframe(exploded_df , 'MONIT_Dataflow_Tiles_Exploded')

Unnamed: 0,dataflow_name,dataflow_execution_id,dataflow_version,dataflow_id,tile_id,tile_name,tile_type,rows_processed,dataflow_begin_time,begin_time,end_time,duration_in_sec,parent_tile_id,parent_tile_name,parent_tile_type,parent_rows_processed,parent_end_time,actual_duration_in_sec,tile_delay_rank
11025,MetaData_Stage 1,8f77dfc7-b2fb-4f3c-9d24-1d94e0e32424,19,8,d312578c-e9cb-4e3a-9dc6-662c00c93ba0,Governance_Users and Groups,DataHubManifestLoaderAction,35.0,2020-07-09 16:00:07,2020-07-09 16:00:17.000,2020-07-09 16:00:21.000,4.000,,,,,2020-07-09 16:00:17.000,4.000,1.0
11026,MetaData_Stage 1,8f77dfc7-b2fb-4f3c-9d24-1d94e0e32424,19,8,d316ddf3-9917-4542-bf68-e1cd8397d894,Governance_Groups,DataHubManifestLoaderAction,3.0,2020-07-09 16:00:07,2020-07-09 16:00:17.000,2020-07-09 16:00:21.000,4.000,,,,,2020-07-09 16:00:17.000,4.000,1.0
11028,MetaData_Stage 1,8f77dfc7-b2fb-4f3c-9d24-1d94e0e32424,19,8,eebdf394-fc6d-40c0-895e-b325044db506,MetaData_Groups,PublishToVault,3.0,2020-07-09 16:00:07,2020-07-09 16:00:21.000,2020-07-09 16:00:22.000,1.000,0fe45cc8-7c88-48cb-a437-4656954767a6,PDP?,Constant,3.0,2020-07-09 16:00:21.000,1.000,1.0
11027,MetaData_Stage 1,8f77dfc7-b2fb-4f3c-9d24-1d94e0e32424,19,8,e048f809-08ae-49c2-84be-5b7b76a99944,MetaData_Users and Groups,PublishToVault,35.0,2020-07-09 16:00:07,2020-07-09 16:00:21.000,2020-07-09 16:00:22.000,1.000,8e52e4dc-fd3a-455b-9ec1-c2a404d342d0,PDP? 1,Constant,35.0,2020-07-09 16:00:21.000,1.000,1.0
11024,MetaData_Stage 1,8f77dfc7-b2fb-4f3c-9d24-1d94e0e32424,19,8,8e52e4dc-fd3a-455b-9ec1-c2a404d342d0,PDP? 1,Constant,35.0,2020-07-09 16:00:07,2020-07-09 16:00:21.000,2020-07-09 16:00:21.000,0.000,d312578c-e9cb-4e3a-9dc6-662c00c93ba0,Governance_Users and Groups,DataHubManifestLoaderAction,35.0,2020-07-09 16:00:21.000,0.000,1.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
11750,DEV | Swish Central,69207fcb-bc5b-4792-b82d-81a8eb05bc70,1062,357,f2a5bf78-3757-4be3-b555-a354df56ce53,TEST | Swish Central,PublishToVault,40756.0,2024-02-06 16:35:04,2024-02-06 16:35:20.967,2024-02-06 16:35:24.767,3.800,47257e8e-8a34-41cf-a438-3c034eeb4f87,Select Columns 1,SelectValues,40756.0,2024-02-06 16:35:24.066,0.701,1.0
11747,DEV | Swish Central,69207fcb-bc5b-4792-b82d-81a8eb05bc70,1062,357,d6fb5455-f89b-4c83-8714-89fd084db9bb,Remove Duplicates,Unique,38111.0,2024-02-06 16:35:04,2024-02-06 16:35:21.167,2024-02-06 16:35:24.065,2.898,2a867c0e-8a3d-49db-81fa-d1ab6668c82c,Filter Rows,Filter,40756.0,2024-02-06 16:35:24.065,0.000,1.0
11746,DEV | Swish Central,69207fcb-bc5b-4792-b82d-81a8eb05bc70,1062,357,2a867c0e-8a3d-49db-81fa-d1ab6668c82c,Filter Rows,Filter,40756.0,2024-02-06 16:35:04,2024-02-06 16:35:21.167,2024-02-06 16:35:24.065,2.898,47257e8e-8a34-41cf-a438-3c034eeb4f87,Select Columns 1,SelectValues,40756.0,2024-02-06 16:35:24.066,-0.001,1.0
11749,DEV | Swish Central,69207fcb-bc5b-4792-b82d-81a8eb05bc70,1062,357,1da7fa87-0d1b-434d-b018-cb80a5f3ce5a,DEV | Swish Central,PublishToVault,37924.0,2024-02-06 16:35:04,2024-02-06 16:35:21.168,2024-02-06 16:35:24.734,3.566,24bc7117-ac36-4e5c-aee5-582d8f954cec,Add Formula 4,ExpressionEvaluator,37924.0,2024-02-06 16:35:24.066,0.668,1.0
