In [3]:
%load_ext autoreload
%autoreload 2

In [4]:
from path_prefix import PATH_PREFIX

In [5]:
import logging
import os
import pandas as pd
from geo.drift_compensation import get_track_drift_rate
from get_turn import get_turning_points, plot_changepoints, TurnAndRise, write_turnandrise_to_zarr
import csv
import random
from typing import List
import pickle


In [6]:
def make_dirs() -> None:
    # Make /workspace/deepflow/data/osstate/dangling if it doesn't exist
    os.makedirs(f'{PATH_PREFIX}/data/osstate/dangling', exist_ok=True)
    os.makedirs(f'{PATH_PREFIX}/data/osstate/routes', exist_ok=True)
    print(f'Created {PATH_PREFIX}/data/osstate/dangling')
    print(f'Created {PATH_PREFIX}/data/osstate/routes')

make_dirs()

Created /Users/thinhhoang/Documents/XFD/data/osstate/dangling
Created /Users/thinhhoang/Documents/XFD/data/osstate/routes


In [7]:
import random
import string

def generate_random_string(length):
    """
    Generate a random string of characters and numbers with the specified length.
    
    Args:
    length (int): The desired length of the random string.
    
    Returns:
    str: A random string of characters and numbers.
    """
    # Define the character set: lowercase letters, uppercase letters, and digits
    characters = string.ascii_letters + string.digits
    
    # Generate the random string
    random_string = ''.join(random.choice(characters) for _ in range(length))
    
    return random_string

In [8]:
def get_filename_from_filepath(filepath: str) -> str:
    # Get the filename without the extension, which is everything after the last slash and before the first period
    if '/' in filepath:
        filename = filepath.split('/')[-1].split('.')[0]
    else:
        filename = filepath.split('.')[0]
    return filename


In [9]:
def process_csv_file(filepath: str, logtag: str, n_idents: int = 0, ident_filter: List[str] = [], ident_mandatory: List[str] = []):
    # Set up logging
    logger = logging.getLogger(logtag)
    logger.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    fh = logging.FileHandler(f'{logtag}.log')
    fh.setLevel(logging.DEBUG)
    fh.setFormatter(formatter)
    logger.addHandler(fh)

    # Read the file and preprocess it
    df:pd.DataFrame = pd.read_csv(filepath, compression='gzip')
    logger.info(f"Read {len(df)} rows from {filepath}")
    df.dropna(how='any', inplace=True)
    logger.info(f"Dropped NaN rows, {len(df)} rows remaining")
    # add an ident column by concatenating df['callsign'] and df['icao24']
    df['ident'] = (df['callsign'].str.strip()+'_'+df['icao24'].str.strip())
    # add a column rtime that is df['time'] - df['time'].min()
    df['rtime'] = df['time'] - df['time'].min()
    # Drop the columns we don't need
    df.drop(columns=['onground', 'alert', 'spi', 'squawk'], inplace=True)
    idents = df['ident'].unique()
    # If n_idents is greater than the number of unique idents, set n_idents to the number of unique idents
    if len(idents) < n_idents:
        n_idents = len(idents)

    # Only keep the idents that are in the ident_filter
    if len(ident_filter) > 0:
        idents = [ident for ident in idents if ident in ident_filter]
    
    # Add the mandatory idents to the list by finding intersection between idents and ident_mandatory
    if len(ident_mandatory) > 0:
        ident_mandatory_collected = [ident for ident in idents if ident in ident_mandatory] # idents that are in both idents and ident_mandatory
        
        if len(ident_mandatory_collected) < len(ident_mandatory):
            print(f"Could not find all mandatory idents")
            print(f"Found only {len(ident_mandatory_collected)} out of {len(ident_mandatory)} mandatory idents")
            logger.error(f"Could not find all mandatory idents: {ident_mandatory}")

        # Keep ident_mandatory_collected and add random idents to the list to make up n_idents
        if n_idents > 0 and len(ident_mandatory) < n_idents:
            new_idents_pool = [ident for ident in idents if ident not in ident_mandatory_collected] # pool of idents to choose from for the remaining slots
            new_idents = random.sample(new_idents_pool, n_idents - len(ident_mandatory)) # Choose n_idents - len(ident_mandatory) random idents
            new_idents = list(set(new_idents)) # Remove duplicates
            idents = ident_mandatory_collected + new_idents
        elif n_idents > 0: # len(ident_mandatory) >= n_idents: too many mandatory idents, keep only first n_idents
            idents = ident_mandatory_collected[:n_idents]
        else:
            raise ValueError("n_idents must be greater than 0")

    else: # len(ident_mandatory) == 0 or no ident_mandatory specified
        if n_idents > 0:
            idents = random.sample(list(idents), n_idents)
        else:
            idents = list(idents)
    
    logger.info(f"Processing {len(idents)} unique idents")

    filename = get_filename_from_filepath(filepath)

    # Create a folder called filename inside the routes folder
    # os.makedirs(f'{PATH_PREFIX}/data/osstate/routes/{filename}', exist_ok=True)

    # File objects for writing to CSV: dangling_file, route_file for turning points - altitude change points, links_file, waypoints_file
    dangling_file = open(f'{PATH_PREFIX}/data/osstate/dangling/{filename}.danglings', 'w', newline='')
    # TP: turning points, DP: altitude change points
    route_file_tp = open(f'{PATH_PREFIX}/data/osstate/routes/{filename}.turns', 'w', newline='')
    route_file_dp = open(f'{PATH_PREFIX}/data/osstate/routes/{filename}.climbs', 'w', newline='')
    links_file = open(f'{PATH_PREFIX}/data/osstate/routes/{filename}.links', 'w', newline='')
    waypoints_file = open(f'{PATH_PREFIX}/data/osstate/routes/{filename}.waypoints', 'w', newline='')

    # Create the CSV writers
    dangling_writer = csv.writer(dangling_file)
    # The header of dangling file
    dangling_writer.writerow(['filename', 'ident'])

    tp_route_writer = csv.writer(route_file_tp)
    dp_route_writer = csv.writer(route_file_dp)
    # The header of route files
    tp_route_writer.writerow(['ident','wp','time','lat','lon','alt','vel'])
    dp_route_writer.writerow(['ident','time','lat','lon','alt','vel']) # the altitude change points don't have a waypoint name
    
    waypoints_writer = csv.writer(waypoints_file)
    waypoints_writer.writerow(['wp','lat','lon','ident'])

    links_writer = csv.writer(links_file)
    links_writer.writerow(['link','wp1','wp2','time1','time2','lat1','lon1','lat2','lon2','ident'])
    
    for ident in idents:
        try:

            # Get the subdf for the ident
            df_ident = df[df['ident'] == ident]
            if len(df_ident) == 0:
                logger.error(f"Ident {ident} not found in the dataframe")
                continue
            turns:TurnAndRise = get_turning_points(df_ident)
            # Write the turning points and altitude change points to the CSV files
            last_wp_name = '' # to store the last waypoint name, useful to write the links
            for i in range(len(turns['tp_time'])):
                wp_name:str = 'W' + generate_random_string(24) # Generate a random string for the waypoint name, 24 characters long starting with W
                tp_route_writer.writerow([ident, wp_name, turns['tp_time'][i], turns['tp_lat'][i], turns['tp_lon'][i], turns['tp_alt'][i], turns['tp_vel'][i]])
                waypoints_writer.writerow([wp_name, turns['tp_lat'][i], turns['tp_lon'][i], ident])
                if last_wp_name != '':
                    link_name:str = 'L' + generate_random_string(24) # Generate a random string for the link name, 24 characters long starting with L
                    links_writer.writerow([link_name, last_wp_name, wp_name, turns['tp_time'][i-1], turns['tp_time'][i], turns['tp_lat'][i-1], turns['tp_lon'][i-1], turns['tp_lat'][i], turns['tp_lon'][i], ident])
                last_wp_name = wp_name
            for i in range(len(turns['dp_time'])):
                dp_route_writer.writerow([ident, turns['dp_time'][i], turns['dp_lat'][i], turns['dp_lon'][i], turns['dp_alt'][i], turns['dp_vel'][i]])

            if not turns['landed']:
                # Aircraft not yet landed, write to the dangling CSV file
                dangling_writer.writerow([filename, ident])
            # write_turnandrise_to_zarr(turns, f'{PATH_PREFIX}/data/osstate/routes/{filename}/{ident}.zarr')
            logger.info(f"Processed {ident}")
        except Exception as e:
            logger.error(f"Error processing {ident}: {e}")
    
    # Close the file objects
    dangling_file.close()
    route_file_tp.close()
    route_file_dp.close()
    links_file.close()
    waypoints_file.close()
    logger.info(f"Finished processing {filename}")



