In [1]:

# Import all datasets, helpers

import src.data.api.models as data_models
import src.helpers.helpers as helpers

# Import all the necessary libraries

from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.svm import SVC
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GridSearchCV

from datetime import datetime
import pandas as pd
import joblib
from joblib import Parallel, delayed
from dask.distributed import Client, LocalCluster 
import os

output = 'trained_models' # Output folder for trained models
random_state = 42 # Random state for reproducibility
n_jobs = 4 # Number of cores to use for parallel processing


In [2]:

# Here it is possible to adjust the datasets and models that will be used in the project
# Set any of the pred_models to None if you don't want to use it

# Load datasets from training_settings.json

datasets = helpers.load_json_file('training_settings.json')

# e.g. if you want to remove the random_forest model from the breast_cancer dataset
# datasets['breast_cancer']['pred_models']['random_forest'] = None

In [3]:


######################################
### Run the models on the datasets ###
######################################

# Loop through all the datasets

# Record the current date and time to the nearest minute
now_format = datetime.now().strftime("%d_%m_%Y__%H-%M")

for dataset in datasets:
    
    if datasets[dataset]['active'] == False:
        continue
    
    log = f"Model training evaluation for {dataset} on {now_format}"
    
    
    # Get the model name from the dataset and load it, else set it to None
    
    
    dataset_model_name = datasets[dataset]['data_model']
    dataset_model_class = getattr(data_models, dataset_model_name, None)
    
    if dataset_model_name == None:
        print(f"Dataset {dataset} has no model name")
        continue
    
    if dataset_model_class is not None:
        
        # Load the dataset with the smote parameter (e.g. False or 'auto')and random state
        
        smote = datasets[dataset]['smote']
        cleaned_data = dataset_model_class(smote=smote, random_state=random_state)
        
    else:
        print(f"Dataset {dataset} ({dataset_model_name}) not found")
        continue
    
    
    # Split the data into target variable and featurest
    
    X = cleaned_data['X']
    y = cleaned_data['y']
        
    # Split the data into training and testing sets
    X_train, X_test, y_train, y_test = cleaned_data.train_test_split(X, y)
    
    
    # Train all the models on the dataset
    for pred_model in datasets[dataset]['pred_models']:
        
        print(f"Running {pred_model} on {dataset}")
        
        # Set the steps and parameters for the pipeline
        pipe_params = {}

        match pred_model:
            
            case 'rf':
                steps = [('rf', RandomForestClassifier())]
                pipe_params.update({
                    'rf__n_estimators': [100, 200, 300],
                    'rf__max_depth': [2, 4, 8, 10, 20],
                    'rf__min_samples_split': [2, 5, 10],
                    'rf__min_samples_leaf': [1, 2, 4]
                })
                
            case 'rbf_svm':
                steps = [
                        ('scaler', StandardScaler()),
                        ('rbf_svm', SVC())]
                pipe_params.update({
                    'rbf_svm__C': [0.1, 1, 10, 100],
                    'rbf_svm__gamma': [1, 0.1, 0.01, 0.001],
                    'rbf_svm__kernel': ['rbf']
                })
                
            case 'dnn':
                steps = [('dnn', MLPClassifier())]
                pipe_params.update({
                    'dnn__hidden_layer_sizes': [(100,)],
                    'dnn__activation': ['relu', 'tanh'],
                    'dnn__alpha': [0.0001, 0.001, 0.01],
                    'dnn__learning_rate': ['constant', 'adaptive']
                })
                    
                
            case _:
                steps = None
                print(f"Model {pred_model} not found")
                
        # Apply override parameters
        if datasets[dataset]['pred_models'][pred_model] is not None or datasets[dataset]['pred_models'][pred_model] != {}:
            pipe_params.update(datasets[dataset]['pred_models'][pred_model])
                
        pipe = Pipeline(steps=steps)
        pipe.fit(X_train, y_train)
        
        search = GridSearchCV(pipe, pipe_params, cv=2, n_jobs=n_jobs, verbose=1)
        
        cluster = LocalCluster()  
        client = Client(cluster) 
        
        with joblib.parallel_backend("dask", scatter=[X_train, y_train]):  
            search.fit(X_train, y_train)
        
        
        # Create a folder with the datetime if it doesn't exist
        base = output + '/' + now_format
        if not os.path.exists(base):
            os.makedirs(base)
        
        # Save the model to the output folder with the dataset name and the model name
        joblib.dump(search.best_estimator_, f"{base}/{dataset}_{pred_model}.joblib")
        
        # Log the evaluation metrics to a log file, including the best parameters
        log += helpers.dataset_log(search, X_test, y_test, pred_model, cleaned_data.get_labels())
    
        
    # Save the log to a file
    with open(f"{base}/{dataset}_model_comparison.txt", 'w') as file:
        file.write(log)
        file.close()
        
        
        
        

Running rf on credit_score


ValueError: To use Joblib with Dask first create a Dask Client

    from dask.distributed import Client
    client = Client()
or
    client = Client('scheduler-address:8786')