In [None]:
from utils import *
from multiprocess import Pool

# -------------------------------
# Main Routine with Multiprocessing
# -------------------------------
def main(radii, data_dir, percentage_of_data_usage, train_months, test_months):
    """ 
    Compute enhanced features and save them for model training using multiprocessing.
    """
    output_dir = data_dir.rstrip("/") + "_more_features/"

    logging.info("Starting feature computation...")
    start_time = time.time()

    logging.info("Loading data...")
    data = load_data_aux(data_dir, percentage_of_data_usage, train_months, test_months)
    X = data["X"]
    y_delays = data["y_delays"]
    md = data["metadata"]
    columns_scheme = data["columns_scheme"]

    logging.info(f"X shape: {X.shape}, y_delays shape: {y_delays.shape}, Metadata shape: {md.shape}")

    features_config = indices_retrieval(columns_scheme)
    logging.info("Feature configuration retrieved.")
    pos_emb = compute_train_position_embedding(X, features_config["past_station_1_indices"], features_config["future_station_1_indices"])
    
    X_augmented = np.hstack([X.copy(), pos_emb])
    logging.info(f"X after adding position embeddings: {X_augmented.shape}")

    state_times = np.unique(md[:, 0])
    logging.info(f"Unique state times found: {len(state_times)}")

    num_radii = len(radii)
    num_new_features = num_radii * 4 + 1
    new_features = np.empty((X_augmented.shape[0], num_new_features))

    num_workers = min(8, cpu_count())
    logging.info(f"Using {num_workers} CPU cores for multiprocessing.")

    args_list = []
    for st in state_times:
        # Find the indices in md where the state time equals st.
        idx = np.where(md[:, 0] == st)[0]
        args_list.append((st, idx, X_augmented, radii, features_config))

    with Pool(num_workers) as pool:
        results = pool.map(process_state_time, args_list)

    for indices, features_st in results:
        new_features[indices, :] = features_st

    X_final = np.hstack([X.copy(), pos_emb, new_features])
    logging.info(f"Final X shape after adding new features: {X_final.shape}")

    y_actions_list = []
    for month in set(train_months + test_months):
        month_str = f"{int(month):02d}"
        y_actions_month = np.load(os.path.join(data_dir, f"y_actions/y_actions_2023{month_str}.npy"))
        sample_size = max(1, int(percentage_of_data_usage * len(y_actions_month)))
        y_actions_list.append(y_actions_month[:sample_size])
    
    y_actions_all = np.concatenate(y_actions_list, axis=0)
    logging.info(f"y_actions_all shape: {y_actions_all.shape}")

    for sub in ["metadata", "x", "y_delays", "y_actions"]:
        os.makedirs(os.path.join(output_dir, sub), exist_ok=True)

    month_str = f"{int(train_months[0]):02d}"
    np.save(os.path.join(output_dir, "x", f"x_2023{month_str}.npy"), X_final)
    np.save(os.path.join(output_dir, "y_delays", f"y_delays_2023{month_str}.npy"), y_delays)
    np.save(os.path.join(output_dir, "metadata", f"md_2023{month_str}.npy"), md)
    np.save(os.path.join(output_dir, "y_actions", f"y_actions_2023{month_str}.npy"), y_actions_all)

    x_mapping = columns_scheme["x"]
    current_max_index = max(x_mapping.values()) + 1

    # Add 8 new position embedding features
    for d in range(8):
        x_mapping[f"TRAIN_POSITION_EMBEDDING_{d}"] = current_max_index
        current_max_index += 1

    # Add proximity-based features for each radius
    for r in radii:
        x_mapping[f"num_trains_within_{r}"] = current_max_index; current_max_index += 1
        x_mapping[f"min_distance_within_{r}"] = current_max_index; current_max_index += 1
        x_mapping[f"mean_distance_within_{r}"] = current_max_index; current_max_index += 1
        x_mapping[f"mean_delay_within_{r}"] = current_max_index; current_max_index += 1

    # Add global train count
    x_mapping["nb_trains_in_state"] = current_max_index; current_max_index += 1

    with open(os.path.join(output_dir, "metadata/columns_scheme.pkl"), "wb") as f:
        pkl.dump(columns_scheme, f)

    end_time = time.time()
    total_time = (end_time - start_time) / 60
    logging.info(f"Enhanced features computed and saved successfully in {total_time:.2f} minutes!")

In [None]:
if __name__ == "__main__":
    logging.basicConfig(filename='more_features.log', level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
    main(radii=[0.46, 0.83, 1.11], data_dir="/Users/mac/Desktop/train_delay_prediction/delay_pred_5_5_v1/", percentage_of_data_usage=1.0, train_months=[3], test_months=[3])