In [4]:
import numpy as np
from asammdf import MDF, Signal

# Parameters
duration_seconds = 300  # 5 minutes
sample_rate = 10  # Hz
total_samples = duration_seconds * sample_rate

# Generate time stamps
timestamps = np.arange(total_samples) / sample_rate

# Generate signal: ramp up from 10 to 95, then down to 10, repeat
half_cycle = total_samples // 2
ramp_up = np.linspace(10, 95, half_cycle)
ramp_down = np.linspace(95, 10, total_samples - half_cycle)
signal = np.concatenate((ramp_up, ramp_down))

# Generate signal: ramp up from 80 to 450, then down to 80, repeat
half_cycle2 = total_samples // 2
ramp_up2 = np.linspace(450, 80, half_cycle)
ramp_down2 = np.linspace(450, 80, total_samples - half_cycle2)
signal2 = np.concatenate((ramp_down2, ramp_up2))

pattern = np.tile([0]*sample_rate + [1]*sample_rate, duration_seconds // 2)
# Ensure we hit exactly 3000 samples
signal3 = pattern[:total_samples]
print(signal3)
# Create MDF object and add signal
mdf = MDF()
mdf.append(
    Signal(
        samples=signal,
        timestamps=timestamps,
        name="OilTemp_Cval_PT",
        unit="C",
        comment="Simulated oil temperature signal"
    )
)
mdf.append(
    Signal(
        samples=signal2,
        timestamps=timestamps,
        name="DPFTemp_Cval_PT",
        unit="F",
        comment="Simulated DPF temp signal"
    )
)
mdf.append(
    Signal(
        samples=signal3,
        timestamps=timestamps,
        name="test_flag",
        unit="N/a",
        comment="Simulated test flag signal"
    )
)

# Save to file
mdf.save("simulation.mf4")
print("MF4 file saved as 'simulation.mf4'")


[0 0 0 ... 1 1 1]
MF4 file saved as 'simulation.mf4'


In [3]:
import socket
from asammdf import MDF
import time
import json

#"signal_based_output_group3.mf4"
mdf = MDF("signal_based_output_group6.mf4")
print(mdf.channels_db.keys())

OilTemp_data = mdf.get("EngOilTemp_Cval")  
CoolTemp_data = mdf.get("EngCoolTemp_Cval")  
OilPres_data = mdf.get("EngOilPress_Cval_CPC")  
SysVolt_data = mdf.get("SysVolt_Stat")
EngCrk_data = mdf.get("EngCrk_Stat")
EngShtdn_data = mdf.get("EngProtShtdnEng_Stat_CPC")
ExhTemp_data = mdf.get("EngExhGasTemp_Cval_CPC")    


oiltempvalues = OilTemp_data.samples
cooltempvalues = CoolTemp_data.samples
oilpressvalues = OilPres_data.samples
sysvoltvalues = SysVolt_data.samples
engcrkvalues = EngCrk_data.samples
engshtdnvalues = EngShtdn_data.samples
exhtempvalues = ExhTemp_data.samples


sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
address = ('127.0.0.1', 6000)

for value1, value2, value3, value4, value5, value6, value7 in zip(oiltempvalues, cooltempvalues, oilpressvalues, sysvoltvalues, engcrkvalues, engshtdnvalues, exhtempvalues):
    packet_dict = {
            'EngOilTemp_Cval': float(value1),
            'EngCoolTemp_Cval': float(value2),
            'EngOilPress_Cval_CPC': float(value3),
            'SysVolt_Stat': float(value4),
            'EngCrk_Stat': float(value5),
            'EngProtShtdnEng_Stat_CPC': float(value6),
            'EngExhGasTemp_Cval_CPC': float(value7),
        }
    message = json.dumps(packet_dict).encode()
    print(f"Sending {packet_dict} to Local Host")
    sock.sendto(message, address)

dict_keys(['time', 'PTOOilTemp_Cval_SCA', 'PTOSpd_Cval_SCA', 'PTOSetSpd_Cval_SCA', 'PTOEnblSw_Rq_SCA', 'PTORemProSpdCntrlSw_Rq_SCA', 'PTORemVarSpdCntrlSw_Rq_SCA', 'PTOSetSw_Rq_SCA', 'PTOCstSw_Rq_SCA', 'PTORsmSw_Rq_SCA', 'PTOAccelSw_Rq_SCA', 'PTOGovMemSel_Stat_SCA', 'PTOGovProgSpdsw2_Stat_SCA', 'AuxInpIgnrSw_Stat_SCA', 'MOIS_Warn_Rq', 'MOIS_Error_Stat', 'MOIS_D_Actv_Stat', 'SupAccelPdlWarn_Rq', 'ASGA_ActiveSteerOcc_Rq', 'ASGA_AcstcSteerWarn_Rq', 'ASGA_DgrdMdSteer_Stat', 'ASGA_DrvMdSteer_Stat', 'TrffcStyleD_Stat_VRDU3', 'TrffcStyleAutoChg_Rq_VRDU3', 'TrffcStyleWarn_Rq_VRDU3', 'TrffcStyle_Cval_VRDU3', 'ABA_WarnLvl_Cval_VRDU3', 'ASGA_WarnLvl_Cval_VRDU3', 'LDP_WarnLvl_Cval_VRDU3', 'ABA_DrvAct_Stat_VRDU', 'ADASISRst_Rq_VRDU', 'OptWarnCrossTrffc_Rq', 'OptWarnOncTrffc_Rq', 'AcstcWarnLtFwdBrkFct_Rq', 'AcstcWarnRtFwdBrkFct_Rq', 'AcstcWarnCeFwdBrkFct_Rq', 'CTA_D_Actv_Stat', 'DST_D_Actv_Stat', 'CTA_Stat_Rq', 'DST_Stat_Rq', 'CTA_BrkOcc_Rq', 'DST_BrkOcc_Rq', 'SpdFtAxleLtWhl_Cval', 'SpdFtAxleRtWhl_Cv

KeyboardInterrupt: 

In [3]:
import socket
from asammdf import MDF
import time

mdf = MDF("signal_based_output_group6.mf4")
list =[]
for name in mdf.iter_channels():
    list.append(name.name)
print(list)    

['PTOOilTemp_Cval_SCA', 'PTOSpd_Cval_SCA', 'PTOSetSpd_Cval_SCA', 'PTOEnblSw_Rq_SCA', 'PTORemProSpdCntrlSw_Rq_SCA', 'PTORemVarSpdCntrlSw_Rq_SCA', 'PTOSetSw_Rq_SCA', 'PTOCstSw_Rq_SCA', 'PTORsmSw_Rq_SCA', 'PTOAccelSw_Rq_SCA', 'PTOGovMemSel_Stat_SCA', 'PTOGovProgSpdsw2_Stat_SCA', 'AuxInpIgnrSw_Stat_SCA', 'MOIS_Warn_Rq', 'MOIS_Error_Stat', 'MOIS_D_Actv_Stat', 'SupAccelPdlWarn_Rq', 'ASGA_ActiveSteerOcc_Rq', 'ASGA_AcstcSteerWarn_Rq', 'ASGA_DgrdMdSteer_Stat', 'ASGA_DrvMdSteer_Stat', 'TrffcStyleD_Stat_VRDU3', 'TrffcStyleAutoChg_Rq_VRDU3', 'TrffcStyleWarn_Rq_VRDU3', 'TrffcStyle_Cval_VRDU3', 'ABA_WarnLvl_Cval_VRDU3', 'ASGA_WarnLvl_Cval_VRDU3', 'LDP_WarnLvl_Cval_VRDU3', 'ABA_DrvAct_Stat_VRDU', 'ADASISRst_Rq_VRDU', 'OptWarnCrossTrffc_Rq', 'OptWarnOncTrffc_Rq', 'AcstcWarnLtFwdBrkFct_Rq', 'AcstcWarnRtFwdBrkFct_Rq', 'AcstcWarnCeFwdBrkFct_Rq', 'CTA_D_Actv_Stat', 'DST_D_Actv_Stat', 'CTA_Stat_Rq', 'DST_Stat_Rq', 'CTA_BrkOcc_Rq', 'DST_BrkOcc_Rq', 'SpdFtAxleLtWhl_Cval', 'SpdFtAxleRtWhl_Cval', 'SpdR_AxleLtW

In [1]:
from asammdf import MDF, Signal
import cantools
import cantools.database

from collections import defaultdict
import numpy as np

#"ZZ5420_Data2_F001_2025-06-16_16-16-47.mf4"
mdf = MDF("ZZ5420_Data2_F019_2025-07-24_23-39-58.mf4")
db = cantools.database.load_file("CHASSIS_667kB_dbc_2024_20a.dbc")
#Using asammdf to take in the frame-based mf4 file and then cantools to take in the dbc file.

#.get helps us extract Signal objects from the loaded MDF file, such as CAN ID, CAN data length code, data bytes, and timestamps
ids = mdf.get('CAN_DataFrame.ID', group=6).samples.astype(int) # group=1 is specifying to use CAN group 1
dlcs = mdf.get('CAN_DataFrame.DLC', group=6).samples.astype(int)
data_bytes = mdf.get('CAN_DataFrame.DataBytes', group=6).samples
timestamps = mdf.get('t', group=6).samples


#--------- build a frame list to put together CAN IDs, timestamps, data length codes, and data bytes ------
frames = []
for i in range(len(timestamps)): 
    can_id = ids[i]
    dlc = dlcs[i]
    data = bytes(data_bytes[i][:dlc])
    frames.append((timestamps[i], can_id, data))
#----------------------------------------------------------------------------------------------------------
    
#------------  decoding using the dbc file -----------------
decoded_signals = []
for ts, can_id, data in frames:
    try:
        msg = db.get_message_by_frame_id(can_id) # try and get the message definition from dbc
        signals = msg.decode(data) # decode byte data into the signal's values
        decoded_signals.append((ts, signals)) # stored as (timestamp, {signal_name: value, ...})
    except Exception:# Frame not in DBC or decode error, skip as needed
        pass
#-----------------------------------------------------------

#------------ Group each value into a time series for its signal --------------
signal_times = defaultdict(list) # no key error will be raised, default to empty list
signal_values = defaultdict(list)

for ts, sig_dict in decoded_signals:
    for sig_name, val in sig_dict.items():
        signal_times[sig_name].append(ts)
        signal_values[sig_name].append(val)
#------------------------------------------------------------------------------

#------------ Convert into asammdf Signal objects --------------
signals = [] 
for sig_name in signal_times:
    times = np.array(signal_times[sig_name])
    values = np.array(signal_values[sig_name])
    
    # Sort time stamps to be in order
    sort_idx = np.argsort(times)
    times = times[sort_idx]
    values = values[sort_idx]

    sig = Signal(
        samples=values,
        timestamps=times,
        name=sig_name,
        unit=''  
    )
    if sig.samples.dtype == object:
    # Extract float values from NamedSignalValue objects, this is for any can signal that isn't a float already, need to figure that out in the future
        sig.samples = np.array([v.value if hasattr(v, 'value') else v for v in sig.samples], dtype=np.float64)

    signals.append(sig)
#---------------------------------------------------------------


new_mdf = MDF()
new_mdf.append(signals)
new_mdf.save('signal_based_output_group6.mf4')# generate a signal-based output file



Overwriting message 'DI_Shrt_TCO' with 'DI_TCO' in the frame id to message dictionary because they have identical masked frame ids 0x98fe6bee.


WindowsPath('signal_based_output_group6.mf4')

In [4]:
import socket
import time
import json
import numpy as np

# Setup
duration = 60  # seconds
sample_rate = 10  # Hz
total_samples = duration * sample_rate
t = np.linspace(0, duration, total_samples)

# Simulate RPM ramp-up and down
cycle_time = 5  # seconds per cycle
rpm = 600 + (1300 * ((t % cycle_time) / (cycle_time / 2)))
rpm = np.where((t % cycle_time) > (cycle_time / 2),
               1900 - (900 * ((t % cycle_time - (cycle_time / 2)) / (cycle_time / 2))),
               rpm)
rpm = np.clip(rpm, 600, 1900)

# Detect RPM trend (rising/falling)
rpm_diff = np.diff(rpm, prepend=rpm[0])
rpm_rising = rpm_diff > 0.5  # small threshold to ignore flat noise

# Oil pressure: 70 PSI during rise, 30 PSI during fall or idle
oil_pressure = np.where(rpm_rising,
                        np.random.normal(70, 2, total_samples),  # 70 ±2 PSI when accelerating
                        np.random.normal(30, 3, total_samples))  # 30 ±3 PSI when falling/steady

# RPM flag
rpm_flag = (rpm > 1700).astype(int)

# Battery voltage
battery_voltage = 14.3 + 0.1 * np.sin(0.5 * np.pi * t)

# Socket setup
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
address = ('127.0.0.1', 6000)

# Send packets
for i in range(total_samples):
    packet_dict = {
        'RPM': float(round(rpm[i], 2)),
        'OilPress': float(round(oil_pressure[i], 2)),
        'RPM_Above_1700': int(rpm_flag[i]),
        'BatteryVoltage': float(round(battery_voltage[i], 3)),
    }
    message = json.dumps(packet_dict).encode()
    print(f"Sending {packet_dict}")
    sock.sendto(message, address)
    time.sleep(1 / sample_rate)


Sending {'RPM': 600.0, 'OilPress': 25.75, 'RPM_Above_1700': 0, 'BatteryVoltage': 14.3}
Sending {'RPM': 652.09, 'OilPress': 75.71, 'RPM_Above_1700': 0, 'BatteryVoltage': 14.316}
Sending {'RPM': 704.17, 'OilPress': 70.46, 'RPM_Above_1700': 0, 'BatteryVoltage': 14.331}
Sending {'RPM': 756.26, 'OilPress': 70.52, 'RPM_Above_1700': 0, 'BatteryVoltage': 14.345}
Sending {'RPM': 808.35, 'OilPress': 73.26, 'RPM_Above_1700': 0, 'BatteryVoltage': 14.359}
Sending {'RPM': 860.43, 'OilPress': 66.77, 'RPM_Above_1700': 0, 'BatteryVoltage': 14.371}
Sending {'RPM': 912.52, 'OilPress': 71.17, 'RPM_Above_1700': 0, 'BatteryVoltage': 14.381}
Sending {'RPM': 964.61, 'OilPress': 71.61, 'RPM_Above_1700': 0, 'BatteryVoltage': 14.389}
Sending {'RPM': 1016.69, 'OilPress': 70.81, 'RPM_Above_1700': 0, 'BatteryVoltage': 14.395}
Sending {'RPM': 1068.78, 'OilPress': 67.56, 'RPM_Above_1700': 0, 'BatteryVoltage': 14.399}
Sending {'RPM': 1120.87, 'OilPress': 70.26, 'RPM_Above_1700': 0, 'BatteryVoltage': 14.4}
Sending {'RP