# Formatting X-CANIDS Dataset like SynCAN

https://dx.doi.org/10.21227/epsj-y384

1. Download Raw CAN messages (raw.zip) from above URL
2. Unzip raw.zip and place the parquet files at `Dataset/X-CANIDS/raw` directory.
3. Run this code. It converts the X-CANIDS data files into the same format of SynCAN dataset. In the end of the code, it will save the formatted result files at `Dataset/X-CANIDS/canet` directory. It will also save signal-extracted intermediate files at `Dataset/X-CANIDS/signal` directory.

## Libraries and functions


In [1]:
import gc
import glob
import struct
from pathlib import Path

import cantools
import pandas as pd
from tqdm.auto import tqdm

pd.set_option('display.float_format', str)
dataset_dir = '../../Dataset/X-CANIDS'
dbc = cantools.database.load_file(f'{dataset_dir}/hyundai_2015_ccan.dbc')

### Key functions

In [2]:
def bytes_to_list(data_bytes: bytes) -> list:
    l = len(data_bytes)
    decimal_values = struct.unpack(f'{l}B', data_bytes)
    return list(decimal_values)

def load_arrange_data(file_path, print_option=True):
    df = pd.read_parquet(file_path)
    # make Session labels (Note: Attacks in X-CANIDS Dataset were performed without a pause)
    df['Session'] = 0
    splits = Path(file_path).stem.split('-')
    attack = None
    if len(splits) > 1:  # if it's an attack dataset
        attack = splits[1]
        msgs = df.loc[df['label'] == 1]
        t_start, t_end = 0, 0
        if msgs.shape[0] > 0:  # if the dataset includes attack messages
            t_start, t_end = msgs.index.min(), msgs.index.max()
            df.loc[t_start:t_end, 'Session'] = 1
            assert df.query('label == 1 and Session == 0').shape[0] == 0   
        if attack == 'susp':  # it doens't have label=1 rows, so apply a rougher approach
            assert len(df.loc[(480 < df.index.total_seconds()) & (df.index.total_seconds() <= 1440) & (df['label'] == 1)]) == 0
            df.loc[(480 < df.index.total_seconds()) & (df.index.total_seconds() <= 1440), 'Session'] = 1
    # Format columns
    df.reset_index(inplace=True)
    df['Time'] = df['timestamp'].dt.total_seconds()
    df.rename(columns={'arbitration_id': 'ID', 'dlc': 'DLC', 'label': 'Label', 'data': 'Data'}, inplace=True)
    df['Session'] = df['Session'].astype(int)
    df['Label'] = df['Label'].astype(int)
    if print_option:
        print(f'# rows: {df.shape[0]:,}')
        print(pd.concat([df['Label'].value_counts().rename('Label'), df['Session'].value_counts().rename('Session')], axis=1))
    return df[['Session', 'Label', 'Time', 'ID', 'DLC', 'Data']]

In [3]:
def iterchunk(dataset: pd.DataFrame, n: int):
    prev = 0
    while True:
        chunk = dataset.iloc[prev:prev+n].copy()
        if chunk.shape[0] == 0:
            break
        prev += n
        yield chunk

def decode(record) -> dict:
    try:
        message = dbc.get_message_by_frame_id(record['ID'])
        decoded = message.decode(record['Data'], decode_choices=False, allow_truncated=False, allow_excess=False)
        '''
        decode_choices=False does not convert the decoded values to choice strings
        allow_truncated=False and allow_excess=False do not accept a longer or shorter Data field than specified
        '''
        decoded_v2 = {}
        for key, value in decoded.items():
            decoded_v2[f"{str(record['ID'])}+{key}"] = value
        return decoded_v2
    except KeyError:
        return {}

def parse_with_DBC(data: pd.DataFrame, label=True) -> pd.DataFrame:
    data['decoded'] = data.apply(decode, axis=1)
    parsed_data = pd.DataFrame.from_records(data.decoded.reset_index(drop=True))
    columns = parsed_data.columns.to_list()
    if label:
        default_columns = ['Session', 'Label', 'Time']
    else:
        default_columns = ['Time']
    parsed_data[default_columns] = data[default_columns].to_numpy()     # to_numpy() is applied to bypass index alignment
    parsed_data = parsed_data[default_columns + columns]    # rearrange the column order (monotime and labels to the first)
    return parsed_data

