In [1]:
#%reset -f 
import sys, os, pickle, random
import pandas as pd
import numpy as np
import pyarrow.parquet as pq
import matplotlib.pyplot as plt

from datetime import datetime
from pathlib import Path
from scipy.signal import savgol_filter
from scipy.stats import zscore, median_abs_deviation
from itertools import groupby

from IPython.core.magic import register_cell_magic
@register_cell_magic
def skip(line, cell): return  # cells can be skipped by using '%%skip' in the first line

In [2]:
# -----------> SET DATA FOLDER LOCATION HERE:
#DATA_PATH = Path('/home', 'sieglew', 'MA-Data')
DATA_PATH = Path(r'C:\Users\SIEGLEW\OneDrive - Daimler Truck\MA\Code\MA-Data')

# FILE SOURCES ---------------------------------------------------------------
original_trips_folder = Path(DATA_PATH, "processed") # original Trip parquet files

# OUTPUT LOCATIONS ---------------------------------------------------------------
trips_processed_final = Path(DATA_PATH, "new_folder") # Save new files here
trips_processed_cleaned = Path(DATA_PATH, "new_folder_2") # Save new files here


In [3]:
# GET DIRECTORY CONTENS:
def get_files(folder, ext, full = False): return [file if full else file.name for file in folder.glob(f'*{ext}')]
trips = get_files(trips_processed_final, ".parquet", full = False)
original_trips = get_files(original_trips_folder, ".parquet", full = False)
len(trips)

358

In [4]:
# get list of signals
df = pd.read_parquet(Path(trips_processed_final, trips[13]), engine='fastparquet')
all_columns = df.columns
assert len(all_columns) == 114

In [5]:
# SUMMARY OF ANALYSIS: 
#############################################################################################################################################
# SIGNALS  ###################################################################################################
# these signals always have more than 90% nans and will be removed from all trips:
sparse_signals = {'odometer','signal_date','vehicle_id', 'diff',
                  'chrgcoupproxydet_stat','hv_batmaxdischrgpwrlim_cval', 'hv_ptc2_pwr_cval',
                  'inv1_curr_cval_api1','inv1_pwr_cval_api1','inv1_pwr_cval_api3','inv1_spd_cval_api1','inv2_curr_cval_api2','inv2_pwr_cval_api2', 'inv2_spd_cval_api2'}

# these signals are either all-nans in a significant amount of trips or are too redundant/insignificant and will thus be removed from all trips:
remove_signals = {"minmoduletempindex_bms01", "maxmoduletempindex_bms01", "brc_stat_brc2", "maxpwr_contendrnbrkresist2","currtmp_brkresist2int_cval", "currpwr_contendrnbrkresist2",
                  "plugchrg_stat", "hv_chrgpwr_ecpc_cval", "chargestate", "dsrdpwr_contendrn_cval", "dsrdpwr_contendrn2_cval","maproadgrad_cval", "airtempoutsd_cval_sca", 
                  "meanmoduletemperature_bms01", "linkvoltage_bms05", "lv_conv_voltdmd_e2e_dcl1", "lv_convmaxcurr_cval_dcl1", "lv_convcurr_cval_dcl1","lv_conv_dc_momvolt_cval_dcl1", 
                  "hv_batlinkvoltage_sc01", "hv_dclink_volt_cval_brc", "hv_dclink_volt_cval_brc2", "oiltemp_ra_cval","hv_bat_intres_cval_bms1", "stringvoltage_bms01", 
                  "currtmp_brkresist1int_cval", "maxpwr_contendrnbrkresist_cval", "hv_bat_dc_volt_cval_bms1", "pt4_dcb_temp2_st3", "pt4_dcb_temp1_st3", "maxbrickvoltage_bms01",
                  'cc_actv_stat', 'cc_setspd_cval', 'pt4_dcb_hvdc_power_max_st3', 'hv_bathighcelltemp_cval_bms1', 'hv_batlowcelltemp_cval_bms1', 'hv_batmomavlchrgen_cval_bms1', 
                  'signal_ts', 'hv_bat_soh_cval_bms1', 'grocmbvehweight_cval'}

