Generalize upload functions given relation diccionary

In [1]:
import os 
import sys
import pandas as pd
import numpy as np
from pathlib import Path
from datetime import date, timedelta
from sqlalchemy import text

In [2]:
# Importar modulos propios en notebook
project_root = Path(os.getcwd()).parent
sys.path.append(str(project_root))

In [3]:
import dependencies as dp
from db.engines import engine_silver, conn_silver, engine_gold, conn_gold
from processes.extract.functions import map_db_tables, db_tables_to_df
from processes.transform.functions import check_nulls, check_data_types
from processes.load.functions import get_new_data, upload_new_data, get_modified_data, update_modified_data


2024-12-23 13:30:10,096 - INFO - Successful connection to schema bronze
2024-12-23 13:30:10,126 - INFO - Successful connection to schema silver
2024-12-23 13:30:10,160 - INFO - Successful connection to schema gold


In [4]:
from model.gold_schema_mappings import (
    pkey_mapping,
    fkey_mapping,
    column_name_mapping,
    # fk_column_silver_mapping
)

In [5]:
from model.table_relations import related_silv_gold_v2

In [6]:
conn = conn_gold

In [7]:
# extract data from silver 
try:
    db_tables = map_db_tables(engine=engine_silver, schema='silver')
    silver_df = db_tables_to_df(engine=engine_silver, tables=db_tables)
    dp.logger.info(f"Extract Historic Data Process executed successfully in silver schema.")

except Exception as e:
    dp.logger.error(f"Extract Historic Data Proccess failed in bronze schema.: {e}")

2024-12-23 13:30:10,270 - INFO - Mapping database tables in schema 'silver' started.
2024-12-23 13:30:10,363 - INFO - Tables in schema 'silver' were successfully mapped.
2024-12-23 13:30:10,363 - INFO - Loading data from silver.Almacenes table...
2024-12-23 13:30:10,363 - DEBUG - Data from Almacenes table has been load in a DataFrame.
2024-12-23 13:30:10,363 - INFO - Loading data from silver.Articulos table...
2024-12-23 13:30:11,059 - DEBUG - Data from Articulos table has been load in a DataFrame.
2024-12-23 13:30:11,063 - INFO - Loading data from silver.BonosPresencia table...
2024-12-23 13:30:12,690 - DEBUG - Data from BonosPresencia table has been load in a DataFrame.
2024-12-23 13:30:12,690 - INFO - Loading data from silver.BonosTrabajadas table...
2024-12-23 13:30:16,160 - DEBUG - Data from BonosTrabajadas table has been load in a DataFrame.
2024-12-23 13:30:16,160 - INFO - Loading data from silver.Clientes table...
2024-12-23 13:30:16,503 - DEBUG - Data from Clientes table has b

In [8]:
ordered_tables = [
    # "TiposOrdenesReparacion",
    # "TiposVentasAlmacen",
    # "Articulos",
    # "TiposHoras",
    # "Clientes",
    # "Empresas",
    # "Vehiculos",
    # "Talleres",
    # "Operarios",
    # "Almacenes",
    # #"Stock",
    # "BonosPresencia",
    # "OrdenesReparacion",
    # "BonosTrabajadas",
    "Compras",
    # "Invertidas",
    # # "OrdenesVentaMostrador",
    # # "OrdenesVentaTaller"
]

In [9]:
from utils.functions import build_upload_query

In [10]:
from model.model_info import gold_properties
from model.gold_schema_mappings import fkey_mapping

As functions

In [11]:
from utils.functions import build_upload_query
from model.model_info import gold_properties
from model.gold_schema_mappings import fkey_mapping

