# grid_search

**Short Introduction** 

This notebook allows to perform parameter grid searches. For every type of pipeline there is a corresponding grid search config. In this config, the parameter ranges can be configures. The grid search is then performed and the results are saved to a log file.
The number of workers should be chosen according to the number of available CPU cores and the memory consumption of the pipeline. Pipeline functions have been optimized for concurrency to limit the memory consumption and use cached data when used repeatedly. On DDR4 systems RAM speed can become a bottleneck when using compute intensive regressors.

---
The `execute_pipeline` function manages **parallel execution** of a pipeline function (e.g., a hyperparameter sweep) over multiple parameter combinations. It uses a **ProcessPoolExecutor** to run jobs concurrently, and **logs results to CSV** in consistent columns. The function also includes **basic error tracking** and can handle large workloads in a batched manner to reduce file I/O overhead.

## Parameters

- **pipeline_function**  
  The function to apply to each parameter combination. Generally, this is a pipeline or model evaluation function.

- **param_combinations** (`list`)  
  A list of tuples (or lists) representing the different combinations of parameters to evaluate.

- **param_keys** (`list` of `str`)  
  Names corresponding to each position in the parameter combinations, ensuring each combination can be unpacked into a dictionary.

- **log_path** (`str`)  
  File path to which results are written as CSV. Automatically creates the parent directory if needed.

- **num_workers** (`int`, default=6)  
  Maximum number of worker processes for parallel execution. Limited by the number of CPU threads available.

- **data_cache** (any, optional)  
  Shared data or resources to be passed into each pipeline call if needed.

- **batch_size** (`int`, default=100)  
  Number of results to buffer in memory before writing to the CSV file in bulk.

---

## Return Values

**No direct return**. The function writes results to a CSV at `log_path`. It also  
- Tracks the number of successful vs. failed runs  
- Logs the most frequently occurring error message  
- Optionally sends progress and completion notifications  

Under the hood, it organizes the final CSV so that columns appear in a fixed, consistent order, even if new metrics or errors appear in later tasks.

In [None]:
# Check how many workers are available
import os
print(f'Available workers: {os.cpu_count()}')

---
# Efficiency Map Config

The following cell contains the configuration to perform a grid search for the efficiency map pipeline. The First part of the config dictionary should not be changed, only the param_values key should be adjusted to the desired parameter ranges. For every parameter a list of values should be provided. If a parameter shall not be varied, a list with a single value should be provided. Specify a comment to make the current grid search more unique in the log file. For this pipeline using certain `twoD_smoothing_kwargs` like specific regressor configs or other lazy evaluation models can lead to excessive memory usage. Reduce accordingly if necessary.

In [None]:
from itertools import product
from modules.parametric_pipelines import efficiencymap_pipeline
from modules.threadpool_executer import execute_pipeline
from modules.data_handler import get_can_files


