In [1]:
# misc functions scripts

def print_log(message: str, function: str, where: str, type_message: str):
    """
    Print a message with specific format

    Args:
        message (str): string that need be printed
        function (str): string that indicates the origin function from the message
        where (str): string that indicates the origin place/script from the message
        type_message (str): _specify the type of message
    """
    match type_message:
        case "log":
            print(f"-- LOG -- {message} -- {function} -- {where}" )
        case "error":
            print(f"-- ERROR -- {message} -- {function} -- {where}" )

In [2]:
# Datasbase component

# import modules
import utils.dfutils as df_tools
import utils.dbutils as db_tools
import pandas as pd


# defining a place for log message
where = "load_dabase_components.py"


def load_save_insert_with_keys(df: pd.DataFrame, config: dict, **kargs) -> pd.DataFrame:
    """
    Funtion designed for load data into dataframe. 
    In this case the data that has been previously saved in the data base table,
    but that matches the data contained in the 'df' dataframe (taking into account
    the columns that define the record as the only 'delete_keys') will be deleted
    to replace it with the updated data coming from df, the rest of the data from
    'df' will be added to the table in the database

    Args:
        df (pd.DataFrame): Dataframe with data that will be stored
        config (dict): Dictionary with information that it's required for execute the function

    Raises:
        Exception: Raised when occurring a error while execute this function

    Returns:
        pd.DataFrame: dataframe with data that was stored
        cross_component (dict): Dictionary with information that it's required for communicate components
    """
    # defining a function for log message
    function = "load_save_insert_with_keys"

    # defining **kargs required as expliticty variables
    between_dates = kargs["between_dates"]
    cross_parameters = kargs["cross_parameters"]

    try:
        df = df_tools.fill_dataframe_nulls(df, "")
        print_log("Opening connection to database", function, where, "log")
        conn = db_tools.open_connection_with_scripting_account()
        cursor = conn.cursor() 
        cursor.fast_executemany = True
        print_log("Loading data into database", function, where, "log")
        db_tools.perform_safe_delete_insert_with_keys(
                conn = conn, 
                delete_keys = config["delete_keys"], 
                source_df = df, 
                schema = config["schema"], 
                target_table_name = config["table"]
        )
    except Exception as e:
        raise Exception(f"ERROR: {e}") 
    
    return df, between_dates, cross_parameters 


def load_save_insert_with_keys_by_sheets(df: list, config: dict, **kargs) -> pd.DataFrame:
    """
    Funtion designed for load data into dataframe. 
    In this case the data that has been previously saved in the data base table,
    but that matches the data contained in the 'df' dataframe (taking into account
    the columns that define the record as the only 'delete_keys') will be deleted
    to replace it with the updated data coming from df, the rest of the data from
    'df' will be added to the table in the database

    Args:
        df (list): List of Dataframe with data that will be stored
        config (dict): Dictionary with information that it's required for execute the function

    Raises:
        Exception: Raised when occurring a error while execute this function

    Returns:
        pd.DataFrame: dataframe with data that was stored
        cross_component (dict): Dictionary with information that it's required for communicate components
    """
    # defining a function for log message
    function = "load_save_insert_with_keys_by_sheets"

    # defining **kargs required as expliticty variables
    between_dates = kargs["between_dates"]
    cross_parameters = kargs["cross_parameters"]
    
    for sheet in config.keys():
        try:
            df[sheet] = df_tools.fill_dataframe_nulls(df[sheet], "")
            print_log("Opening connection to database", function, where, "log")
            conn = db_tools.open_connection_with_scripting_account()
            cursor = conn.cursor() 
            cursor.fast_executemany = True
            print_log(f"Loading {sheet} data into database", function, where, "log")
            db_tools.perform_safe_delete_insert_with_keys(
                    conn = conn, 
                    delete_keys = config[sheet]["delete_keys"], 
                    source_df = df[sheet], 
                    schema = config[sheet]["schema"], 
                    target_table_name = config[sheet]["table"]
            )
        except Exception as e:
            raise Exception(f"ERROR: {e}")
    
    return df, between_dates, cross_parameters



def download_data_from_database(df: pd.DataFrame, config: dict, **kargs) -> pd.DataFrame:
    """
    Downloads data from database and loads it into a Pandas dataframe.

    Args:
        df (pd.DataFrame): Dataframe with data that will be stored
        config (dict): Dictionary with information that it's required for execute the function

    Raises:
        Exception: Raised when occurring a error while execute this function

    Returns:
        pd.DataFrame: dataframe with data that was stored
        cross_component (dict): Dictionary with information that it's required for communicate components
    """
    # defining a function for log message
    function = "download_data_from_database"

    # defining **kargs required as expliticty variables
    between_dates = kargs["between_dates"]
    cross_parameters = kargs["cross_parameters"]

    try:
        print_log("Opening connection to database", function, where, "log")
        conn = db_tools.open_connection_with_scripting_account()
        print_log("Downloading data from database", function, where, "log")
        df = db_tools.download_from_database(
            conn = conn, 
            params =  config["params"],
            query = config["query"]
        )
        df = df.dropna()
    except Exception as e:
        raise Exception(f"ERROR: {e}") 

    return df, between_dates, cross_parameters

