# Create a Sample MDF File

The code creates a sample MDF-4 file that contains 3 different entries using the asammdf library.

The sample MDF file has three signals related to the Engine Control Unit:
- RPM
- Speed
- Gear

The file will be stored in the filesystem

In [14]:
import asammdf
from asammdf import Source
import numpy as np
from asammdf.blocks import v4_constants as v4c
import numpy as np
import uuid


# How many samples we will generate
numberOfValues = 10000

# Create an empty MDF file
mdf = asammdf.MDF()

# Generate time array
time = np.linspace(0, 100, numberOfValues)

signals = []

source = Source(source_type=Source.SOURCE_TOOL, bus_type=Source.BUS_TYPE_NONE, name="EngineControlUnit", path="PT_CAN.Powertrain", comment="Generated" )

# Generate vehicle RPM signal
rpm_amplitude = 500
rpm_frequency = 10
rpm_phase = np.pi/4
vehicle_RPM = 3000 + rpm_amplitude * np.sin(2 * np.pi * rpm_frequency * time + rpm_phase) + np.random.normal(0, 50, size=numberOfValues)
signals.append(asammdf.Signal(name="EngineRPM", samples=vehicle_RPM, timestamps=time, unit="RPM", source=source))

# Generate vehicle speed signal
speed_amplitude = 10
speed_frequency = 10
speed_phase = np.pi/6
vehicle_speed = 60 + speed_amplitude * np.sin(2 * np.pi * speed_frequency * time + speed_phase) + np.random.normal(0, 2, size=numberOfValues)
signals.append(asammdf.Signal(name="Speed", samples=vehicle_speed, timestamps=time, unit="km/h", source=source))

# Generate engine power signal
engine_power = vehicle_RPM * vehicle_speed / 1000 + np.random.normal(0, 50, size=numberOfValues)
signals.append(asammdf.Signal(name="EnginePower", samples=engine_power, timestamps=time, unit="kW", source=source))

# Generate gear signal
gear = np.zeros(numberOfValues)
for i in range(numberOfValues):
    if vehicle_speed[i] < 20:
        gear[i] = 1
    elif vehicle_speed[i] < 40:
        gear[i] = 2
    elif vehicle_speed[i] < 60:
        gear[i] = 3
    else:
        gear[i] = 4
signals.append(asammdf.Signal(name="Gear", samples=gear, timestamps=time, unit="-", source=source))

mdf.append(signals, common_timebase=True)

mdf.save(dst="/lakehouse/default/Files/sample/test.mf4", overwrite=True)

file_uuid = str(uuid.uuid4())


StatementMeta(, 5b391492-66c4-41d7-ab89-85afb0471038, 18, Finished, Available)

# Create metadata file
This code will create a metadata file based on the MDF file. The metadata file contains information such as the name and signal types.

In [None]:
import os
from asammdf import MDF
from datetime import datetime
from asammdf.blocks import v4_constants as v4c
import json

def getSource(mdf, signal):    
    '''
        Extracts the source information from the MDF-4 file for a given signal
    '''

    if signal.source is not None:
        source_name = signal.source.name
        source_type = v4c.SOURCE_TYPE_TO_STRING[signal.source.source_type]
        bus_type = v4c.BUS_TYPE_TO_STRING[signal.source.bus_type]
    else:
        source_name = "Unknown"
        source_type = "Unknown"
        bus_type = "Unknown"

    try: 
        channel_group_acq_name = mdf.groups[signal.group_index].channel_group.acq_name
    except:
        channel_group_acq_name = ""

    try: 
        acq_source_name = mdf.groups[signal.group_index].channel_group.acq_source.name
    except:
        acq_source_name = ""

    try:
        acq_source_path = mdf.groups[signal.group_index].channel_group.acq_source.path
    except:
        acq_source_path = ""

    try:
        channel_group_acq_source_comment = mdf.groups[signal.group_index].channel_group.acq_source.comment
    except:
        channel_group_acq_source_comment = ""

    try:
        channel_group_comment = mdf.groups[signal.group_index].channel_group.comment
    except:
        channel_group_comment = ""

    try:
        signal_source_path = signal.source.path
    except:
        signal_source_path = ""

    return source_name, source_type, bus_type, channel_group_acq_name, acq_source_name, acq_source_path, channel_group_acq_source_comment, channel_group_comment, signal_source_path

