### Define message processing function

In [None]:
from collections.abc import Iterable

import polars as pl
import sqlalchemy as db

from afft.services.sirius import Message
from afft.utils.result import Ok, Err


def collect_messages_by_type(messages: Iterable[Message]) -> dict[type, list[Message]]:
    """Collects messages based on the message type."""
    
    message_types: list[type] = set([type(message) for message in messages])
    type_to_messages: dict[type, list[Message]] = {message_type: list() for message_type in message_types}
    
    for message in messages:
        type_to_messages[type(message)].append(message)
    
    return type_to_messages


def frame_messages(messages: Iterable[Message]) -> pl.DataFrame:
    """Creates a data frame from a collection of messages. Assumes that 
    the messages have the same fields."""
    return pl.DataFrame([message.to_dict() for message in messages])
    

In [None]:
from functools import reduce
from pathlib import Path
from typing import Optional


from dotenv import dotenv_values

from afft.io import read_config, read_lines
from afft.io.sql import insert_data_frame_into
from afft.services.sirius import Message, MessageProtocol
from afft.services.sirius import build_message_protocol, parse_message_lines
from afft.utils.log import logger

# NOTE: Collective import of all Sirius message types
from afft.services.sirius import message_types

TopicMessages = dict[str, list[Message]]
TypeMessages = dict[type, list[Message]]

def main() -> None:
    """Main function."""
    
    values = dotenv_values("/home/martin/dev/afft/.env")
    
    MESSAGE_DIR: Path = Path("/data/kingston_snv_01/acfr_messages_merged")
    MESSAGE_FILE: Path = MESSAGE_DIR / Path("r23685bc_20100605_021022_messages.txt")
    PROTOCOL_FILE: Path = Path("/home/martin/dev/afft/config/protocol/protocol_v1.toml")

    url: db.engine.URL = db.engine.URL.create(
        drivername="postgresql",
        database="acfr_auv_messages",
        host="localhost",
        port=5432,
        username=values.get("PG_USER"),
        password=values.get("PG_PASSWORD"),
    )

    # Create database from URL
    engine: db.Engine = db.create_engine(url)
    
    # Read message lines and configuration from file
    lines: list[str] = read_lines(Path(MESSAGE_FILE)).unwrap()
    config: dict = read_config(Path(PROTOCOL_FILE)).unwrap()

    protocol: MessageProtocol = build_message_protocol(config.get("message_maps"))

    # Parse message lines to types per topic
    topic_messages: TopicMessages = parse_message_lines(lines, protocol)

    for topic, messages in topic_messages.items():
        logger.info(f"Topic: {topic}, message count: {len(messages)}")

    # Flatten all message collections into a single list
    messages: list[Message] = reduce(lambda left, right: left + right, topic_messages.values())

    # Group messages by type, since they can be inserted into the same table
    type_messages: TypeMessages = collect_messages_by_type(messages)

    for message_type, messages in type_messages.items():
        logger.info(f"Type: {message_type.__name__}, message count: {len(messages)}")
    
    
    # Mapping from message type to table name
    table_names: dict[type, str] = {
        message_types.ImageCaptureMessage: "image_capture",
        message_types.SeabirdCTDMessage: "ctd_seabird",
        message_types.AanderaaCTDMessage: "ctd_aanderaa",
        message_types.EcopuckMessage: "water_ecopuck",
        message_types.ParosciPressureMessage: "pressure_parosci",
        message_types.TeledyneDVLMessage: "dvl_teledyne",
        message_types.TrackLinkModemMessage: "modem_track_link",
        message_types.EvologicsModemMessage: "modem_evologics",
        message_types.MicronSonarMessage: "sonar_micron",
        message_types.OASonarMessage: "sonar_obstacle_avoidance",
        message_types.BatteryMessage: "batteries",
        message_types.ThrusterMessage: "thrusters",
    }

    
    for message_type, messages in type_messages.items():
        data_frame: pl.DataFrame = frame_messages(messages)
        table_name: Optional[str] = table_names.get(message_type)

        if not table_name:
            logger.warning(f"missing table name for type: {message_type}")
            
        match insert_data_frame_into(engine, table_name, data_frame):
            case Ok(_status):
                pass
            case Err(message):
                logger.error(message)

# Invoke main function
main()