<a href="https://colab.research.google.com/github/rezzie-rich/colab-notebooks/blob/main/RB_MisLoROpt_con_t1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
model:
  name: 'mistral-7b-model'
  torch_dtype: 'torch.bfloat16'

lora:
  enable: true
  config:
    lora_r: 4
    lora_alpha: 32

training:
  global_args:
    output_dir: './model_output'
    logging_dir: './logs'
    evaluation_strategy: 'steps'
    logging_steps: 500
    save_strategy: 'epoch'
    save_total_limit: 3
    load_best_model_at_end: true
    metric_for_best_model: 'loss'
    greater_is_better: false
  datasets_info:
    - dataset_name: 'dataset1'
      split: 'train'
      text_fields: ['text_field1', 'text_field2']
      label_fields:
        label1: 'categorical'
        label2: 'continuous'
      max_token_length: 512
      lora:
        enable: true
        config:
          lora_r: 4
          lora_alpha: 32
      training_args:
        learning_rate: 5e-5
        num_train_epochs: 3
        per_device_train_batch_size: 8
        warmup_steps: 100
        weight_decay: 0.01
      optuna:
        enable: true
        study_name: 'optuna_study_dataset1'
        direction: 'minimize'
        n_trials: 100
        param_ranges:
          learning_rate:
            low: 1e-5
            high: 1e-4
          num_train_epochs:
            low: 2
            high: 5
          per_device_train_batch_size:
            options: [8, 16, 32]

    - dataset_name: 'dataset2'
      split: 'test'
      text_fields: ['content']
      label_fields:
        category: 'categorical'
      max_token_length: 256
      lora:
        enable: true
        config:
          lora_r: 8
          lora_alpha: 16
      training_args:
        learning_rate: 3e-5
        num_train_epochs: 5
        per_device_train_batch_size: 16
        warmup_steps: 50
        weight_decay: 0.02
      optuna:
        enable: false

    - dataset_name: 'dataset3'
      split: 'validation'
      text_fields: ['description']
      label_fields:
        score: 'continuous'
      max_token_length: 128
      lora:
        enable: false
      training_args:
        learning_rate: 2e-5
        num_train_epochs: 4
        per_device_train_batch_size: 32
        warmup_steps: 75
        weight_decay: 0.03
      optuna:
        enable: false

experience_replay:
  enable: true
  replay_rate: 0.1


In [3]:
# Install necessary libraries
import subprocess
import sys
import pkg_resources

packages = [
    'pyyaml',
    'pandas',
    'torch',
    'transformers',
    'datasets',
    'scikit-learn',
    'huggingface-hub',
    'optuna',
    'numpy'
]

for package in packages:
    try:
        # Attempt to install the package
        subprocess.check_call([sys.executable, '-m', 'pip', 'install', package])

        # After installation, verify if the package is installed
        dist = pkg_resources.get_distribution(package)
        print(f"Successfully installed {package} with version {dist.version}")
    except subprocess.CalledProcessError as e:
        print(f"Failed to install {package}. Error: {str(e)}")
    except pkg_resources.DistributionNotFound:
        print(f"{package} was not found after installation attempt. Please check for errors.")


Successfully installed pyyaml with version 6.0.1
Successfully installed pandas with version 1.5.3
Successfully installed torch with version 2.1.0+cu121
Successfully installed transformers with version 4.35.2
Successfully installed datasets with version 2.16.1
Successfully installed scikit-learn with version 1.2.2
Successfully installed huggingface-hub with version 0.20.3
Successfully installed optuna with version 3.5.0
Successfully installed numpy with version 1.23.5


In [None]:
# Modularized and Dynamic Training Script for Mistral 7B with LoRA, PEFT, and Hyperparameter Optimization using config file

import logging
import yaml
import pandas as pd
import torch
from transformers import (AutoModelForCausalLM, AutoTokenizer, Trainer, TrainingArguments, DataCollatorForSeq2Seq, trainer_utils)
from datasets import load_dataset, Dataset, concatenate_datasets
from peft import LoraConfig, prepare_model_for_int8_training, get_peft_model
from sklearn.model_selection import train_test_split
from huggingface_hub import notebook_login, HfFolder
import optuna
from optuna.integration import HuggingFacePruner
import shutil
import os
import numpy as np

