# Librerias

In [1]:
# Librerias
import pandas as pd
import warnings
import yaml
import os
import datetime

# Modulos
from modules.oracle_connection import set_engine
from connections import source, target
from queries.queries import Q_UBICACIONES, Q_UBICACIONES_PALLETS, Q_ANDENES, Q_ANDENES_PALLETS_ENTRADA, Q_ANDENES_PALLETS_SALIDA, Q_PRODUCTOS
from validators.validators import validators

# warnings
warnings.filterwarnings("ignore")

# Funciones

## Extractor de datos

In [2]:
from sqlalchemy import text
def extract_data(conexion_str, query):
    
    engine = set_engine(conexion_str)
    query = text(query)
    
    with engine.begin() as conn:
        df = pd.read_sql_query(query, conn)
    
    return df

## Loader

In [3]:
def load_data(df, table_name, schemaDB, conexion_str = None):
    
    if conexion_str==None:
        output = './data'
        # name = [name for name, obj in globals().items() if id(obj) == id(df)][0]
        filename = os.path.join(output, table_name + '.csv')
        df.to_csv(filename, index=False, sep=';', decimal=',')
    
    else:
        engine = set_engine(conexion_str)
    
        with engine.begin() as conn:
            df.to_sql(name= table_name, con=conn, 
                      if_exists = 'append', index=False, 
                      chunksize=25000, schema= schemaDB)


## Utils

### Funciones de validacion

In [4]:
def date_validator(input, dateformat='%Y-%m-%d', default = '1900-01-01'):
    
    """
    Transforma un valor tipo str a fecha con un formato pre-establecido. Si no puede hacerlo,
    devuelve un valor de fecha por defecto.
    """
    
    default_date = datetime.datetime.strptime(default, dateformat)
    current_year = datetime.datetime.now().year
    max_year = current_year + 5

    try:
        date_year = datetime.datetime.strptime(input, dateformat).date().year
        if date_year>max_year:
            return default_date
        else:
            return datetime.datetime.strptime(input, dateformat).date()
    except:
        return default_date
        

def dataframe_date_validator(df, cols):
    
    """
    Transforma los campos cols de un dataframe a fecha en un formato preestablecido. Si no puede hacerlo,
    devuelve un valor de fecha por defecto.
    """
    
    for key, value in cols.items():
        
        df[key] = df[key].astype(str)
        df[key] = df[key].apply(lambda x: date_validator(x, dateformat=value['dateformat'], default = value['default']))

    return df   

# EXTRACT

###  Levanto tablas desde el origen

<div style="display: inline-block">

    
| Tabla   origen                | dataframe               |
|:------------------------------|:------------------------|
| Mastro de ubicaciones         | ubicaciones             |
| Pallets por ubicación         | ubicaciones_pallets     |
| Maestro de andenes            | andenes                 |
| Pallets de andenes de entrada | andenes_pallets_entrada |
| Pallets de andenes de salida  | andenes_pallets_salida  |
| Maestro de productos          | dim_productos           |

</div>



In [5]:
#%%time
# Levanto los dataframes a utilizar y guardo la data cruda

# UBICACIONES
# Maestro de ubicaciones
df_ubicaciones = extract_data(source, Q_UBICACIONES)
#table_name = 'Ubicaciones'
#schema = 'dssadmin'
#load_data(df_ubicaciones, table_name, schema, conexion_str = None)

# Pallets en ubicaciones
df_ubicaciones_pallets = extract_data(source, Q_UBICACIONES_PALLETS)
#table_name = 'Pallets_ubicaciones'
#schema = 'dssadmin'
#load_data(df_ubicaciones, table_name, schema, conexion_str = None)

# ANDENES
# Maestro de andenes
df_andenes = extract_data(source, Q_ANDENES)
#table_name = 'Andenes'
#schema = 'dssadmin'
#load_data(df_andenes, table_name, schema, conexion_str = None)

# Pallets en andenes entrada
df_andenes_pallets_entrada = extract_data(source, Q_ANDENES_PALLETS_ENTRADA)
#table_name = 'Pallets_andenes_entrada'
#schema = 'dssadmin'
#load_data(df_andenes_pallets_entrada, table_name, schema, conexion_str = None)

