In [None]:
import sagemaker
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.tuner import IntegerParameter, ContinuousParameter, CategoricalParameter, HyperparameterTuner
from sagemaker import get_execution_role
import os

print("--- ‚öôÔ∏è Step 1: Initializing SageMaker Environment ---")

# 1. Get execution role (IAM Role)
# If running locally in VS Code, the Role ARN may need to be specified manually
try:
    role = get_execution_role()
except ValueError:
    print("‚ö†Ô∏è Warning: Could not automatically retrieve role, please manually enter ARN.")
    # Please go to AWS Console -> IAM -> Roles to find a role similar to AmazonSageMaker-ExecutionRole
    role = "arn:aws:iam::137568342316:role/SageMakerExecutionRole" 

# 2. Basic configuration
sagemaker_session = sagemaker.Session()
bucket_name = 'sleep-disorder-mlops-bucket' # Your S3 bucket name

# 3. Specify code source (Source of Truth for Code)
# SageMaker will automatically `git clone` this repository onto the training instance
git_repo = 'https://github.com/vinnie071015/sleeping-disorder-mlops.git' # Replace with your repository address
git_config = {
    'repo': git_repo, 
    'branch': 'main'
}

print(f"‚úÖ Role: {role}")
print(f"‚úÖ Bucket: {bucket_name}")
print(f"‚úÖ Git Repo: {git_repo}")


print("\n--- üíæ Step 2: Defining S3 Data Input ---")

# Define S3 data input
# SageMaker will automatically download all files under this S3 path to /opt/ml/input/data/train/ inside the container
# The s3_data here must point to the folder path containing sleep_data.csv (ending with /)
s3_input_path = f's3://{bucket_name}/raw_data/'

train_input = sagemaker.inputs.TrainingInput(
    s3_data=s3_input_path, 
    content_type='text/csv'
)

print(f"‚úÖ Training Data Source: {s3_input_path}")


print("\n--- üèóÔ∏è Step 3: Defining Common Estimator Configuration (Git Mode) ---")

# Use ml.m5.large (General Purpose)
instance_type = 'ml.m5.large'

# ‚ö†Ô∏è Critical fix: Define S3 model output path, force use of your custom bucket
model_output_s3_path = f's3://{bucket_name}/sagemaker-tuning-output/' 
print(f"‚úÖ Model Artifacts Output Path (FIXED): {model_output_s3_path}")


common_estimator_args = {
    # Fix 1: Entry file path must include src (because source_dir became the root)
    'entry_point': 'src/train.py',
    
    # Fix 2: source_dir is set to '.' (representing the root of the Git repository)
    'source_dir': '.',
    
    'role': role,
    'instance_count': 1,
    'instance_type': instance_type,
    'framework_version': '1.2-1',
    'py_version': 'py3',
    'git_config': git_config,
    'sagemaker_session': sagemaker_session,
    
    # === Critical fix: Add output_path parameter ===
    'output_path': model_output_s3_path,
    'environment': {
        'WANDB_API_KEY': "0f759a15e3c54016f3f727c9720f0c9206fdd5c1",  # The container will automatically read this variable for login
        'WANDB_PROJECT': 'sleep-disorder-mlops', # Optional: specify project name
        'WANDB_WATCH': 'false' # Optional: turn off unnecessary model monitoring to speed up
    }
    # ========================================
}

print(f"‚úÖ Instance Type: {instance_type}")
print(f"‚úÖ Path Correction: source_dir='.', entry_point='src/train.py'")


print("\n--- üéõÔ∏è Step 4: Defining Tuners for 3 Models ---")

# Define metric capture rules (corresponds to the print statements in src/train.py)
metric_definitions = [
    {'Name': 'accuracy', 'Regex': '‚úÖ Accuracy: ([0-9\\.]+)'},
    {'Name': 'f1', 'Regex': '‚úÖ F1 Score: ([0-9\\.]+)'}
]

# ==========================================
# A. Random Forest Tuner
# ==========================================
# Note: The common_estimator_args used here already includes output_path
rf_estimator = SKLearn(**common_estimator_args) 
# Fix model type to RF
rf_estimator.set_hyperparameters(model_type='random_forest')

