In [None]:
import os
import pandas as pd
import geopandas as gpd
import multiprocessing as mp
import dask.config
from aad.common.config import Config
from aad.data.loader import DataLoader
from aad.data.preprocessing import DataPreprocessor
from aad.data.annotation import DataAnnotator
from aad.data.sequences import DataSequencer
from aad.data.groundtruth import GroundTruthCollector
from aad.common.core_logging import ProcessLogger

from dask.distributed import Client
def main():
    config = Config()
    logger = ProcessLogger(config, 'logger')
    loader = DataLoader(config)
    # Load all data
    df_sensor, _, df_locations = loader.load_raw_data(label_load=False, location_load=True)
    n_workers: int = min(mp.cpu_count(), config.data_pipeline.NUM_WORKERS)
    dask.config.set({'temporary_directory': r'D:\tmp'})
    # Start Dask cluster for the entire pipeline

    with Client(n_workers=4, threads_per_worker=1) as client:
        minutes = [16, 30]
        for i in minutes:
            OUTPUT_DIR = f'D:/ff_data/output_{i}min_7h'
            MODEL_DIR = f'model_{i}min_7h'
            DATASET_DIR = f'dataset_{i}min_7h'

            # Set environment variables for the Config class to pick up
            os.environ['OUTPUT_DIR'] = OUTPUT_DIR
            os.environ['MODEL_DIR'] = MODEL_DIR
            os.environ['DATASET_DIR'] = DATASET_DIR

            config = Config()
            config.data_pipeline.LOCAL_OFFSET_MINUTES = 420
            config.data_pipeline.WINDOW_DURATION_MINUTES = i

            print(f'Processing with window size {i} minutes and offset 7h')
            
            # Preprocessing
            preprocessor = DataPreprocessor(config, df_sensor=df_sensor, logger=logger)
            preprocessor.preprocess_data(client=client)
            # Ground Truth Processing
            groundtruth_collector = GroundTruthCollector(config)
            df_groundtruth = groundtruth_collector.collect_groundtruth(start_end_offset_min=180)
            # Annotation (using ground truth as labels)
            annotator = DataAnnotator(config, df_labels=df_groundtruth, df_locations=df_locations, logger=logger)
            annotator.annotate_data(client=client)
            # Sequence creation
            sequencer = DataSequencer(config, logger=logger)
            sequencer.create_dataset(fit_scaler=True)
            %%

        for i in minutes:       
            OUTPUT_DIR = f'D:/ff_data/output_{i}min_0h'
            MODEL_DIR = f'model_{i}min_0h'
            DATASET_DIR = f'dataset_{i}min_0h'

            # Set environment variables for the Config class to pick up
            os.environ['OUTPUT_DIR'] = OUTPUT_DIR
            os.environ['MODEL_DIR'] = MODEL_DIR
            os.environ['DATASET_DIR'] = DATASET_DIR

            config = Config()
            config.data_pipeline.WINDOW_DURATION_MINUTES = i
            config.data_pipeline.LOCAL_OFFSET_MINUTES = 0

            print(f'Processing with window size {i} minutes and offset 0h')
            
            # Preprocessing
            preprocessor = DataPreprocessor(config, df_sensor=df_sensor, logger=logger)
            preprocessor.preprocess_data(client=client)
            # Ground Truth Processing
            groundtruth_collector = GroundTruthCollector(config)
            df_groundtruth = groundtruth_collector.collect_groundtruth(start_end_offset_min=180)
            # Annotation (using ground truth as labels)
            annotator = DataAnnotator(config, df_labels=df_groundtruth, df_locations=df_locations, logger=logger)
            annotator.annotate_data(client=client)
            # Sequence creation
            sequencer = DataSequencer(config, logger=logger)
            sequencer.create_dataset(fit_scaler=True)
main()

  from .autonotebook import tqdm as notebook_tqdm
Creating partitions: 100%|██████████| 9/9 [00:00<00:00, 10.29it/s]
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
2025-08-28 23:19:21,472 - distributed.client - ERROR - 
Traceback (most recent call last):
  File "f:\conda\npu\Lib\site-packages\distributed\utils.py", line 1923, in wait_for
    return await fut
           ^^^^^^^^^
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "f:\conda\npu\Lib\site-packages\distributed\utils.py", line 818, in wrapper
    return await func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "f:\conda\npu\Lib\site-packages\distributed\client.py", line 1929, in _close
    await se

TimeoutError: 