# Pallets en andenes salida
df_andenes_pallets_salida = extract_data(source, Q_ANDENES_PALLETS_SALIDA)
#table_name = 'Pallets_andenes_salida'
#schema = 'dssadmin'
#load_data(df_andenes_pallets_salida, table_name, schema, conexion_str = None)

# Maestro de productos
df_productos = extract_data(target, Q_PRODUCTOS)
#table_name = 'dim_productos'
#schema = 'dssadmin'
#load_data(df_productos, table_name, schema, conexion_str = None)

# TRANSFORM

### Generar tabla de ubicaciones

<div style="display: inline-block">
    
| # | Operación                                | Dataframe      |
|---|------------------------------------------|----------------|
| 1 | Join Maestro Ubicaciones con pallets     | bt_ubicaciones |
| 2 | Calculo campo pallets                    | bt_ubicaciones |
| 3 | Calculo campo cantidad_pallets_ubicacion | bt_ubicaciones |
| 4 | Correccion de estados                    | bt_ubicaciones |
    
</div>

In [6]:
primary_id = ['id_empresa','id_almacen','id_zona','id_pasillo','id_ubicacion']

# ADECUACION DE LAS UBICACIONES -------------------------------------------

# 1- Join de maestro de ubicaciones y pallets por ubicacion
df_ubicaciones_bt = pd.merge(df_ubicaciones, df_ubicaciones_pallets, how= 'left', on= primary_id)


# 2- Calculo de campo pallets
df_ubicaciones_bt.id_pallet.fillna(0, inplace=True)
df_ubicaciones_bt['pallets'] = df_ubicaciones_bt['id_pallet'].apply(lambda x: 1 if x>0 else 0)


# 3- Calculo la cantidad de pallets por ubicacion
df_ubicaciones_bt['pallets_ubicacion'] = df_ubicaciones_bt.groupby(primary_id)['pallets'].transform('sum')


# 4- Correccion de estados
def estados(id_estado_actual, capacidad_ubicacion, pallets_ubicacion):
    
    if id_estado_actual == 'I':
        return 'I'
    
    elif pallets_ubicacion == 0:
        return 'L'
    
    elif pallets_ubicacion==capacidad_ubicacion:
        return 'O'
    
    else:
        return 'P'

df_ubicaciones_bt['id_estado_ubicacion'] = df_ubicaciones_bt.apply(lambda x: estados(x.id_estado_ubicacion, x.capacidad_ubicacion, x.pallets_ubicacion), axis=1)
#df_ubicaciones_bt.head()


### Generar tabla de andenes

<div style="display: inline-block">
    
| # | Operación                                                         | Dataframe       |
|---|-------------------------------------------------------------------|-----------------|
| 1 | Union Pallets de entrada con Pallets de salida en Pallets Andenes | Pallets_Andenes |
| 2 | Join Maestro de Andenes con Pallets Andenes                       | bt_andenes      |
    
</div>

In [7]:
# ADECUACION DE LOS ANDENES -----------------------------------------------

# 1- Union Pallets de entrada con Pallets de salida en Pallets Andenes
df_andenes_pallets_entrada['tipo_anden'] = 'Entrada'
df_andenes_pallets_salida['tipo_anden'] = 'Salida'
df_andenes_pallets = pd.concat([df_andenes_pallets_entrada, df_andenes_pallets_salida], axis=0)

# merge de los pallets de andenes con la dimension de producto
# df_andenes_pallets = pd.merge(df_andenes_pallets, df_productos, how= 'left', on= ['id_producto'])

# 2- Join de maestro de andenes y pallets por anden
df_andenes_bt = pd.merge(df_andenes, df_andenes_pallets, how= 'left', on= primary_id)

# Agrego la metrica de pallets
df_andenes_bt.id_pallet.fillna(0, inplace=True)

# Calculo la variable pallets
df_andenes_bt['pallets'] = df_andenes_bt['id_pallet'].apply(lambda x: 1 if x>0 else 0)

### Generar base total BT

<div style="display: inline-block">
    
| # | Operación                       | Dataframe     |
|---|---------------------------------|---------------|
| 1 | Union de ubicaciones con andenes | bt_inventario |
| 2 | Join de inventario con dim producto  | bt_inventario |
| 3 | Adecuacion de tipos de datos    | bt_inventario |
| 4 | Llenado de valores nulos        | bt_inventario |
    
</div>

