In [2]:
# Load kedro environment (not needed in .py)
from pathlib import Path
from kedro.framework.context import load_context

# Load a context to be able to work in the notebook
#current_dir = Path.cwd()
current_dir = Path("/u01/share/cesar/aa_engine_uy/notebooks/")
proj_path = current_dir.parent
context = load_context(proj_path)
catalog = context.catalog
credentials = context.config_loader.get("credentials*","credentials*/**")
parameters = context.config_loader.get("parameters*","parameters*/**")

from aa_engine_pkg.assets.utils import *
from aa_engine_pkg.assets.core.data.kedro.catalog_expansion.partitioned_sql import SQLPartitionedDataSet

In [4]:
def create_cliente_activo(cliente_activo: SQLPartitionedDataSet,
                          date: str) -> pd.DataFrame:
    """Creates master table with features related to EoP state of customers for one period of data

    Parameters
    ----------
    cliente_activo:
        dataset defined in ´catalog.yml´ - list of active customers at EoP for the given period
    date:
        period to process
    
    Returns
    -------
        Mastertable with information of clientes at EoP
    """

    # Initialize logger
    log = initialize_logger()

    # Load active clientes for period
    log.info(f"Creating cliente_activo...")
    period_to_load = get_previous_month(date)
    df_clientes_activos = cliente_activo.filter_by(date=period_to_load)

    # Return
    return df_clientes_activos

In [5]:
cliente_activo=catalog.load("cliente_activo")

2021-01-04 17:41:08,506 - kedro.io.data_catalog - INFO - Loading data from `cliente_activo` (SQLPartitionedDataSet)...


In [6]:
id_cols = ["CUSTOMER_ID", "DATE_EXP"]