# Define the parameter space for Efficiency Map Pipeline
efficiencymap_config = {
    "comment": "Comment for the run",
    "param_keys": [
        'comment', 'files', 'gear', 'efficiency_limit_lower', 'efficiency_limit_upper', 'soc_limit_lower', 'soc_limit_upper', 'remove_neutral_gear',
        'smoothing_kwargs', 'columns_to_smooth', 'substract_auxiliary_power', 'which_full_load_curve', 'twoD_smoothing_kwargs', 'high_fidelity_interpolation',
        'n_quantize_bins', 'at_middle_of_bin', 'n_interpolation_bins', 'global_offset', 'generate_plots', 'verbose'
    ],
    "param_values": list(product(
        ["Test Run"],  # Supply comment
        [   # files
            get_can_files(folder='data/', exclude_keywords=['coastdown', 'sascha', 'RLC_Konstantfahrt'])
            # or use 'all' to use all files
        ],
        [1, 2],  # gear
        [0, 0.02, 0.04, 0.06, 0.08, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4],  # efficiency_limit_lower
        [0.97, 0.98, 0.99, 1],  # efficiency_limit_upper
        [0],  # soc_limit_lower
        [100],  # soc_limit_upper
        [True],  # remove_neutral_gear
        [  # smoothing_kwargs
            {'filter_type': 'moving_average', 'window_size': 3},
            {'filter_type': 'moving_average', 'window_size': 5},
            {'filter_type': 'moving_average', 'window_size': 7},
            {'filter_type': 'moving_average', 'window_size': 10},
            {'filter_type': 'exponential_moving_average', 'alpha': 0.4},
            {'filter_type': 'exponential_moving_average', 'alpha': 0.6},
            {'filter_type': 'exponential_moving_average', 'alpha': 0.8},
            {'filter_type': 'savitzky_golay', 'window_length': 5, 'polyorder': 3},
            {'filter_type': 'savitzky_golay', 'window_length': 10, 'polyorder': 3},
            {'filter_type': 'savitzky_golay', 'window_length': 15, 'polyorder': 3},
            {'filter_type': 'savitzky_golay', 'window_length': 5, 'polyorder': 5},
            {'filter_type': 'savitzky_golay', 'window_length': 10, 'polyorder': 5},
            {'filter_type': 'savitzky_golay', 'window_length': 15, 'polyorder': 5},
            {'filter_type': 'savitzky_golay', 'window_length': 5, 'polyorder': 1},
            {'filter_type': 'savitzky_golay', 'window_length': 10, 'polyorder': 1},
            {'filter_type': 'savitzky_golay', 'window_length': 15, 'polyorder': 1},
            None
        ],
        [   # columns_to_smooth
            ['hv_battery_current', 'hv_battery_voltage', 'rear_motor_torque', 'engine_rpm', 'dcdc_power_hv'],
            ['dcdc_power_hv', 'hv_battery_current', 'hv_battery_voltage'],
            ['rear_motor_torque', 'engine_rpm']
        ],
        [True],  # substract_auxiliary_power
        ['adjusted'],  # which_full_load_curve
        [  # twoD_smoothing_kwargs
            {  # IDW config
                'method': 'idw',
                'power': 1,
                'num_neighbors': 10,
                'outlier_detection': True,
                'threshold_multiplier': 2
            },
            {  # Gaussian filter config
                'method': 'gaussian_filter',
                'sigma': 2,
                'grid_size': 50
            },
            {  # Griddata config
                'method': 'griddata',
                'interp_method': 'nearest'
            },
            {  # Regression config
                'method': 'regression',
                'model': 'random_forest',
                'model_params': {
                    'n_estimators': 50,
                    'max_depth': 5,
                    'random_state': 42
                }
            },
            None
        ],
        [False],  # high_fidelity_interpolation
        [10],  # n_quantize_bins
        [True],  # at_middle_of_bin
        [10],  # n_interpolation_bins
        [0],  # global_offset
        [False],  # generate_plots (leave off for grid search)
        [False]  # verbose (leave off for grid search)
    )),
    "log_path": "data/logs/efficiencymap_log.csv",
    "pipeline_function": efficiencymap_pipeline
}


config = efficiencymap_config

# Preload data to cache for disk I/O optimization
def collect_all_filenames(param_values, param_keys):
    from modules.data_handler import get_can_files
    files_index = param_keys.index('files')
    unique_files = set()
    for params in param_values:
        files_param = params[files_index]
        if files_param == 'all':
            return get_can_files()
        else:
            unique_files.update(files_param)
    return list(unique_files)

# Collect all files
files_to_load = collect_all_filenames(config['param_values'], config['param_keys'])
data_cache = {}
from modules.data_handler import load_can_data
for filename in files_to_load:
    data = load_can_data(filename, verbose=False)
    data_cache[filename] = data


num_workers = 96 # Adjust to available workers
# Execute the pipeline
execute_pipeline(
    config['pipeline_function'],
    config['param_values'],
    config['param_keys'],
    config['log_path'],
    num_workers=num_workers,
    data_cache=data_cache,
    batch_size=num_workers*2
)