In [8]:
# 1- Unificacion de ubicaciones con andenes
df_BT_inventario = pd.concat([df_ubicaciones_bt, df_andenes_bt], axis=0)

##################################################################################################################

# 2- Join de inventario_bt con maestro de productos
df_BT_inventario['id_producto'] = df_BT_inventario['id_producto'].fillna(0).astype(int)
df_BT_inventario = pd.merge(df_BT_inventario, df_productos, how= 'left', on= ['id_producto'])

##################################################################################################################


# 3- Adecuacion de tipos de datos + 4- Llenado de valores nulos
# Correccion de fechas
date_cols = {
    'fecha_vencimiento':{'dateformat':'%Y-%m-%d %H:%M:%S', 'default': '1900-01-01 00:00:00'},
    'id_dia_ingreso_pallet':{'dateformat':'%Y-%m-%d %H:%M:%S', 'default': '1900-01-01 00:00:00'}
}  
                                                                                                      
df_BT_inventario = dataframe_date_validator(df_BT_inventario, date_cols)

# Correccion de valores enteros
cols_int = [
    'id_empresa', 'id_almacen', 'id_tipo_zona', 'id_division_asignada', 'id_consignatario', 'capacidad_ubicacion', 
    'id_producto', 'id_departamento', 'id_clase', 'id_gran_categoria', 'id_gerente_comercial', 'id_pallet', 'bultos'
]

df_BT_inventario[cols_int] = df_BT_inventario[cols_int].fillna(0).astype(int)

# Correccion de valores float
cols_float = ['kilos', 'unidades']
df_BT_inventario[cols_float] = df_BT_inventario[cols_float].astype(float).fillna(0.0)


# Correccion de valores str
cols_str = [
    'id_zona', 'id_pasillo','id_ubicacion', 'id_tipo_ubicacion', 'id_tipo_rack',
    'id_estado_ubicacion', 'variable_logistica'
]
df_BT_inventario[cols_str] = df_BT_inventario[cols_str].fillna('s/d').astype('string')


### Armado tabla AG_Inventario

<div style="display: inline-block">
    
| # | Operación                             | Dataframe     |
|---|---------------------------------------|---------------|
| 1 | Groupby - Eliminar pallet \| Producto | ag_inventario |
    
</div>

In [9]:
# 1 - Armado de la tabla Ag_inventario 
index_cols_ag = [
    'id_dia', 'id_empresa', 'id_almacen', 'id_zona', 'id_pasillo', 'id_ubicacion', 
    'id_tipo_ubicacion', 'id_tipo_zona', 'id_tipo_rack', 'id_estado_ubicacion', 'id_division_asignada',
    'id_departamento', 'id_clase', 'id_gran_categoria', 'id_gerente_comercial', 'id_consignatario'
]

agg_cols = {
    'capacidad_ubicacion': max,
    'pallets':sum,
    'bultos': sum,
    'unidades': sum,
    'kilos': sum
}

df_AG_inventario = df_BT_inventario.groupby(index_cols_ag).agg(agg_cols).reset_index()
#df_AG_inventario.head()

### Armado tabla AG_Inventario_Evolutivo

<div style="display: inline-block">
    
| # | Operación                             | Dataframe     |
|---|---------------------------------------|---------------|
| 1 | Calculo metricas de ubicaciones | ag_inventario |
| 2 | Groupby - Eliminar ubicación | zona | ag_inventario |
    
</div>

In [10]:
# 1- Calculo de las metricas de ubicaciones

def calcular_metricas_ubicaciones(df:pd.DataFrame)->pd.DataFrame:
    
    ubicaciones_totales = (df['id_tipo_zona'] == 1)* 1
    ubicaciones_inutilizadas = (df['id_estado_ubicacion'] == 'I')* ubicaciones_totales
    ubicaciones_disponibles = (1 - ubicaciones_inutilizadas)*ubicaciones_totales
    ubicaciones_libres = (df['id_estado_ubicacion'] == 'L')* ubicaciones_totales
    ubicaciones_ocupadas = (df['id_estado_ubicacion'] == 'O')* ubicaciones_totales
    ubicaciones_semi_ocupadas = (df['id_estado_ubicacion'] == 'P')* ubicaciones_totales
    capacidad_disponible = ubicaciones_disponibles* df['capacidad_ubicacion']
        
    return ubicaciones_totales, ubicaciones_inutilizadas, ubicaciones_disponibles, ubicaciones_libres, ubicaciones_ocupadas, ubicaciones_semi_ocupadas, capacidad_disponible

