In [None]:
from candump_utils.can_id import write_candump_line
from candump_utils.packet_header import process_first_message_data
from candump_utils.data_field_tm import process_data_field_tm
from candump_utils.data_field_tc import process_data_field_tc

# Librairies and Dictionnaries

# 0. Test Message

In [None]:
message = ["can0  00110400   [8]  00001111 00000111 00000000 00000010 11000000 00001010 00011000 00000001"]

In [None]:
def convert_big_to_little_endian(bit_message: str) -> str:
    """
    Converts a bit message from big-endian to little-endian format.

    Args:
        bit_message (str): A string of bits separated by spaces, representing octets in big-endian order.

    Returns:
        str: The bit message converted to little-endian format, with spaces between octets.
    """
    # Split the input message into a list of 8-bit groups (octets)
    octets: list[str] = bit_message.strip().split()

    # Validate that each octet is exactly 8 bits and contains only '0' or '1'
    if not all(len(octet) == 8 and set(octet).issubset({"0", "1"}) for octet in octets):
        raise ValueError("Input must be a space-separated string of valid 8-bit binary numbers.")

    # Reverse the order of the octets for little-endian format
    reversed_octets: list[str] = octets[::-1]

    # Join the reversed octets with spaces
    return " ".join(reversed_octets)

# 1. Decode CAN

In [None]:
cpt = 0

for line in message:
    print("#-----------------------------------------------------------#")
    print(line)

    # Call write_candump_line for each line
    can_id, supervisor = write_candump_line(line)
    sbr = can_id["sbr_type"]

    if supervisor:
        print("/**-------------------------SUPERVISOR-------------------------**/")
        continue

    # For the first line, process the first message and get the length
    if (sbr == "Set Block Request") & (can_id["sb_type"] != "Unsolicited Telemetry"):
        x, _ = write_candump_line(line, doplot=False)
        nb_blocks = int(x["block_to_transfert"])

        packet_header = process_first_message_data(line)
        tmtc = packet_header["str_type"]

    # For the second line, process the TM data field
    elif sbr == "Transfer":
        if tmtc == "TM":
            if cpt == 0:
                process_data_field_tm(line)
                cpt += 1
        elif tmtc == "TC":
            if cpt == 0:
                process_data_field_tc(line)
                cpt += 1
        else:
            raise ValueError
    else:
        cpt = 0

    print("#-----------------------------------------------------------#\n")

In [None]:
line

In [None]:
process_first_message_data(line)

In [None]:
test = line.split(None, 3)[3]

In [None]:
test

In [None]:
convert_big_to_little_endian(test)

In [None]:
process_first_message_data(convert_big_to_little_endian(test))

In [None]:
def get_cparameters(ccf_type: int = 0, ccf_stype: int = 0) -> Optional[list]:
    """
    Retrieve the CCF_CNAME based on CCF_TYPE and CCF_STYPE from a CSV file.

    Args:
    dat_file (str): The path to the CSV file containing the data.
    ccf_type (int): The type to search for.
    ccf_stype (int): The subtype to search for.

    Returns:
    str: The CCF_CNAME if a match is found, otherwise an empty string.
    """

    name = get_cname(ccf_type, ccf_stype)

    config = read_config({"Submodule": ["name", "commit"]})
    expected_commit = config["Submodule.commit"]
    dat_file = f"cdf_table_{expected_commit}.dat"

    # Lire le fichier CSV dans un DataFrame
    path_df = os.path.join(get_project_root(), "etc/config/", dat_file)
    df = pd.read_csv(path_df, sep="\t")

    # Filtrer les résultats en fonction de CCF_TYPE et CCF_STYPE
    result = df[(df["CDF_CNAME"] == name)]

    if result.value_counts().count() == 0:
        return None

    return list(result.CDF_PNAME.to_dict().values())

In [None]:
test = get_cparameters(20, 1)

In [None]:
test[0]

In [None]:
name = get_cname(17, 1)


config = read_config({"Submodule": ["name", "commit"]})
expected_commit = config["Submodule.commit"]
dat_file = f"cdf_table_{expected_commit}.dat"

# Lire le fichier CSV dans un DataFrame
path_df = os.path.join(get_project_root(), "etc/config/", dat_file)
df = pd.read_csv(path_df, sep="\t")

# Filtrer les résultats en fonction de CCF_TYPE et CCF_STYPE
result = df[(df["CDF_CNAME"] == name)]

In [None]:
result.value_counts().count() == 0