# these signals are either identical or too correlated with one of the other signals and will be removed from all trips:
redundant_signals = {'hirestotalvehdist_cval_cpc', 'ambtemp_cval_pt', 'grshift_stat_pt', 'currgr_stat_edcu', 'edrvspd_cval', 'hv_bat_dc_maxvoltlim_cval',
                     'hv_bat_dc_minvoltlim_cval', 'ignsw_stat_sca'}
                     
drop_signals = list(sparse_signals | remove_signals | redundant_signals)

# interrelated signal pairs can be combined (mean with nans ignored) to only introduce one variable:
pair_signals = [('actualdcvoltage_pti1', 'actualdcvoltage_pti2'), # only the first variable of each pair will be kept
                ('actualspeed_pti1', 'actualspeed_pti2'), 
                ('actualtorque_pti1','actualtorque_pti2'),
                ('motortemperature_pti1','motortemperature_pti2'),
                ('powerstagetemperature_pti1', 'powerstagetemperature_pti2'),
                ('rmsmotorcurrent_pti1','rmsmotorcurrent_pti2'),
                ('brktempra_cval', 'brktempfa_cval'),
                ('roadgrad_cval_pt','bs_roadincln_cval')]

# these signals should be explicitly considered for outlier removal to clean and improve data quality
signal_remove_outliers = ["roadgrad_cval_pt","hirestotalvehdist_cval_icuc","vehspd_cval_cpc", 
                        "hv_batmomavldischrgen_cval_1", "hv_bat_soc_cval_bms1", "airtempinsd_cval_hvac", "airtempoutsd_cval_cpc","hv_bathighcelltemp_cval_bms1",
                        "hv_batlowcelltemp_cval_bms1", "hv_dclink_volt_cval_dcl1", "actualdcvoltage_pti1", "hv_bat_dc_momvolt_cval_bms1","hv_batmaxchrgpwrlim_cval_1", 
                        "hv_batmaxdischrgpwrlim_cval_1"]

# optional signals
# could be kept and filled with zeros/ones instead, to retain the ePTO feature if available in the truck
optional_or_zero = ['epto_pwr_cval', 'currpwr_contendrnbrkresist_cval', 'elcomp_pwrcons_cval'] # --> fill with: "0" if not available
optional_or_one = ['brc_stat_brc1'] #--> fill with "1" if not available

# FILES  ###################################################################################################
# these trips do not contain valid trips or are too short or are sorted out for other reasons
invalid_original_trips = ['v_id983V101_trip30.parquet', 'v_id983V101_trip70.parquet', 'v_id983V101_trip8.parquet', 'v_id983V101_trip94_2.parquet', 'v_id983V10_trip19.parquet', 
                           'v_id983V10_trip22.parquet', 'v_id983V10_trip32.parquet', 'v_id983V10_trip36.parquet', 'v_id983V13_trip199.parquet', 'v_id983V14_trip9.parquet', 
                           'v_id983V15_trip70.parquet', 'v_id983V16_trip1.parquet', 'v_id983V16_trip178.parquet', 'v_id983V16_trip185.parquet', 'v_id983V16_trip197.parquet', 
                           'v_id983V16_trip2.parquet', 'v_id983V17_trip132.parquet', 'v_id983V18_trip198.parquet', 'v_id983V18_trip2.parquet', 'v_id983V18_trip218.parquet', 
                           'v_id983V18_trip6.parquet', 'v_id983V19_trip111.parquet', 'v_id983V19_trip113.parquet', 'v_id983V19_trip87.parquet', 'v_id983V19_trip99.parquet', 
                           'v_id983V1_trip100.parquet', 'v_id983V1_trip109.parquet', 'v_id983V1_trip116.parquet', 'v_id983V1_trip117.parquet', 'v_id983V1_trip118.parquet', 
                           'v_id983V1_trip13.parquet', 'v_id983V1_trip132.parquet', 'v_id983V1_trip142.parquet', 'v_id983V1_trip162.parquet', 'v_id983V1_trip163.parquet', 
                           'v_id983V1_trip26.parquet', 'v_id983V1_trip31.parquet', 'v_id983V1_trip40.parquet', 'v_id983V1_trip41.parquet', 'v_id983V1_trip43.parquet', 
                           'v_id983V1_trip44.parquet', 'v_id983V1_trip46.parquet', 'v_id983V1_trip54.parquet', 'v_id983V1_trip6.parquet', 'v_id983V2_trip60.parquet', 
                           'v_id983V4_trip139.parquet', 'v_id983V4_trip36.parquet', 'v_id983V4_trip58.parquet', 'v_id983V1_trip118.parquet', 'v_id983V1_trip132.parquet', 
                           'v_id983V14_trip9.parquet', 'v_id983V10_trip19.parquet', 'v_id983V17_trip132.parquet', 'v_id983V1_trip163.parquet', 'v_id983V1_trip132.parquet', 
                           'v_id983V10_trip19.parquet', 'v_id983V1_trip163.parquet', 'v_id983V1_trip132.parquet', 'v_id983V1_trip163.parquet', 'v_id983V19_trip87.parquet', 
                           'v_id983V1_trip163.parquet']

