In [1]:
import pandas as pd
import re
from typing import List, Dict, Any, Tuple, Callable
from tqdm.auto import tqdm
from pathlib import Path
import csv
from concurrent.futures import ThreadPoolExecutor
import importlib
import files
from files import *

from actions import (
    remove_typos,
    CalculateWeightAndTrimAction,
    CheckinMsgProcessor,
    CreateLoadingInstructionAction,
    CreateLoadsheetAction,
    CreateZFWMessageAction,
    EstimateStorePaxDataAction,
    RampFinalAction,
    SendFuelOrderAction,
    SendLoadingInstructionAction,
    SendLoadsheetAction,
    SetActualBagWeightIndicatorAction,
    SetCKIPaxDistributionAction,
    StoreAircraftDataAction,
    StorePaxDataAction,
    StorePaxDataGuiAction,
    StoreRegistrationAndConfigurationAc,
    TdmCreateLoadingInstructionAction,
    TransferCargoAction,
    TransferCheckinDataAction,
    UpdateEstimatesAction,
    UpdateFuelDataAction,
    UpdateLoadTableAction,
    UpdateTransitLoadTableAction,
)

In [3]:
class Process:
    def __init__(self, func: Callable, **kwargs):
        self.func = func
        self.kwargs = kwargs

    def get_func(self) -> Callable:
        return self.func

    def get_kwargs(self):
        return self.kwargs

In [4]:
def multiprocess(processes: List[Process], workers: int):
    with ThreadPoolExecutor(workers) as executor:
        futures = []
        for process in processes:
            futures.append(executor.submit(process.get_func(), **process.get_kwargs())),

        for future in futures:
            future.result()  # This will re-raise any exceptions that occurred during task execution

# Fix broken CSV files

In [5]:
def fix_broken_csv(csv_input_file, csv_output_file):
    id_timestamp_pattern = re.compile(r"^\d+,\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}")

    with open(csv_input_file, "r", encoding="utf-8") as infile, open(
        csv_output_file, "w", encoding="utf-8", newline=""
    ) as outfile:
        reader = csv.reader(infile)
        writer = csv.writer(outfile)
        buffer = []
        first_line = True

        # Read the header from the input file and write it to the output file
        header = next(reader)
        writer.writerow(header)

        for line in infile:
            line = line.rstrip("\n")  # Retain trailing newlines by using rstrip('\n')

            # Check if the line matches the pattern for a new entry
            if id_timestamp_pattern.match(line):
                # If buffer is not empty, process the previous buffered entry
                if not first_line:
                    combined_line = "\n".join(buffer)
                    # Add closing quote if the previous entry was not closed properly
                    if combined_line.count('"') % 2 != 0:
                        combined_line += '"'
                    writer.writerow(csv.reader([combined_line]).__next__())

                # Start a new buffer with the current line
                buffer = [line]
                first_line = False
            else:
                # Continue the buffer
                buffer.append(line)

        # Handle the last buffer if not empty
        if buffer:
            combined_line = "\n".join(buffer)
            # Add closing quote if the last entry was not closed properly
            if combined_line.count('"') % 2 != 0:
                combined_line += '"'
            writer.writerow(csv.reader([combined_line]).__next__())

In [7]:
multiprocess(
    processes=[
        Process(
            fix_broken_csv,
            csv_input_file=CSV_FILE_AB,
            csv_output_file=CSV_FILE_AB_FIXED,
        ),
        Process(
            fix_broken_csv,
            csv_input_file=CSV_FILE_MN,
            csv_output_file=CSV_FILE_MN_FIXED,
        ),
        Process(
            fix_broken_csv,
            csv_input_file=CSV_FILE_ZY,
            csv_output_file=CSV_FILE_ZY_FIXED,
        ),
    ],
    workers=3,
)

# Read CSV files and convert them to Parquet files


In [13]:
def csv_to_parquet_cleaning(csv_file: Path, parquet_file: Path) -> pd.DataFrame:

    # read the CSV file
    df = pd.read_csv(csv_file)

    # remove leading and trailing linebreaks and whitespaces
    def custom_strip(text):
        if isinstance(text, str):
            return text.strip("\n\r").strip()
        return text

    df = df.map(custom_strip, na_action="ignore")

    # Drop duplicates ignoring the index
    df.drop_duplicates(subset=list(df.columns).remove("id"), inplace=True)

    # Set the unique identifier for every flight
    df["flightid"] = df.apply(
        lambda x: f"{x['airline_code']}_{x['flight_number']}_{x['flight_date']}_{x['departure_airport']}",
        axis=1,
    )

    # Convert creation_time to a datetime object
    df["creation_time"] = pd.to_datetime(df["creation_time"])

    # Write the dataframe to parquet
    df.to_parquet(parquet_file, engine="pyarrow", compression="brotli")
    return df

