In [1]:
import os
import sys
from datetime import datetime

In [2]:
# -------------------------------------------------------------
# Environment setup
# -------------------------------------------------------------
BASE_DIR = os.path.abspath("../")
sys.path.append(BASE_DIR)

# -------------------------------------------------------------
# Configuration
# -------------------------------------------------------------
from src.utils.config import load_config
config = load_config(base_dir=BASE_DIR)

In [3]:
import time
import dask.dataframe as dd
from dask_ml.preprocessing import StandardScaler
from dask_ml.decomposition import TruncatedSVD



# Mathematical embedding - truncateSVD

In [4]:
start_time = time.time()

# === Leer CSV ===
individual_tensors = config["paths"]["tensors_convolution"]
data = dd.read_csv(individual_tensors)

row_id = data.iloc[:, 0]                     # primera columna
df_num = data.iloc[:, 1:].astype(float)      # num√©ricas

print(" to_dask_array ")
X = df_num.to_dask_array(lengths=True)

# === Escalado (Dask-ML) ===
print(" Escalado (Dask-ML) ")
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

# === SVD sin cargar en memoria ===
print(" SVD ")
n_components = 20
tsvd = TruncatedSVD(n_components=n_components)
X_reduced = tsvd.fit_transform(X_scaled)

# Convertir resultado a Dask-DF sin computar en RAM
print(" to dask ")
svd_cols = [f"svd_{i}" for i in range(n_components)]
df_svd = dd.from_dask_array(X_reduced, columns=svd_cols)

# Concatenar IDs + componentes
print(" concat ")
result = dd.concat([
    row_id.reset_index(drop=True),
    df_svd.reset_index(drop=True)
], axis=1)

# === Guardar sin computar en RAM ===
print(" Write ")
tensors_svd = config["paths"]["tensors_svd"]
os.makedirs(os.path.dirname(tensors_svd), exist_ok=True)
result.to_parquet(tensors_svd, write_index=False)

end_time = time.time()
print(f"‚è∞ Start: {time.ctime(start_time)}")
print(f"üèÅ End:   {time.ctime(end_time)}")
print(f"‚è±Ô∏è Total: {end_time - start_time:.2f} sec ({(end_time - start_time)/60:.2f} min)")


 to_dask_array 
 Escalado (Dask-ML) 
 SVD 
 to dask 
 concat 
 Write 


We're assuming that the indices of each dataframes are 
 aligned. This assumption is not generally safe.


‚è∞ Start: Wed Nov 26 18:08:54 2025
üèÅ End:   Wed Nov 26 18:16:02 2025
‚è±Ô∏è Total: 428.01 sec (7.13 min)


In [5]:
result.head(2)

We're assuming that the indices of each dataframes are 
 aligned. This assumption is not generally safe.


Unnamed: 0,row_id,svd_0,svd_1,svd_2,svd_3,svd_4,svd_5,svd_6,svd_7,svd_8,...,svd_10,svd_11,svd_12,svd_13,svd_14,svd_15,svd_16,svd_17,svd_18,svd_19
0,+++AhJk2QJM=_0,0.006205,0.517851,-0.115916,-0.967972,-0.320077,-0.068502,0.255377,0.040823,0.097394,...,0.052599,-0.075849,-0.084619,0.44465,-0.057621,-0.045088,-0.035509,-0.011,-0.078737,-0.062345
1,+++AhJk2QJM=_1,-0.56386,-0.230911,0.315015,-0.063151,-0.285392,-0.041747,0.186106,0.003272,0.099422,...,0.00617,-0.166224,-0.078923,-0.109955,-0.107116,0.016595,-0.087999,-0.000677,0.070822,0.077504


# end