def assing_foreig_keys (tbl_gold, df_silver, fk_relations, conn):
    '''
    This function assign Foreign Keys to the tables which has to be assinged. 

    input:
        - tbl_gold (str): Name of the table which Fkeys are assigned.
        - df_silver (DataFrame): df which Fkeys are goint to be assigned.
        - fk_relations (Dictionary): Dictionary with the Foreign Keys information. 

    output:
        - df_fkeys_assigned (DataFrame): The python DataFrame with all its Foreign Keys assigned.
    '''
    # define empty df to store df with fkeys assigned
    df_merge_fkeys = df_silver

    for fkey_name, fkey_details in fk_relations.items():
        dp.logger.info(f"Assigning column '{fkey_name}' to table '{tbl_gold}' stracted from Silver Schema.")

        # define empty df to store df with fkeys assigned
        # df_merge_fkeys = df_silver

        # Assign keys from the dictionary of each Fkey dictionary
        tbl = fkey_details['tbl']
        select_columns = fkey_details['select_columns']
        left_on = fkey_details['left_on']
        right_on = fkey_details['right_on']
        pk_to_fk = fkey_details['pk_to_fk']

        # Construct query for the foreign table
        query = build_upload_query(
            table_name=tbl,
            key_columns=select_columns,
            date_column=None
        )

        foreing_tbl = pd.read_sql(query, con=conn)

        # IMPORTANTE: Comprobar que los tipos de datos en left_on y right_on son idénticos este codigo se rompio por los rights_on cambiados a listas.

        ## APAÑO DE LO IMPORTANTE ARRIBA

        # Define suffixes when principal and foreign table are merge    
        merge_suffixes = ("_silver", "_foreign")

        ####### APAÑO #######
        if tbl_gold in {'BonosTrabajadas', 'Invertidas'} and fkey_name == 'FkOrdenReparacion':
            df_merge_fkeys[left_on] = df_merge_fkeys[left_on].astype('Int64').astype(str)
            df_merge_fkeys[left_on] = df_merge_fkeys[left_on].replace('<NA>', None)

        # Realizar el merge acumulativo
        df_merge_fkeys = pd.merge(
            df_merge_fkeys,
            foreing_tbl,
            left_on=left_on,
            right_on=right_on,
            how='left',
            suffixes=merge_suffixes
        )

        # Renaming columns acording to 'pk_to_fk'
        df_merge_fkeys = df_merge_fkeys.rename(columns=pk_to_fk)

        # Identify duplicate foreign columns after merge
        duplicated_columns = [col for col in df_merge_fkeys.columns if col.endswith("_foreign")]
        for col in duplicated_columns:
            dp.logger.info(f"Removing unnecessary foreign column '{col}' from the table '{tbl_gold}'.")
            df_merge_fkeys = df_merge_fkeys.drop(columns=[col])

        # Restore original silver column names (if applicable)
        renamed_columns = {col: col.replace("_silver", "") for col in df_merge_fkeys.columns if col.endswith("_silver")}
        
        if renamed_columns:
            dp.logger.info(f"Renaming columns to their original names: {renamed_columns}")
            df_merge_fkeys = df_merge_fkeys.rename(columns=renamed_columns)

    ##### APAÑO #### ¡Creo que ya no es necesario, pero verificar, porque esta corregido en el modelo! ¡CORREGIDO!
    # if tbl_gold == 'Vehiculos':
    #     df_merge_fkeys = df_merge_fkeys.rename(columns={'FechaUltimo':'FechaUltimaVisita'})    

    df_merge_fkeys = df_merge_fkeys[gold_properties[tbl_gold].keys()]

    dp.logger.debug(f"Foreign key assignment for table '{tbl_gold}' completed.")

    return df_merge_fkeys
    

In [None]:
df_silver_fkeys = {}

for tbl_gold in ordered_tables:
    dp.logger.info(f"Processing table: Silver '{tbl_gold}' -> Gold '{tbl_gold}'")

    #df_silver_fkeys = {}

    if tbl_gold in fkey_mapping.keys():
        dp.logger.debug(f'Table {tbl_gold} needs to assign foreign keys.')
        
        # Obtener relaciones de claves foráneas de la tabla
        fk_relations = fkey_mapping[tbl_gold]
        dp.logger.debug(f"FK Processing table: {tbl_gold}")

        df_silver_fkeys[tbl_gold] = assing_foreig_keys (
            tbl_gold,
            silver_df[tbl_gold],
            fkey_mapping[tbl_gold],
            conn
        )
    else:
        df_silver_fkeys[tbl_gold] = silver_df[tbl_gold]

    dp.logger.debug(f"Applying transforme functions to '{tbl_gold}'")

    df_silver_fkeys[tbl_gold], df_null_rows = check_nulls(df_silver_fkeys[tbl_gold], gold_properties[tbl_gold])

    df_silver_fkeys[tbl_gold], df_invalid = check_data_types(df_silver_fkeys[tbl_gold], gold_properties[tbl_gold], df_null_rows)

    table_relations = related_silv_gold_v2[tbl_gold]
    for relation in table_relations:
        tbl_silv = relation.get("tbl_gold") ## OJO relation.get("tbl_silv")
        df = df_silver_fkeys[tbl_gold] ### OJO  ## silver_df[tbl_gold] 
        table_name = tbl_gold
        key_columns = relation.get("key_columns")
        date_column = relation.get("date_column")


    df_new_data, df_existing_data = get_new_data (
        df=df, 
        table_name=table_name,
        key_columns=key_columns,
        date_column=date_column,
        engine=engine_gold)
    
    if not df_new_data.empty:
        dp.logger.info(f'There is new data to upload in table "{table_name}"')

        dp.logger.info(f"Assigning new IDs for table '{tbl_gold}'")
        pkey_column = pkey_mapping.get(tbl_gold)

        result = conn.execute(
            text(f'SELECT MAX("{pkey_column}") FROM "gold"."{tbl_gold}"')
        )

        max_existing_id = result.scalar() or 0

        df_new_data[pkey_column] = range( # silver_df
            max_existing_id + 1, max_existing_id + 1 + len(df_new_data)
        )

        try:
            upload_new_data(
                df_new_data=df_new_data,
                table_name=table_name, 
                date_column=date_column, 
                engine=engine_gold)
            
            dp.logger.info(f'New data has been uploaded succesfully into table "{tbl_gold}"')

        except Exception as e:
            dp.logger.error(f"An error has occurred trying to insert new data into table '{tbl_gold}': {e}")
            continue
            
    else:
        dp.logger.info(f'There is not update data to insert into table "{table_name}"')
    
    
    