---
# Coastdown Config

The following cell contains the configuration to perform a grid search for the coastdown pipeline. It works the same as the efficiency map config. The First part of the config dictionary should not be changed, only the param_values key should be adjusted to the desired parameter ranges.

In [None]:
from itertools import product
from modules.parametric_pipelines import coastdown_pipeline
from modules.threadpool_executer import execute_pipeline
from modules.data_handler import get_can_files, load_can_data
import numpy as np

path = 'data/path_to_coastdown_files'  # Adjust to your path
files = get_can_files(folder=path)

# Define the parameter space for Coastdown Pipeline
coastdown_config = {
    "comment": "Coastdown Grid Search",
    "param_keys": [
        'comment',
        'files',
        'do_pitch_correction',
        'speed_signal',
        'bucket_size',
        'vehicle_mass',
        'rotating_mass_eq',
        'frontal_area',
        'smoothing_kwargs',
        'columns_to_smooth',
        'steering_angle_limit',
        'select_suspension_level',
        'deriv_lower_limit',
        'deriv_upper_limit',
        'cut_time',
        'seed',
        'target_n_segments',
        'outlier_threshold',
        'loss_type',
        'generate_plots',
        'verbose'
    ],
    "param_values": list(product(
        ["Run 1"],  # Supply comment
        [   # files
            files
        ],
        [True],  # do_pitch_correction
        [   # speed_signal
            'vehicle_speed',
            'vehicle_speed_pitch_corrected',
            'vehicle_speed_gps',
            'vehicle_speed_gps_pitch_corrected'
        ],
        [2, 4, 6, 10, 15],  # bucket_size
        [2300],  # vehicle_mass
        [50],  # rotating_mass_eq
        [2.33],  # frontal_area
        [   # smoothing_kwargs
            {'filter_type': 'moving_average', 'window_size': 5},
            {'filter_type': 'moving_average', 'window_size': 20},
            {'filter_type': 'moving_average', 'window_size': 50},
            {'filter_type': 'moving_average', 'window_size': 100},
            {'filter_type': 'exponential_moving_average', 'alpha': 0.03},
            {'filter_type': 'exponential_moving_average', 'alpha': 0.05},
            {'filter_type': 'exponential_moving_average', 'alpha': 0.1},
            {'filter_type': 'exponential_moving_average', 'alpha': 0.2},
            {'filter_type': 'exponential_moving_average', 'alpha': 0.4},
            {'filter_type': 'savitzky_golay', 'window_length': 5, 'polyorder': 3},
            {'filter_type': 'savitzky_golay', 'window_length': 25, 'polyorder': 3},
            {'filter_type': 'savitzky_golay', 'window_length': 50, 'polyorder': 3},
            {'filter_type': 'savitzky_golay', 'window_length': 100, 'polyorder': 3},
            {'filter_type': 'savitzky_golay', 'window_length': 5, 'polyorder': 5},
            {'filter_type': 'savitzky_golay', 'window_length': 25, 'polyorder': 5},
            {'filter_type': 'savitzky_golay', 'window_length': 50, 'polyorder': 5},
            {'filter_type': 'savitzky_golay', 'window_length': 100, 'polyorder': 5},
            None
        ],
        [   # columns_to_smooth
            ['accelerator_pedal', 'steering_wheel_angle', 'vehicle_speed']
        ],
        [1, 3, 10],  # steering_angle_limit
        [None],  # select_suspension_level
        [-0.5, -0.4, -0.3, -0.2],  # deriv_lower_limit
        [-0.01, 0, 0.01, 0.02, 0.03, 0.04],  # deriv_upper_limit
        [1, 3, 5, 10],  # cut_time
        [42],  # seed
        [None],  # target_n_segments
        [None, 0.5, 1],  # outlier_threshold
        ['sec_rmse'],  # loss_type
        [False],  # generate_plots
        [False]  # verbose
    )),
    "log_path": "data/logs/coastdown_log.csv",
    "pipeline_function": coastdown_pipeline
}

