In [13]:
import os
import pandas as pd
import dask.dataframe as dd
from dask import delayed
from dask.distributed import Client

def inicialize_dask():
    # Inicializa un cliente de Dask y ponle puerto 8786
    client = Client(memory_limit='8GB', processes=False)
    print(client)

def cargar_datos():

    inicialize_dask()

    data_path = '../data'
    files = os.listdir(data_path)
    
    # Filtra y recopila únicamente los archivos que cumplen con la convención de nombres
    selected_files = []
    for file in files:
        if file.endswith('.csv'):
            parts = file.split('_')
            if len(parts) < 3:
                continue
            year = parts[0]
            if year not in ['2024', '2023','2022']:
                continue
            month = parts[1]
            if year == '2023' and month not in ['01','03','06','09','11']:
                continue
            if year == '2022' and month not in ['01','03','06','09','11']:
                continue
            selected_files.append(os.path.join(data_path, file))
    
    # Función para procesar cada archivo, decorada con @delayed para que se ejecute de forma perezosa
    @delayed
    def process_file(file_path):
        try:
            df = pd.read_csv(file_path, low_memory=True, dtype=str, 
             usecols=['station_id', 'last_reported', 'num_bikes_available',
             'num_docks_available', 'num_bikes_available_types.mechanical', 'num_bikes_available_types.ebike', 
             'is_installed', 'is_renting', 'is_returning', 'is_charging_station'],
            skiprows=lambda i: i > 0 and i % 3 != 0)
            df['last_reported'] = pd.to_datetime(df['last_reported'], unit='s', errors='coerce')
            df['year'] = df['last_reported'].dt.year
            df['month'] = df['last_reported'].dt.month
            df['day'] = df['last_reported'].dt.day
            df['hour'] = df['last_reported'].dt.hour

            # Elimina columnas innecesarias si existen
            cols_to_drop = [col for col in ['traffic', 'V1'] if col in df.columns]
            if cols_to_drop:
                df.drop(columns=cols_to_drop, inplace=True)

            # Convierte a float las columnas numéricas
            numeric_cols = ['num_bikes_available', 'num_docks_available', 
                            'num_bikes_available_types.mechanical', 'num_bikes_available_types.ebike']
            for col in numeric_cols:
                if col in df.columns:
                    df[col] = df[col].astype(float)
            return df
        except Exception as e:
            print(f"Error processing {file_path}: {e}")
            return pd.DataFrame()  # Devuelve un DataFrame vacío en caso de error

    # Crea una lista de DataFrames diferidos (lazy)
    delayed_dfs = [process_file(file_path) for file_path in selected_files]
    
    if not delayed_dfs:
        return None

    # Convierte los objetos diferidos en un Dask DataFrame
    ddf = dd.from_delayed(delayed_dfs)

    # Carga la metadata de estaciones en un DataFrame de pandas (pequeño)
    df_meta = pd.read_csv('../Informacio_Estacions_Bicing_2025.csv',
                          usecols=['station_id', 'lat', 'lon', 'capacity'],
                          low_memory=False)
    # Asegúrate de que los tipos de 'station_id' coincidan
    ddf['station_id'] = ddf['station_id'].astype('Int64')
    df_meta['station_id'] = df_meta['station_id'].astype('Int64')

    # Procesa columnas que necesitan conversiones de tipo
    for col in ['is_installed', 'is_renting', 'is_returning']:
        if col in ddf.columns:
            ddf[col] = ddf[col].fillna(0).astype(int)
    
    if 'is_charging_station' in ddf.columns:
        ddf['is_charging_station'] = ddf['is_charging_station'].map({'TRUE': 1, 'FALSE': 0})
    
    # Realiza el merge con la metadata de estaciones (la metadata se carga en memoria)
    ddf = ddf.merge(df_meta, on='station_id', how='inner')
    
    # Calcula una columna adicional de capacidad total
    ddf['sum_capacity'] = ddf['num_bikes_available'] + ddf['num_docks_available']
    
    # Convierte a pandas para operaciones que se realizan de forma vectorizada
    df_final = ddf.compute()
    
    # Reemplaza los valores nulos en 'capacity' utilizando la mediana por estación (operación vectorizada)
    median_capacity = df_final.groupby('station_id')['sum_capacity'].median()
    df_final['capacity'] = df_final['capacity'].fillna(df_final['station_id'].map(median_capacity))
    
    # Limita 'num_docks_available' entre 0 y 'capacity' usando la función clip, que es vectorizada
    df_final['num_docks_available'] = df_final['num_docks_available'].clip(lower=0, upper=df_final['capacity'])
    
    # Calcula la variable objetivo
    df_final['target'] = df_final['num_docks_available'] / df_final['capacity']
    
    # Agrega los datos por estación y componentes temporales
    aggregated_df = df_final.groupby(['station_id', 'year', 'month', 'day', 'hour']).agg(
        num_bikes_available=('num_bikes_available', 'mean'),
        num_docks_available=('num_docks_available', 'mean'),
        num_mechanical=('num_bikes_available_types.mechanical', 'median'),
        num_ebike=('num_bikes_available_types.ebike', 'median'),
        is_renting=('is_renting', 'mean'),
        is_returning=('is_returning', 'mean'),
        target=('target', 'mean'),
        lat=('lat', 'first'),
        lon=('lon', 'first'),
        capacity=('capacity', 'first')
    ).reset_index()

    # Filtra por las estaciones disponibles en la metadata de envío
    id_df = pd.read_csv('../data/metadata_sample_submission_2025.csv')
    station_list = pd.unique(id_df['station_id'])
    aggregated_df = aggregated_df[aggregated_df['station_id'].isin(station_list)]
    
    return aggregated_df



