Esto me respondio ChatGPT:

from dtaidistance import dtw
import numpy as np
import pandas as pd
from joblib import Parallel, delayed

# Simulamos 500k series cortas (ej: 30 valores cada una)
# En producción esto vendría de tu DataFrame
n_series = 500_000
series_length = 30
series = [np.random.rand(series_length) for _ in range(n_series)]

# Supongamos que querés comparar las primeras 100 contra las siguientes 1000
query_series = series[:100]
base_series = series[100:1100]

# Calculamos DTW usando paralelización con joblib
def compute_dtw_for_query(q_idx, q):
    distances = [dtw.distance_fast(q, b) for b in base_series]
    return q_idx, distances

results = Parallel(n_jobs=-1)(delayed(compute_dtw_for_query)(i, q) for i, q in enumerate(query_series))

In [1]:
import numpy as np
import pandas as pd
import gc
from dtaidistance import dtw
import hdbscan
from sklearn.manifold import MDS
from fastdtw import fastdtw
from sklearn.cluster import MiniBatchKMeans
from sklearn.metrics import pairwise_distances
from tqdm import tqdm
import functools
from multiprocessing import Pool, shared_memory, cpu_count
import pickle
import cloudpickle
from tqdm import tqdm

In [2]:

gc.collect()
df_full = pd.read_parquet('./data/l_vm_completa_train.parquet', engine='fastparquet')

In [3]:
# Dejar solo las columnas CUSTOMER_ID, PRODUCT_ID, y ORDINAL, CLASE_DELTA donde PERIODO <= 201910
df_full = df_full.loc[df_full['PERIODO'] <= 201910, ['CUSTOMER_ID', 'PRODUCT_ID', 'ORDINAL', 'CLASE_DELTA']]
df_full = df_full.sort_values(by=['CUSTOMER_ID', 'PRODUCT_ID', 'ORDINAL'])

In [4]:
# Agrupamos por combinación de cliente y producto
grouped = df_full.groupby(['CUSTOMER_ID', 'PRODUCT_ID'])['CLASE_DELTA'].apply(list)
# Convertimos a DataFrame
series_df = grouped.reset_index(name='serie_clase')
# Extraer series como listas
series_list = series_df['serie_clase'].tolist()

In [None]:

# -------------------------------
# 1. Función de limpieza
# -------------------------------
def es_serie_valida(s):
    if not isinstance(s, list): return False
    if len(s) < 3: return False
    for v in s:
        if not isinstance(v, (int, float)): return False
        if isinstance(v, float) and np.isnan(v): return False
    return True

# Asumimos que ya tenés series_list
series_list_limpia = [s for s in series_list if es_serie_valida(s)]
print(f"Series válidas: {len(series_list_limpia)} / {len(series_list)}")

# -------------------------------
# 2. Muestreo
# -------------------------------
sample_size = min(1000, len(series_list_limpia))  # Ajustable
np.random.seed(42)
sample_idx = np.random.choice(len(series_list_limpia), sample_size, replace=False)
sample_series = [series_list_limpia[i] for i in sample_idx]

# -------------------------------
# 3. Matriz de distancias entre muestras
# -------------------------------
def fastdtw_distance(s1, s2):
    try:
        dist, _ = fastdtw(s1, s2)
        return dist
    except:
        return np.inf

distance_matrix = np.zeros((sample_size, sample_size))
print("Calculando matriz de distancias FastDTW...")

for i in tqdm(range(sample_size)):
    for j in range(i + 1, sample_size):
        d = fastdtw_distance(sample_series[i], sample_series[j])
        distance_matrix[i, j] = distance_matrix[j, i] = d

# -------------------------------
# 4. MDS y clustering
# -------------------------------
print("Reduciendo dimensión con MDS...")
mds = MDS(n_components=10, dissimilarity='precomputed', random_state=42)
embedding = mds.fit_transform(distance_matrix)

clusterer = hdbscan.HDBSCAN(min_cluster_size=20)
sample_labels = clusterer.fit_predict(embedding)

# Si querés ver cuántos clusters reales se detectaron:
n_clusters = len(set(sample_labels)) - (1 if -1 in sample_labels else 0)
print(f"Clusters detectados: {n_clusters}")

# -------------------------------
# 5. SERIALIZAR sample_series y sample_labels a memoria compartida
# -------------------------------
sample_blob = pickle.dumps((sample_series, sample_labels))
sample_shm = shared_memory.SharedMemory(create=True, size=len(sample_blob))
sample_shm.buf[:len(sample_blob)] = sample_blob
shm_name = sample_shm.name
shm_size = len(sample_blob)

# -------------------------------
# 6. Función worker para asignar cluster
# -------------------------------
def init_worker(_shm_name, _shm_size):
    global sample_data
    shm = shared_memory.SharedMemory(name=_shm_name)
    blob = shm.buf[:_shm_size]
    sample_data = pickle.loads(blob)

def asignar_cluster_worker(serie):
    sample_series, sample_labels = sample_data
    min_dist = float('inf')
    min_label = -1
    for s, label in zip(sample_series, sample_labels):
        try:
            d = fastdtw_distance(serie, s)
        except:
            d = float('inf')
        if d < min_dist:
            min_dist = d
            min_label = label
    return min_label

# -------------------------------
# 7. Paralelizar asignación
# -------------------------------
print(f"Asignando clusters usando memoria compartida y {min(28, cpu_count())} procesos...")
with Pool(processes=28, initializer=init_worker, initargs=(shm_name, shm_size)) as pool:
    cluster_labels = list(tqdm(pool.imap(asignar_cluster_worker, series_list_limpia), total=len(series_list_limpia)))

# -------------------------------
# 8. Guardar resultados
# -------------------------------
df_resultado = pd.DataFrame({
    'serie_clase': series_list_limpia,
    'cluster': cluster_labels
})
df_resultado.to_parquet("series_clusterizadas.parquet", index=False)

print("✅ Clustering completo y resultado guardado.")

# Liberar memoria compartida
sample_shm.close()
sample_shm.unlink()


Series válidas: 508660 / 690181
Calculando matriz de distancias FastDTW...


100%|██████████| 1000/1000 [03:17<00:00,  5.05it/s]


Reduciendo dimensión con MDS...
Clusters detectados: 7
Asignando clusters usando memoria compartida y 28 procesos...


Exception ignored in: <function SharedMemory.__del__ at 0x757225135b80>Exception ignored in: 
Traceback (most recent call last):
Exception ignored in: <function SharedMemory.__del__ at 0x757225135b80>  File "/home/pablo/anaconda3/envs/LaboIII/lib/python3.9/multiprocessing/shared_memory.py", line 184, in __del__
<function SharedMemory.__del__ at 0x757225135b80>    

self.close()Traceback (most recent call last):
Exception ignored in: Traceback (most recent call last):

  File "/home/pablo/anaconda3/envs/LaboIII/lib/python3.9/multiprocessing/shared_memory.py", line 184, in __del__
  File "/home/pablo/anaconda3/envs/LaboIII/lib/python3.9/multiprocessing/shared_memory.py", line 184, in __del__
<function SharedMemory.__del__ at 0x757225135b80>  File "/home/pablo/anaconda3/envs/LaboIII/lib/python3.9/multiprocessing/shared_memory.py", line 227, in close
        
    self.close()self.close()Exception ignored in: Traceback (most recent call last):

self._mmap.close()
  File "/home/pablo/anacond