config = coastdown_config

def collect_all_filenames(param_values, param_keys):
    files_index = param_keys.index('files')
    unique_files = set()
    for params in param_values:
        files_param = params[files_index]
        if files_param == 'all':
            unique_files.update(get_can_files())
        else:
            unique_files.update(files_param)
    return list(unique_files)

# Preload data to cache for disk I/O optimization
files_to_load = collect_all_filenames(config['param_values'], config['param_keys'])
data_cache = {}
for filename in files_to_load:
    data = load_can_data(filename, verbose=False)
    data_cache[filename] = data

num_workers = 52 # Adjust to available workers
# Execute the pipeline with data_cache
execute_pipeline(
    config['pipeline_function'],
    config['param_values'],
    config['param_keys'],
    config['log_path'],
    num_workers=num_workers,
    data_cache=data_cache,
    batch_size=num_workers*2
)

---
# Constspeed Config

In [None]:
from itertools import product
from modules.parametric_pipelines import constspeed_pipeline
from modules.threadpool_executer import execute_pipeline
from modules.data_handler import get_can_files, load_can_data

files_neubiberg = get_can_files(folder='data/path_to_constant_speed_files')  # Adjust to your path

# Define the parameter space for Constant Speed Pipeline
constspeed_config = {
    "comment": "Constant Speed Grid Search",
    "param_keys": [
        'comment',
        'files',
        'speed_signal',
        'speed_threshold',
        'min_n_samples',
        'min_avg_speed',
        'do_pitch_correction',
        'cut_time',
        'smoothing_kwargs',
        'columns_to_smooth',
        'steering_angle_limit',
        'select_suspension_level',
        'vehicle_mass',
        'frontal_area',
        'outlier_threshold',
        'loss_type',
        'generate_plots',
        'verbose'
    ],
    "param_values": list(product(
        ["Run 1"],  # comment
        [files],  # files
        [   # speed_signal
            'vehicle_speed'
        ],
        [1, 2, 4, 8],  # speed_threshold
        [10, 20, 40],  # min_n_samples
        [2, 4, 10],  # min_avg_speed
        [True],  # do_pitch_correction
        [None, 1, 4, 8],  # cut_time
        [   # smoothing_kwargs
            {'filter_type': 'moving_average', 'window_size': 5},
            {'filter_type': 'moving_average', 'window_size': 20},
            {'filter_type': 'moving_average', 'window_size': 50},
            {'filter_type': 'moving_average', 'window_size': 100},
            {'filter_type': 'exponential_moving_average', 'alpha': 0.03},
            {'filter_type': 'exponential_moving_average', 'alpha': 0.05},
            {'filter_type': 'exponential_moving_average', 'alpha': 0.1},
            {'filter_type': 'exponential_moving_average', 'alpha': 0.2},
            {'filter_type': 'exponential_moving_average', 'alpha': 0.4},
            None
        ],
        [   # columns_to_smooth
            ['vehicle_speed']
        ],
        [1, 3, 5, 10],  # steering_angle_limit
        [None],  # select_suspension_level
        [2300],  # vehicle_mass
        [2.33],  # frontal_area
        [None, 0.5, 1],  # outlier_threshold
        ['sec_rmse'],  # loss_type
        [False],  # generate_plots
        [False]  # verbose
    )),
    "log_path": "data/logs/constspeed_pipeline_log_thesis.csv",
    "pipeline_function": constspeed_pipeline
}

config = constspeed_config

def collect_all_filenames(param_values, param_keys):
    files_index = param_keys.index('files')
    unique_files = set()
    for params in param_values:
        files_param = params[files_index]
        if files_param == 'all':
            unique_files.update(get_can_files())
        else:
            unique_files.update(files_param)
    return list(unique_files)

