In [6]:
import pandas as pd
import numpy as np
import ezodf

MAIN_SPORT_DOCUMENT_FILE_PATH = 'SPORT_documents//sport_ttc_20220814.ods'

def read_document_by_sheet(file_path, sheet_name):
    ods = ezodf.opendoc(file_path)
    
    sheet = ods.sheets[sheet_name]
    
    data = []
    for row in sheet.rows():
        data_row = [cell.value for cell in row]
        data.append(data_row)
    
    df = pd.DataFrame(data)
    return df

def get_specific_sheet_names_from_document(file_path: str, starts_with: str) -> list:
    """Filter the sheets based on the first characters."""
    ods = ezodf.opendoc(file_path)
    sheets = [name for name in ods.sheets.names() if name.startswith(starts_with)]

    return sheets



Reading the TM'S

In [7]:

def get_all_tms_on_the_document(file_path) -> pd.DataFrame:
    """Specific to read all TM sheets in the document."""
    tm_sheets = get_specific_sheet_names_from_document(file_path, "TM")
    columns = ['identification','name','version_number', 'pkt_type', 'sec_hdr_flag', 'apid', 'seq_flags', 'seq_count', 'pkt_data_length', 'secondary_header', 'data', 'checksum']

    main_tm_df = pd.DataFrame()
    for sheet_name in tm_sheets:
        df = read_document_by_sheet(file_path, sheet_name)
        df = df.iloc[6:, 0:12].dropna(axis = 0, how = 'all')

        main_tm_df = pd.concat([main_tm_df, df], ignore_index=True)

    main_tm_df.columns = columns
    main_tm_df['apid'] = main_tm_df['apid'].apply(lambda x: hex(int(x, 16)))

    return main_tm_df

main_tm_df = get_all_tms_on_the_document(MAIN_SPORT_DOCUMENT_FILE_PATH)

Reading the DD's

In [8]:
def single_data_field_dict(
        field_name: str, 
        length_bits: int, 
        format_str: str, 
        nominal_min: int| float | None, 
        nominal_max: int| float | None, 
        conversion: str | None, 
        unit: str | None,
        observation = None
    ) -> dict:

    single_data_dict = {
        "field": field_name,
        "lenght(bits)": length_bits,
        "format": format_str,
        "nominal_minimum": nominal_min,
        "nominal_maximum": nominal_max,
        "conversion": conversion,
        "unit": unit,
        "observation": observation
    }

    return single_data_dict

def single_data_field_dict_from_row(row: np.ndarray | list) -> dict:
    """a data field has to have at least the field, bit length and format, and it varies how much columns it has."""

    assert len(row) >= 3
    field_name, bit_length, format_str = row[0], row[1], row[2]

    if len(row) >= 8:
        nominal_min, nominal_max, conversion, unit, observation = row[3], row[4], row[5], row[6], row[7]
    elif len(row) == 7:
        nominal_min, nominal_max, conversion, unit, observation = row[3], row[4], row[5], row[6], None
    elif len(row) == 6:
       nominal_min, nominal_max, conversion, unit, observation = row[3], row[4], 'N/A', 'N/A', row[-1]
    elif len(row) == 5:
        nominal_min, nominal_max, conversion, unit, observation = row[3], row[4], 'N/A', 'N/A', None
    elif len(row) == 4:
        nominal_min, nominal_max, conversion, unit, observation = 'N/A', 'N/A', 'N/A', 'N/A', row[-1]
    else:
        nominal_min, nominal_max, conversion, unit, observation = 'N/A', 'N/A', 'N/A', 'N/A', None

    try:
        field_name = field_name.strip()
    except AttributeError:
        field_name = ''

    return single_data_field_dict(field_name, bit_length, format_str, nominal_min, nominal_max, conversion, unit, observation)

In [9]:

def find_tm_for_given_dd_name(dd_name: str, main_tm_df: pd.DataFrame) -> tuple[str, str]:

    filtered_tm_df = main_tm_df.drop_duplicates().query(f"""data == '{dd_name}'""")

    if filtered_tm_df.empty:
        return '', ''
    else:
        return filtered_tm_df['identification'].item(), filtered_tm_df['apid'].item()

def read_inner_dd_from_document_array(data_array: np.ndarray):
    assert data_array[0,0] is not None
    assert data_array[1,0] == 'Field'
    assert data_array[-1,0] == 'Total'
    
    data_name = data_array[0,0]
    identification, apid = find_tm_for_given_dd_name(data_name, main_tm_df)

    main_dict = {
        "identification": identification,
        "apid": apid,
        "data_name": data_array[0,0]
    }
    
    data_packets = []
    for i in range(2, len(data_array)):
        row = data_array[i,:]
        data_packets.append(single_data_field_dict_from_row(row))
    main_dict["data_packets"] = data_packets

    return main_dict