# Authenticate with Hugging Face
notebook_login()

# Setting up logging for debugging and tracking
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 1. Configuration Management
class ConfigManager:
    def __init__(self, config_path):
        with open(config_path, 'r') as file:
            self.config = yaml.safe_load(file)

    def get(self, path, default=None):
        keys = path.split('.')
        value = self.config
        for key in keys:
            value = value.get(key, None)
            if value is None:
                return default
        return value

    def get_dataset_config(self, dataset_name, config_type):
        datasets_info = self.get('datasets_info', [])
        for dataset_info in datasets_info:
            if dataset_info['dataset_name'] == dataset_name:
                return dataset_info.get(config_type, None)
        return None

# 2. Authentication and Setup
class HfAuthenticator:
    @staticmethod
    def authenticate():
        notebook_login()

# 3. Model and Tokenizer Initialization
class ModelInitializer:
    def __init__(self, model_name):
        self.model_name = model_name
        self.model = None
        self.tokenizer = None

    def initialize(self):
        try:
            self.model = AutoModelForCausalLM.from_pretrained(self.model_name, torch_dtype=torch.bfloat16)
            self.tokenizer = AutoTokenizer.from_pretrained(self.model_name)
        except Exception as e:
            logging.error(f"Error initializing model and tokenizer: {e}")
            raise

# 4. LoRA Configuration
class LoraConfigurer:
    def __init__(self, model):
        self.model = model

    def configure(self, lora_config=None):
        if lora_config is None:
            raise ValueError("No LoRA configuration provided.")
        try:
            # Preparing model for quantized INT8 training for efficiency
            self.model = prepare_model_for_int8_training(self.model)
            # Applying PEFT for efficient distributed training
            self.model = get_peft_model(self.model, lora_config)  # Use lora_config here
        except Exception as e:
            logging.error(f"Error configuring LoRA: {e}")
            raise

# 5. Dataset Management
class DatasetManager:
    def __init__(self, dataset_info, tokenizer):
        self.dataset_info = dataset_info
        self.tokenizer = tokenizer

    def load_and_prepare(self):
        try:
            dataset = load_dataset(self.dataset_info['dataset_name'], split=self.dataset_info.get('split', 'train'))
            dataset = self.auto_preprocess(dataset)
            return dataset
        except Exception as e:
            logging.error(f"Error loading and preparing dataset: {e}")
            raise

    def auto_preprocess(self, dataset):
        text_fields = self.dataset_info.get('text_fields', [])
        label_fields = self.dataset_info.get('label_fields', {})

        def preprocess_function(examples):
            # Process text fields
            if text_fields:
                concatenated_text = [' '.join([examples[field] for field in text_fields]) for _ in range(len(examples[text_fields[0]]))]
                tokenized_examples = self.tokenizer(concatenated_text, padding='max_length', truncation=True, max_length=self.dataset_info.get('max_token_length', 512))
            else:
                raise ValueError(f"No text fields provided in the dataset_info for dataset: {self.dataset_info['dataset_name']}.")

            # Process label fields
            for label_field, label_type in label_fields.items():
                if label_type == 'categorical':
                    tokenized_examples['labels'] = examples[label_field]
                elif label_type == 'continuous':
                    tokenized_examples['labels'] = [[float(label)] for label in examples[label_field]]
                elif label_type == 'multi_label':
                    tokenized_examples['labels'] = [list(map(float, label.split(','))) for label in examples[label_field]]

            return tokenized_examples

        dataset = dataset.map(preprocess_function, batched=True)
        return dataset