In [3]:
# pipeline function scripts

# import packages
import pandas as pd

# defining a place for log message
where = "pipeline.py"

def general_pipeline(config: dict, between_dates: dict):
    # defining a function for log message
    function = "general_pipeline"

    dfs_df = pd.DataFrame
    cross_parameters = dict()
    for step in config.keys():
        try:
            print_log(f"Executing {step}", function, where, "log")
            dfs_df, between_dates, cross_parameters = config[step]["function"](# function defined in config
                # pass dataframe or dataframes list that will be manage
                dfs_df,                                  
                # pass dictionary defined in config with all information for execute the function 
                config[step]["information_function"], 
                # pass additional parameter, in this case each defined function should have **kargs 
                between_dates = between_dates,
                cross_parameters = cross_parameters
            )
        except Exception as e:
            message = f"An error occurring while execute the step {step}. {e} "
            print_log(message, function,  where, "error")
            raise Exception(message)
            
    return dfs_df

In [7]:
# functions scripts

import requests
import json
import pandas as pd
import os, time
import utils.dfutils as df_tools
from datetime import date, timedelta, datetime

os.environ['TZ'] = 'GTM'
time.tzset()

# defining a place for log message
where = "observe_ia_functions.py"

def get_date_epoch_start():
  reference = date(1970, 1, 1)#.date() 
#   dt = date.today() - timedelta(days = 1) # date.today() it's use because works for get the lastday

# Revisar el dia 20
  dt = date(2022,9,8) #- timedelta(days = 1)
  return int((dt - reference).total_seconds() * 1000) #- 19*3600*1000 # This miliseconds ((19*3600 - 1)*1000) are subtracting for get a start time in day

def get_date_epoch_end():
  return get_date_epoch_start() + 24*3600*1000

def api_component(dfs_df, information_function, **kargs):
    # defining a function for log message
    function = "api_component"
    
    # defining **kargs required as expliticty variables
    between_dates = kargs["between_dates"]
    cross_parameters = kargs["cross_parameters"]
    
    dfs = []
    for account in information_function["accounts"]:
        try:
            print_log(f"Extracting data for account: {account}", function, where, "log")
            size = information_function["size"] 
            page_i = information_function["page"]
            headers = information_function["headers"]
            payload = information_function["payload"]

            url = information_function["url_template"].format(account = account, size = size, page = page_i)
            response = json.loads(requests.request("POST", url, headers = headers, data = payload).content)
            print_log(f"Extracting {response['totalPages']} pages for account: {account}", function, where, "log")
            df_accounts = []
            for page in range(1, response["totalPages"]+1):
                try:
                    url = information_function["url_template"].format(account = account, size = size, page = page)
                    response = json.loads(requests.request("POST", url, headers = headers, data = payload).content)
                    print_log(f"Extracting page: {response['page']}", function, where, "log")
                    
                    df_1 = pd.json_normalize(response, record_path = ["meetings"], meta = ["account", "startDate", "endDate"])
                    df_1 = df_1[["observeCallId", "agentEmail", "agentName", "agentActive", "callTime", "scorecard"]]
                    df_2 = pd.json_normalize(response, record_path = ["meetings", ["scorecard"]], meta = ["account", "startDate"])
                    df_2 = df_2[["account", "startDate", "present", "name"]]
                    df_2["comprobation_name"] = None
                    df_2["observeCallId"] = None
                    df_2["agentEmail"] = None
                    df_2["agentName"] = None
                    df_2["agentActive"] = None
                    df_2["callTime"] = None
                    
                    df2_index = 0
                    for df1_index in df_1.index:
                        index_comprobation = 0
                        for z in range(df2_index, df2_index + len(df_1.loc[df1_index,"scorecard"])):
                            df_2.loc[z, "comprobation_name"] = df_1.loc[df1_index,"scorecard"][index_comprobation]["name"]
                            df_2.loc[z, "observeCallId"] = df_1.loc[df1_index, "observeCallId"]
                            df_2.loc[z, "agentEmail"] = df_1.loc[df1_index, "agentEmail"]
                            df_2.loc[z, "agentName"] = df_1.loc[df1_index, "agentName"]
                            df_2.loc[z, "agentActive"] = df_1.loc[df1_index, "agentActive"]
                            df_2.loc[z, "callTime"] = df_1.loc[df1_index, "callTime"]
                            index_comprobation = index_comprobation +1
                            df2_index = df2_index + 1

                    if (df_2["name"] ==  df_2["comprobation_name"]).unique() == True:
                        df_2 = df_2[["account", "startDate", "observeCallId", "agentEmail", "agentName", "agentActive", "callTime", "present", "name"]]
                    else:
                        print_log(f"Error when trying extract page {page} for {account}. {e}", function, where, "error")
                        raise Exception(f"Error when trying extract page {page} for {account}. {e}")
                    
                    df_accounts.append(df_2)
                except Exception as e:
                    print_log(f"Error when trying extract page {page} for {account}. {e}", function, where, "error")
                
            df = pd.concat(df_accounts)
            n = df["observeCallId"].nunique() 
            df["startDate"] = df["startDate"].apply(lambda date: datetime.fromtimestamp(date/1000))
            df = df.groupby(by=["account", "agentName","name","startDate"]).sum(list(df['present'])).reset_index()
            df["present"] = ((df["present"]/n))
            dfs.append(df)

        except Exception as e:
            print_log(f"Error when trying extract data for {account}. {e}", function, where, "error")
        
    dfs_df = pd.concat(dfs, ignore_index = True)
    return dfs_df, between_dates, cross_parameters