def read_dd_sheet_from_document(file_path: str, sheet_name: str):

    df = read_document_by_sheet(file_path, sheet_name)
    df = df.dropna(axis = 0, how = 'all').dropna(axis = 1, how = 'all')
    data_array = df.to_numpy()

    pointer = 0
    all_sheet_data = []
    for i in range(0, len(data_array)):
        first_column_value = data_array[i,0]
        if first_column_value == 'Total':
            inner_data_array = data_array[pointer:(i+1), :]
            all_sheet_data.append(read_inner_dd_from_document_array(inner_data_array))
            pointer = (i+1)

    return pd.DataFrame(all_sheet_data)

def get_all_dds_from_document(file_path):
    dd_sheets = get_specific_sheet_names_from_document(file_path, "DD")

    all_dd_from_sheets_df = pd.DataFrame()
    for sheet_name in dd_sheets:
        inner_dd_df = read_dd_sheet_from_document(file_path, sheet_name)
        all_dd_from_sheets_df = pd.concat([all_dd_from_sheets_df, inner_dd_df], ignore_index=True)

    return all_dd_from_sheets_df

main_dd_df = get_all_dds_from_document(MAIN_SPORT_DOCUMENT_FILE_PATH)

In [10]:
from CatalogDataReader import CatalogDataReader

catalog_data = CatalogDataReader()

catalog_data.get_all_tms_on_the_document('sport_ttc_20220814.ods')

Unnamed: 0,identification,name,version_number,pkt_type,sec_hdr_flag,apid,seq_flags,seq_count,pkt_data_length,secondary_header,data,checksum
0,TM_CDH_001,C&DH: get time,0.0,0.0,1.0,0x201,0x3,0.0,0x1A,Time,DD_CDH_001,CRC-16
1,TM_CDH_005,TT&C: get latest TC headers,0.0,0.0,1.0,0x205,0x3,0.0,0x4F,Time,DD_CDH_005,CRC-16
2,TM_CDH_006,TT&C: get latest standard TC response,0.0,0.0,1.0,0x206,0x3,0.0,0x1E,Time,DD_CDH_006,CRC-16
3,TM_CDH_011,TT&C: initiate crypto key exchange,0.0,0.0,1.0,0x20b,0x3,0.0,0x29,Time,DD_CDH_011,CRC-16
4,TM_CDH_012,TT&C: validate candidate crypto key (pt. 1),0.0,0.0,1.0,0x20c,0x3,0.0,0x11,Time,DD_CDH_012,CRC-16
...,...,...,...,...,...,...,...,...,...,...,...,...
176,TM_HOU_021_C,HK ADCS_OBS_NM_EXT (compressed),0.0,0.0,1.0,0x3d,0x3,Varies,Variable,Time,DC_HOU_OBS_4E,CRC-16
177,TM_HOU_022_C,HK ADCS_OBS_SPM_EXT (compressed),0.0,0.0,1.0,0x3e,0x3,Varies,Variable,Time,DC_HOU_OBS_5E,CRC-16
178,TM_HOU_023_C,HK ADCS_OBS_MICM_EXT (compressed),0.0,0.0,1.0,0x3f,0x3,Varies,Variable,Time,DC_HOU_OBS_6E,CRC-16
179,TM_STD_001,Standard uplink frame response,0.0,0.0,1.0,0x1,0x3,Varies,0xE,Time,Varies,ErrCode


In [11]:
catalog_data.get_all_dds_from_document('sport_ttc_20220814.ods')

Unnamed: 0,identification,apid,data_name,data_packets
0,TM_CDH_001,0x201,DD_CDH_001,"[{'field': 'GPS time (C&DH view)', 'lenght(bit..."
1,TM_CDH_005,0x205,DD_CDH_005,[{'field': 'Header of most recent uplink frame...
2,TM_CDH_006,0x206,DD_CDH_006,"[{'field': 'Latest TM_STD_001', 'lenght(bits)'..."
3,TM_CDH_011,0x20b,DD_CDH_011,"[{'field': 'Diffie-Hellman public integer QB',..."
4,TM_CDH_012,0x20c,DD_CDH_012,"[{'field': 'Validation number a', 'lenght(bits..."
...,...,...,...,...
172,TM_HOU_022,0x37,DD_HOU_OBS_5E,"[{'field': 'sport_adcs_get_day_fraction()', 'l..."
173,TM_HOU_022_C,0x3e,DC_HOU_OBS_5E,"[{'field': 'Compressed DD_HOU_OBS_5E', 'lenght..."
174,TM_HOU_023,0x38,DD_HOU_OBS_6E,"[{'field': 'sgp4_kep()', 'lenght(bits)': 192.0..."
175,TM_HOU_023_C,0x3f,DC_HOU_OBS_6E,"[{'field': 'Compressed DD_HOU_OBS_6E', 'lenght..."