In [None]:
import os
import pandas as pd
import dask.dataframe as dd
from dask import delayed
from dask.distributed import Client

def inicialize_dask():
    # Inicializa un cliente de Dask con límite de memoria y sin procesos (para reducir sobrecarga)
    client = Client(memory_limit='8GB', processes=False)
    print(client)

def cargar_datos():
    inicialize_dask()

    data_path = '../data'
    files = os.listdir(data_path)
    
    # Filtra y recopila únicamente los archivos que cumplen con la convención de nombres
    selected_files = []
    for file in files:
        if file.endswith('.csv'):
            parts = file.split('_')
            if len(parts) < 3:
                continue
            year = parts[0]
            if year not in ['2024', '2023']:
                continue
            selected_files.append(os.path.join(data_path, file))
    
    # Función para procesar cada archivo, ejecutada de forma perezosa
    @delayed
    def process_file(file_path):
        try:
            df = pd.read_csv(file_path, low_memory=True, dtype=str)
            df['last_reported'] = pd.to_datetime(df['last_reported'], unit='s', errors='coerce')
            df['year'] = df['last_reported'].dt.year
            df['month'] = df['last_reported'].dt.month
            df['day'] = df['last_reported'].dt.day
            df['hour'] = df['last_reported'].dt.hour

            # Elimina columnas innecesarias
            cols_to_drop = [col for col in ['traffic', 'V1'] if col in df.columns]
            if cols_to_drop:
                df.drop(columns=cols_to_drop, inplace=True)

            # Convierte a numérico las columnas relevantes
            numeric_cols = ['num_bikes_available', 'num_docks_available', 
                            'num_bikes_available_types.mechanical', 'num_bikes_available_types.ebike']
            for col in numeric_cols:
                if col in df.columns:
                    df[col] = pd.to_numeric(df[col], errors='coerce')
            return df
        except Exception as e:
            print(f"Error processing {file_path}: {e}")
            return pd.DataFrame()  # Devuelve un DataFrame vacío en caso de error

    delayed_dfs = [process_file(file_path) for file_path in selected_files]
    if not delayed_dfs:
        return None

    # Crea un Dask DataFrame a partir de los DataFrames diferidos
    ddf = dd.from_delayed(delayed_dfs)

    # Carga la metadata de estaciones
    df_meta = pd.read_csv('../Informacio_Estacions_Bicing_2025.csv',
                          usecols=['station_id', 'lat', 'lon', 'capacity'],
                          low_memory=True)
    
    ddf['station_id'] = ddf['station_id'].astype('Int64')
    df_meta['station_id'] = df_meta['station_id'].astype('Int64')

    # Procesa columnas adicionales si existen
    for col in ['is_installed', 'is_renting', 'is_returning']:
        if col in ddf.columns:
            ddf[col] = ddf[col].fillna(0).astype(int)
    if 'is_charging_station' in ddf.columns:
        ddf['is_charging_station'] = ddf['is_charging_station'].map({'TRUE': 1, 'FALSE': 0})

    # Realiza el merge con la metadata de estaciones
    ddf = ddf.merge(df_meta, on='station_id', how='inner')

    # Calcula una columna adicional: capacidad total según datos disponibles
    ddf['sum_capacity'] = ddf['num_bikes_available'] + ddf['num_docks_available']

    # Calcula la mediana de 'sum_capacity' por estación usando Pandas en un subconjunto (solo dos columnas)
    median_capacity = ddf[['station_id', 'sum_capacity']].compute().groupby('station_id')['sum_capacity'].median()

    # Función para rellenar valores nulos en 'capacity' usando la mediana por estación
    def fill_capacity(df, median_capacity):
        df['capacity'] = df['capacity'].fillna(df['station_id'].map(median_capacity))
        return df

    # Aplica la función en cada partición sin materializar todo el dataset
    ddf = ddf.map_partitions(fill_capacity, median_capacity)

    # Limita 'num_docks_available' entre 0 y 'capacity'
    ddf['num_docks_available'] = ddf['num_docks_available'].clip(lower=0, upper=ddf['capacity'])
    
    # Calcula la variable objetivo
    ddf['target'] = ddf['num_docks_available'] / ddf['capacity']

    # Agrega los datos por estación y componentes temporales utilizando operaciones distribuidas
    aggregated_ddf = ddf.groupby(['station_id', 'year', 'month', 'day', 'hour']).agg({
        'num_bikes_available': 'mean',
        'num_docks_available': 'mean',
        'num_bikes_available_types.mechanical': 'median',
        'num_bikes_available_types.ebike': 'median',
        'is_renting': 'mean',
        'is_returning': 'mean',
        'target': 'mean',
        'lat': 'first',
        'lon': 'first',
        'capacity': 'first'
    }).reset_index()

    # Filtra por las estaciones disponibles en la metadata de envío
    id_df = pd.read_csv('../data/metadata_sample_submission_2025.csv')
    station_list = pd.unique(id_df['station_id'])
    aggregated_ddf = aggregated_ddf[aggregated_ddf['station_id'].isin(station_list)]
    
    # Finalmente, computa el resultado agregado (mucho menos datos que materializar todo el DataFrame)
    aggregated_df = aggregated_ddf.compute()
    
    return aggregated_df