rf_tuner = HyperparameterTuner(
    estimator=rf_estimator,
    objective_metric_name='accuracy',
    metric_definitions=metric_definitions,
    hyperparameter_ranges={
        'n_estimators': IntegerParameter(50, 150),
        'max_depth': IntegerParameter(5, 15)
    },
    max_jobs=2,          # Run 2 jobs in total (for cost saving)
    max_parallel_jobs=1, # Run sequentially (for safety)
    base_tuning_job_name='rf-tuning'
)

# ==========================================
# B. SVM Tuner
# ==========================================
svm_estimator = SKLearn(**common_estimator_args)
svm_estimator.set_hyperparameters(model_type='svm')

svm_tuner = HyperparameterTuner(
    estimator=svm_estimator,
    objective_metric_name='accuracy',
    metric_definitions=metric_definitions,
    hyperparameter_ranges={
        'C': ContinuousParameter(0.1, 5.0),
        'kernel': CategoricalParameter(['rbf', 'linear'])
    },
    max_jobs=2,
    max_parallel_jobs=1,
    base_tuning_job_name='svm-tuning'
)

# ==========================================
# C. Logistic Regression Tuner
# ==========================================
lr_estimator = SKLearn(**common_estimator_args)
lr_estimator.set_hyperparameters(model_type='logistic_regression')

lr_tuner = HyperparameterTuner(
    estimator=lr_estimator,
    objective_metric_name='accuracy',
    metric_definitions=metric_definitions,
    hyperparameter_ranges={
        'C': ContinuousParameter(0.1, 5.0)
    },
    max_jobs=2,
    max_parallel_jobs=1,
    base_tuning_job_name='lr-tuning'
)

print("‚úÖ Tuners for RF, SVM, and LR are ready.")

In [None]:
import boto3
import time
from datetime import datetime
import sagemaker

# ==========================================
# üõ†Ô∏è Helper function definitions (Bug fixed)
# ==========================================

def get_tuning_job_status(tuner_obj):
    """Get the status of the current Job from the Tuner"""
    try:
        job_name = tuner_obj.latest_tuning_job.name
        response = boto3.client('sagemaker').describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=job_name)
        return response['HyperParameterTuningJobStatus']
    except Exception:
        return "Initializing"

def fetch_latest_error_log(bucket_name, prefix="debug_logs/"):
    """[Call on failure] Retrieve the latest debug log generated from S3 and print it"""
    print(f"\nüîç [Automatic Diagnosis] Failure detected, retrieving latest log from s3://{bucket_name}/{prefix}...")
    s3 = boto3.client('s3')
    try:
        response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
        if 'Contents' not in response:
            print("‚ö†Ô∏è No log file found in S3.")
            return
        files = sorted(response['Contents'], key=lambda x: x['LastModified'])
        latest_file = files[-1]
        key = latest_file['Key']
        print(f"üìÑ Latest log found: {key} (Time: {latest_file['LastModified']})")
        print("="*60)
        file_obj = s3.get_object(Bucket=bucket_name, Key=key)
        print(file_obj['Body'].read().decode('utf-8')) 
        print("="*60)
    except Exception as e:
        print(f"‚ùå Failed to retrieve log: {e}")

def print_best_model_info(tuner_obj, model_name):
    """
    [Call on success] Retrieve and print the S3 path of the best model
    (Fixed the issue with 'str' object has no attribute 'describe')
    """
    try:
        # 1. Get the name of the best Job
        best_job_name = tuner_obj.best_training_job()
        
        if not best_job_name:
            print(f"‚ö†Ô∏è {model_name}: Task showed completion, but no best Training Job was found.")
            return

        # 2. Use boto3 to query Job details directly (This is the most robust method)
        sm_client = boto3.client('sagemaker')
        desc = sm_client.describe_training_job(TrainingJobName=best_job_name)
        
        # 3. Extract metrics
        metrics = desc.get('FinalMetricDataList', [])
        # Find the Accuracy metric
        score = "N/A"
        for m in metrics:
            if m['MetricName'] == 'accuracy':
                score = m['Value']
                break
        
        # 4. Extract S3 path
        s3_uri = desc['ModelArtifacts']['S3ModelArtifacts']
        
        print(f"\nüèÜ {model_name} Training Completed!")
        print(f"   ‚úÖ Best Accuracy: {score}")
        print(f"   üíæ Best Model Save Location: {s3_uri}")
        
    except Exception as e:
        print(f"‚ö†Ô∏è Failed to get {model_name} model information: {e}")