# 6. Training Loop and Evaluation
class TrainerWrapper:
    def __init__(self, model, tokenizer):
        self.model = model
        self.tokenizer = tokenizer
        self.trainer = None  # Initialize trainer only in the train method

    def train(self, train_dataset, test_dataset, training_args):
        if training_args is None:
            raise ValueError("No training arguments provided.")
        training_args_obj = TrainingArguments(**training_args)
        try:
            data_collator = DataCollatorForSeq2Seq(self.tokenizer, model=self.model, label_pad_token_id=-100, pad_to_multiple_of=8)
            self.trainer = Trainer(
                model=self.model,
                args=training_args_obj,
                train_dataset=train_dataset,
                eval_dataset=test_dataset,
                data_collator=data_collator,
                callbacks=[CustomCheckpointCallback(self.model, training_args_obj.output_dir)]
            )
            self.trainer.train()
        except Exception as e:
            logging.error(f"Error during model training: {e}")
            raise

# 7. Hyperparameter Optimization
class HyperparameterOptimizer:
    def __init__(self, model, tokenizer, training_args_template, datasets_info):
        self.model = model
        self.tokenizer = tokenizer
        self.training_args_template = training_args_template
        self.datasets_info = datasets_info
        self.study = None  # Initialized in optimize()

    def objective(self, trial, training_args_template, optuna_config):
        if optuna_config is None:
            raise ValueError("No Optuna configuration provided.")
        # Fetch Optuna parameter ranges from the config
        learning_rate_low = optuna_config['param_ranges']['learning_rate']['low']
        learning_rate_high = optuna_config['param_ranges']['learning_rate']['high']
        num_train_epochs_low = optuna_config['param_ranges']['num_train_epochs']['low']
        num_train_epochs_high = optuna_config['param_ranges']['num_train_epochs']['high']
        per_device_train_batch_size_options = optuna_config['param_ranges']['per_device_train_batch_size']['options']

        # Optuna suggests hyperparameters
        learning_rate = trial.suggest_loguniform("learning_rate", learning_rate_low, learning_rate_high)
        num_train_epochs = trial.suggest_int("num_train_epochs", num_train_epochs_low, num_train_epochs_high)
        per_device_train_batch_size = trial.suggest_categorical("per_device_train_batch_size", per_device_train_batch_size_options)

        # Update training arguments with suggestions
        training_args = TrainingArguments(
            **training_args_template,
            learning_rate=learning_rate,
            num_train_epochs=num_train_epochs,
            per_device_train_batch_size=per_device_train_batch_size,
        )

        # Train the model with suggested hyperparameters
        trainer_wrapper = TrainerWrapper(self.model, training_args)
        for dataset_info in self.datasets_info:
            dataset_manager = DatasetManager(dataset_info, self.tokenizer)
            dataset = dataset_manager.load_and_prepare()
            train_dataset, test_dataset = train_test_split(dataset, test_size=0.2)
            trainer_wrapper.train(train_dataset, test_dataset)

        # Evaluate the model
        eval_result = trainer_wrapper.trainer.evaluate()
        return eval_result['eval_loss']  # or another metric that you prefer

    def optimize(self, training_args_template, optuna_config):
      if optuna_config is None:
            raise ValueError("No Optuna configuration provided.")
        self.study = optuna.create_study(
            study_name=optuna_config['study_name'],
            direction=optuna_config['direction'],
            pruner=HuggingFacePruner()
        )
        self.study.optimize(
            lambda trial: self.objective(trial, training_args_template, optuna_config),
            n_trials=optuna_config['n_trials']
        )
        return self.study.best_trial.params

# 8. Logging and Debugging - Already integrated using Python's logging module.

# 9. Utility Functions
def input_with_validation(prompt, type_=None, validation=None, error_msg='Invalid input'):
    """Validates and converts user input."""
    while True:
        try:
            value = input(prompt)
            if type_:
                value = type_(value)
            if validation and not validation(value):
                raise ValueError
            return value
        except ValueError:
            print(error_msg)

def save_model(model, tokenizer, output_dir):
    """Saves the model and tokenizer to the specified directory."""
    try:
        model.save_pretrained(output_dir)
        tokenizer.save_pretrained(output_dir)
    except Exception as e:
        logging.error(f"Error saving model and tokenizer: {e}")
        raise