# Computo los tipos de ubicaciones en métricas
cols_ubicaciones = ['ubicaciones','ubicaciones_inutilizadas','ubicaciones_disponibles','ubicaciones_libres','ubicaciones_ocupadas','ubicaciones_semi_ocupadas', 'capacidad_disponible'                   ]

df_AG_inventario[cols_ubicaciones] = df_AG_inventario.apply(calcular_metricas_ubicaciones, 
                                                            axis=1, result_type="expand")

# 2- Groupby de la tabla Ag y generacoin de la Ag_Inventario_evolutivo
index_cols_ag_ev = [
    'id_dia', 'id_empresa', 'id_almacen', 'id_zona',
    'id_tipo_ubicacion', 'id_tipo_zona', 'id_tipo_rack', 'id_division_asignada',
    'id_departamento', 'id_clase', 'id_gran_categoria', 'id_gerente_comercial', 'id_consignatario'
]

agg_cols_ev = {
    'capacidad_ubicacion': sum,
    'pallets':sum,
    'bultos': sum,
    'unidades': sum,
    'kilos': sum,
    'ubicaciones':sum,
    'ubicaciones_inutilizadas':sum,
    'ubicaciones_disponibles':sum,
    'ubicaciones_libres':sum,
    'ubicaciones_ocupadas':sum,
    'ubicaciones_semi_ocupadas':sum,
    'capacidad_disponible': sum
}

df_AG_inventario_evolutivo = df_AG_inventario.groupby(index_cols_ag_ev).agg(agg_cols_ev).reset_index()
#df_AG_inventario_evolutivo.head()


### Renombrar y eliminar columnas

In [11]:
# se renombran las columnas para que coincidan con la estructura de la tabla en base de datos
df_BT_inventario.rename(columns={
                                'id_departamento': 'id_dpto_gdm',
                                'id_pallet': 'nro_pallet',
                                'kilos': 'peso',
                                'fecha_vencimiento': 'fecha_caducidad',
                                'id_dia_ingreso_pallet': 'fecha_ingreso_pallet'
                                }, inplace=True)

df_AG_inventario.rename(columns={
                                'id_departamento': 'id_dpto_gdm',
                                'id_consignatario': 'consignatario',
                                'kilos': 'peso_pallet'
                                }, inplace=True)

# se eliminan las columnas que no están en la tabla de base de datos
df_BT_inventario = df_BT_inventario.drop(columns=['tipo_anden',
                                                 'desc_tipo_zona',
                                                 'desc_estado_ubicacion',
                                                 'pallets',
                                                 'pallets_ubicacion'])

df_AG_inventario = df_AG_inventario.drop(columns=['capacidad_disponible'])

# LOAD

## Carga de tablas en Datawarehouse

<div style="display: inline-block">
    
| # | Operación                                                         | Destino       |
|---|-------------------------------------------------------------------|-----------------|
| 1 | Carga de inventario_bt a datawarehouse                            | BT_INVENTARIO_WF |
| 2 | Carga de inventario_ag a datawarehouse                            | AG_INVENTARIO_WF      |
| 3 | Carga de inventario_ag_evolutivo a datawarehouse                            | AG_INVENTARIO_EVOLUTIVO_WF      |
    
</div>

In [12]:
#%%time

# 1- Carga de inventario_bt a datawarehouse
table_name = 'BT_INVENTARIO_WF'        
schema = 'DSSADM'
load_data(df_BT_inventario, table_name, schema, conexion_str = target)

# 2- Carga de la agregada de inventario a datawarehouse
table_name = 'AG_INVENTARIO_WF'
schema = 'DSSADM'
load_data(df_AG_inventario, table_name, schema, conexion_str = target)

# 3- Carga de la inventario evolutiva a datawarehouse
table_name = 'AG_INVENTARIO_EVOLUTIVO_WF'
schema = 'DSSADM'
load_data(df_AG_inventario_evolutivo, table_name, schema, conexion_str = target)


# Control de Ejecucion

In [13]:
from queries.queries import Q_CONTROL
extract_data(target, Q_CONTROL)

Unnamed: 0,mail
0,0
