In [1]:
import pandas as pd
import os, sys 
import glob

In [2]:
df_colnames = ['timestamp', 'icao24', 'lat', 'lon', 'crs', 'callsign', 'alt']

In [None]:
from tqdm import tqdm

def write_id_catalogue_for_each_date(path: str):
    # Get a list of all directories in the specified path
    date_dirs = [d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))]
    # Sort the directories by name
    date_dirs.sort()
    # For each directory, get a list of all files in the directory
    for date_dir in date_dirs:
        # print(f'Looking for CSV files in {os.path.join(path, date_dir, '*.csv')}')
        date_files = glob.glob(os.path.join(path, date_dir, '*.csv'))
        # print(f'Found {len(date_files)} files')
        # But not files starting with ._
        date_files = [f for f in date_files if not f.startswith('._')]
        # Sort the files by name
        date_files.sort()

        catalogue_df = pd.DataFrame(columns=['id', 'file_name'])

        for file in tqdm(date_files, desc=f'Processing {date_dir}', total=len(date_files)):
            # Read the file 
            file_df = pd.read_csv(file, header=None, names=df_colnames)
            # Create an id column in the df by concatenating icao24 and callsign
            file_df['id'] = file_df['icao24'].str.upper() + file_df['callsign'].str.strip().str.upper()
            # Get unique IDs from the file
            unique_ids = file_df['id'].unique()
            
            # Create a DataFrame with unique IDs and the file name
            file_catalogue = pd.DataFrame({
                'id': unique_ids,
                'file_name': os.path.splitext(os.path.basename(file))[0],
                'folder_name': date_dir
            })
            
            # Append to the catalogue_df
            catalogue_df = pd.concat([catalogue_df, file_catalogue], ignore_index=True)

        # Save the catalogue_df to a csv file
        catalogue_df.to_csv(os.path.join(path, date_dir, f'cat_{date_dir}.cat'), index=False)

write_id_catalogue_for_each_date('.')


In [9]:
def process_catalogue_folder_level(path: str):
    # Get a list of all directories in the specified path
    date_dirs = [d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))]
    # Sort the directories by name
    date_dirs.sort()
    # For each directory, get a list of all files in the directory
    for date_dir in date_dirs:
        # Get the cat file: 
        cat_file = os.path.join(path, date_dir, f'cat_{date_dir}.cat')
        # Read the cat file
        cat_df = pd.read_csv(cat_file)
        # Change file_name to folder_name + '/' + file_name
        cat_df['file_name'] = cat_df.apply(lambda x: os.path.join(x['folder_name'], str(x['file_name'])), axis=1)
        # Convert the file_name column to str
        cat_df['file_name'] = cat_df['file_name'].astype(str)
        # columns: id, file_name
        # Group by ID and aggregate filenames with comma separator
        merged_df = cat_df.groupby('id')['file_name'].agg(','.join).reset_index()
        # Save the merged df to a csv file
        merged_df.to_csv(os.path.join(path, date_dir, f'cat2_{date_dir}.cat2'), index=False)

process_catalogue_folder_level('.')



