In [2]:
#import pandas as pd
import polars as pl
import numpy as np
from datetime import datetime
import json
import os
import math
import pandas as pd
from sklearn.preprocessing import PowerTransformer, StandardScaler
import time

import warnings

warnings.filterwarnings('ignore')

fase = '02_normaliza'

In [3]:
with open('gen_config.json', 'r') as file:
    gen_config =json.load(file)

In [4]:
folder = gen_config['folder']

#entradas
path_group = gen_config['path_group']
path_prod_stats = gen_config['path_prod_stats']

#salidas
path_norm = gen_config['path_norm']
path_transform_stats = gen_config['path_transform_stats']

#variables
var_escalado = gen_config['var_escalado']

print(f"{'COMIENZA':-^100}")
print(f"{fase:-^100}")

----------------------------------------------COMIENZA----------------------------------------------
--------------------------------------------02_normaliza--------------------------------------------


In [5]:
df = pl.read_parquet(f"{folder}/{path_group}")
prod_stats = pl.read_parquet(f'{folder}/{path_prod_stats}')

In [6]:
df_norm = df.join(prod_stats, on=['product_id', 'customer_id'], how='left', coalesce=True)
print(f"Columnas en DF:{df_norm.columns}")
print(f"df_norm shape: ({df_norm.shape[0]:>9_d}, {df_norm.shape[1]:_d})")

Columnas en DF:['product_id', 'customer_id', 'periodo', 'tn', 'primer_periodo', 'ultimo_periodo', 'values', 'total_tn', 'min_tn', 'average_tn', 'median_tn', 'std_dev_tn', 'iqr_tn', 'max_tn']
df_norm shape: (5_303_555, 14)


In [7]:
# juguete = df_norm.filter((pl.col('product_id') == 20011) & (pl.col('customer_id') == 10001))[['product_id','customer_id', 'periodo','tn']]
# juguete, lambda_, scale_, mean_, var_ = power_transform(juguete)
# juguete = power_inverse_transform(juguete, lambda_, mean_, var_)
# juguete

# ACA SUCEDE LA NORMALIZACION

In [8]:
df_norm = df_norm.with_columns([
    ((pl.col("periodo").cast(pl.Utf8) + "01").str.to_date("%Y%m%d")).alias("periodo_dt"),  
])

primer_periodo = df_norm['periodo_dt'].min()

df_norm = df_norm.with_columns(
    ((pl.col('periodo_dt').dt.year() - primer_periodo.year) * 12 +
    (pl.col('periodo_dt').dt.month() - primer_periodo.month)).alias('mes_indice')
)

df_pivot = df_norm.pivot(values='tn', index=['product_id', 'customer_id'], columns='mes_indice')
df_pivot = df_pivot.to_pandas()


In [9]:
def power_transform(row_df):
    pt = PowerTransformer()
    transformed_data = pt.fit_transform(row_df[2:].to_numpy().reshape(-1, 1)).flatten()
    #group_df = group_df.with_columns(pl.Series('tn_trans', transformed_data))

    stdscaler = pt.__getattribute__('_scaler')
    
    lambda_ = pt.lambdas_[0]
    scale_ = stdscaler.scale_[0]
    mean_ = stdscaler.mean_[0]
    var_ = stdscaler.var_[0]

    return transformed_data, lambda_, scale_, mean_, var_

In [10]:
def power_inverse_transform(x_trans, lambda_, mean_, var_):
    try:
        x = x_trans.to_numpy().reshape(-1, 1)
    except:
        x = np.array(x_trans)

    x = x * var_ ** 0.5 + mean_

    x_inv = np.zeros_like(x)
    pos = x >= 0

    # when x >= 0
    if abs(lambda_) < np.spacing(1.0):
        x_inv[pos] = np.exp(x[pos]) - 1
    else:  # lambda_ != 0
        x_inv[pos] = np.power(x[pos] * lambda_ + 1, 1 / lambda_) - 1

    # when x < 0
    if abs(lambda_ - 2) > np.spacing(1.0):
        x_inv[~pos] = 1 - np.power(-(2 - lambda_) * x[~pos] + 1, 1 / (2 - lambda_))
    else:  # lambda_ == 2
        x_inv[~pos] = 1 - np.exp(-x[~pos])

    x_orig = x_inv.flatten()

    return x_orig

In [11]:
transform_stats = pd.DataFrame(columns=['product_id', 'customer_id', 'pwr_lambda', 'pwr_scale', 'pwr_mean', 'pwr_var'])
df_pwr_pt = pd.DataFrame(columns=['product_id', 'customer_id', 'mes_indice', 'tn', 'tn_trans'])
df_pwr = pd.DataFrame(columns=['product_id', 'customer_id', 'mes_indice', 'tn', 'tn_trans'])
meses = pd.Series(df_pivot.columns[2:])