def observeia_handling_function(df, information_function, **kargs):
    # defining a function for log message
    function = "observeia_handling_function"
    
    # defining **kargs required as expliticty variables
    between_dates = kargs["between_dates"]
    cross_parameters = kargs["cross_parameters"]

    df = df_tools.df_handling(df, information_function["df_handling"])
    df.columns

    return df, between_dates, cross_parameters
    

In [8]:
configs = {
    "extract_step": {
        "function": api_component,
        "information_function": {
            "accounts": [
                "itelbpo_car8",
                "itelbpo-breville-mono-prod", 
                "itelbpo-healthandwellness", 
                "itelbpo-speedyloan", 
                "itelbpo-thebradfordexchange", 
                "itelbpo_jps", 
                "itelbpo_jps_mono", 
                "itelbpo_ontellus"
            ],             
            "size": 1000000,
            "page": 1,
            "url_template": "https://api.observe.ai/v1/reports/accounts/{account}/data?size={size}&page={page}",
            "payload": json.dumps({
              "email": "reporting@itelinternational.com",
              "start_date": get_date_epoch_start(),
              "end_date": get_date_epoch_end(),
            }),
            "headers": {
              "Content-Type": "application/json",
              "Authorization": "Basic aXRlbGJwb19qcHM6UzphRj9aO1JVelh5SFs4cDtQQk8sVmFOTm8="
            }
        }
    },
    "transform_step": {
        "function": observeia_handling_function,
        "information_function": {
          "df_handling": {
            "rename_columns": { 
              "account": "account", 
              "startDate": "date", 
              "agentName": "agent_name", 
              "present": "present_rate", 
              "name": "scorecard"
            }
          }
        }
    },
    "load_step": {
      "function": load_save_insert_with_keys,
      "information_function": {
          "delete_keys": ["date", "account", "scorecard"], # required for load_database_components.load_save_insert_with_keys
          "schema": "learning_development",          # required for load_database_components.load_save_insert_with_keys
          "table": "observe_ia_scorecards"             # required for load_database_components.load_save_insert_with_keys
      }
    }
}

In [9]:
# entry point 
dfs_df = general_pipeline(configs, dict())
# dfs_df.to_csv("observe_ia_all_accounts_05-25-2022.csv", index = False)

-- LOG -- Executing extract_step -- general_pipeline -- observe_ia_functions.py
-- LOG -- Extracting data for account: itelbpo_car8 -- api_component -- observe_ia_functions.py
-- LOG -- Extracting 0 pages for account: itelbpo_car8 -- api_component -- observe_ia_functions.py
-- ERROR -- Error when trying extract data for itelbpo_car8. No objects to concatenate -- api_component -- observe_ia_functions.py
-- LOG -- Extracting data for account: itelbpo-breville-mono-prod -- api_component -- observe_ia_functions.py
-- LOG -- Extracting 1 pages for account: itelbpo-breville-mono-prod -- api_component -- observe_ia_functions.py
-- LOG -- Extracting page: 1 -- api_component -- observe_ia_functions.py
-- LOG -- Extracting data for account: itelbpo-healthandwellness -- api_component -- observe_ia_functions.py
-- LOG -- Extracting 0 pages for account: itelbpo-healthandwellness -- api_component -- observe_ia_functions.py
-- ERROR -- Error when trying extract data for itelbpo-healthandwellness. No 