In [7]:
def create_master_total(cliente_activo: pd.DataFrame,
                        master_arpu_quality: pd.DataFrame,
                        master_campanas: pd.DataFrame,
                        master_clientes: pd.DataFrame,
                        master_eop: pd.DataFrame,
                        master_plan_evento: pd.DataFrame,
                        master_servicioalcliente: pd.DataFrame,
                        master_mudanza: pd.DataFrame,
                        master_echi: pd.DataFrame,
                        master_mantenimiento: pd.DataFrame,
                        master_mora: pd.DataFrame,
                        master_eventos_fact: pd.DataFrame,
                        parameters: Dict,
                        date: str) -> Union[pd.DataFrame, None]:
    """Function that takes care of master table unification (i.e., merging the different master tables of the data
    sources into one table) for one period defined in the ``date`` parameter.
    The function receives pandas dataframes created in previous nodes, performs a merge and saves
    the master as parquet file.
    # TODO: incluir parámetros
    Parameters
    ----------
    master_arpu_quality:
        pandas dataframe with information regarding ARPU quality for active customers in period
    master_campanas:
        pandas dataframe with information regarding campañas for active customers in period
    master_clientes:
        pandas dataframe with active customers in period
    master_eop:
        pandas dataframe with information related to EoP state of active customers in period
    master_plan_evento:
        pandas dataframe with customer's events features for active customers in period
    master_servicioalcliente:
        pandas dataframe with call center calls features for active customers in period
    date:
        period to be generated, must be a string in "yyyymmdd" format
    parameters:
        set of project parameters defined in ``parameters.yml``
    Returns
    -------
    pd.DataFrame
        Dataframe with unified master table information
    Args:
        master_mudanza:
        master_echi:
        master_mantenimiento:
        master_mora:
        master_eventos_fact:
        cliente_activo:
    """

    # Initialize logger
    log = initialize_logger()

    table_name = "total"
    write_to_parquet = parameters["write_to_parquet"]
    write_subsample = parameters["masters"][table_name]["create_subsample"]

    path = f"{parameters['paths']['master_path']}master_{table_name}/"
    os.makedirs(path, exist_ok=True)
    processed_dates = os.listdir(path)
    match = [file for file in processed_dates if str(date) in file]

    if len(match) > 0:
        # If table is found, read parquet:
        log.info(f"Reading {match[0]} table")
        df_final = pd.read_parquet(path + match[0], engine="pyarrow")

    else:
        log.info(f"Creating master table total for period {date}")

        # Reformat tables
        # TODO
        """
        masters_to_exclude = ["total", "global"]
        
        for master in set(parameters["masters"].keys()).difference(masters_to_exclude):
            if parameters["masters"][master]["include_in_total"]:
                tables_to_include.append(f"master_{master}")
        """
        tables_to_include = [master_arpu_quality,
                             master_campanas,
                             master_clientes,
                             master_eop,
                             master_plan_evento,
                             master_servicioalcliente,
                             master_mudanza,
                             master_echi,
                             master_mantenimiento,
                             master_mora,
                             master_eventos_fact]
        df_list = []
        for df in tables_to_include:
            to_drop = [c for c in df.columns if "DATE" in c]
            df.drop(to_drop, axis=1, inplace=True)
            df_list.append(df)

        df_list = [df.set_index("CUSTOMER_ID") for df in df_list]
        log.info(f"Merging all {len(df_list)} tables")

        # Merge into one df
        df_final = pd.concat(df_list, axis=1)
        df_final = df_final.reset_index()
        del df_list;
        gc.collect()

        # Merge all with active customers
        log.info(f"Merging all tables with EOP")
        df_final = pd.merge(cliente_activo, df_final, on="CUSTOMER_ID", how="left", validate="1:1")
        df_final["DATE_CALC"] = date

        # Optimize memory usage of table before saving
        log.info("Optimizing memory...")
        start_mem = df_final.memory_usage().sum() / 1024 ** 2
        log.info('Memory usage of dataframe is {:.2f} MB'.format(start_mem))

        df_final = Parallel(n_jobs=12, verbose=5, prefer="processes")(
            delayed(reduce_mem_usage)(df) for df in np.array_split(df_final, 1000)
        )

        df_final = pd.concat(df_final)
        end_mem = df_final.memory_usage().sum() / 1024 ** 2
        log.info('Memory usage after optimization is: {:.2f} MB'.format(end_mem))
        log.info('Decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem))
        log.info(f"Exporting {df_final.shape[0]} rows and {df_final.shape[1]} columns")

        # Save sample and total mastertable in parquet files
        if write_to_parquet:
            if write_subsample:
                df_subsample = create_subsample(df=df_final,
                                                subsample_col="CUSTOMER_ID",
                                                pct=0.2)
                path_subsample = f"{parameters['paths']['master_path']}master_subsample/"
                os.makedirs(path_subsample, exist_ok=True)
                df_subsample.to_parquet(f"{path_subsample}master_subsample_{date}.parquet", engine="pyarrow")

            file = f"{path}master_{table_name}_{date}.parquet"
            df_final.to_parquet(file, engine="pyarrow")

    return df_final

In [8]:
dates = calculate_dates_to_process_for_master(parameters, table_name="total")
print(dates)

['20181203', '20181231', '20190128', '20190225', '20190325', '20190422']


In [14]:
path=parameters['paths']['master_path']+f'master_'
path

'/data/uy_po/master/master_'

In [21]:
for date in dates:
    print(f"Processing date {date}")
    cliente_activo_df= create_cliente_activo(cliente_activo,date)

    table='arpu_quality'
    master_arpu_quality=pd.read_parquet(path+f'{table}/master_{table}_{date}.parquet')
    table='campanas'
    master_campanas=pd.read_parquet(path+f'{table}/master_{table}_{date}.parquet')
    table='clientes'
    master_clientes=pd.read_parquet(path+f'{table}/master_{table}_{date}.parquet')
    table='eop'
    master_eop=pd.read_parquet(path+f'{table}/master_{table}_{date}.parquet')
    table='plan_evento'
    master_plan_evento=pd.read_parquet(path+f'{table}/master_{table}_{date}.parquet')
    table='servicioalcliente'
    master_servicioalcliente=pd.read_parquet(path+f'{table}/master_{table}_{date}.parquet')
    table='mudanza'
    master_mudanza=pd.read_parquet(path+f'{table}/master_{table}_{date}.parquet')
    table='echi'
    master_echi=pd.read_parquet(path+f'{table}/master_{table}_{date}.parquet')
    table='mantenimiento'
    master_mantenimiento=pd.read_parquet(path+f'{table}/master_{table}_{date}.parquet')
    table='mora'
    master_mora=pd.read_parquet(path+f'{table}/master_{table}_{date}.parquet')
    table='eventos_facturados'
    master_eventos_fact=pd.read_parquet(path+f'{table}/master_{table}_{date}.parquet')
    
    create_master_total(cliente_activo_df,
                        master_arpu_quality,
                        master_campanas,
                        master_clientes,
                        master_eop,
                        master_plan_evento,
                        master_servicioalcliente,
                        master_mudanza,
                        master_echi,
                        master_mantenimiento,
                        master_mora,
                        master_eventos_fact,
                        parameters,
                        date)
  

Processing date 20181203
2021-01-04 17:59:39,313 - aa_engine_pkg.assets.utils.utilities - INFO - Creating cliente_activo...
select distinct CUSTOMER_ID from stg_uy_customer_status where UPPER(STATUS) LIKE '%ACTIVO%' and DATE_EXP = 201811


  % ((self.server_version_info,))


2021-01-04 17:59:44,389 - aa_engine_pkg.assets.utils.utilities - INFO - Creating master table total for period 20181203
2021-01-04 17:59:45,011 - aa_engine_pkg.assets.utils.utilities - INFO - Merging all 11 tables
2021-01-04 17:59:47,070 - aa_engine_pkg.assets.utils.utilities - INFO - Merging all tables with EOP
2021-01-04 17:59:48,455 - aa_engine_pkg.assets.utils.utilities - INFO - Optimizing memory...
2021-01-04 17:59:48,513 - aa_engine_pkg.assets.utils.utilities - INFO - Memory usage of dataframe is 823.11 MB


[Parallel(n_jobs=12)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=12)]: Done  48 tasks      | elapsed:    7.1s
[Parallel(n_jobs=12)]: Done 138 tasks      | elapsed:   15.3s
[Parallel(n_jobs=12)]: Done 264 tasks      | elapsed:   26.5s
[Parallel(n_jobs=12)]: Done 426 tasks      | elapsed:   41.0s
[Parallel(n_jobs=12)]: Done 624 tasks      | elapsed:   58.9s
[Parallel(n_jobs=12)]: Done 858 tasks      | elapsed:  1.3min
[Parallel(n_jobs=12)]: Done 1000 out of 1000 | elapsed:  1.5min finished


