# LLM Fine-Tuning for Job Description Classification with SageMaker & MLflow

This notebook demonstrates fine-tuning Llama 3 for job description classification using SageMaker Pipelines and MLflow for experiment tracking. It assumes raw data has been pre-generated and uploaded to S3.

## 1. Setup and Dependencies

In [1]:
!pip install --ignore-installed --no-deps --no-cache-dir fsspec==2023.6.0

Looking in indexes: https://pypi.org/simple, https://pip.repos.neuron.amazonaws.com
Collecting fsspec==2023.6.0
  Downloading fsspec-2023.6.0-py3-none-any.whl.metadata (6.7 kB)
Downloading fsspec-2023.6.0-py3-none-any.whl (163 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m163.8/163.8 kB[0m [31m38.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: fsspec
Successfully installed fsspec-2023.6.0


In [2]:
!pip install sagemaker==2.225.0  datasets transformers>=4.40.0 mlflow peft>=0.9.0 sagemaker-mlflow --quiet

In [3]:
%load_ext autoreload
%autoreload 2

In [4]:
import sagemaker
import boto3
import os
import json
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.function_step import step
from sagemaker.workflow.parameters import ParameterString, ParameterInteger, ParameterFloat
from sagemaker.workflow.execution_variables import ExecutionVariables

import sys
notebook_dir = os.getcwd()
project_root = os.path.abspath(os.path.join(notebook_dir, '..'))
if project_root not in sys.path:
    print(f"Adding project root to sys.path: {project_root}")
    sys.path.insert(0, project_root)
else:
    print(f"Project root already in sys.path: {project_root}")

# These imports refer to the scripts in your 'steps/' directory.
from steps.preprocess_job_descriptions import preprocess_data
from steps.finetune_llama3_classifier import finetune_model # Assuming this is the main function in your finetune script
from steps.evaluation_classifier import evaluate_model

os.environ["SAGEMAKER_USER_CONFIG_OVERRIDE"] = os.getcwd()

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


INFO:datasets:PyTorch version 2.7.0+cu118 available.
INFO:datasets:TensorFlow version 2.12.1 available.
INFO:datasets:JAX version 0.4.20 available.


Adding project root to sys.path: /home/sagemaker-user/job-classification-sagemaker-mlflow


2025-05-29 21:18:46.369485: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE4.1 SSE4.2 AVX AVX2 AVX512F FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


ImportError: cannot import name 'EncoderDecoderCache' from 'transformers' (/opt/conda/lib/python3.10/site-packages/transformers/__init__.py)

## 2. SageMaker Session and IAM Role

In [None]:
try:
    role = sagemaker.get_execution_role()
    print(f"SageMaker Execution Role: {role}")
except ValueError:
    iam = boto3.client("iam")
    print("Could not automatically get SageMaker execution role. Please ensure it's configured or specify manually.")
    # Fallback: Replace with your specific role name if necessary
    # role_name = "YourSageMakerExecutionRoleName" 
    # role = iam.get_role(RoleName=role_name)["Role"]["Arn"]
    raise ValueError("SageMaker execution role not found. Please create one or ensure your environment is configured.")

sess = sagemaker.Session()
region = sess.boto_region_name
default_bucket = sess.default_bucket()
print(f"SageMaker Session region: {region}, bucket: {default_bucket}")

## 3. Configuration

In [None]:
pipeline_name = "JobDescClassification-Llama3-Pipeline-V4" 
base_job_prefix = "job-desc-classify" 

mlflow_tracking_server_arn = "arn:aws:sagemaker:your-region:your-aws-account-id:mlflow-tracking-server/your-tracking-server-name" # <--- REPLACE THIS
mlflow_experiment_name = "JobDescriptionClassification-Llama3-FineTuning"

model_id_default = "meta-llama/Meta-Llama-3-8B" 

processed_data_s3_prefix = f"{base_job_prefix}/processed_data/v2" # Example prefix for processed data

preprocess_instance_type = "ml.m5.large"
finetune_instance_type = "ml.g5.12xlarge" 
evaluation_instance_type = "ml.g5.2xlarge" 

if "your-region" in mlflow_tracking_server_arn:
    print("WARNING: MLflow Tracking Server ARN is a placeholder. Please replace it.")

## 4. Pipeline Parameters

In [None]:
param_raw_data_s3_uri = ParameterString(
    name="RawDatasetS3URI", 
    default_value=f"s3://{default_bucket}/raw_job_description_data/v1_translated/raw_jds_translated.jsonl" # Example default
)
param_job_desc_column = ParameterString(name="JobDescriptionColumn", default_value="job_description_text")
param_category_column = ParameterString(name="CategoryColumn", default_value="category_label")
param_test_split_fraction = ParameterFloat(name="TestSplitFraction", default_value=0.15)
param_validation_split_fraction = ParameterFloat(name="ValidationSplitFraction", default_value=0.15)
param_max_samples_per_split = ParameterInteger(name="MaxSamplesPerSplit", default_value=-1) # -1 means no limit, use 0 or None in script

param_model_id = ParameterString(name="ModelIdentifier", default_value=model_id_default)
param_finetune_epochs = ParameterInteger(name="FineTuneEpochs", default_value=1)
param_per_device_train_batch_size = ParameterInteger(name="PerDeviceTrainBatchSize", default_value=1)
param_lora_r = ParameterInteger(name="LoraR", default_value=8)
param_lora_alpha = ParameterInteger(name="LoraAlpha", default_value=16)
param_lora_dropout = ParameterFloat(name="LoraDropout", default_value=0.05)
param_learning_rate = ParameterFloat(name="LearningRate", default_value=0.0002)
param_merge_weights = ParameterString(name="MergeWeights", default_value="True") # Use String for boolean-like params
param_hf_token = ParameterString(name="HuggingFaceToken", default_value="OPTIONAL_HF_TOKEN_PLACEHOLDER")

param_eval_batch_size = ParameterInteger(name="EvaluationBatchSize", default_value=4)

## 5. Define Pipeline Steps using `@step` decorator

In [None]:
from steps.finetune_llama3_classifier import launch_hf_training_job

# A. Preprocessing Step
@step(
    name="PreprocessRawJobData",
    instance_type=preprocess_instance_type,
    keep_alive_period_in_seconds=300
)
def sm_pipeline_preprocess_step(
    raw_data_s3_identifier: str,
    output_s3_bucket_name: str,
    output_s3_prefix_val: str,
    jd_col: str,
    cat_col: str,
    test_frac: float,
    val_frac: float,
    max_samples: int, # Corresponds to max_samples_per_split
    mlflow_arn_tracking: str,
    mlflow_exp_name: str,
    pipeline_exec_id: str,
):
    # The imported 'preprocess_data' is from 'steps/preprocess_job_descriptions.py'
    max_samples_val = None if max_samples < 0 else max_samples # Convert -1 to None for the script
    s3_paths_and_run_id = preprocess_data(
        raw_dataset_identifier=raw_data_s3_identifier,
        s3_output_bucket=output_s3_bucket_name,
        s3_output_prefix=output_s3_prefix_val,
        job_desc_column=jd_col,
        category_column=cat_col,
        test_split_fraction=test_frac,
        validation_from_train_fraction=val_frac,
        max_samples_per_split=max_samples_val,
        mlflow_arn=mlflow_arn_tracking,
        experiment_name=mlflow_exp_name,
        run_name=pipeline_exec_id,
    )
    return s3_paths_and_run_id

# B. Fine-tuning Step
hf_pytorch_image_uri = sagemaker.image_uris.retrieve(
    "huggingface-pytorch-training",
    region=region,
    version="4.31.0", 
    py_version="py310",
    instance_type=finetune_instance_type, 
    image_scope="training"
)
print(f"Using HuggingFace PyTorch image for fine-tuning: {hf_pytorch_image_uri}")

@step(
    name="FineTuneLlama3Classifier",
    instance_type=finetune_instance_type,
    image_uri=hf_pytorch_image_uri, 
    keep_alive_period_in_seconds=3600 
)
def sm_pipeline_finetune_step(
    processed_data_info: dict, 
    model_identifier_str: str,
    epochs_ft: int,
    batch_size_ft: int,
    lora_r_val: int,
    lora_alpha_val: int,
    lora_dropout_val: float,
    lr_val: float,
    merge_w: str, # String 'True' or 'False'
    mlflow_arn_tracking: str,
    mlflow_exp_name: str,
    pipeline_exec_id: str,
    hf_auth_token: str
):
    # The imported 'finetune_model' is from 'steps/finetune_llama3_classifier.py'
    actual_hf_token = hf_auth_token if hf_auth_token and hf_auth_token != "OPTIONAL_HF_TOKEN_PLACEHOLDER" else os.environ.get("HF_TOKEN")
    merge_weights_bool = merge_w.lower() == 'true'
    
    # finetune_model script saves to /opt/ml/model by default.
    # The S3 path of this uploaded model artifact will be the output of this step.
    local_model_output_path = finetune_model(
        model_id=model_identifier_str,
        train_data_s3_path=processed_data_info['train'],
        eval_data_s3_path=processed_data_info['validation'],
        # output_dir is handled by the script, defaults to /opt/ml/model
        epochs=epochs_ft,
        per_device_train_batch_size=batch_size_ft,
        learning_rate=lr_val,
        lora_r=lora_r_val,
        lora_alpha=lora_alpha_val,
        lora_dropout=lora_dropout_val,
        merge_weights=merge_weights_bool,
        hf_token=actual_hf_token,
        mlflow_arn=mlflow_arn_tracking,
        experiment_name=mlflow_exp_name,
        run_id=pipeline_exec_id 
    )
    # The @step decorator uploads /opt/ml/model. We return a dict for clarity for the next step.
    return {"model_s3_path_implicit": "s3_path_managed_by_sagemaker_for_opt_ml_model", "mlflow_run_id": pipeline_exec_id} 

# C. Evaluation Step
@step(
    name="EvaluateFineTunedClassifier",
    instance_type=evaluation_instance_type,
    image_uri=hf_pytorch_image_uri,
    keep_alive_period_in_seconds=600
)
def sm_pipeline_evaluate_step(
    finetune_step_output: dict, # Contains mlflow_run_id for constructing MLflow model URI
    processed_data_info: dict, # Contains 'test' and 'categories_s3_path'
    eval_bs: int, 
    mlflow_arn_tracking: str,
    mlflow_exp_name: str,
    pipeline_exec_id: str # This is the overall pipeline execution ID
):
    # The imported 'evaluate_model' is from 'steps/evaluation_classifier.py'
    # Construct MLflow model URI from the pipeline_exec_id (which was used as run_id for finetuning)
    # Assuming finetune_model logged model as 'fine_tuned_classifier_model' artifact
    mlflow_model_uri_to_load = f"runs:/{pipeline_exec_id}/fine_tuned_classifier_model" 
    
    eval_results = evaluate_model(
        model_s3_path_or_mlflow_uri=mlflow_model_uri_to_load, 
        test_data_s3_path=processed_data_info['test'],
        poc_categories_s3_path=processed_data_info['categories_s3_path'],
        batch_size=eval_bs,
        mlflow_arn=mlflow_arn_tracking,
        experiment_name=mlflow_exp_name,
        run_id=pipeline_exec_id # Evaluate logs under the same parent run_id
    )
    return eval_results

## 6. Construct the Pipeline

In [None]:
preprocess_outputs = sm_pipeline_preprocess_step(
    raw_data_s3_identifier=param_raw_data_s3_uri,
    output_s3_bucket_name=default_bucket, 
    output_s3_prefix_val=processed_data_s3_prefix,
    jd_col=param_job_desc_column,
    cat_col=param_category_column,
    test_frac=param_test_split_fraction,
    val_frac=param_validation_split_fraction,
    max_samples=param_max_samples_per_split, 
    mlflow_arn_tracking=mlflow_tracking_server_arn, 
    mlflow_exp_name=mlflow_experiment_name, 
    pipeline_exec_id=ExecutionVariables.PIPELINE_EXECUTION_ID
)

finetune_outputs = sm_pipeline_finetune_step(
    processed_data_info=preprocess_outputs, 
    model_identifier_str=param_model_id, 
    epochs_ft=param_finetune_epochs,
    batch_size_ft=param_per_device_train_batch_size,
    lora_r_val=param_lora_r,
    lora_alpha_val=param_lora_alpha,
    lora_dropout_val=param_lora_dropout,
    lr_val=param_learning_rate,
    merge_w=param_merge_weights,
    mlflow_arn_tracking=mlflow_tracking_server_arn,
    mlflow_exp_name=mlflow_experiment_name,
    pipeline_exec_id=ExecutionVariables.PIPELINE_EXECUTION_ID,
    hf_auth_token=param_hf_token
)

evaluate_outputs = sm_pipeline_evaluate_step(
    finetune_step_output=finetune_outputs, 
    processed_data_info=preprocess_outputs, 
    eval_bs=param_eval_batch_size,
    mlflow_arn_tracking=mlflow_tracking_server_arn,
    mlflow_exp_name=mlflow_experiment_name,
    pipeline_exec_id=ExecutionVariables.PIPELINE_EXECUTION_ID
)

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        param_raw_data_s3_uri,
        param_job_desc_column,
        param_category_column,
        param_test_split_fraction,
        param_validation_split_fraction,
        param_max_samples_per_split,
        param_model_id,
        param_finetune_epochs,
        param_per_device_train_batch_size,
        param_lora_r,
        param_lora_alpha,
        param_lora_dropout,
        param_learning_rate,
        param_merge_weights,
        param_hf_token,
        param_eval_batch_size
    ],
    steps=[evaluate_outputs],
    sagemaker_session=sess
)

## 7. Upsert and Execute Pipeline

In [None]:
if "your-region" in mlflow_tracking_server_arn:
    print("ERROR: MLflow Tracking Server ARN is a placeholder. Update it in cell [3].")
elif "raw_job_description_data/v1_translated/raw_jds_translated.jsonl" in param_raw_data_s3_uri.default_value:
    print(f"INFO: Using default RawDatasetS3URI: '{param_raw_data_s3_uri.default_value}'.")
    print("       Ensure this S3 URI points to your generated raw dataset or override it when starting the pipeline.")
    print("       Run 'generate_and_upload_raw_data.py' script if you haven't already.")

print("\nUpserting the pipeline...")
try:
    pipeline.upsert(role_arn=role)
    print(f"Pipeline '{pipeline_name}' upserted successfully.")

    print("\nStarting pipeline execution with default parameters...")
    # To override parameters:
    # execution = pipeline.start(
    #     parameters={
    #         "RawDatasetS3URI": "s3://your-bucket/your-actual-raw-data.jsonl",
    #         "FineTuneEpochs": 2
    #     }
    # )
    execution = pipeline.start()
    
    print(f"Pipeline execution started with ARN: {execution.arn}")
    execution.describe()
except Exception as e:
    print(f"An error occurred during pipeline upsert or start: {e}")

## 8. Clean up (Optional)

In [None]:
# To delete the pipeline definition from SageMaker:
# try:
#     pipeline.delete()
#     print(f"Pipeline '{pipeline_name}' deleted.")
# except Exception as e:
#     print(f"Error deleting pipeline '{pipeline_name}': {e}")