In [None]:
df_silver_fkeys = {}

for tbl_gold in ordered_tables:
    dp.logger.info(f"Processing table: Silver '{tbl_gold}' -> Gold '{tbl_gold}'")

    #df_silver_fkeys = {}

    if tbl_gold in fkey_mapping.keys():
        dp.logger.debug(f'Table {tbl_gold} needs to assign foreign keys.')
        
        # Obtener relaciones de claves foráneas de la tabla
        fk_relations = fkey_mapping[tbl_gold]
        dp.logger.debug(f"FK Processing table: {tbl_gold}")

        df_silver_fkeys[tbl_gold] = assing_foreig_keys (
            tbl_gold,
            silver_df[tbl_gold],
            fkey_mapping[tbl_gold],
            conn
        )
    else:
        df_silver_fkeys[tbl_gold] = silver_df[tbl_gold]

In [None]:
fk_relations

In [None]:
assing_foreig_keys (
            tbl_gold,
            silver_df[tbl_gold],
            fkey_mapping[tbl_gold],
            conn
)

In [None]:
silver_df[tbl_gold]['FkAlmacen']

In [None]:
for tbl_gold in ordered_tables:

    if tbl_gold in fkey_mapping.keys():
        dp.logger.debug(f'Table {tbl_gold} needs to assign foreign keys.')
        
        # Obtener relaciones de claves foráneas de la tabla
        fk_relations = fkey_mapping[tbl_gold]
        dp.logger.debug(f"FK Processing table: {tbl_gold}")
                
        for fkey_name, fkey_details in fk_relations.items():
            dp.logger.info(f"Assigning column '{fkey_name}' from Silver for FK processing in table '{tbl_gold}'.")
            
            # Detalles de la relación
            tbl = fkey_details['tbl']
            select_columns = fkey_details['select_columns']
            left_on = fkey_details['left_on']
            right_on = fkey_details['right_on']
            pk_to_fk = fkey_details['pk_to_fk']
            
            # Construir consulta para la tabla foránea
            query = build_upload_query(
                table_name=tbl,
                key_columns=select_columns,
                date_column=None
            )
            foreing_tbl = pd.read_sql(query, con=engine_gold)

            merge_suffixes = ("_silver", "_foreign")


            ####### APAÑO #######
            if tbl_gold == 'BonosTrabajadas' and fkey_name == 'FkOrdenReparacion':
                silver_df[tbl_gold][left_on] = silver_df[tbl_gold][left_on].astype('Int64').astype(str)
                silver_df[tbl_gold][left_on] = silver_df[tbl_gold][left_on].replace('<NA>', None)

            # Realizar el merge acumulativo
            silver_df[tbl_gold] = pd.merge(
                silver_df[tbl_gold],
                foreing_tbl,
                left_on=left_on,
                right_on=right_on,
                how='left',
                suffixes=merge_suffixes
            )

In [None]:
fkey_name, fkey_details

In [None]:
silver_df[tbl_gold].columns