def calculateMetadata(filename, uuid):

    mdf = MDF(filename)

    print(f"Generating metadata file {filename}-{uuid}")

    metadata = {
        "name": filename,
        "source_uuid": str(uuid),
        "preparation_startDate": str(datetime.utcnow()),
        "signals": [],
        "signals_comment": [],
        "signals_decoding": [],
        "group_comment": [],
        "comments": mdf.header.comment,
        "mdf_start_time": str(mdf.start_time),
    }
    
    for signal in mdf.iter_channels(raw=True):

        source_name, source_type, bus_type, channel_group_acq_name, acq_source_name, acq_source_path, channel_group_acq_source_comment, channel_group_comment, signal_source_path = getSource(mdf, signal)

        metadata["signals"].append(
            {
                "name": signal.name,
                "unit": signal.unit,
                "group_index": signal.group_index,
                "channel_index": signal.channel_index,
                "channel_group_acq_name": channel_group_acq_name,
                "acq_source_name": acq_source_name,
                "acq_source_path": acq_source_path,
                "source" : source_name,
                "source_type": source_type,
                "bus_type": bus_type,
                "datatype": signal.samples.dtype.name,
                "signal_source_path": signal_source_path,
            }          
        )

        metadata["signals_comment"].append(signal.comment)

        metadata["signals_decoding"].append(str(signal.conversion))

        metadata["group_comment"].append(
            {
                "channel_group_acq_source_comment": channel_group_acq_source_comment,
                "channel_group_comment": channel_group_comment
            }
        )
   
    print(f"Finished calculating metadata {file_uuid} with {len(metadata['signals'])} signals")

    mdf.close()

    del mdf

    return metadata

target = "/lakehouse/default/Files/metadata"
with open(os.path.join(target, f"{file_uuid}.metadata.json"), 'w') as metadataFile:
    metadata = calculateMetadata("/lakehouse/default/Files/sample/test.mf4", file_uuid)
    metadataFile.write(json.dumps(metadata))
    print(f"Finished writing metadata file {file_uuid} with {len(metadata['signals'])} signals")


# Decode the MDF file to parquet
This script will take the generated MDF file and create a parquet file

In [None]:
from asammdf import MDF
import pyarrow as pa
import pyarrow.parquet as pq
import re
import os


def extractSignalsByType(decodedSignal, rawSignal):
    '''
        Extracts the signals from the MDF-4 file and converts them to a numeric or string representation
        Takes into consideration numbers, strings and records (rendered as a string) 

        We have to make sure that we have the right type / storage based on the datatype.
        Trying to use the wrong type will create issues related to loss of precision.

        ADX real datatype is a 64 bit float.
        This means that all integer types except uint64 and int64 can be stored without loss of precision        

    '''   
    numberOfSamples = len(decodedSignal.timestamps)

    # create an empty array for each type of signal initialized to nan or zero values
    floatSignals = np.full(numberOfSamples, np.nan, dtype=np.double)
    stringSignals = np.empty(numberOfSamples, dtype=str)

    try:
        # If it is a record we will decompose its contents on the string field
        # we will not store a value in floatSignals
        if np.issubdtype(decodedSignal.samples.dtype, np.record):
            stringSignals = [record.pprint() for record in decodedSignal.samples]

        # If the value can be represented as a float is the only thing we need.
        # String will be empty
        elif np.issubdtype(decodedSignal.samples.dtype, np.floating):
            floatSignals = decodedSignal.samples

        # Check if decodedSignal.samples.dtype is a uint64 or uint. If it is, we will only store it as string
        # Floats will not be stored as there is a loss of precision
        elif np.issubdtype(decodedSignal.samples.dtype, np.uint64) or np.issubdtype(decodedSignal.samples.dtype, np.int64):        
            stringSignals = decodedSignal.samples.astype(str)   
    
        # We will store all ints smaller or equal to 32 bits in floats only, as we have no loss of precision
        elif np.issubdtype(decodedSignal.samples.dtype, np.integer):
            floatSignals = decodedSignal.samples
        
        # If we have a pure string as raw signal, we will store it as a string
        elif np.issubdtype(rawSignal.samples.dtype, np.string_) or np.issubdtype(rawSignal.samples.dtype, np.unicode_):
            stringSignals = rawSignal.samples.view(np.chararray).decode('utf-8') 

        # For everything else use the previous approach but we will use decode with utf-8 to make sure we get the correct representation for text tables
        # astype(string) was causing issues with special characters, and S32 would have truncated results.
        else:
            floatSignals = rawSignal.samples.astype(float)
            stringSignals = decodedSignal.samples.view(np.chararray).decode('utf-8') 
            

    except Exception as e:
        print(f"Exception for {decodedSignal.name}: {e}")        
        print(traceback.print_exc())        
        raise e

    return floatSignals, stringSignals