# ==========================================
# üöÄ Main flow: Parallel launch
# ==========================================

print("\n--- üöÄ Step 5: Launching All Tuning Jobs in Parallel (Smart Monitor V2) ---")
print(f"üïí Start Time: {datetime.now().strftime('%H:%M:%S')}\n")

# ‚ö†Ô∏è If you don't want to retrain, please comment out the following three fit lines
# ‚ö†Ô∏è Run the monitoring loop below directly, it will capture the status of the tasks that have already finished
rf_tuner.fit({'train': train_input}, wait=False) 
svm_tuner.fit({'train': train_input}, wait=False)
lr_tuner.fit({'train': train_input}, wait=False)

print("‚úÖ All tasks are running in parallel in the cloud (or are already running)! Starting smart monitoring panel...\n")
print("-" * 90)
print(f"{'Time':<10} | {'Random Forest':<20} | {'SVM':<20} | {'Logistic Regression':<20}")
print("-" * 90)

# ==========================================
# üïµÔ∏è Real-time monitoring loop (Includes result handling)
# ==========================================

processed_jobs = {'RF': False, 'SVM': False, 'LR': False}

try:
    while True:
        # 1. Get latest status
        rf_st = get_tuning_job_status(rf_tuner)
        svm_st = get_tuning_job_status(svm_tuner)
        lr_st = get_tuning_job_status(lr_tuner)
        
        current_time = datetime.now().strftime('%H:%M:%S')
        print(f"{current_time:<10} | {rf_st:<20} | {svm_st:<20} | {lr_st:<20}")
        
        # 2. Check RF results
        if rf_st in ['Completed', 'Failed', 'Stopped'] and not processed_jobs['RF']:
            if rf_st == 'Completed':
                print_best_model_info(rf_tuner, "Random Forest")
            elif rf_st == 'Failed':
                print(f"\n‚ùå Random Forest Task Failed!")
                fetch_latest_error_log(LOG_BUCKET_NAME) # Use the bucket name variable defined earlier
            processed_jobs['RF'] = True 

        # 3. Check SVM results
        if svm_st in ['Completed', 'Failed', 'Stopped'] and not processed_jobs['SVM']:
            if svm_st == 'Completed':
                print_best_model_info(svm_tuner, "SVM")
            elif svm_st == 'Failed':
                print(f"\n‚ùå SVM Task Failed!")
                fetch_latest_error_log(LOG_BUCKET_NAME)
            processed_jobs['SVM'] = True

        # 4. Check LR results
        if lr_st in ['Completed', 'Failed', 'Stopped'] and not processed_jobs['LR']:
            if lr_st == 'Completed':
                print_best_model_info(lr_tuner, "Logistic Regression")
            elif lr_st == 'Failed':
                print(f"\n‚ùå Logistic Regression Task Failed!")
                fetch_latest_error_log(LOG_BUCKET_NAME)
            processed_jobs['LR'] = True

        # 5. Exit condition
        if all(processed_jobs.values()):
            print("-" * 90)
            print("\nüéâ All hyperparameter tuning tasks have finished!")
            break
            
        time.sleep(30) 

except KeyboardInterrupt:
    print("\n‚ö†Ô∏è Monitoring manually stopped.")

In [None]:
import boto3