In [6]:
trips = [_ for _ in trips if _ not in invalid_original_trips]
print('Keep no. trips: ', len(trips))

keep_signals = [col for col in all_columns if col not in drop_signals + [b for _, b in pair_signals]]
print('Keep no. signals: ', len(keep_signals))

Keep no. trips:  358
Keep no. signals:  44


In [7]:
# UTILS
####################################################################################################
def expand_indices(r_list, ws, offset, max_val):
    r_set = set(r_list).copy()
    for i in range(-ws,ws+1):
        r_set = r_set | set([x+i for x in r_list])
    r_set = [x+offset for x in list(r_set)]
    drops = [x for x in list(r_set) if (x>=0 and x<max_val)]
    return sorted(drops)

####################################################################################################   
def smooth_filter(X, ws, remove_outliers = True, smooth = True):

    def custom_filter(X,ws, remove_outliers, thresh = 8):
        X_filtered = X.copy()
        X_noise  = abs(zscore(X.ffill().bfill() - savgol_filter(X.ffill().bfill(), ws,1, mode='nearest'), nan_policy='omit'))
        if remove_outliers:
            cr = X.std() - median_abs_deviation(X, nan_policy='omit')
            thresh_new = max(abs(X.std() / cr), thresh) if cr != 0 else max(abs(X.std()), thresh)
            peaks = expand_indices(list(np.where((X_noise > thresh_new))[0]), int(ws), 0, len(X))
            X_filtered[peaks] = np.nan
            X_filtered[X_noise.isna()] = np.nan
        else:
            thresh_new = None

        if X_filtered.nunique() < 2:
            X_filtered.fillna(X_filtered.mean(), inplace=True)
        else:
            X_filtered.ffill(inplace=True)
            X_filtered.bfill(inplace=True)  

        return X_filtered, X_noise, thresh_new

    X_filtered, X_noise, thresh = custom_filter(X, ws, remove_outliers = remove_outliers)
    
    if (X - X_filtered).std() > 10:
        X_filtered, X_noise, thresh = custom_filter(X_filtered, ws, remove_outliers = remove_outliers)

    if smooth: X_filtered = savgol_filter(X_filtered, ws/2,1, mode='nearest')
    return X_filtered, X_noise, thresh

####################################################################################################
def get_slices(c_state, min_length=60, exp=lambda val: val == 0):
        slices = []
        start = None
        for i, val in enumerate(c_state):
            condition = exp(val)
            if condition and start is None:
                    start = i
            elif not condition and start is not None:
                    if (i - start) >= min_length:
                        slices.append(slice(start, i))
                    start = None
        if start is not None and (len(c_state) - start) >= min_length:
            slices.append(slice(start, len(c_state)))
        return slices

####################################################################################################
def intersect_slices(slices_A, slices_B):
    intersection_slices = []
    for v_slice in slices_A:
        for c_slice in slices_B:
            start = max(v_slice.start, c_slice.start)
            stop = min(v_slice.stop, c_slice.stop)
            if start < stop:
                intersection_slices.append(slice(start, stop))
    return intersection_slices