In [14]:
multiprocess(
    processes=[
        Process(
            csv_to_parquet_cleaning, csv_file=CSV_FILE_AB_FIXED, parquet_file=PARQUET_FILE_AB
        ),
        Process(
            csv_to_parquet_cleaning, csv_file=CSV_FILE_MN_FIXED, parquet_file=PARQUET_FILE_MN
        ),
        Process(
            csv_to_parquet_cleaning, csv_file=CSV_FILE_ZY_FIXED, parquet_file=PARQUET_FILE_ZY
        ),
    ],
    workers=3,
)

# Extract Action Data


In [31]:
import importlib

importlib.reload(remove_typos)
importlib.reload(CalculateWeightAndTrimAction)
importlib.reload(CheckinMsgProcessor)
importlib.reload(CreateLoadingInstructionAction)
importlib.reload(CreateLoadsheetAction)
importlib.reload(CreateZFWMessageAction)
importlib.reload(EstimateStorePaxDataAction)
importlib.reload(RampFinalAction)
importlib.reload(SendFuelOrderAction)
importlib.reload(SendLoadingInstructionAction)
importlib.reload(SendLoadsheetAction)
importlib.reload(SetActualBagWeightIndicatorAction)
importlib.reload(SetCKIPaxDistributionAction)
importlib.reload(StoreAircraftDataAction)
importlib.reload(StorePaxDataAction)
importlib.reload(StorePaxDataGuiAction)
importlib.reload(StoreRegistrationAndConfigurationAc)
importlib.reload(TdmCreateLoadingInstructionAction)
importlib.reload(TransferCargoAction)
importlib.reload(TransferCheckinDataAction)
importlib.reload(UpdateEstimatesAction)
importlib.reload(UpdateFuelDataAction)
importlib.reload(UpdateLoadTableAction)
importlib.reload(UpdateTransitLoadTableAction)

action_extractors = {
    "CalculateWeightAndTrimAction": CalculateWeightAndTrimAction.extract,
    "CheckinMsgProcessor": CheckinMsgProcessor.extract,
    "CreateLoadingInstructionAction": CreateLoadingInstructionAction.extract,
    "CreateLoadsheetAction": CreateLoadsheetAction.extract,
    "CreateZFWMessageAction": CreateZFWMessageAction.extract,
    "EstimateStorePaxDataAction": EstimateStorePaxDataAction.extract,
    "RampFinalAction": RampFinalAction.extract,
    "SendFuelOrderAction": SendFuelOrderAction.extract,
    "SendLoadingInstructionAction": SendLoadingInstructionAction.extract,
    "SendLoadsheetAction": SendLoadsheetAction.extract,
    "SetActualBagWeightIndicatorAction": SetActualBagWeightIndicatorAction.extract,
    "SetCKIPaxDistributionAction": SetCKIPaxDistributionAction.extract,
    "StoreAircraftDataAction": StoreAircraftDataAction.extract,
    "StorePaxDataAction": StorePaxDataAction.extract,
    "StorePaxDataGuiAction": StorePaxDataGuiAction.extract,
    "StoreRegistrationAndConfigurationAc": StoreRegistrationAndConfigurationAc.extract,
    "TdmCreateLoadingInstructionAction": TdmCreateLoadingInstructionAction.extract,
    "TransferCargoAction": TransferCargoAction.extract,
    "TransferCheckinDataAction": TransferCheckinDataAction.extract,
    "UpdateEstimatesAction": UpdateEstimatesAction.extract,
    "UpdateFuelDataAction": UpdateFuelDataAction.extract,
    "UpdateLoadTableAction": UpdateLoadTableAction.extract,
    "UpdateTransitLoadTableAction": UpdateTransitLoadTableAction.extract,
}

In [20]:
def extract_action_data(
    source_file: Path,
    target_file: Path,
    label: str | None = None,
    df: pd.DataFrame | None = None,
) -> pd.DataFrame:
    """Extract specific data based on predefined action extractors, optionally displaying a progress bar and labels.

    This function iterates over a dictionary of action names and their associated extractor functions, applying each
    extractor to the relevant entries in the DataFrame. The results are stored in new columns in the DataFrame.

    Args:
        df (pd.DataFrame): The DataFrame from which data will be extracted.
            It must contain columns that match the keys in the action_extractors dictionary.
        progress_bar (bool, optional): If True, displays a progress bar during the data extraction process.
            Useful for visual feedback during long operations. Defaults to False.
        label (str | None, optional): An optional label that prefixes the print statements for better traceability during debugging.
            If None, only the action name is printed. Defaults to None.

    Returns:
        pd.DataFrame: The original DataFrame with additional columns containing the extracted data.
    """
    if df is None:
        df = pd.read_parquet(source_file)

    for action_name, extractor in action_extractors.items():
        if extractor is not None:
            if len(df[df.action_name == action_name]) == 0:
                print(label, action_name, "not found in DataFrame")
                continue

            if label:
                print(label, action_name)
            else:
                print(action_name)

            df[f"data_{action_name}"] = df[df.action_name == action_name][
                "entry_details"
            ].apply(extractor)

    df.to_parquet(target_file, engine="pyarrow", compression="brotli")
    return df