| M√©todo de escalado      | Qu√© hace                                 | Preserva direcci√≥n | Ideal para Cosine | Ideal para Euclidean | Ideal para Manhattan (L1) | Ideal para Minkowski | Adecuado para K-Means | Comentarios clave |
|--------------------------|-------------------------------------------|---------------------|--------------------|------------------------|---------------------------|------------------------|------------------------|--------------------|
| **L2 Normalization**     | Normaliza cada vector a norma 1 (‚Äñx‚Äñ‚ÇÇ=1) | ‚úî S√≠               | ‚≠ê **S√≠** (equivalencia exacta) | ‚úî S√≠ (si magnitud no importa) | ‚ùå No (distorsiona L1) | ‚ùå No | ‚≠ê **S√≠ (cuando quieres imitar Cosine-KMeans)** | Convierte Euclidean en Cosine; ideal para embeddings y perfiles. |
| **StandardScaler**       | Centra y escala cada feature (z-score)   | ‚ùå No              | ‚ùå **No** | ‚≠ê **S√≠** (distancia euclidiana cl√°sica) | ‚≠ê **S√≠** | ‚≠ê **S√≠** | ‚≠ê **S√≠** | Mantiene variancia comparable entre features; est√°ndar para ML tradicional. |
| **MinMaxScaler**         | Escala cada feature a [0,1]              | ‚ùå No              | ‚ùå No | ‚≠ê S√≠ (cuando las escalas importan) | ‚≠ê S√≠ | ‚≠ê S√≠ | ‚≠ê S√≠ | Preserva relaciones relativas; √∫til en clustering basado en distancias mixtas. |
| **RobustScaler**         | Escala usando medianas y IQR             | ‚ùå No              | ‚ùå No | ‚≠ê S√≠ (robusto a outliers) | ‚≠ê S√≠ | ‚≠ê S√≠ | ‚≠ê S√≠ | Excelente cuando hay outliers fuertes; no apto para coseno. |
| **None (sin escala)**    | Deja los datos crudos                    | A veces            | ‚ùå No | ‚ùå No (si las escalas var√≠an entre features) | ‚ùå No | ‚ùå No | ‚ùå No | Solo √∫til si todas las features ya est√°n en la misma escala y sin outliers. |


In [6]:
import time
import dask.dataframe as dd
import dask.array as da

### Normalization L1

In [7]:
start_time = time.time()

# individual_tensors = config["paths"]["tensors_convolution"]
tensors_svd = config["paths"]["tensors_svd"]
inpust_path = tensors_svd
data = dd.read_parquet(inpust_path)
data = data.repartition(npartitions=data.npartitions)

id_col = "row_id"
value_cols = [c for c in data.columns if c != id_col]

X = data[value_cols].astype(float).to_dask_array(lengths=True)
row_ids = data[id_col].to_dask_array(lengths=True).reshape(-1, 1)

L1_norms = da.sum(da.abs(X), axis=1, keepdims=True)
L1_norms = da.where(L1_norms == 0, 1, L1_norms)
X_norm_L1 = X / L1_norms

full_array = da.hstack([row_ids, X_norm_L1])
df_norm_L1 = dd.from_dask_array(full_array, columns=[id_col] + value_cols)

output_path_L1 = config["paths"]["tensors_normalized_L1"]
os.makedirs(os.path.dirname(tensors_svd), exist_ok=True)
df_norm_L1.to_parquet(output_path_L1, write_index=False)

end_time = time.time()
print(f"‚è∞ Start: {time.ctime(start_time)}")
print(f"üèÅ End:   {time.ctime(end_time)}")
print(f"‚è±Ô∏è Total: {end_time - start_time:.2f} sec ({(end_time - start_time)/60:.2f} min)")



‚è∞ Start: Wed Nov 26 18:17:10 2025
üèÅ End:   Wed Nov 26 18:20:21 2025
‚è±Ô∏è Total: 191.28 sec (3.19 min)


### Normalization L2

In [8]:
start_time = time.time()

# individual_tensors = config["paths"]["tensors_convolution"]
tensors_svd = config["paths"]["tensors_svd"]
inpust_path = tensors_svd
data = dd.read_parquet(inpust_path)
data = data.repartition(npartitions=data.npartitions)

id_col = "row_id"
value_cols = [c for c in data.columns if c != id_col]

X = data[value_cols].astype(float).to_dask_array(lengths=True)
row_ids = data[id_col].to_dask_array(lengths=True).reshape(-1, 1)

L2_norms = da.linalg.norm(X, axis=1, keepdims=True)
L2_norms = da.where(L2_norms == 0, 1, L2_norms)
X_norm_L2 = X / L2_norms

full_array = da.hstack([row_ids, X_norm_L2])
df_norm_L2 = dd.from_dask_array(full_array, columns=[id_col] + value_cols)

output_path_L2 = config["paths"]["tensors_normalized_L2"]
os.makedirs(os.path.dirname(tensors_svd), exist_ok=True)
df_norm_L2.to_parquet(output_path_L2, write_index=False)

end_time = time.time()
print(f"‚è∞ Start: {time.ctime(start_time)}")
print(f"üèÅ End:   {time.ctime(end_time)}")
print(f"‚è±Ô∏è Total: {end_time - start_time:.2f} sec ({(end_time - start_time)/60:.2f} min)")



‚è∞ Start: Wed Nov 26 18:20:21 2025
üèÅ End:   Wed Nov 26 18:23:31 2025
‚è±Ô∏è Total: 189.62 sec (3.16 min)