start_time = time.time()

for i, row in df_pivot.iterrows():
    if i % (df_pivot.shape[0]//100) == 0:
        #print(f"Transformando serie {i} de {df_pivot.shape[0]}, {round(i/df_pivot.shape[0]*100, 2)}%")
        df_pwr = pd.concat([df_pwr, df_pwr_pt], ignore_index=True, axis=0)  
        df_pwr_pt = pd.DataFrame(columns=['product_id', 'customer_id', 'mes_indice', 'tn', 'tn_trans'])
        print(f"Transformando serie {i} de {df_pivot.shape[0]}, Acumulado: {df_pwr.shape}, {round(i/df_pivot.shape[0]*100, 2)}% || {round(time.time() - start_time, 2)} segundos")
        start_time = time.time()
    
    x_trans, lambda_, scale_, mean_, var_ = power_transform(row)
    df_row = pd.DataFrame({'product_id' : row.iloc[0],
                            'customer_id' : row.iloc[1],
                            'mes_indice' : meses.values,
                            'tn' : row.iloc[2:],
                            'tn_trans' : x_trans})
    df_pwr_pt = pd.concat([df_pwr_pt, df_row], ignore_index=True, axis=0)    
    
    transform_stats_row = pd.DataFrame({'product_id': row.iloc[0],
                        'customer_id': row.iloc[1],
                        'pwr_lambda': lambda_,
                        'pwr_scale': scale_,
                        'pwr_mean': mean_,
                        'pwr_var': var_
                        }, index=[i])
    transform_stats = pd.concat([transform_stats, transform_stats_row], ignore_index=True, axis=0)

#resto
print(f"Transformando serie {i} de {df_pivot.shape[0]}, Acumulado: {df_pwr.shape}, {round(i/df_pivot.shape[0]*100, 2)}% || {round(time.time() - start_time, 2)} segundos")
df_pwr = pd.concat([df_pwr, df_pwr_pt], ignore_index=True, axis=0)  
df_pwr_pt = pd.DataFrame(columns=['product_id', 'customer_id', 'mes_indice', 'tn', 'tn_trans'])

Transformando serie 0 de 178684, Acumulado: (0, 5), 0.0% || 0.02 segundos
Transformando serie 1786 de 178684, Acumulado: (64296, 5), 1.0% || 4.09 segundos
Transformando serie 3572 de 178684, Acumulado: (128592, 5), 2.0% || 3.85 segundos
Transformando serie 5358 de 178684, Acumulado: (192888, 5), 3.0% || 3.7 segundos
Transformando serie 7144 de 178684, Acumulado: (257184, 5), 4.0% || 3.66 segundos
Transformando serie 8930 de 178684, Acumulado: (321480, 5), 5.0% || 3.66 segundos
Transformando serie 10716 de 178684, Acumulado: (385776, 5), 6.0% || 3.71 segundos
Transformando serie 12502 de 178684, Acumulado: (450072, 5), 7.0% || 3.82 segundos
Transformando serie 14288 de 178684, Acumulado: (514368, 5), 8.0% || 3.83 segundos
Transformando serie 16074 de 178684, Acumulado: (578664, 5), 9.0% || 3.67 segundos
Transformando serie 17860 de 178684, Acumulado: (642960, 5), 10.0% || 3.7 segundos
Transformando serie 19646 de 178684, Acumulado: (707256, 5), 10.99% || 3.8 segundos
Transformando serie

In [12]:
# para controlar
# a = df_pwr[(df_pwr['product_id'] == 20001) & (df_pwr['customer_id'] == 10001)]['tn_trans']
# b = power_inverse_transform(a,
#                         lambda_=transform_stats.iloc[0]['pwr_lambda'],
#                         mean_=transform_stats.iloc[0]['pwr_mean'],
#                         var_=transform_stats.iloc[0]['pwr_var'])
# c = df_pwr[(df_pwr['product_id'] == 20001) & (df_pwr['customer_id'] == 10001)]['tn']

# pd.DataFrame({'a': a, 'b': b, 'c': c})

In [13]:
df_pwr = pl.from_pandas(df_pwr)
transform_stats = pl.from_pandas(transform_stats)

In [14]:
df_pwr = df_pwr.with_columns([
    pl.col('product_id').cast(pl.Int64),
    pl.col('customer_id').cast(pl.Int64),
    pl.col('mes_indice').cast(pl.Int32),
])

transform_stats = transform_stats.with_columns([
    pl.col('product_id').cast(pl.Int64),
    pl.col('customer_id').cast(pl.Int64),
])

In [15]:
df_pwr.write_parquet(f'{folder}/pwr.parquet')
transform_stats.write_parquet(f'{folder}/{path_transform_stats}')

In [16]:
df_pwr = pl.read_parquet(f'{folder}/pwr.parquet')
transform_stats = pl.read_parquet(f'{folder}/{path_transform_stats}')

In [17]:
df_norm = df_norm.join(df_pwr[['product_id', 'customer_id', 'mes_indice', 'tn_trans']],
            on=['product_id', 'customer_id', 'mes_indice'], how='left', coalesce=True)

In [18]:
#df_temp = df_norm.join(transform_stats, on=['product_id', 'customer_id'], how='left', coalesce=True)

In [21]:
df_norm = df_norm.with_columns([
    ((pl.col('tn') - pl.col('average_tn')) / pl.col('std_dev_tn')).alias('tn_standard'),
])

df_norm = df_norm.with_columns([
    ((pl.col('tn') - pl.col('median_tn')) / pl.col('iqr_tn')).alias('tn_robust'),
])

#se  pueden pasar a cero xq corrije los productos constantes

df_norm = df_norm.with_columns([
    pl.col("tn_standard")
    .fill_nan(0)
    .replace([float('inf'), float('-inf')], 0),
    pl.col("tn_robust")
    .fill_nan(0)
    .replace([float('inf'), float('-inf')], 0),
    ]
)

In [22]:
df_norm = df_norm.sort(by=['product_id', 'customer_id', 'periodo'])

In [23]:
#asigno la que efectivamente se usa en el resto del proceso
df_norm = df_norm.with_columns(pl.col(var_escalado).alias('tn_norm'))
print(f"Metodo de escalado: {var_escalado}")

Metodo de escalado: tn_robust


In [24]:
df_norm.write_parquet(f'{folder}/{path_norm}')
transform_stats.write_parquet(f'{folder}/{path_transform_stats}')

In [25]:
print(f"df_norm completo: {df_norm.shape}")
print(f"df_norm diciembre: {df_norm.filter(pl.col('periodo') == 201912).shape}")

df_norm completo: (5303555, 20)
df_norm diciembre: (178684, 20)


In [26]:
df_norm.filter((pl.col('tn_standard') == np.inf))

product_id,customer_id,periodo,tn,primer_periodo,ultimo_periodo,values,total_tn,min_tn,average_tn,median_tn,std_dev_tn,iqr_tn,max_tn,periodo_dt,mes_indice,tn_trans,tn_standard,tn_robust,tn_norm
i64,i64,i64,f64,date,date,u32,f64,f64,f64,f64,f64,f64,f64,date,i32,f64,f64,f64,f64


In [27]:
print(f"""
Datos con inf en tn_standard: {df_norm.filter((pl.col('tn_standard') == np.inf))['total_tn'].sum()}
Datos con -inf en tn_standard: {df_norm.filter((pl.col('tn_standard') == -np.inf))['total_tn'].sum()}
Datos con NaN en tn_standard: {df_norm.filter((pl.col('tn_standard') == np.NaN))['total_tn'].sum()}
Datos con inf en tn_robust: {df_norm.filter((pl.col('tn_robust') == np.inf))['total_tn'].sum()}
Datos con -inf en tn_robust: {df_norm.filter((pl.col('tn_robust') == -np.inf))['total_tn'].sum()}
Datos con NaN en tn_robust: {df_norm.filter((pl.col('tn_robust') == np.NaN))['total_tn'].sum()}
""")


Datos con inf en tn_standard: 0.0
Datos con -inf en tn_standard: 0.0
Datos con NaN en tn_standard: 0.0
Datos con inf en tn_robust: 0.0
Datos con -inf en tn_robust: 0.0
Datos con NaN en tn_robust: 0.0



In [28]:
# df_temp = df_norm.join(transform_stats, on=['product_id', 'customer_id'], how='left', coalesce=True)

In [29]:
# df_temp = df_temp.to_pandas()

In [30]:
# df_temp['tn_inversa'] = df_temp.apply(lambda row: power_inverse_transform(row['tn_trans'], row['pwr_lambda'], row['pwr_mean'], row['pwr_var']), axis=1)

In [31]:
# df_temp['tn_inversa'] = [tn_inv[0] for tn_inv in df_temp['tn_inversa']]

In [32]:
# test = df_temp['tn_inversa'] - df_temp['tn']

In [33]:
# test = df_temp[np.abs(df_temp['tn_inversa'] - df_temp['tn']) >= 0.01]
# test

In [34]:
# df_temp.to_parquet(f'{folder}/temp.parquet', index=False)

In [35]:
print(f"{fase:-^100}")
print(f"{'FINALIZA':-^100}\n\n\n")

--------------------------------------------02_normaliza--------------------------------------------
----------------------------------------------FINALIZA----------------------------------------------