2021-01-04 18:01:55,183 - aa_engine_pkg.assets.utils.utilities - INFO - Memory usage after optimization is: 413.78 MB
2021-01-04 18:01:55,184 - aa_engine_pkg.assets.utils.utilities - INFO - Decreased by 49.7%
2021-01-04 18:01:55,185 - aa_engine_pkg.assets.utils.utilities - INFO - Exporting 116634 rows and 924 columns
Processing date 20181231
2021-01-04 18:02:00,673 - aa_engine_pkg.assets.utils.utilities - INFO - Creating cliente_activo...
select distinct CUSTOMER_ID from stg_uy_customer_status where UPPER(STATUS) LIKE '%ACTIVO%' and DATE_EXP = 201811
2021-01-04 18:02:05,787 - aa_engine_pkg.assets.utils.utilities - INFO - Creating master table total for period 20181231
2021-01-04 18:02:06,328 - aa_engine_pkg.assets.utils.utilities - INFO - Merging all 11 tables
2021-01-04 18:02:08,315 - aa_engine_pkg.assets.utils.utilities - INFO - Merging all tables with EOP
2021-01-04 18:02:09,685 - aa_engine_pkg.assets.utils.utilities - INFO - Optimizing memory...
2021-01-04 18:02:09,743 - aa_engine_

[Parallel(n_jobs=12)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=12)]: Done  48 tasks      | elapsed:    5.2s
[Parallel(n_jobs=12)]: Done 138 tasks      | elapsed:   13.4s
[Parallel(n_jobs=12)]: Done 264 tasks      | elapsed:   24.6s
[Parallel(n_jobs=12)]: Done 426 tasks      | elapsed:   39.1s
[Parallel(n_jobs=12)]: Done 624 tasks      | elapsed:   56.5s
[Parallel(n_jobs=12)]: Done 858 tasks      | elapsed:  1.3min
[Parallel(n_jobs=12)]: Done 1000 out of 1000 | elapsed:  1.5min finished