In [32]:
multiprocess(
    processes=[
        Process(
            extract_action_data,
            source_file=PARQUET_FILE_AB,
            target_file=PARQUET_FILE_AB_CONV,
            label="ABCD",
        ),
        Process(
            extract_action_data,
            source_file=PARQUET_FILE_MN,
            target_file=PARQUET_FILE_MN_CONV,
            label="MNOP",
        ),
        Process(
            extract_action_data,
            source_file=PARQUET_FILE_ZY,
            target_file=PARQUET_FILE_ZY_CONV,
            label="ZYXW",
        ),
    ],
    workers=3,
)

ZYXW CalculateWeightAndTrimAction
ABCD CalculateWeightAndTrimAction
MNOP CalculateWeightAndTrimAction
ZYXW CheckinMsgProcessor
ZYXW CreateLoadingInstructionAction
ZYXW CreateLoadsheetAction
ZYXW CreateZFWMessageAction
ZYXW EstimateStorePaxDataAction
ABCD CheckinMsgProcessor
ZYXW RampFinalAction
ZYXW SendFuelOrderAction
ZYXW SendLoadingInstructionAction
ZYXW SendLoadsheetAction
ZYXW SetActualBagWeightIndicatorAction
ZYXW SetCKIPaxDistributionAction
ABCD CreateLoadingInstructionAction
ZYXW StoreAircraftDataAction
ABCD CreateLoadsheetAction
ZYXW StorePaxDataAction
ABCD CreateZFWMessageAction
ABCD EstimateStorePaxDataAction
ZYXW StorePaxDataGuiAction
ABCD RampFinalAction
ZYXWABCD SendFuelOrderAction not found in DataFrame
 StoreRegistrationAndConfigurationAc
ABCD SendLoadingInstructionAction
ABCD SendLoadsheetAction
ABCD SetActualBagWeightIndicatorAction
ZYXW TdmCreateLoadingInstructionAction
ABCD SetCKIPaxDistributionAction not found in DataFrame
ABCD StoreAircraftDataAction
ZYXW Transfer

# Extract Weight Data

In [None]:
class Weight:
    def __init__(self, key, description=None):
        self.key = key
        self.description = description

    def get_key(self):
        return self.key

    def get_description(self):
        return self.description

In [None]:
class WeightCluster:
    def __init__(self, name: str, weights: Dict[Weight, str]):
        self._name = name
        self._weights = weights

    def get_weight_cluster(self):
        return self._weights

In [None]:
class Action:
    def __init__(self, name: str, weights: Dict[Weight, str]):
        self._name = name
        self._weights = weights

    def get_name(self):
        return self._name

    def get_weights(self):
        return self._weights

    def get_weight_items(
        self,
    ):
        return self._weights.items()

In [None]:
EZFW = Weight("EZFW", "Estimated Zero Fuel Weight")
AZFW = Weight("AZFW", "Actual Zero Fuel Weight")
ETOW = Weight("ETOW", "Estimated Takeoff Weight")
ATOW = Weight("ATOW", "Actual Takeoff Weight")
TTL = Weight("TTL", "Total Traffic Load")
DOW = Weight("DOW", "Dry Operating Weight")
TOF = Weight("TOF", "Take Off Fuel")
TF = Weight("TF", "Trip Fuel")
ALW = Weight("ALW", "LANDING WEIGHT ACTUAL")

In [None]:
LOADSHEETACTION = {
    TTL: "TOTAL TRAFFIC LOAD",
    DOW: "DRY OPERATING WEIGHT",
    AZFW: "ZERO FUEL WEIGHT ACTUAL",
    TOF: "TAKE OFF FUEL",
    ATOW: "TAKE OFF WEIGHT ACTUAL",
    TF: "TRIP",
    ALW: "LANDING WEIGHT ACTUAL",
}

In [None]:
ACTION_WEIGHTS = [
    # Action(name="RampFinalAction", weights={EZFW: "EZFW"}), # EZFW is not a value just a status
    Action(name="CreateLoadsheetAction", weights=LOADSHEETACTION),
    Action(name="SendLoadsheetAction", weights=LOADSHEETACTION),
]

In [None]:
# Recursive function to find the value for a given key
def find_value(data: dict | list, key: str):
    if isinstance(data, dict):
        for k, v in data.items():
            if k == key:
                if not isinstance(v, numbers.Number):
                    if isinstance(v, str) and v.isdigit():
                        v = eval(v)
                    else:
                        raise ValueError("Value not a number string or a number")
                if v < 1_000_000:
                    return v
                return None
            else:
                found = find_value(v, key)
                if found is not None:
                    return found
    elif isinstance(data, list):
        for item in data:
            found = find_value(item, key)
            if found is not None:
                return found
    return None


# Function to apply the recursive search to JSON data
def extract_key(json_str: str, key: str):
    data = json.loads(json_str)
    return find_value(data, key)