In [5]:
#AFEGIM ALTRES VARIABLES ( 4h anteriors + tipo de dia: festiu cap setmana laborable)

def crear_campos_optimized(df):
    df = df.sort_values(by=['station_id', 'year', 'month', 'day', 'hour']).reset_index(drop=True)
    
    # Aplicamos los lags por grupo
    def aplicar_lags(grupo):
        for lag in range(1, 5):
            grupo[f'ctx-{lag}'] = grupo['target'].shift(lag)
        return grupo

    df = df.groupby('station_id').apply(aplicar_lags).reset_index(drop=True)

    # Eliminamos las primeras 4 filas de cada grupo y muestreamos cada 5 horas
    def filtro(grupo):
        grupo = grupo.iloc[4:]  # quitamos las primeras 4
        return grupo.iloc[::5]  # cogemos 1 de cada 5

    df = df.groupby('station_id').apply(filtro).reset_index(drop=True)

    return df



#funcio per afegir el tipo de dia, festiu laborable o cap setmana 
def day_categorization_bcn(df):
    """
    Añade una columna 'day_type' al DataFrame con la clasificación numérica de cada día:
    0 = Laborable, 1 = Fin de semana, 2 = Festivo.
    """
    import pandas as pd

    df['date'] = pd.to_datetime(df[['year', 'month', 'day']])

    # Festivos en Barcelona (2020 - marzo 2025)
    holidays = [
        "2020-01-01", "2020-01-06", "2020-04-10", "2020-04-13", "2020-05-01", "2020-06-24", "2020-09-11", "2020-09-24",
        "2020-10-12", "2020-11-01", "2020-12-06", "2020-12-08", "2020-12-25", "2020-12-26",
        "2021-01-01", "2021-01-06", "2021-04-02", "2021-04-05", "2021-05-01", "2021-06-24", "2021-09-11", "2021-09-24",
        "2021-10-12", "2021-11-01", "2021-12-06", "2021-12-08", "2021-12-25", "2021-12-26",
        "2022-01-01", "2022-01-06", "2022-04-15", "2022-04-18", "2022-05-01", "2022-06-24", "2022-09-11", "2022-09-24",
        "2022-10-12", "2022-11-01", "2022-12-06", "2022-12-08", "2022-12-25", "2022-12-26",
        "2023-01-01", "2023-01-06", "2023-04-07", "2023-04-10", "2023-05-01", "2023-06-24", "2023-09-11", "2023-09-24",
        "2023-10-12", "2023-11-01", "2023-12-06", "2023-12-08", "2023-12-25", "2023-12-26",
        "2024-01-01", "2024-01-06", "2024-03-29", "2024-04-01", "2024-05-01", "2024-06-24", "2024-09-11", "2024-09-24",
        "2024-10-12", "2024-11-01", "2024-12-06", "2024-12-08", "2024-12-25", "2024-12-26",
        "2025-01-01", "2025-01-06", "2025-04-18", "2025-04-21", "2025-05-01", "2025-06-24", "2025-09-11", "2025-09-24",
        "2025-10-12", "2025-11-01", "2025-12-06", "2025-12-08", "2025-12-25", "2025-12-26"
    ]
    
    holiday_dates = pd.to_datetime(holidays)

    # Función para clasificar el día en valores numéricos
    def classify_day(date):
        if date in holiday_dates:
            return 2  # Festivo
        elif date.weekday() >= 5:  # Sábado (5) o domingo (6)
            return 1  # Fin de semana
        else:
            return 0  # Laborable

    df['day_type'] = df['date'].apply(classify_day)

    df.drop(columns=['date'], inplace=True)

    return df



