In [None]:
import yaml
import traceback

import dask
import dask.distributed

import fsspec

import xopr.opr_access

from radar_line_processing import process_radar_line, get_output_locations, cache_exists

### Setup processing cluster

In [None]:
client = dask.distributed.LocalCluster().get_client()
client

### Load parameters

In [None]:
with open("data_preprocessing_config.yaml", "r") as f:
    config = yaml.safe_load(f)

for k in config:
    print(f"=== {k} ===")
    for sk in config[k]:
        print(f"- {sk}: {config[k][sk]}")
    print("")

### Query STAC catalog for flights to process

In [None]:
opr = xopr.opr_access.OPRConnection(cache_dir="radar_cache")
flights = {}
for collection in config["input"]["collections"]:
    flights[collection] = [f['flight_id'] for f in opr.get_flights(collection)]

    limit = config["input"].get("flights_per_collection_limit", None)
    if (limit is not None) and limit > 0:
        flights[collection] = flights[collection][:limit]

    print(f"Found {len(flights[collection])} flights in collection {collection}")

In [None]:
# Remove any collections that already have a cached processed output
if config['processing_flights']['check_for_cached_files']:
    n_flights_to_process = 0
    for collection in flights:
        flights[collection] = [
            f for f in flights[collection]
            if not cache_exists(f, collection,
                    config["output"]["processed_flight_cache_url"],
                    cache_revision_id=config["processing_flights"]["cache_revision_id"])
        ]
        n_flights_to_process += len(flights[collection])
    
    print(f"Found {n_flights_to_process} flights to process after checking for cached files")

### Process flights

In [None]:
futures = []
for season_name in flights:
    futures.extend(client.map(process_radar_line, flights[season_name],
        season_name=season_name,
        output_storage_location=config["output"]["processed_flight_cache_url"],
        parameters=config["processing_flights"],
        return_dataset=False,
        opr_connection=opr
        ))

results = []
for future in dask.distributed.as_completed(futures):
    try:
        result = future.result()
        results.append(result)
    except Exception as e:
        print(f"Error processing flight: {e}")
        traceback.print_exc()

In [None]:
results