# Preload data to cache for disk I/O optimization
files_to_load = collect_all_filenames(config['param_values'], config['param_keys'])
data_cache = {}
for filename in files_to_load:
    data = load_can_data(filename, verbose=False)
    data_cache[filename] = data

# Execute the pipeline with data_cache
num_workers = 116 # Adjust to available workers
execute_pipeline(
    config['pipeline_function'],
    config['param_values'],
    config['param_keys'],
    config['log_path'],
    num_workers=num_workers,
    data_cache=data_cache,
    batch_size=num_workers*2
)

---
# Gear Strategy Config


In [None]:
from itertools import product
from modules.parametric_pipelines import gearstrategy_pipeline
from modules.threadpool_executer import execute_pipeline
from modules.data_handler import get_can_files, load_can_data
import pickle

# Define the parameter space for Gear Strategy Pipeline
gearstrategy_config = {
    "comment": "Gear Strategy Grid Search",
    "param_keys": [
        'comment',
        'smoothing_kwargs',
        'columns_to_smooth',
        'gear_change_offset_samples',
        'outlier_eps',
        'outlier_min_samples',
        'close_points_merge_thr',
        'n_clusters',
        'attach_endpoints',
        'anchor_points',
        'spline_order',
        'num_knots',
        'cluster_weight',
        'normal_weight',
        'knot_distr_method',
        'verbose',
        'generate_comparison_plots',
        'generate_plots'
    ],
    "param_values": list(product(
        ["Run 1"], # comment
        [   # smoothing_kwargs
            None
        ],
        [   # columns_to_smooth
            None
        ],
        [0], # gear_change_offset_samples
        [0.025, 0.05, 0.1, 0.125], # outlier_eps
        [2, 3, 4], # outlier_min_samples
        [0.0005, 0.001, 0.005, 0.01, 0.025, 0.05], # close_points_merge_thr
        [5, 6, 7, 8], # n_clusters
        [True, False], # attach_endpoints
        [True], # anchor_points
        [1, 2, 3], # spline_order
        [3, 5, 7, 9], # num_knots
        [0.1, 0.3, 0.5, 0.7, 0.9], # cluster_weight
        [None], # normal_weight set to None to keep it inverse to cluster_weight
        ['cluster'], # knot_distr_method
        [False], # verbose
        [False], # generate_comparison_plots
        [False] # generate_plots
    )),
    "log_path": "data/logs/gear_strategy_log.csv",
    "pipeline_function": gearstrategy_pipeline
}

config = gearstrategy_config

# Initialize data_cache
data_cache = {}

# Load the ground truth results_normal dictionary
with open('data/path_to_results_normal.pkl', 'rb') as f:
    results_normal_loaded = pickle.load(f)

# Load the results_sport dictionary
with open('data/path_to_results_sport.pkl', 'rb') as f:
    results_sport_loaded = pickle.load(f)

# Remove 'line_function' from results dictionaries if present
for axis2 in results_normal_loaded:
    results_normal_loaded[axis2].pop('line_function', None)

for axis2 in results_sport_loaded:
    results_sport_loaded[axis2].pop('line_function', None)

# Load the gear change data
normal_files = get_can_files('data/path_to_files_recorded_in_normal_mode')  # Adjust to your path
normal_data = load_can_data(normal_files[0], verbose=False)

sport_files = get_can_files('data/path_to_files_recorded_in_sport_mode')  # Adjust to your path
sport_data = load_can_data(sport_files[0], verbose=False)

# Store loaded data into data_cache
data_cache['results_normal_loaded'] = results_normal_loaded
data_cache['results_sport_loaded'] = results_sport_loaded
data_cache['normal_data'] = normal_data
data_cache['sport_data'] = sport_data

num_workers = 7 # Adjust to available workers
# Execute the pipeline with data_cache
execute_pipeline(
    config['pipeline_function'],
    config['param_values'],
    config['param_keys'],
    config['log_path'],
    num_workers=num_workers,
    data_cache=data_cache,
    batch_size=num_workers*2
)