In [2]:
df_merge = cargar_datos()

<Client: 'inproc://172.20.10.6/17524/1' processes=1 threads=12, memory=7.45 GiB>


  df['last_reported'] = pd.to_datetime(df['last_reported'], unit='s', errors='coerce')
You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('is_charging_station', 'float64'))

  df['last_reported'] = pd.to_datetime(df['last_reported'], unit='s', errors='coerce')
  df['last_reported'] = pd.to_datetime(df['last_reported'], unit='s', errors='coerce')
  df['last_reported'] = pd.to_datetime(df['last_reported'], unit='s', errors='coerce')
  df['last_reported'] = pd.to_datetime(df['last_reported'], unit='s', errors='coerce')
  df['last_reported'] = pd.to_datetime(df['last_reported'], unit='s', errors='coerce')
  df['last_reported'] = pd.to_datetime(df['last_reported'], unit='s', erro

In [6]:
df_merge= crear_campos_optimized(df_merge)

  df = df.groupby('station_id').apply(aplicar_lags).reset_index(drop=True)
  df = df.groupby('station_id').apply(filtro).reset_index(drop=True)


In [7]:
df_merge_final=day_categorization_bcn(df_merge)

In [8]:
# Ahora con el df_merge_final crear un df nuevo que sea df_train que coja 1 de cada 5 filas
# df_train = df_merge_final.iloc[::5]
# df_train = df_train.reset_index(drop=True)
print(df_merge_final.head(10))  # Ver las primeras filas del nuevo DataFrame

   station_id    year  month  day  hour  num_bikes_available  \
0           1  2023.0    1.0  1.0  23.0            18.666667   
1           1  2023.0    1.0  2.0   4.0            22.000000   
2           1  2023.0    1.0  2.0  12.0            13.000000   
3           1  2023.0    1.0  2.0  20.0            10.500000   
4           1  2023.0    1.0  3.0   1.0            10.000000   
5           1  2023.0    1.0  3.0   6.0            18.500000   
6           1  2023.0    1.0  3.0  11.0             1.500000   
7           1  2023.0    1.0  3.0  16.0             5.500000   
8           1  2023.0    1.0  3.0  21.0            10.500000   
9           1  2023.0    1.0  4.0   2.0            12.000000   

   num_docks_available  num_mechanical  num_ebike  is_renting  is_returning  \
0            27.333333            18.0        0.0         1.0           1.0   
1            24.000000            22.0        0.0         1.0           1.0   
2            33.000000            12.0        1.0         

In [9]:
# Para separar los datos usamos df_train
X = df_merge_final[['station_id','month', 'day', 'hour', 'ctx-1', 'ctx-2', 'ctx-3', 'ctx-4','lat','lon', 'day_type']]
y = df_merge_final['target']

In [10]:
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
from sklearn.ensemble import RandomForestRegressor
from sklearn.neural_network import MLPRegressor
from sklearn.neural_network import MLPRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import numpy as np

# Linear Regression

def linear_regression(X,y, size):
#Separamos train y test
    X_train, X_test, y_train, y_test = train_test_split(X ,y , test_size= size, random_state=42)

    lm = LinearRegression()
#Entrenamiento modelo
    lm.fit(X_train, y_train)
#Predict
    y_pred = lm.predict(X_test)
#Metricas
    r2 = r2_score(y_test, y_pred)
    mse = mean_squared_error(y_test, y_pred)
    mae = mean_absolute_error(y_test, y_pred)

    return  r2, mse, mae


# Random Forest

def RandomForest(X,y, size):
#Separamos train y test
    X_train, X_test, y_train, y_test = train_test_split(X ,y , test_size= size, random_state=42)

    rf = RandomForestRegressor(n_estimators = 100, random_state= 42)
#Entrenamiento modelo
    rf.fit(X_train, y_train)
#Predict
    y_pred = rf.predict(X_test)
#Metricas
    r2 = r2_score(y_test, y_pred)
    mse = mean_squared_error(y_test, y_pred)
    mae = mean_absolute_error(y_test, y_pred)
#Factoring Importance
    feature_importance = rf.feature_importances_
    feature_names = X_test.columns
    importance_df = pd.DataFrame({'Feature': feature_names, 'Importance': feature_importance})
    importance_df = importance_df.sort_values(by='Importance', ascending=False)

    return  r2, mse, mae, importance_df, rf

# Neural Network

def neural_network_model(X, y, test_size=0.2):
    # Definir columnas por tipo
    numeric_features = ['month', 'day', 'hour', 'ctx-1', 'ctx-2', 'ctx-3', 'ctx-4', 'lat', 'lon']
    categorical_features = ['station_id', 'day_type']

    # Preprocesado
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', StandardScaler(), numeric_features),
            ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_features)
        ]
    )

    # Modelo optimizado: 2 capas ocultas, early stopping, batch_size bajo
    model = MLPRegressor(
        hidden_layer_sizes=(128, 64),
        activation='relu',
        solver='adam',
        random_state=42,
        max_iter=500,
        early_stopping=True,
        validation_fraction=0.1,
        batch_size=256
    )

    # Pipeline de preprocesado + modelo
    pipeline = Pipeline([
        ('preprocess', preprocessor),
        ('regressor', model)
    ])

    # División de datos
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, random_state=42)

    # Entrenar
    pipeline.fit(X_train, y_train)

    # Predicción
    y_pred = pipeline.predict(X_test)

    # Métricas
    r2 = r2_score(y_test, y_pred)
    mse = mean_squared_error(y_test, y_pred)
    mae = mean_absolute_error(y_test, y_pred)

    return r2, mse, mae, pipeline


In [11]:
# r2_linear, mse_linear, mae_linear = linear_regression(X,y, 0.3)
# r2_rf, mse_rf, mae_rf, importance_df,rf = RandomForest(X, y, 0.3)
r2_nn, mse_nn, mae_nn, nn_model = neural_network_model(X, y, 0.3)

metrics_df = pd.DataFrame({
    'Métrica': ['R²', 'MSE', 'MAE'],
    # 'Regresión Lineal': [r2_linear, mse_linear, mae_linear],
    # 'Random Forest': [r2_rf, mse_rf, mae_rf],
    'Neural Network': [r2_nn, mse_nn, mae_nn]
})

print(metrics_df)
print("Importancia de las características según Random Forest:")
print(importance_df)

  Métrica  Neural Network
0      R²        0.795054
1     MSE        0.015544
2     MAE        0.085573
Importancia de las características según Random Forest:


NameError: name 'importance_df' is not defined

In [12]:
import pandas as pd
from sklearn.preprocessing import LabelEncoder
import gc

# --- CARGA PARCIAL DEL DATASET, INCLUYENDO 'index' ---
use_cols = ['index', 'station_id', 'month', 'day', 'hour', 'ctx-4', 'ctx-3', 'ctx-2', 'ctx-1']
df = pd.read_csv('../metadata_sample_submission_2025.csv', usecols=use_cols)

df['year'] = 2024  # Necesario para `day_categorization_bcn`

# --- IMPUTACIÓN DE LAT/LON ---
df_merge_final = df_merge_final[['station_id', 'lat', 'lon']].drop_duplicates()
df = df.merge(df_merge_final, on='station_id', how='left')

# --- CLASIFICACIÓN DEL DÍA ---
df = day_categorization_bcn(df)

# --- CONVERSIÓN DE CATEGÓRICAS (si hace falta) ---
if df['day_type'].dtype == 'object':
    df['day_type'] = LabelEncoder().fit_transform(df['day_type'])

# --- PREDICCIÓN POR LOTES ---
features = ['station_id', 'month', 'day', 'hour', 'ctx-4', 'ctx-3', 'ctx-2', 'ctx-1', 'lat', 'lon', 'day_type']
X_predict = df[features]

batch_size = 5000
predictions = []
for start in range(0, len(X_predict), batch_size):
    end = start + batch_size
    batch = X_predict.iloc[start:end]
    preds = nn_model.predict(batch)
    predictions.extend(preds)
    del batch
    gc.collect()

# --- CREACIÓN DEL DF FINAL PARA ENTREGABLE ---
df['percentage_docks_available'] = predictions
df_final = df[['index', 'percentage_docks_available']]
df_final.to_csv('predictions.csv', index=False)

print("✅ Archivo 'predictions.csv' creado correctamente.")


✅ Archivo 'predictions.csv' creado correctamente.


In [10]:
df_final = df[['index','percentage_docks_available']]
df_final.to_csv('predictions.csv', index=False)

KeyError: "['index'] not in index"