In [None]:
def cross_folders_category_merging(path: str):
    # Get a list of all directories in the specified path
    date_dirs = [d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))]
    # Sort the directories by name
    date_dirs.sort()
    
    for date_dir in date_dirs:
        # Folder 1
        # Get the last CSV filename
        date_files = glob.glob(os.path.join(path, date_dir, '*.csv'))
        date_files = [f for f in date_files if not f.startswith('._')]
        date_files.sort()
        last_csv_file = os.path.basename(date_files[-1])
        last_csv_file_base = last_csv_file.split('.')[0]
        print(f'last_csv_file: {last_csv_file}')
        cat1_filename = f'cat2_{date_dir}.cat2'

        # Folder 2
        # Get the first CSV filename in the next folder
        if date_dirs.index(date_dir) + 1 == len(date_dirs):
            continue
        next_date_dir = date_dirs[date_dirs.index(date_dir) + 1]
        next_date_files = glob.glob(os.path.join(path, next_date_dir, '*.csv'))
        next_date_files = [f for f in next_date_files if not f.startswith('._')]
        next_date_files.sort()
        first_csv_file = os.path.basename(next_date_files[0])
        first_csv_file_base = first_csv_file.split('.')[0]
        print(f'first_csv_file: {first_csv_file}')
        cat2_filename = f'cat2_{next_date_dir}.cat2'

        import csv
        # Read CAT1 into a dictionary: key = id, value = file_name string
        cat1_data = {}
        with open(os.path.join(path, date_dir, cat1_filename), newline='') as f:
            reader = csv.DictReader(f)
            for row in reader:
                cat1_data[row['id']] = row['file_name']

        # Read CAT2 into a dictionary: key = id, value = file_name string
        cat2_data = {}
        with open(os.path.join(path, next_date_dir, cat2_filename), newline='') as f:
            reader = csv.DictReader(f)
            for row in reader:
                cat2_data[row['id']] = row['file_name']

        # Process each ID in CAT1 that might span both files
        for id_, cat1_files in list(cat1_data.items()):
            # Check if CAT1 file_name contains last_csv_file
            if last_csv_file_base in cat1_files:
                # Check if the same id exists in CAT2 and its file_name contains first_csv_file
                if id_ in cat2_data and first_csv_file_base in cat2_data[id_]:
                    # Concatenate CAT2 file_name to CAT1 file_name (you may choose a separator, here we use a comma)
                    cat1_data[id_] = cat1_files + "," + cat2_data[id_]
                    # Remove the entry from CAT2 as it has now been merged
                    del cat2_data[id_]

        # Optionally, write the updated CAT1 data back to a CSV file
        with open(os.path.join(path, date_dir, f'cat3_{date_dir}.cat3'), 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(['id', 'file_name'])
            for id_, file_names in cat1_data.items():
                writer.writerow([id_, file_names])

        # And write the updated CAT2 data (with spanning IDs removed) back to a CSV file
        with open(str(os.path.join(path, next_date_dir, f'cat3_{next_date_dir}.cat3')), 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(['id', 'file_name'])
            for id_, file_names in cat2_data.items():
                writer.writerow([id_, file_names])

cross_folders_category_merging('.')



In [None]:
def filter_for_consecutive_timestamps(path: str):
    # Get a list of all directories in the specified path
    date_dirs = [d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))]
    # Sort the directories by name
    date_dirs.sort()

    for date_dir in date_dirs:
        # Get the cat file: 
        cat_file = os.path.join(path, date_dir, f'cat3_{date_dir}.cat3')
        # Read the cat file
        cat_df = pd.read_csv(cat_file)
        # columns: id, file_name
        ids_marked_to_delete = []

        # Iterate through each row in the dataframe
        for idx, row in tqdm(cat_df.iterrows(), total=len(cat_df), desc=f'Processing {date_dir}'):
            id_ = row['id']
            file_name = row['file_name']
            
            # Split the timestamps and extract numbers after backslash
            timestamps = []
            for ts in file_name.split(','):
                try:
                    timestamps.append(int(ts.split('\\')[1]))
                except:
                    # Skip if timestamp can't be parsed
                    continue
                    
            # Sort timestamps to check if they're consecutive
            timestamps.sort()
            
            # Check if timestamps are 3600 seconds apart
            for i in range(len(timestamps)-1):
                if timestamps[i+1] - timestamps[i] != 3600:
                    ids_marked_to_delete.append(id_)
                    break


        # Filter out the ids marked to delete
        print(f'{len(ids_marked_to_delete)} ids marked to delete')
        cat_df = cat_df[~cat_df['id'].isin(ids_marked_to_delete)]

        # Save the filtered cat_df to a csv file
        cat_df.to_csv(os.path.join(path, date_dir, f'cat4_{date_dir}.cat4'), index=False)

filter_for_consecutive_timestamps('.')


For multiprocessing version, see below

In [7]:
from tqdm import tqdm