2021-01-04 18:04:14,898 - aa_engine_pkg.assets.utils.utilities - INFO - Memory usage after optimization is: 417.78 MB
2021-01-04 18:04:14,899 - aa_engine_pkg.assets.utils.utilities - INFO - Decreased by 49.6%
2021-01-04 18:04:14,900 - aa_engine_pkg.assets.utils.utilities - INFO - Exporting 116634 rows and 931 columns
Processing date 20190128
2021-01-04 18:04:20,428 - aa_engine_pkg.assets.utils.utilities - INFO - Creating cliente_activo...
select distinct CUSTOMER_ID from stg_uy_customer_status where UPPER(STATUS) LIKE '%ACTIVO%' and DATE_EXP = 201812
2021-01-04 18:04:25,260 - aa_engine_pkg.assets.utils.utilities - INFO - Creating master table total for period 20190128
2021-01-04 18:04:25,769 - aa_engine_pkg.assets.utils.utilities - INFO - Merging all 11 tables
2021-01-04 18:04:27,782 - aa_engine_pkg.assets.utils.utilities - INFO - Merging all tables with EOP
2021-01-04 18:04:29,167 - aa_engine_pkg.assets.utils.utilities - INFO - Optimizing memory...
2021-01-04 18:04:29,227 - aa_engine_

[Parallel(n_jobs=12)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=12)]: Done  48 tasks      | elapsed:    5.2s
[Parallel(n_jobs=12)]: Done 138 tasks      | elapsed:   13.5s
[Parallel(n_jobs=12)]: Done 264 tasks      | elapsed:   24.9s
[Parallel(n_jobs=12)]: Done 426 tasks      | elapsed:   39.7s
[Parallel(n_jobs=12)]: Done 624 tasks      | elapsed:   57.5s
[Parallel(n_jobs=12)]: Done 858 tasks      | elapsed:  1.3min
[Parallel(n_jobs=12)]: Done 1000 out of 1000 | elapsed:  1.5min finished


2021-01-04 18:06:35,561 - aa_engine_pkg.assets.utils.utilities - INFO - Memory usage after optimization is: 421.71 MB
2021-01-04 18:06:35,562 - aa_engine_pkg.assets.utils.utilities - INFO - Decreased by 49.7%
2021-01-04 18:06:35,563 - aa_engine_pkg.assets.utils.utilities - INFO - Exporting 116984 rows and 938 columns
Processing date 20190225
2021-01-04 18:06:41,180 - aa_engine_pkg.assets.utils.utilities - INFO - Creating cliente_activo...
select distinct CUSTOMER_ID from stg_uy_customer_status where UPPER(STATUS) LIKE '%ACTIVO%' and DATE_EXP = 201901
2021-01-04 18:06:44,271 - aa_engine_pkg.assets.utils.utilities - INFO - Creating master table total for period 20190225
2021-01-04 18:06:44,795 - aa_engine_pkg.assets.utils.utilities - INFO - Merging all 11 tables
2021-01-04 18:06:46,818 - aa_engine_pkg.assets.utils.utilities - INFO - Merging all tables with EOP
2021-01-04 18:06:48,237 - aa_engine_pkg.assets.utils.utilities - INFO - Optimizing memory...
2021-01-04 18:06:48,296 - aa_engine_

[Parallel(n_jobs=12)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=12)]: Done  48 tasks      | elapsed:    5.2s
[Parallel(n_jobs=12)]: Done 138 tasks      | elapsed:   13.6s
[Parallel(n_jobs=12)]: Done 264 tasks      | elapsed:   24.8s
[Parallel(n_jobs=12)]: Done 426 tasks      | elapsed:   39.5s
[Parallel(n_jobs=12)]: Done 624 tasks      | elapsed:   57.3s
[Parallel(n_jobs=12)]: Done 858 tasks      | elapsed:  1.3min
[Parallel(n_jobs=12)]: Done 1000 out of 1000 | elapsed:  1.5min finished


