In [None]:
import dask.dataframe as dd
import pandas as pd
import glob
import os
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path

# Load Challenge Data

In [None]:
challenge_data = pd.read_csv("../data/challenge_set.csv")
submission_data = pd.read_csv("../data/final_submission_set.csv")
print(f"{challenge_data.shape[0]=}, {submission_data.shape[0]=}")
challenge_data = pd.concat([challenge_data, submission_data], axis=0)
challenge_data.reset_index(drop=True, inplace=True)
print(f"{challenge_data.shape[0]=}")
challenge_data['takeoff_time'] = pd.to_datetime(challenge_data['actual_offblock_time'], utc=True) + pd.to_timedelta(challenge_data['taxiout_time'], unit='m')
challenge_data['arrival_time'] = pd.to_datetime(challenge_data['arrival_time'], utc=True)
print(challenge_data.dtypes)
print(f"{challenge_data[['flight_id']].drop_duplicates().shape[0]=}")
challenge_data[['flight_id','date','actual_offblock_time','taxiout_time','takeoff_time','arrival_time','flight_duration','flown_distance']]

In [None]:
challenge_data = challenge_data[['flight_id', "takeoff_time", "arrival_time"]]
challenge_data

# Load _ALL_ `parquet` files

In [None]:
# Define input and output directories
input_dir = Path("../data/")
output_dir = iPath("../data_cleaned/")
output_dir.mkdir(parents=True, exist_ok=True)

# List all .parquet files in the input directory
parquet_files = glob.glob(str(input_dir / "*.parquet"))

# Function to process each file
def process_file(file):
    # Load file using pyarrow engine for faster reads
    df = pd.read_parquet(file, engine='pyarrow')

    # Sort and merge as before
    df.sort_values(["flight_id", "timestamp"], inplace=True)
    df = df.merge(challenge_data, on='flight_id', how='inner')

    # Filter based on timestamp conditions
    df = df[(df.timestamp >= df.takeoff_time) & 
            (df.timestamp <= df.takeoff_time + (df.arrival_time - df.takeoff_time) / 2)]

    # Select relevant columns
    df = df[['flight_id', 'timestamp', 'temperature', 'altitude', 'groundspeed', 'vertical_rate']]
    
    # Save the processed DataFrame to the output directory
    output_file = output_dir / os.path.basename(file)
    df.to_parquet(output_file, engine='pyarrow')
    
    print(f"Converted {file=}")

In [None]:
# Use ThreadPoolExecutor for parallel processing
with ThreadPoolExecutor() as executor:
    executor.map(process_file, parquet_files)

# Clean Flight data

### Ref: [Trajectory](https://ansperformance.eu/study/data-challenge/data.html#trajectory)
> Trajectories are not necessarily complete/overlapping with respect to what reported in the flight list in actual_offblock_time or arrival_time. This is due to the possibly limited/partial ADS-B coverage in some parts (or some lower altitudes) of the world. The interval [actual_offblock_time + taxiout_time, arrival_time] is a good approximation of the in-the-air portion of the flight.

### Ref: [Exploratory Data Analysis](https://ansperformance.eu/study/data-challenge/data.html#using-traffic-for-exploratory-data-analysis)
> Consider only the `[actual_offblock_time + taxiout_time, arrival_time]` interval for the in-the-air portion of the flight.

In [None]:
aggregated_ddf0.columns = ['_'.join(col).strip() for col in aggregated_ddf0.columns.values]
aggregated_ddf0.head()

In [None]:
aggregated_ddf0.to_csv('../data/vertical_rate.csv')

In [None]:
# Define the columns you want to aggregate
columns_to_aggregate = [
    'altitude', 'groundspeed', 'track', 'vertical_rate', 
    'u_component_of_wind', 'v_component_of_wind', 
    'temperature', 'specific_humidity'
]

# Perform the groupby operation and aggregate per flight_id
aggregated_ddf = filtered_ddf.groupby('flight_id')[columns_to_aggregate].agg(agg_functions)

# Compute the result (this step triggers the actual computation)
aggregated_ddf = aggregated_ddf.compute()

# Display the first few rows to check the output
aggregated_ddf.head()

In [None]:
aggregated_ddf.columns = ['_'.join(col).strip() for col in aggregated_ddf.columns.values]
aggregated_ddf.head()

In [None]:
aggregated_ddf.to_csv('../data/additional_features.csv')

In [None]:
density_ratio_sqrt = np.array([1, 0.985, 0.971, 0.956, 0.942, 0.928, 0.914, 0.900, 0.886, 0.873, 0.859, 0.793, 0.730, 0.670, 0.612, 0.556, 0.496, 0.440, 0.390])
altitude_ft = np.array([0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 15000, 20000, 25000, 30000, 35000, 40000, 45000, 50000])

# Density altitude
filtered_ddf['temperature'] = filtered_ddf['temperature'].bfill()
filtered_ddf['temperature_celsius'] = filtered_ddf['temperature'] - 273.15
filtered_ddf['standard_temp_at_alt'] = 15 - (2 * (filtered_ddf['altitude'] / 1000))
filtered_ddf['density_altitude'] = filtered_ddf['altitude'] + 120 * (filtered_ddf['temperature_celsius'] - filtered_ddf['standard_temp_at_alt'])

filtered_ddf['density_ratio_sqrt'] = np.interp(filtered_ddf.density_altitude, altitude_ft, density_ratio_sqrt)
filtered_ddf['ias'] = filtered_ddf.groundspeed * filtered_ddf.density_ratio_sqrt
filtered_ddf['d_ias'] = (
    (filtered_ddf.groupby('flight_id')['ias'].shift(-2) - filtered_ddf.groupby('flight_id')['ias'].shift(2))
    /
    (filtered_ddf.groupby('flight_id')['timestamp'].shift(-2) - filtered_ddf.groupby('flight_id')['timestamp'].shift(2)).dt.total_seconds()
)
filtered_ddf["norm_vertical_rate"] = ((1 / filtered_ddf.density_ratio_sqrt) ** 2) * (filtered_ddf.vertical_rate)

In [None]:
filtered_ddf["condition"] = (filtered_ddf.vertical_rate > 500) & (filtered_ddf.ias > 250) & (filtered_ddf.d_ias < 0.5)
filtered_ddf['group'] = (filtered_ddf['condition'] != filtered_ddf['condition'].shift()).cumsum()
filtered_ddf = (
    filtered_ddf[filtered_ddf['condition']]  # Keep only rows where condition is True
    .groupby('group')  # Group by consecutive blocks
    .filter(lambda x: (x['timestamp'].max() - x['timestamp'].min()).total_seconds() >= 5 * 60)  # Duration check
)
filtered_ddf = filtered_ddf.drop(columns=['temperature_celsius', 'standard_temp_at_alt', 'density_ratio_sqrt', 'd_ias', 'condition', 'group'])

In [None]:
# Define the columns you want to aggregate
columns_to_aggregate = ['norm_vertical_rate', 'ias']

# Perform the groupby operation and aggregate per flight_id
aggregated_ddf = filtered_ddf.groupby('flight_id')[columns_to_aggregate].agg(agg_functions)

# Compute the result (this step triggers the actual computation)
aggregated_ddf = aggregated_ddf.compute()

# Display the first few rows to check the output
aggregated_ddf.head()

In [None]:
aggregated_ddf.columns = ['_'.join(col).strip() for col in aggregated_ddf.columns.values]
aggregated_ddf.head()

In [None]:
aggregated_ddf.to_csv('../data/ias_norm_virt_rate.csv')