In [10]:
def get_dangling_idents(filepath: str) -> List[str]:
    filename = get_filename_from_filepath(filepath)
    try:
        dangling_df = pd.read_csv(f'{PATH_PREFIX}/data/osstate/dangling/{filename}.danglings')
        return dangling_df['ident'].unique().tolist()
    except FileNotFoundError:
        return []

In [11]:
def get_data_file_list() -> List[str]:
    # List all the files in the data folder
    data_files = os.listdir(f'{PATH_PREFIX}/data/osstate/extracted')
    # Only keep the .csv.gz files
    data_files = [file for file in data_files if file.endswith('.csv.gz')]
    data_files = [f'{PATH_PREFIX}/data/osstate/extracted/{file}' for file in data_files]
    # Sort the files alphabetically
    data_files.sort()
    print(f'Found {len(data_files)} files')
    return data_files

In [12]:
# Test before multiprocessing

# file_list = get_data_file_list()
# process_csv_file(file_list[0], 'test', n_idents=100, ident_mandatory=[])
# print('Dangling idents: ', get_dangling_idents(file_list[0]))
# process_csv_file(filepath=file_list[1], logtag='test2', n_idents=100, ident_mandatory=get_dangling_idents(file_list[0]))


In [13]:
# Multiprocessing
# ================

In [14]:
import multiprocess as mp

In [15]:
def process_file(file_list, thread_number, n_idents = 2000):
    print(f"Processing {len(file_list)} files in thread {thread_number}")
    for index, file in enumerate(file_list):
        if index == 0:
            process_csv_file(filepath=file, logtag=file, n_idents=n_idents)
        else:
            process_csv_file(filepath=file, logtag=file, n_idents=n_idents, ident_mandatory=get_dangling_idents(file_list[index - 1]))

do_not_allow_delete = True

if __name__ == '__main__':
    file_list = get_data_file_list()
    num_processes = min(mp.cpu_count(), len(file_list))
    processes = []

    # Divide the file list into num_processes chunks
    file_list = [file_list[i:i + len(file_list) // num_processes] for i in range(0, len(file_list), len(file_list) // num_processes)]

    for i in range(num_processes):
        print(process_file)
        p = mp.Process(target=process_file, args=(file_list[i], i))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

Found 1 files
<function process_file at 0x10c5ed440>
Processing 1 files in thread 0


KeyboardInterrupt: 

# CAUTION: DELETE DATA

In [1]:
def wipe_slate():
    # Wipe the slate clean
    !rm -rf {PATH_PREFIX}/data/osstate/dangling/*
    !rm -rf {PATH_PREFIX}/data/osstate/routes/*
    !rm -rf {PATH_PREFIX}/data/osstate/extracted/*.log

In [2]:
wipe_slate()