In [None]:
for tbl_gold in ordered_tables:

    if tbl_gold in fkey_mapping.keys():
        dp.logger.debug(f'Table {tbl_gold} needs to assign foreign keys.')
        
        # Obtener relaciones de claves foráneas de la tabla
        fk_relations = fkey_mapping[tbl_gold]
        dp.logger.debug(f"FK Processing table: {tbl_gold}")
                
        for fkey_name, fkey_details in fk_relations.items():
            dp.logger.info(f"Assigning column '{fkey_name}' from Silver for FK processing in table '{tbl_gold}'.")
            
            # Detalles de la relación
            tbl = fkey_details['tbl']
            select_columns = fkey_details['select_columns']
            left_on = fkey_details['left_on']
            right_on = fkey_details['right_on']
            pk_to_fk = fkey_details['pk_to_fk']
            
            # Construir consulta para la tabla foránea
            query = build_upload_query(
                table_name=tbl,
                key_columns=select_columns,
                date_column=None
            )
            foreing_tbl = pd.read_sql(query, con=engine_gold)
            
            # IMPORTANTE: Comprobar que los tipos de datos en left_on y right_on son idénticos este codigo se rompio por los rights_on cambiados a listas.

            ## APAÑO DE LO IMPORTANTE ARRIBA
            
            merge_suffixes = ("_silver", "_foreign")


            ####### APAÑO #######
            if tbl_gold == 'BonosTrabajadas' and fkey_name == 'FkOrdenReparacion':
                silver_df[tbl_gold][left_on] = silver_df[tbl_gold][left_on].astype('Int64').astype(str)
                silver_df[tbl_gold][left_on] = silver_df[tbl_gold][left_on].replace('<NA>', None)

            # Realizar el merge acumulativo
            silver_df[tbl_gold] = pd.merge(
                silver_df[tbl_gold],
                foreing_tbl,
                left_on=left_on,
                right_on=right_on,
                how='left',
                suffixes=merge_suffixes
            )
            
            # Renombrar columnas según `pk_to_fk`
            silver_df[tbl_gold] = silver_df[tbl_gold].rename(columns=pk_to_fk)

            # Manejar la eliminación de columnas duplicadas
            right_on_foreign = f"{right_on}_foreign"  # Nombre esperado después del merge
            left_on_silver = f"{left_on}_silver"      # Nombre esperado después del merge

            # Identificar columnas foráneas duplicadas después del merge
            duplicated_columns = [col for col in silver_df[tbl_gold].columns if col.endswith("_foreign")]
            for col in duplicated_columns:
                dp.logger.info(f"Removing unnecessary foreign column '{col}' from the table '{tbl_gold}'.")
                silver_df[tbl_gold] = silver_df[tbl_gold].drop(columns=[col])

            # Restaurar nombres originales de las columnas de silver (si aplica)
            renamed_columns = {col: col.replace("_silver", "") for col in silver_df[tbl_gold].columns if col.endswith("_silver")}
            if renamed_columns:
                dp.logger.info(f"Renaming columns to their original names: {renamed_columns}")
                silver_df[tbl_gold] = silver_df[tbl_gold].rename(columns=renamed_columns)

        ##### APAÑO ####
        if tbl_gold == 'Vehiculos':
            silver_df[tbl_gold] = silver_df[tbl_gold].rename(columns={'FechaUltimo':'FechaUltimaVisita'})    

        silver_df[tbl_gold] = silver_df[tbl_gold][gold_properties[tbl_gold].keys()]

        dp.logger.debug(f"Foreign key assignment for table '{tbl_gold}' completed.")

        dp.logger.debug(f"Applying transforme functions to '{tbl_gold}'")

        # Funciones transform IMPORTANTE: Aplicar a todas las tablas no solo a las de FKs

        silver_df[tbl_gold], df_null_rows = check_nulls(silver_df[tbl_gold], gold_properties[tbl_gold])

        silver_df[tbl_gold], df_invalid = check_data_types(silver_df[tbl_gold], gold_properties[tbl_gold], df_null_rows)

    
    table_relations = related_silv_gold_v2[tbl_gold]
    for relation in table_relations:
        tbl_silv = relation.get("tbl_gold") ## OJO relation.get("tbl_silv")
        df = silver_df[tbl_gold] ### OJO  ## silver_df[tbl_gold] 
        table_name = tbl_gold
        key_columns = relation.get("key_columns")
        date_column = relation.get("date_column")
        


    dp.logger.info(f"Processing table: Silver '{tbl_silv}' -> Gold '{tbl_gold}'")

    df_new_data, df_existing_data = get_new_data (
            df=df, 
            table_name=table_name,
            key_columns=key_columns,
            date_column=date_column,
            engine=engine_gold)
            
    if not df_new_data.empty:
        dp.logger.info(f'There is new data to upload in table "{table_name}"')

        dp.logger.info(f"Assigning new IDs for table '{tbl_gold}'")
        pkey_column = pkey_mapping.get(tbl_gold)

        result = conn.execute(
            text(f'SELECT MAX("{pkey_column}") FROM "gold"."{tbl_gold}"')
        )

        max_existing_id = result.scalar() or 0

        df_new_data[pkey_column] = range( # silver_df
            max_existing_id + 1, max_existing_id + 1 + len(df_new_data)
        )

        try:
            upload_new_data(
                df_new_data=df_new_data,
                table_name=table_name, 
                date_column=date_column, 
                engine=engine_gold)
            
            dp.logger.info(f'New data has been uploaded succesfully into table "{tbl_gold}"')

        except Exception as e:
            dp.logger.error(f"An error has occurred trying to insert new data into table '{tbl_gold}': {e}")
            continue
            
    else:
        dp.logger.info(f'There is not update data to insert into table "{table_name}"')
    

    #else:

