In [11]:

import xml.etree.ElementTree as ET
import re
import os
import json
from collections import defaultdict
import pandas as pd
from difflib import get_close_matches


In [12]:
# PDU Metadata Extractor with Signal Breakdown

import xml.etree.ElementTree as ET
import json
import os
from collections import defaultdict

NS = {'autosar': 'http://autosar.org/schema/r4.0'}

SERVICE_FILE = "Service_Instance_A14_Ver_3.2 1.arxml"
RBS_FILE = "RBS_A14_Ver_3.2 3.arxml"
OUTPUT_JSON = "pdu_signal_metadata.json"
DEBUG_LOG = "debug_log.txt"

def log_debug(msg):
    with open(DEBUG_LOG, 'a') as f:
        f.write(msg + "\n")

def normalize_name(name):
    return name.replace("SomeIp", "").replace("_SI", "").replace("_", "").lower()



In [13]:

def parse_service_interfaces(service_path):
    service_map = {}
    tree = ET.parse(service_path)
    root = tree.getroot()
    for si in root.findall('.//autosar:SOMEIP-SERVICE-INTERFACE-DEPLOYMENT', NS):
        si_name = si.find('autosar:SHORT-NAME', NS).text
        sid = si.find('.//autosar:SERVICE-INTERFACE-ID', NS)

        # Use EVENT-DEPLOYMENTS instead of SOMEIP-EVENT to fetch EVENT-IDs correctly
        event_deployments = si.findall('.//autosar:EVENT-DEPLOYMENTS/autosar:SOMEIP-EVENT-DEPLOYMENT', NS)
        event_ids = []
        for ev in event_deployments:
            eid = ev.find('autosar:EVENT-ID', NS)
            if eid is not None:
                event_ids.append(eid.text)

        key = normalize_name(si_name)
        service_map[key] = {
            'service_interface': si_name,
            'service_id': sid.text if sid is not None else '',
            'event_ids': ','.join(event_ids)
        }
    return service_map

In [14]:
service_data = parse_service_interfaces(SERVICE_FILE)

In [15]:
service_data

{'bms1fd20': {'service_interface': 'SomeIpBMS1_FD_20_SI',
  'service_id': '518',
  'event_ids': '32775'},
 'bms2fd100': {'service_interface': 'SomeIpBMS2_FD_100_SI',
  'service_id': '781',
  'event_ids': '32775'},
 'bms3fd100': {'service_interface': 'SomeIpBMS3_FD_100_SI',
  'service_id': '915',
  'event_ids': '32775'},
 'bmscannm': {'service_interface': 'SomeIpBMS_CANNM_SI',
  'service_id': '916',
  'event_ids': '32775'},
 'ccm11200': {'service_interface': 'SomeIpCCM11_200_SI',
  'service_id': '1250',
  'event_ids': '32771'},
 'ccm1200': {'service_interface': 'SomeIpCCM1_200_SI',
  'service_id': '768',
  'event_ids': '32771'},
 'ccm2200': {'service_interface': 'SomeIpCCM2_200_SI',
  'service_id': '770',
  'event_ids': '32771'},
 'ccm3200': {'service_interface': 'SomeIpCCM3_200_SI',
  'service_id': '771',
  'event_ids': '32771'},
 'ccm4200': {'service_interface': 'SomeIpCCM4_200_SI',
  'service_id': '772',
  'event_ids': '32771'},
 'ccm5test200': {'service_interface': 'SomeIpCCM5_TEST_

In [16]:

def infer_cycle_time_from_name(pdu_name):
    match = re.search(r'_([0-9]{2,4})$', pdu_name)
    if match:
        return str(float(match.group(1)) / 1000)
    return "0.0"
def parse_rbs_pdus(rbs_path):
    tree = ET.parse(rbs_path)
    root = tree.getroot()
    pdu_map = {}

    # Create a lookup for signal lengths
    signal_length_map = {}
    for signal in root.findall('.//autosar:I-SIGNAL', NS):
        signal_name = signal.find('autosar:SHORT-NAME', NS).text
        length_elem = signal.find('autosar:LENGTH', NS)
        signal_length = length_elem.text if length_elem is not None else '0'
        signal_length_map[signal_name] = signal_length

    for pdu in root.findall('.//autosar:I-SIGNAL-I-PDU', NS):
        pdu_name_elem = pdu.find('autosar:SHORT-NAME', NS)
        pdu_name = pdu_name_elem.text if pdu_name_elem is not None else 'Unnamed_PDU'

        length_elem = pdu.find('autosar:LENGTH', NS)
        length = length_elem.text if length_elem is not None else '0'

        timing = pdu.find('.//autosar:CYCLIC-TIMING', NS)
        cycle_time = infer_cycle_time_from_name(pdu_name)
        
        signals = {}
        signal_mappings = pdu.findall('.//autosar:I-SIGNAL-TO-PDU-MAPPINGS/autosar:I-SIGNAL-TO-I-PDU-MAPPING', NS)
        signal_count = 0

        for mapping in signal_mappings:
            sig_ref = mapping.find('autosar:I-SIGNAL-REF', NS)
            if sig_ref is not None:
                sig_name = sig_ref.text.split('/')[-1]
                start_pos = mapping.find('autosar:START-POSITION', NS)
                byte_order = mapping.find('autosar:PACKING-BYTE-ORDER', NS)
                # Fetch signal length from the signal_length_map
                signal_len = signal_length_map.get(sig_name, '0')
                signals[sig_name] = {
                    f"{sig_name}_value": 0,
                    f"{sig_name}_Byte_order": byte_order.text if byte_order is not None else 'Unknown',
                    f"{sig_name}_start_bit": int(start_pos.text) if start_pos is not None else -1,
                    f"{sig_name}_len": signal_len
                }
                signal_count += 1

        pdu_map[pdu_name] = {
            'length': length,
            'cycle_time': cycle_time,
            'signals': signals,
            'total_signals': signal_count
        }
    return pdu_map

In [17]:
pdu_data = parse_rbs_pdus(RBS_FILE)

In [18]:
pdu_data

{'BMS1_FD_20': {'length': '64',
  'cycle_time': '0.02',
  'signals': {'BMS_BATTERY_BUS_VOLTAGE': {'BMS_BATTERY_BUS_VOLTAGE_value': 0,
    'BMS_BATTERY_BUS_VOLTAGE_Byte_order': 'MOST-SIGNIFICANT-BYTE-LAST',
    'BMS_BATTERY_BUS_VOLTAGE_start_bit': 0,
    'BMS_BATTERY_BUS_VOLTAGE_len': '13'},
   'BMS_DISCONNECT_BY_BMS': {'BMS_DISCONNECT_BY_BMS_value': 0,
    'BMS_DISCONNECT_BY_BMS_Byte_order': 'MOST-SIGNIFICANT-BYTE-LAST',
    'BMS_DISCONNECT_BY_BMS_start_bit': 13,
    'BMS_DISCONNECT_BY_BMS_len': '1'},
   'BMS_BATTERY_PACK_CURRENT': {'BMS_BATTERY_PACK_CURRENT_value': 0,
    'BMS_BATTERY_PACK_CURRENT_Byte_order': 'MOST-SIGNIFICANT-BYTE-LAST',
    'BMS_BATTERY_PACK_CURRENT_start_bit': 16,
    'BMS_BATTERY_PACK_CURRENT_len': '14'},
   'BMS_FC_NEGATIVE_CONTACTOR_STATE': {'BMS_FC_NEGATIVE_CONTACTOR_STATE_value': 0,
    'BMS_FC_NEGATIVE_CONTACTOR_STATE_Byte_order': 'MOST-SIGNIFICANT-BYTE-LAST',
    'BMS_FC_NEGATIVE_CONTACTOR_STATE_start_bit': 32,
    'BMS_FC_NEGATIVE_CONTACTOR_STATE_len': '3'

In [19]:

def generate_pdu_metadata(service_data, pdu_data):
    messages = {}
    for pdu_name, pdu_info in pdu_data.items():
        key = normalize_name(pdu_name)
        matched_service = service_data.get(key, {'service_interface': 'N/A', 'service_id': '', 'event_ids': ''})
        messages[pdu_name] = {
            'pdu_name': pdu_name,
            'service_interface': matched_service['service_interface'],
            'service_id': matched_service['service_id'],
            'event_ids': matched_service['event_ids'],
            'length': pdu_info['length'],
            'cycle_time': pdu_info['cycle_time'],
            'total_signals': pdu_info['total_signals'],
            'signals': pdu_info['signals']
        }
    return {"Messages": messages}

In [20]:

# Execute
service_data = parse_service_interfaces(SERVICE_FILE)
pdu_data = parse_rbs_pdus(RBS_FILE)
final_metadata = generate_pdu_metadata(service_data, pdu_data)

with open(OUTPUT_JSON, 'w') as f:
    json.dump(final_metadata, f, indent=2)

print(f"✅ PDU metadata with signals saved to: {OUTPUT_JSON}")

✅ PDU metadata with signals saved to: pdu_signal_metadata.json