def build_full_trajectory_for_callsigns(path: str):
    # Get a list of all directories in the specified path
    date_dirs = [d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))]
    # Sort the directories by name
    date_dirs.sort()
    

    # Create a new directory called cs
    os.makedirs(os.path.join(path, 'cs'), exist_ok=True)

    df_plain = pd.DataFrame(columns=df_colnames)
    
    for date_dir in date_dirs:
        # Create a new directory inside cs
        os.makedirs(os.path.join(path, 'cs', date_dir), exist_ok=True)
        # Get the cat file: 
        cat_file = os.path.join(path, date_dir, f'cat4_{date_dir}.cat4')
        # Read the cat file
        instruction_df = pd.read_csv(cat_file)
        # columns: id, file_name
        
        # All CSV files to read from instruction_df 
        # Split the file_name strings and get unique CSV files
        all_csv_files_to_read = []
        for files in instruction_df['file_name']:
            # Split on comma to get individual files
            file_list = files.split(',')
            all_csv_files_to_read.extend(file_list)
            
        # Remove duplicates while preserving order
        all_csv_files_to_read = list(dict.fromkeys(all_csv_files_to_read))

        print(f'{len(all_csv_files_to_read)} csv files to read')

        for csv_file in tqdm(all_csv_files_to_read, desc=f'Processing {date_dir}', total=len(all_csv_files_to_read)):
            ids_to_read = []
            # Find ids whose file_name contains the current csv_file
            # Only keep what is after the last backslash
            csv_file_base = csv_file.split('\\')[-1] # file name 123456.csv
            csv_file_datedir = csv_file.split('\\')[0] # folder 2024-02-20
            ids_to_read = instruction_df[instruction_df['file_name'].str.contains(csv_file_base)]['id'].tolist()
            df = pd.read_csv(os.path.join(path, csv_file_datedir, f'{csv_file_base}.csv'), header=None, names=df_colnames)
            # Add the id column to the df by concatenating icao24 and callsign
            df['id'] = df['icao24'].str.upper() + df['callsign'].str.strip().str.upper()
            # Filter the df to only include rows with ids in ids_to_read
            df = df[df['id'].isin(ids_to_read)]
            # Add the df to df_plain
            df_plain = pd.concat([df_plain, df], ignore_index=True)

        # Sort df_plain by timestamp
        df_plain = df_plain.sort_values(by=['id', 'timestamp'])
        # Save the df_plain to a csv file
        df_plain.to_csv(os.path.join(path, 'cs', date_dir, f'cs_{date_dir}.csv'), index=False)


        
        


build_full_trajectory_for_callsigns('.')
            
            
        


48 csv files to read


  df_plain = pd.concat([df_plain, df], ignore_index=True)
Processing 2023-04-01: 100%|██████████| 48/48 [00:08<00:00,  5.70it/s]


48 csv files to read


Processing 2023-04-02: 100%|██████████| 48/48 [00:20<00:00,  2.40it/s]


24 csv files to read


Processing 2023-04-03: 100%|██████████| 24/24 [00:12<00:00,  1.85it/s]


FileNotFoundError: [Errno 2] No such file or directory: './cs/cat4_cs.cat4'

# For Linux Server (multiprocessing)

In [None]:
from mpire import WorkerPool
import pandas as pd
import os

def process_single_date_dir(date_dir, path, df_colnames):
    # Create a new DataFrame for this date_dir
    df_plain = pd.DataFrame(columns=df_colnames)
    
    # Create directory for output
    os.makedirs(os.path.join(path, 'cs', date_dir), exist_ok=True)
    
    # Get the cat file
    cat_file = os.path.join(path, date_dir, f'cat4_{date_dir}.cat4')
    instruction_df = pd.read_csv(cat_file)
    
    # Get unique CSV files to read
    all_csv_files_to_read = []
    for files in instruction_df['file_name']:
        file_list = files.split(',')
        all_csv_files_to_read.extend(file_list)
    all_csv_files_to_read = list(dict.fromkeys(all_csv_files_to_read))
    
    print(f'{date_dir}: {len(all_csv_files_to_read)} csv files to read')
    
    for csv_file in all_csv_files_to_read:
        csv_file_base = csv_file.split('/')[-1]
        csv_file_datedir = csv_file.split('/')[-2]
        ids_to_read = instruction_df[instruction_df['file_name'].str.contains(csv_file_base)]['id'].tolist()
        
        df = pd.read_csv(os.path.join(path, csv_file_datedir, f'{csv_file_base}.csv'), 
                        header=None, names=df_colnames)
        df['id'] = df['icao24'].str.upper() + df['callsign'].str.strip().str.upper()
        df = df[df['id'].isin(ids_to_read)]
        df_plain = pd.concat([df_plain, df], ignore_index=True)
    
    # Sort and save
    df_plain = df_plain.sort_values(by=['id', 'timestamp'])
    output_path = os.path.join(path, 'cs', date_dir, f'cs_{date_dir}.csv')
    df_plain.to_csv(output_path, index=False)
    return f"Completed processing {date_dir}"

def build_full_trajectory_for_callsigns(path: str):
    # Get and sort directories
    date_dirs = [d for d in os.listdir(path) if os.path.isdir(os.path.join(path, d))]
    date_dirs.sort()
    
    # Create output directory
    os.makedirs(os.path.join(path, 'cs'), exist_ok=True)
    
    # Set up parallel processing
    n_workers = os.cpu_count() - 1  # Leave one CPU free
    with WorkerPool(n_workers) as pool:
        results = pool.map(process_single_date_dir, 
                         [(d, path, df_colnames) for d in date_dirs],
                         progress_bar=True)
    
    for result in results:
        print(result)

# Call the function
build_full_trajectory_for_callsigns('summer23/raw')