# def select_meaningful_columns(dataset: pd.DataFrame, chunk_size: int) -> list:
#     n_chunk = dataset.shape[0] // chunk_size + 1
#     minmax_list = list()
#     for chunk in tqdm(iterchunk(dataset, n=chunk_size), desc=' - Checking min/max values of all signals', total=n_chunk):
#         parsed_data = parse_with_DBC(chunk)
#         minmax = pd.concat([parsed_data.min(numeric_only=True).rename('min'),
#                             parsed_data.max(numeric_only=True).rename('max')],
#                            axis=1)
#         minmax_list.append(minmax)
#     final_minmax = pd.concat(minmax_list, join='outer', axis=1)
#     final_minmax = pd.concat([final_minmax.min(axis=1).rename('min'), final_minmax.max(axis=1).rename('max')], axis=1)
#     final_minmax['constant'] = 0
#     final_minmax.loc[final_minmax['min'] == final_minmax['max'], 'constant'] = 1
#     columns = final_minmax.query('constant == 0').index.to_list()
#     print(f' - {len(columns)} columns are valid.')
#     return columns

## Parse selected signals

In [4]:
def parse_selected_signals(dataset: pd.DataFrame, chunk_size: int, signals: list) -> pd.DataFrame:
    n_chunk = dataset.shape[0] // chunk_size + 1
    default_columns = ['Session', 'Label', 'Time']
    data = pd.DataFrame(columns=default_columns + signals)    # Define an empty DataFrame with signal names
    for chunk in tqdm(iterchunk(dataset, n=chunk_size), desc=' - Parsing signals', total=n_chunk):
        parsed_data = parse_with_DBC(chunk)
        subset = default_columns + list(set(signals) & set(parsed_data.columns))
        data = pd.concat([data, parsed_data[subset]], join='outer', axis=0, ignore_index=True)
    return data

In [9]:
selected_signals, continuous_signals = [], []
with open(f'{dataset_dir}/canet_signals_continuous.txt', 'r') as f:
    for line in f.readlines():
        continuous_signals.append(line.strip())
selected_signals += continuous_signals
with open(f'{dataset_dir}/canet_signals_categorical.txt', 'r') as f:
    for line in f.readlines():
        selected_signals.append(line.strip())

print(f'# selected signals: {len(selected_signals)}')
ids = set([int(x.split('+')[0]) for x in selected_signals])
ids_hex = [f'{hex(x)[2:].upper().zfill(3)}h' for x in ids]
ids_hex.sort()
print(f'# CAN IDs: {len(ids)}')
print(f'CAN IDs: {ids}')
print(f'CAN IDs in hex: {ids_hex}')

can_datasets = glob.glob(f'{dataset_dir}/raw/dump*.parquet')
can_datasets.sort()
print(f'Raw data files ({len(can_datasets)}):')
for f in can_datasets[:3] + ['...'] + can_datasets[-3:]:
    print(f)

# selected signals: 107
# CAN IDs: 35
CAN IDs: {128, 129, 512, 897, 899, 902, 903, 1419, 1292, 399, 273, 274, 1427, 275, 790, 1440, 544, 809, 1322, 1456, 688, 1345, 68, 1349, 1351, 1353, 593, 1363, 1365, 1366, 1367, 608, 354, 1265, 1151}
CAN IDs in hex: ['044h', '080h', '081h', '111h', '112h', '113h', '162h', '18Fh', '200h', '220h', '251h', '260h', '2B0h', '316h', '329h', '381h', '383h', '386h', '387h', '47Fh', '4F1h', '50Ch', '52Ah', '541h', '545h', '547h', '549h', '553h', '555h', '556h', '557h', '58Bh', '593h', '5A0h', '5B0h']
Raw data files (133):
../../Dataset/X-CANIDS/raw/dump1.parquet
../../Dataset/X-CANIDS/raw/dump2.parquet
../../Dataset/X-CANIDS/raw/dump3.parquet
...
../../Dataset/X-CANIDS/raw/dump6-susp-5B0h.parquet
../../Dataset/X-CANIDS/raw/dump6.parquet
../../Dataset/X-CANIDS/raw/dump7.parquet


In [13]:
chunk_size = 500000  ####### REDUCE THIS VALUE if you get a memory error #######

print(f'### Extracting CAN features ###')
for i, p in enumerate(can_datasets):
    print(f' [{i+1}/{len(can_datasets)}] Processing dataset: {p}')
    savepath = Path(f"{dataset_dir}/signal/{Path(p).name}")
    if savepath.exists():
        print(' - Skip extracting because the output already exists.')
        continue
    else:
        data = load_arrange_data(p, print_option=False)
        features = parse_selected_signals(data, chunk_size=chunk_size, signals=selected_signals)
        features.to_parquet(savepath, engine='pyarrow', compression='snappy')
        print(f' - Saved: {savepath}')
        del features
        gc.collect()

### Extracting CAN features ###
 [1/133] Processing dataset: ../../Dataset/X-CANIDS/raw/dump1.parquet


 - Parsing signals:   0%|          | 0/7 [00:00<?, ?it/s]

  data = pd.concat([data, parsed_data[subset]], join='outer', axis=0, ignore_index=True)


 - Saved: ../../Dataset/X-CANIDS/signal/dump1.parquet
 [2/133] Processing dataset: ../../Dataset/X-CANIDS/raw/dump2.parquet


 - Parsing signals:   0%|          | 0/9 [00:00<?, ?it/s]

  data = pd.concat([data, parsed_data[subset]], join='outer', axis=0, ignore_index=True)


 - Saved: ../../Dataset/X-CANIDS/signal/dump2.parquet


## Scaling & fomatting like a SynCAN data file
Session,    Label,  Time,           ID,     Signal1_of_ID,      Signal2_of_ID,  Signal3_of_ID,  Signal4_of_ID, ...

Normal,     Normal, 63006572.0314,  id1,    0.3365969165117973, 0.75

### Check and revise DBC min & max values

In [14]:
# Get signal dictionary from DBC
signal_info = dict()
for message in dbc.messages:
    for signal in message.signals:
        signal_info[f'{message.frame_id}+{signal.name}'] = {'scale': signal.scale, 'min': signal.minimum, 'max': signal.maximum}

# Distribution of selected signals in normal conditions
sig_datasets = glob.glob(f'{dataset_dir}/signal/dump[1-9].parquet')
for dataset in sig_datasets:
    print(f'Checking {dataset}')
    df = pd.read_parquet(dataset, columns=['Session', 'Label', 'Time'] + selected_signals)
    df_describe = df.describe()
    # display(df_describe)

    # Compare DBC specifications and real data
    print('**** Wrong specifications in DBC ****')
    for sig, info in signal_info.items():
        try:
            real_min, real_max = round(df_describe.loc['min', sig], 6), round(df_describe.loc['max', sig], 6)
        except KeyError:
            continue
        if info['min'] > real_min:
            print(f'{[sig]} DBC min = {info["min"]}, Real min = {real_min}')
        if info['max'] < real_max:
            print(f'{[sig]} DBC max = {info["max"]}, Real max = {real_max}')
    print('\n')

Checking ../../Dataset/X-CANIDS/signal/dump7.parquet
**** Wrong specifications in DBC ****
['354+Clutch_Driving_Tq'] DBC min = 0.0, Real min = -20.0
['354+Clutch_Driving_Tq'] DBC max = 0.0, Real max = 24.0
['354+Cluster_Engine_RPM'] DBC max = 0.0, Real max = 935.5828


Checking ../../Dataset/X-CANIDS/signal/dump6.parquet
**** Wrong specifications in DBC ****
['354+Clutch_Driving_Tq'] DBC min = 0.0, Real min = -20.0
['354+Clutch_Driving_Tq'] DBC max = 0.0, Real max = 105.0
['354+Cluster_Engine_RPM'] DBC max = 0.0, Real max = 2463.9618
['354+Cluster_Engine_RPM_Flag'] DBC max = 0.0, Real max = 1.0


Checking ../../Dataset/X-CANIDS/signal/dump4.parquet
**** Wrong specifications in DBC ****
['354+Clutch_Driving_Tq'] DBC min = 0.0, Real min = -20.0
['354+Clutch_Driving_Tq'] DBC max = 0.0, Real max = 126.0
['354+Cluster_Engine_RPM'] DBC max = 0.0, Real max = 2422.9446
['354+Cluster_Engine_RPM_Flag'] DBC max = 0.0, Real max = 1.0


Checking ../../Dataset/X-CANIDS/signal/dump5.parquet
**** Wron

In [15]:
# Revise few wrong value ranges heuristically
signal_info['354+Clutch_Driving_Tq']['min'], signal_info['354+Clutch_Driving_Tq']['max'] = -512, 511
signal_info['354+Cluster_Engine_RPM']['min'], signal_info['354+Cluster_Engine_RPM']['max'] = 0, 8191
signal_info['354+Cluster_Engine_RPM_Flag']['min'], signal_info['354+Cluster_Engine_RPM_Flag']['max'] = 0, 1

In [17]:
# Check it again - if there's nothing stated under "Wrong specifications", you're good to go.
for dataset in sig_datasets:
    print(f'Checking {dataset}')
    df = pd.read_parquet(dataset, columns=['Session', 'Label', 'Time'] + selected_signals)
    df_describe = df.describe()
    print('**** Wrong specifications in DBC ****')
    for sig, info in signal_info.items():
        try:
            real_min, real_max = round(df_describe.loc['min', sig], 6), round(df_describe.loc['max', sig], 6)
        except KeyError:
            continue
        if info['min'] > real_min:
            print(f'{[sig]} DBC min = {info["min"]}, Real min = {real_min}')
        if info['max'] < real_max:
            print(f'{[sig]} DBC max = {info["max"]}, Real max = {real_max}')
    print('\n')

Checking ../../Dataset/X-CANIDS/signal/dump7.parquet
**** Wrong specifications in DBC ****


Checking ../../Dataset/X-CANIDS/signal/dump6.parquet
**** Wrong specifications in DBC ****


Checking ../../Dataset/X-CANIDS/signal/dump4.parquet
**** Wrong specifications in DBC ****


Checking ../../Dataset/X-CANIDS/signal/dump5.parquet
**** Wrong specifications in DBC ****


Checking ../../Dataset/X-CANIDS/signal/dump1.parquet
**** Wrong specifications in DBC ****


Checking ../../Dataset/X-CANIDS/signal/dump3.parquet
**** Wrong specifications in DBC ****


Checking ../../Dataset/X-CANIDS/signal/dump2.parquet
**** Wrong specifications in DBC ****




### Min-max normalization (based on DBC) and format like SynCAN

In [19]:
sig_datasets = glob.glob(f'{dataset_dir}/signal/dump*.parquet')

for dataset in tqdm(sig_datasets, desc=f'Processing {len(selected_signals)}-signal version'):
    df = pd.read_parquet(dataset, columns=['Session', 'Label', 'Time'] + selected_signals)

    # Min-max normalization
    for signal in selected_signals:
        min, max = signal_info[signal]['min'], signal_info[signal]['max']
        df[signal] = ((df[signal] - min) / (max - min)).round(8)

    # Formatting
    df = df.reset_index().rename(columns={'index': 'MsgIndex'})
    df_ids = []
    for id in ids:
        df_id = df.filter(like=f'{id}+', axis=1).dropna(how='all', axis=0)
        signals = list(df_id.columns)
        df_id = pd.concat([df.loc[:, ['MsgIndex', 'Session', 'Label', 'Time']], df_id], join='inner', axis=1)
        df_id['ID'] = id
        for i, signal in enumerate(signals):
            df_id[f'Signal{i+1}'] = df_id.loc[:, signal]
        df_id.drop(columns=signals, inplace=True)
        df_ids.append(df_id)
    df_canet = pd.concat(df_ids, axis=0).sort_values('MsgIndex', ignore_index=True)
    df_canet = df_canet.astype({"Session": 'int', "Label": 'int'})
    # display(df_canet.head(1))
    assert df_canet['Signal1'].isna().sum() == 0, 'Some values in Signal1 is empty.'
    filename = Path(dataset).name
    save_path = f'{dataset_dir}/canet/sig{len(selected_signals):03}_{filename}'
    df_canet.to_parquet(save_path)
    print(f' - Saved: {savepath}')

Processing 107-signal version:   0%|          | 0/2 [00:00<?, ?it/s]

 - Saved: ../../Dataset/X-CANIDS/signal/dump2.parquet
 - Saved: ../../Dataset/X-CANIDS/signal/dump2.parquet