def print_best_model_info_fixed(tuner_obj, model_name):
    try:
        # 1. Get the name of the best Job (This is a string)
        best_job_name = tuner_obj.best_training_job()
        
        if not best_job_name:
            print(f"‚ö†Ô∏è {model_name}: No best Training Job found (The task may not be completed or all failed).")
            return

        # 2. Use boto3 to query details (Key fix)
        sm_client = boto3.client('sagemaker')
        desc = sm_client.describe_training_job(TrainingJobName=best_job_name)
        
        # 3. Extract accuracy
        metrics = desc.get('FinalMetricDataList', [])
        score = "N/A"
        for m in metrics:
            if m['MetricName'] == 'accuracy': # Ensure this matches the metric name you defined
                score = m['Value']
                break
        
        # 4. Extract S3 Model Path
        s3_uri = desc['ModelArtifacts']['S3ModelArtifacts']
        
        print(f"\nüèÜ {model_name} Result Report")
        print(f"   üîπ Best Job Name: {best_job_name}")
        print(f"   ‚úÖ Best Accuracy: {score}")
        print(f"   üíæ Model Download Link: {s3_uri}")
        
    except Exception as e:
        print(f"‚ùå Failed to retrieve {model_name} information: {e}")

# ==========================================
# Run Remedial Report
# ==========================================
print("--- üìä Final Scorecard (Extracted from Completed Jobs) ---")

# As long as your tuner object is still in memory, this code will work
print_best_model_info_fixed(rf_tuner, "Random Forest")
print_best_model_info_fixed(svm_tuner, "SVM")
print_best_model_info_fixed(lr_tuner, "Logistic Regression")

In [None]:
import boto3
import sagemaker
import os
import joblib
import tarfile
import shutil

# Set the champion model for in-depth analysis (SVM is chosen here because it has the highest accuracy)
BEST_JOB_NAME = "svm-tuning-251203-2103-002-c8ccdacc" 
MODEL_S3_URI = "s3://sleep-disorder-mlops-bucket/sagemaker-tuning-output/svm-tuning-251203-2103-002-c8ccdacc/output/model.tar.gz"

def analyze_best_job(job_name):
    print(f"\n--- üïµÔ∏è‚Äç‚ôÄÔ∏è Analyzing Champion Job: {job_name} ---")
    
    client = boto3.client('sagemaker')
    desc = client.describe_training_job(TrainingJobName=job_name)
    
    # 1. Extract hyperparameters
    print("\n‚úÖ Best Hyperparameter Configuration (Secret Sauce):")
    hps = desc['HyperParameters']
    for k, v in hps.items():
        # Clean up extra quotes for readability
        clean_v = v.replace('"', '')
        print(f"   - {k}: {clean_v}")
        
    # 2. Extract environment used
    image_uri = desc['AlgorithmSpecification']['TrainingImage']
    print(f"\n‚úÖ Training Image: {image_uri}")
    
    return desc

def download_and_test_model(s3_uri):
    print(f"\n--- ‚¨áÔ∏è Downloading Model for Local Verification ---")
    
    local_tar = "best_model.tar.gz"
    extract_dir = "./best_model_extracted"
    
    # Clean up old files
    if os.path.exists(extract_dir):
        shutil.rmtree(extract_dir)
    
    # 1. Download
    sagemaker.s3.S3Downloader.download(s3_uri, ".")
    print(f"‚úÖ Model compressed package downloaded: {local_tar}")
    
    # Rename the downloaded file (S3Downloader usually keeps the original name, but here we ensure the names match)
    # Note: sagemaker download downloads to the current directory, the file name is model.tar.gz
    # If model.tar.gz already exists in the current directory, it will be overwritten
    
    # 2. Extract
    with tarfile.open("model.tar.gz", "r:gz") as tar:
        tar.extractall(path=extract_dir)
        print(f"‚úÖ Extraction complete, directory contents: {os.listdir(extract_dir)}")
        
    # 3. Load Model (Sanity Check)
    try:
        model_path = os.path.join(extract_dir, "model.joblib")
        pipeline = joblib.load(model_path)
        print("\nüéâ Model loaded successfully!")
        print(f"   - Pipeline Structure: {pipeline}")
        
        # Print the specific parameters of the SVM for double confirmation
        if 'classifier' in pipeline.named_steps:
            clf = pipeline.named_steps['classifier']
            print(f"   - Classifier Parameters: {clf.get_params()}")
            
    except Exception as e:
        print(f"‚ùå Model loading failed: {e}")

if __name__ == "__main__":
    # 1. Analyze parameters
    analyze_best_job(BEST_JOB_NAME)
    
    # 2. Download and test
    download_and_test_model(MODEL_S3_URI)