####################################################################################################
def split_dataframe_by_slices(df, slices):
    return [df.iloc[s] for s in slices]

####################################################################################################
idx_dict = {}
def save_dataframes_to_parquet(dataframes, base_path, idx_dict, veh_id):
    if f"{veh_id}" not in idx_dict: idx_dict[f"{veh_id}"] = 0
    start_idx = idx_dict[f"{veh_id}"]+1
    new_files = []
    for i, df in enumerate(dataframes):
        filename = f"V{veh_id}_T{i+start_idx}.parquet"
        new_files.append(filename)
        df.to_parquet(Path(base_path, filename), engine='fastparquet')
    idx_dict[f"{veh_id}"] = start_idx + i
    return new_files, idx_dict

####################################################################################################
def find_nan_sections(series):
    nan_sections = []
    start_idx = None

    for idx, value in series.isna().items():
        if value and start_idx is None:
            start_idx = idx
        elif not value and start_idx is not None:
            if idx > start_idx + 2:
                nan_sections.append(start_idx)
            start_idx = None

    if start_idx is not None:
        nan_sections.append(start_idx)

    # Remove neighboring numbers
    nan_sections = [nan_sections[i] for i in range(len(nan_sections)) if i == 0 or nan_sections[i] != nan_sections[i-1] + 1]

    return nan_sections

In [8]:
from collections import Counter
vehicle_counts = Counter()

for trip in trips:
    vehicle_id = trip[1:4].strip("_TV."); file_code = trip[:9].strip(".parquet")
    #print(f"\tVehicle ID: {vehicle_id}, File Code: {file_code}")
    vehicle_counts[vehicle_id] += 1
vehicle_counts

Counter({'101': 358})