# Custom Callback for Checkpointing during training
class CustomCheckpointCallback(trainer_utils.Callback):
    def __init__(self, model, output_dir, save_step=100, max_checkpoints=3):
        self.model = model
        self.output_dir = output_dir
        self.save_step = save_step
        self.max_checkpoints = max_checkpoints
        self.saved_checkpoints = []

    def on_step_end(self, args, state, control, **kwargs):
        if state.global_step % self.save_step == 0:
            checkpoint_dir = os.path.join(self.output_dir, f'checkpoint-{state.global_step}')
            self.model.save_pretrained(checkpoint_dir)
            self.saved_checkpoints.append(checkpoint_dir)

            # Remove older checkpoints
            if len(self.saved_checkpoints) > self.max_checkpoints:
                oldest_checkpoint = self.saved_checkpoints.pop(0)
                if os.path.isdir(oldest_checkpoint):
                    shutil.rmtree(oldest_checkpoint)

# Experience Replay
def experience_replay(previous_dataset, current_dataset, replay_rate=0.1):
    """
    This function takes a fraction of the previous dataset and adds it to the current training dataset.
    It helps in preventing catastrophic forgetting by retraining the model on a portion of the previous data.
    """
    replay_samples = previous_dataset.shuffle(seed=42).select(range(int(replay_rate * len(previous_dataset))))
    combined_dataset = concatenate_datasets([current_dataset, replay_samples])
    return combined_dataset.shuffle(seed=42)

# Main Script Execution
def main():
    try:
        # Read configuration from YAML file
        config_manager = ConfigManager('config.yaml')
        datasets_info = config_manager.get('datasets_info')

        # Authenticate with Hugging Face
        HfAuthenticator.authenticate()

        # Model and tokenizer initialization
        model_name = config_manager.get('model.name')
        model_initializer = ModelInitializer(model_name)
        model_initializer.initialize()

        lora_configurer = LoraConfigurer(model_initializer.model)
        trainer_wrapper = TrainerWrapper(model_initializer.model, model_initializer.tokenizer)

        previous_datasets = []
        replay_rate = 0.1  # Define replay rate

        for dataset_info in datasets_info:
            dataset_name = dataset_info['dataset_name']

            # Fetch dataset-specific configs
            lora_config = config_manager.get_dataset_config(dataset_name, 'lora.config')
            training_args = config_manager.get_dataset_config(dataset_name, 'training.args')
            optuna_config = config_manager.get_dataset_config(dataset_name, 'optuna')

            # Configure LoRA with dataset-specific settings
            lora_configurer.configure(lora_config)

            dataset_manager = DatasetManager(dataset_info, model_initializer.tokenizer)
            current_dataset = dataset_manager.load_and_prepare()

            # If Optuna optimization is enabled for the dataset
            if optuna_config and optuna_config['enable']:
                # Ensure proper use of HyperparameterOptimizer with correct arguments
                hyperparam_optimizer = HyperparameterOptimizer(
                    model_initializer.model,
                    model_initializer.tokenizer,
                    training_args,  # Pass the correct training arguments template
                    datasets_info   # Pass the datasets information
                )
                best_params = hyperparam_optimizer.optimize(training_args, optuna_config)
                # Ensure that best_params are correctly integrated into training_args
                for param, value in best_params.items():
                    training_args[param] = value


            # Experience Replay and Training
            if previous_datasets:
                current_dataset = experience_replay(concatenate_datasets(previous_datasets), current_dataset, replay_rate=replay_rate)
            train_dataset, test_dataset = train_test_split(current_dataset, test_size=0.2)
            trainer_wrapper.train(train_dataset, test_dataset, training_args)

            previous_datasets.append(current_dataset)

        # Uploading the trained model to Hugging Face for easy access and version control
        save_model(trainer_wrapper.model, model_initializer.tokenizer, training_args['output_dir'])
        if trainer_wrapper.trainer.is_world_process_zero():
            model_initializer.model.push_to_hub(f"{model_name}_trained_model")

    } except Exception as e {
        logging.error(f"An error occurred in the main script: {e}")
    }

if __name__ == "__main__":
    main()