2021-01-04 18:08:53,459 - aa_engine_pkg.assets.utils.utilities - INFO - Memory usage after optimization is: 419.84 MB
2021-01-04 18:08:53,460 - aa_engine_pkg.assets.utils.utilities - INFO - Decreased by 49.8%
2021-01-04 18:08:53,461 - aa_engine_pkg.assets.utils.utilities - INFO - Exporting 116342 rows and 942 columns
Processing date 20190325
2021-01-04 18:08:59,135 - aa_engine_pkg.assets.utils.utilities - INFO - Creating cliente_activo...
select distinct CUSTOMER_ID from stg_uy_customer_status where UPPER(STATUS) LIKE '%ACTIVO%' and DATE_EXP = 201902
2021-01-04 18:09:02,172 - aa_engine_pkg.assets.utils.utilities - INFO - Creating master table total for period 20190325
2021-01-04 18:09:02,681 - aa_engine_pkg.assets.utils.utilities - INFO - Merging all 11 tables
2021-01-04 18:09:04,697 - aa_engine_pkg.assets.utils.utilities - INFO - Merging all tables with EOP
2021-01-04 18:09:06,128 - aa_engine_pkg.assets.utils.utilities - INFO - Optimizing memory...
2021-01-04 18:09:06,187 - aa_engine_

[Parallel(n_jobs=12)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=12)]: Done  48 tasks      | elapsed:    5.2s
[Parallel(n_jobs=12)]: Done 138 tasks      | elapsed:   13.2s
[Parallel(n_jobs=12)]: Done 264 tasks      | elapsed:   24.8s
[Parallel(n_jobs=12)]: Done 426 tasks      | elapsed:   39.4s
[Parallel(n_jobs=12)]: Done 624 tasks      | elapsed:   57.0s
[Parallel(n_jobs=12)]: Done 858 tasks      | elapsed:  1.3min
[Parallel(n_jobs=12)]: Done 1000 out of 1000 | elapsed:  1.5min finished


2021-01-04 18:11:11,406 - aa_engine_pkg.assets.utils.utilities - INFO - Memory usage after optimization is: 417.91 MB
2021-01-04 18:11:11,407 - aa_engine_pkg.assets.utils.utilities - INFO - Decreased by 49.8%
2021-01-04 18:11:11,408 - aa_engine_pkg.assets.utils.utilities - INFO - Exporting 115807 rows and 942 columns
Processing date 20190422
2021-01-04 18:11:17,092 - aa_engine_pkg.assets.utils.utilities - INFO - Creating cliente_activo...
select distinct CUSTOMER_ID from stg_uy_customer_status where UPPER(STATUS) LIKE '%ACTIVO%' and DATE_EXP = 201903
2021-01-04 18:11:19,535 - aa_engine_pkg.assets.utils.utilities - INFO - Creating master table total for period 20190422
2021-01-04 18:11:20,078 - aa_engine_pkg.assets.utils.utilities - INFO - Merging all 11 tables
2021-01-04 18:11:22,088 - aa_engine_pkg.assets.utils.utilities - INFO - Merging all tables with EOP
2021-01-04 18:11:23,513 - aa_engine_pkg.assets.utils.utilities - INFO - Optimizing memory...
2021-01-04 18:11:23,572 - aa_engine_

[Parallel(n_jobs=12)]: Using backend LokyBackend with 12 concurrent workers.
[Parallel(n_jobs=12)]: Done  48 tasks      | elapsed:    5.2s
[Parallel(n_jobs=12)]: Done 138 tasks      | elapsed:   13.5s
[Parallel(n_jobs=12)]: Done 264 tasks      | elapsed:   25.1s
[Parallel(n_jobs=12)]: Done 426 tasks      | elapsed:   40.0s
[Parallel(n_jobs=12)]: Done 624 tasks      | elapsed:   58.2s
[Parallel(n_jobs=12)]: Done 858 tasks      | elapsed:  1.3min
[Parallel(n_jobs=12)]: Done 1000 out of 1000 | elapsed:  1.5min finished


2021-01-04 18:13:29,621 - aa_engine_pkg.assets.utils.utilities - INFO - Memory usage after optimization is: 418.65 MB
2021-01-04 18:13:29,622 - aa_engine_pkg.assets.utils.utilities - INFO - Decreased by 49.8%
2021-01-04 18:13:29,623 - aa_engine_pkg.assets.utils.utilities - INFO - Exporting 115524 rows and 946 columns