In [9]:
# PREPROCESSING #########################################################
save = True
#########################################################################
# loop through every file:
v_idx = {}
cut_list = []
nan_files = []
discard_files = []
for n, f in enumerate(trips):
    #if n>0: break
    # get file info
    vehicle_id = f[1:4].strip("_TV."); #file_code = f[:9].strip(".parquet")
    if vehicle_id not in v_idx: v_idx[vehicle_id] = 0
    v_idx[vehicle_id] += 1
    # new file numbering
    new_name = f"V{vehicle_id}_T{v_idx[vehicle_id]}"
    print(f"{n+1}/{len(trips)}, Reading File: {f}, New File Name: {new_name}")

    # read to dataframe
    keep_columns = [col for col in all_columns if col not in drop_signals]
    df = pd.read_parquet(Path(trips_processed_final, f), columns = keep_columns, engine='fastparquet')
    # drop certain signals according to previous analysis (not necessary if keep_columns is used)
    # df.drop(columns = drop_signals, inplace=True)

    # concatenate signal pairs
    for pair in pair_signals:
        df[pair[0]] = df[[pair[0],pair[1]]].mean(axis=1, skipna=True)
        df.drop(columns=[pair[1]], inplace=True)

    # correct timestamp if necessary:
    if max(df.signal_time).year < 2000: df.signal_time = pd.to_datetime(df.signal_ts * (10**3))
    if not ((df['signal_time'].dt.year >= 2021) & (df['signal_time'].dt.year <= 2024)).all(): 
        raise ValueError("signal_time values are not all between 2021 and 2024")
    df['signal_time'] = df['signal_time'].dt.round('1s')

    #####################################################################################
    # df column structure: [i,      0,              1,                  2, 3, 4,                    5                               -2, -1          ]
    #                      [row,    Date/Time,      Dist. from start,   Altitude + GPS,             features 1,2,...,n              targets,        ]
    #                      [index,  'signal_time',  'hirestotal...',    'altitude','lat','long',    signal_a, signal_b, c, ...      'bat_en','SOC'  ]
    #####################################################################################
    #  --> move signal_time as first column, then sort remaining columns alphabetically
    df.reset_index(drop=True, inplace=True)  # --> reset index to start from 0
    #df.sort_index(axis=1, inplace=True)
    # Reorder columns based on structure above:
    start_cols = ['signal_time','hirestotalvehdist_cval_icuc','altitude_cval_ippc', 'latitude_cval_ippc','longitude_cval_ippc']
    target_cols = ['hv_batmomavldischrgen_cval_1', 'hv_bat_soc_cval_bms1']
    feature_cols = sorted([col for col in df.columns if col not in start_cols + target_cols])
    columns_order = start_cols + feature_cols + target_cols
    df = df[columns_order]

    #####################################
    # remove distance offset, so each sequence starts at 0 km
    dist_offset = abs(df["hirestotalvehdist_cval_icuc"].iloc[df["hirestotalvehdist_cval_icuc"].first_valid_index()])
    df["hirestotalvehdist_cval_icuc"] -= dist_offset

    #####################################
    signal_only_nans = [sig for sig in list(df.columns[np.array(df.isnull().all())])]
    signal_with_nans = [sig for sig in list(df.columns[np.array(df.isnull().any())])]
    constants = {}

    # fill optional signals apporpriately
    for sig in signal_only_nans:
        if sig in optional_or_zero: df[sig] = 0.0
        elif sig in optional_or_one: df[sig] = 1.0

    for sig in df.columns:
        if True:
            # extract constant values if available:
            if len({x for x in df[sig] if x==x}) == 1:      # number of distinct non-NaN-values
                constants[sig] = df[sig].iloc[df[sig].first_valid_index()] 
            # Filtering, smoothing and outlier removal:
        if sig in signal_remove_outliers:   # DO NOT USE REMOVE OUTLIER FUNCTION
                df[sig], _, _ = smooth_filter(df[sig], 60, remove_outliers = True, smooth = False)

    #####################################
    # clip value limits for specified value ranges: --> replace with nans instead 
    #clip after filtering!!
    value_ranges = {'vehspd_cval_cpc': (0, 110),    # set ranges
                    'hv_batmomavldischrgen_cval_1': (2, 440),
                    'altitude_cval_ippc': (-20, 2800),
                    'roadgrad_cval_pt': (-20, 20),
                    'longitude_cval_ippc': (-5, 35),
                    'latitude_cval_ippc': (35, 68),
                    'vehweight_cval_pt': (7, 44),
                    'airtempoutsd_cval_cpc': (-30, 48),
                    'actualtorque_pti1': (-100, 100),
                    'elcomp_pwrcons_cval': (0, 14),
                    'hv_bat_soc_cval_bms1': (4.9, 100),
                    'hv_batavcelltemp_cval_bms1': (-10, 48),
                    'epto_pwr_cval': (0, 20), 
                    'currpwr_contendrnbrkresist_cval': (0, 90),
                    'brc_stat_brc1': [1.0, 2.0, 3.0], # set values
                    'selgr_rq_pt': [0.0, 1.0, 2.0]} # set values

    for signal, value_range in value_ranges.items():
        if isinstance(value_range, tuple):
            if (df[signal] < value_range[0]).any() or (df[signal] > value_range[1]).any():
                df.loc[(df[signal] < value_range[0]) | (df[signal] > value_range[1]), signal] = np.nan
                print(f"\tSignal {signal} clipped to range {value_range}")
        elif isinstance(value_range, list): df[signal] = df[signal].apply(lambda x: x if any(abs(x - y) <= 0.1 for y in value_range) else np.nan)


    # check if after clipping any column is all nans. if so, discard the file
    if df.isnull().all().any():
        all_nan_columns = df.columns[df.isnull().all()].tolist()
        print(f'Warning: All NaN values in {f}, columns: {all_nan_columns}\n{"-"*120}\n')
        discard_files.append(f)
        continue


    #####################################
    # cut first section, if altitude jump is still detected after preprocessing:
    alt_diff = pd.DataFrame(abs(np.diff(df["altitude_cval_ippc"])))
    lat_diff = pd.DataFrame(abs(np.diff(df["latitude_cval_ippc"])))

    exceed_indices = alt_diff[alt_diff[0] > 100].index.union(lat_diff[lat_diff[0] > 10].index) # sudden altitude jumps of more than 50m or latitude jumps of more than 10 degrees
    exceed_indices = sorted(set(exceed_indices))  # remove duplicates and sort


    if exceed_indices:
        print(alt_diff[0].max())
        print(lat_diff[0].max())
        print(f"\t exceed indices: {exceed_indices}")
        print(f"\tAltitude jump of {alt_diff[0].iloc[exceed_indices[-1]]} detected at index {exceed_indices[-1]}")
        l = len(df) # old length
        if len(df.loc[0:exceed_indices[0]-3]) > len(df.loc[exceed_indices[-1]+3:]):
            df = df.loc[0:exceed_indices[0]-3]  # cut last section
        else:
            df = df.loc[exceed_indices[-1]+3:]  # cut first section
        df.reset_index(drop=True, inplace=True) # reset index
        print(f"\t --> cut: {((l-len(df))/l*100):.2f} %")    # percentage of cut
        if ((l-len(df))/l*100) > 10: cut_list.append((f, ((l-len(df))/l*100)))
        if ((l-len(df))/l*100) > 50: 
            discard_files.append(f)
            print(f"\t --> discarded due to cut:")
            continue

    #####################################
    # check again for nans and fill only columns with nans:
    columns_with_nans = [sig for sig in df.columns[df.isna().any()].tolist()]
    if columns_with_nans: 
        df[columns_with_nans] = df[columns_with_nans].ffill().bfill()
    # check if there are still any nans:
    if df[columns_with_nans].isna().any().any(): 
        columns_with_nans = [sig for sig in df.columns[df.isna().any()].tolist()]
        if df[columns_with_nans].isnull().all().any():
            print(f'Error: All values are NaN in columns: {columns_with_nans} for file {f}\n{"-"*120}\n')
            discard_files.append(f)
            continue
        else:
            print(f'Warning: Still some NaN values in {f}, columns_with_nans: {columns_with_nans}\n{"-"*120}\n')
            print(f"number of nans: {df[columns_with_nans].isna().sum()}")
            nan_files.append(f)

    #####################################
    # Convert each column to datatype "float64", except for "signal_time"
    for col in df.columns: 
        if col not in ['signal_time']: df[col] = df[col].astype('float64')

    ############################################################################################
    # Save new dataframes
    if save:
        df.to_parquet(f'{trips_processed_cleaned}/{new_name}.parquet')
        print(f'\t --> "{new_name}.parquet" saved.\n{"-"*60}\n')