# Open the MDF file and select a single signal
mdf = asammdf.MDF("/lakehouse/default/Files/sample/test.mf4")   


for signal in mdf.iter_channels(raw=True):
    
    group_index = signal.group_index
    channel_index = signal.channel_index

    # We select a specific signal, both decoded and raw
    decodedSignal = mdf.select(channels=[(None, group_index, channel_index)])[0]
    rawSignal = mdf.select(channels=[(None, group_index, channel_index)], raw=True)[0]

    numberOfSamples = len(decodedSignal.timestamps)

    floatSignals, stringSignals = extractSignalsByType(decodedSignal=decodedSignal, rawSignal=rawSignal)                       

    table = pa.table (
        {                   
            "source_uuid": np.full(numberOfSamples, file_uuid, dtype=object),
            "group_index": np.full(numberOfSamples, group_index, dtype=np.int32),
            "channel_index": np.full(numberOfSamples, channel_index, dtype=np.int32),
            "name": np.full(numberOfSamples, decodedSignal.name, dtype=object),
            "timestamp": decodedSignal.timestamps,
            "value": floatSignals,
            "value_string": stringSignals,
            "valueRaw" : rawSignal.samples,
        }
    )

    # Escape all characters from the decodedSignal.name and use only alphanumeric and underscore for the basename
    # This is to avoid issues with the basename_template and parquet
    parquetFileName = re.sub(r"[^a-zA-Z0-9_]", "_", decodedSignal.name)

    #root_path= os.path.join("/lakehouse/default/Files/raw", file_uuid)
    root_path= "/lakehouse/default/Files/raw"
    if not os.path.exists(root_path):
        os.makedirs(root_path)

    pq.write_to_dataset(
        table, 
        root_path=root_path,
        partition_cols=["source_uuid", "name"],
        basename_template=f"{file_uuid}-{group_index}-{channel_index}-{parquetFileName}-{{i}}.parquet",
        use_threads=True,
        compression="snappy")                   

mdf.close()

# Query the Kusto database

The following query will query the database. It requires mounting the generated parquet file as a shortcut.

In [None]:
kustoUri = "https://xxxxxxxx.kusto.fabric.microsoft.com"
database = "VehicleData"
kustoQuery = f"external_table('raw') | where source_uuid == guid({file_uuid}) | summarize count() by source_uuid"

kustoDf  = spark.read\
            .format("com.microsoft.kusto.spark.synapse.datasource")\
            .option("accessToken", mssparkutils.credentials.getToken(kustoUri))\
            .option("kustoCluster", kustoUri)\
            .option("kustoDatabase", database) \
            .option("kustoQuery", kustoQuery).load()

kustoDf.show()