1/358, Reading File: V101_T1.parquet, New File Name: V101_T1
	 --> "V101_T1.parquet" saved.
------------------------------------------------------------

2/358, Reading File: V101_T10.parquet, New File Name: V101_T2
	 --> "V101_T2.parquet" saved.
------------------------------------------------------------

3/358, Reading File: V101_T100.parquet, New File Name: V101_T3
	 --> "V101_T3.parquet" saved.
------------------------------------------------------------

4/358, Reading File: V101_T101.parquet, New File Name: V101_T4
	 --> "V101_T4.parquet" saved.
------------------------------------------------------------

5/358, Reading File: V101_T102.parquet, New File Name: V101_T5
	 --> "V101_T5.parquet" saved.
------------------------------------------------------------

6/358, Reading File: V101_T103.parquet, New File Name: V101_T6
	 --> "V101_T6.parquet" saved.
------------------------------------------------------------

7/358, Reading File: V101_T104.parquet, New File Name: V101_T7
	 --

In [10]:
import pandas as pd
from pathlib import Path

# Function to check for NaN values in parquet files
def check_nan_files(folder_path):
    nan_files = []
    for file in folder_path.glob("*.parquet"):
        df = pd.read_parquet(file, engine='fastparquet')
        if df.isna().any().any():
            nan_files.append(file.name)
    return nan_files

# Define the folder paths
folders = [trips_processed_cleaned]

# Check for NaN files in each folder
for folder in folders:
    nan_files = check_nan_files(folder)
    print(f"Files with NaN values in {folder}:")
    for file in nan_files:
        print(file)

Files with NaN values in C:\Users\SIEGLEW\OneDrive - Daimler Truck\MA\Code\MA-Data\new_folder_2:
