In [3]:
import os
import time
import boto3
import numpy as np
import pandas as pd
import sagemaker
from sagemaker import get_execution_role
from sagemaker.workflow.pipeline_context import PipelineSession
import json
from sagemaker import ModelPackage
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.estimator import Estimator
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.processing import ScriptProcessor
from sagemaker import get_execution_role

In [4]:
sess = boto3.Session()
sm = sess.client("sagemaker")
role = get_execution_role()
sagemaker_session = sagemaker.Session(boto_session=sess)
bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name

pipeline_session = PipelineSession()

print(bucket)

sagemaker-us-east-1-412510750633


In [5]:
GROUP_NAME = 'GROUP6' # CHANGE THIS TO YOUR FIRST NAME
DATA_PREFIX = f'group6/{GROUP_NAME}/data/' # S3 prefix to store data
MODEL_OUTPUT_S3_PATH = f's3://{bucket}/group6/{GROUP_NAME}/model/' # S3 prefix to store the XGBoost training information and model.

BASE_JOB_PROCESSING_NAME = f'{GROUP_NAME}-processing'  # base_job_name for preprocessing
BASE_JOB_TRAINING_NAME = f'{GROUP_NAME}-training'  # base_job_name for training
BASE_JOB_EVALUATION_NAME = f'{GROUP_NAME}-evaluation'  # base_job_name for evaluation

PIPELINE_NAME = f'{GROUP_NAME}-pipeline'  # SageMaker Pipeline name
MODEL_PACKAGE_GROUP_NAME = f'{GROUP_NAME}-ModelPackageGroup'  # Model package group name in the Model Registry

print(f'DATA_PREFIX: {DATA_PREFIX}')
print(f'PIPELINE_NAME: {PIPELINE_NAME}')
print(f'MODEL_PACKAGE_GROUP_NAME: {MODEL_PACKAGE_GROUP_NAME}')

DATA_PREFIX: group6/GROUP6/data/
PIPELINE_NAME: GROUP6-pipeline
MODEL_PACKAGE_GROUP_NAME: GROUP6-ModelPackageGroup


In [6]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString, ParameterFloat

# Define initial parameters
input_data_path1 = f'{DATA_PREFIX}/US_flights_2023.csv'
input_data_path2 = f'{DATA_PREFIX}/airports_geolocation.csv'
input_data_path3 = f'{DATA_PREFIX}/weather_meteo_by_airport.csv'

# raw input data
input_data1 = ParameterString(name="InputData1", default_value=input_data_path1)
input_data2 = ParameterString(name="InputData2", default_value=input_data_path2)
input_data3 = ParameterString(name="InputData3", default_value=input_data_path3)

# status of newly trained model in registry
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="Approved")

# processing step parameters
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.m5.xlarge"
)
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

# training step parameters
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
training_epochs = ParameterString(name="TrainingEpochs", default_value="100")

# model performance step parameters
accuracy_mse_threshold = ParameterFloat(name="AccuracyMseThreshold", default_value=0.75)

In [7]:
s3_bucket_path = 's3://' + sagemaker.Session().default_bucket() + '/'

s3_bucket_path

's3://sagemaker-us-east-1-412510750633/'

In [8]:
# Specify the local paths to your files and the S3 prefix (directory) to upload to
files = ['US_flights_2023.csv', 'airports_geolocation.csv', 'weather_meteo_by_airport.csv']

# Upload files to S3
for file in files:
    sagemaker_session.upload_data(path=file, key_prefix=DATA_PREFIX)
    
# Specify the paths to your uploaded files
file_paths = [f'{DATA_PREFIX}/US_flights_2023.csv', f'{DATA_PREFIX}/airports_geolocation.csv', f'{DATA_PREFIX}/weather_meteo_by_airport.csv']

In [8]:
!mkdir code

mkdir: cannot create directory ‘code’: File exists


In [9]:
%%writefile code/preprocess.py
import numpy as np
import pandas as pd
import os
import joblib
from io import StringIO
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.compose import ColumnTransformer
import tarfile
import logging

# Set up logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

try:
    from sagemaker_containers.beta.framework import (
        content_types,
        encoders,
        env,
        modules,
        transformer,
        worker,
        server,
    )
except ImportError:
    pass

RANDOM_STATE = 2024
LABEL_COLUMN = 'Dep_Delay'
feature_columns = ['Day_Of_Week', 'Airline', 'Dep_Airport', 'Dep_CityName', 'DepTime_label', 'Distance_type', 'Manufacturer', 'Model', 'Aicraft_age',
                   'STATE', 'LATITUDE', 'LONGITUDE', 'tavg', 'tmin', 'tmax', 'prcp', 'snow', 'wdir', 'wspd', 'pres', 'FlightMonth']

one_hot_columns = ['Day_Of_Week', 'Airline', 'Dep_Airport', 'Dep_CityName', 'DepTime_label', 'Distance_type', 'Manufacturer', 'Model', 'STATE', 'FlightMonth']
non_one_hot_columns = ['Aicraft_age', 'LATITUDE', 'LONGITUDE', 'tavg', 'tmin', 'tmax', 'prcp', 'snow', 'wdir', 'wspd', 'pres']

base_dir = "/opt/ml/processing"
base_output_dir = "/opt/ml/output/"

if __name__ == "__main__":
    logger.debug("Starting preprocessing script")

    # Define the input data path within the processing environment
    input_data_path1 = f'{base_dir}/input/US_flights_2023.csv'
    input_data_path2 = f'{base_dir}/input/airports_geolocation.csv'
    input_data_path3 = f'{base_dir}/input/weather_meteo_by_airport.csv'
    
    # Read the CSV file from the input path
    logger.debug("Reading input data")
    df1 = pd.read_csv(input_data_path1)
    df2 = pd.read_csv(input_data_path2)
    df3 = pd.read_csv(input_data_path3)
    
    # Sample the data to reduce size (e.g., take a 2% sample)
    logger.debug("Sampling data to reduce size")
    df1_sample = df1.sample(frac=0.02, random_state=RANDOM_STATE)

    # Merge sampled data
    logger.debug("Merging dataframes")
    merged_df = pd.merge(df1_sample, df2, left_on='Dep_Airport', right_on='IATA_CODE', how='left')
    
    merged_df['FlightDate'] = pd.to_datetime(merged_df['FlightDate'])
    df3['time'] = pd.to_datetime(df3['time'])
    merged_df['FlightMonth'] = merged_df['FlightDate'].dt.month
    
    df = pd.merge(merged_df, df3, left_on=['Dep_Airport', 'FlightDate'], right_on=['airport_id', 'time'], how='left')

    # Include only relevant columns in feature_data
    feature_data = df[feature_columns]
    label_data = df[LABEL_COLUMN]

    preprocessor = ColumnTransformer(
        transformers=[
            ('onehot', OneHotEncoder(), one_hot_columns),
            ('scaler', StandardScaler(), non_one_hot_columns)
        ],
        remainder='passthrough'
    )

    logger.debug("Applying transformations to the data")
    feature_data_transformed = preprocessor.fit_transform(feature_data)

    # Ensure transformed features have correct shape
    logger.debug(f"Transformed feature shape: {feature_data_transformed.shape}")

    logger.debug("Splitting data into train, validation, and test sets")
    x_train, x_temp, y_train, y_temp = train_test_split(feature_data_transformed, label_data, test_size=0.2, random_state=42)
    x_val, x_test, y_val, y_test = train_test_split(x_temp, y_temp, test_size=0.5, random_state=42)

    # Ensure y arrays are 2D
    y_train = y_train.values.reshape(-1, 1)
    y_val = y_val.values.reshape(-1, 1)
    y_test = y_test.values.reshape(-1, 1)

    # Verify the shapes
    logger.debug(f"x_train shape: {x_train.shape}, y_train shape: {y_train.shape}")
    logger.debug(f"x_val shape: {x_val.shape}, y_val shape: {y_val.shape}")
    logger.debug(f"x_test shape: {x_test.shape}, y_test shape: {y_test.shape}")

    # Print a few rows of each to verify
    logger.debug(f"First 5 rows of x_train: {x_train[:5]}")
    logger.debug(f"First 5 rows of y_train: {y_train[:5]}")
    
    logger.debug("Combining features and labels")
    train_dataset = pd.DataFrame(np.hstack((y_train, x_train.toarray())))
    val_dataset = pd.DataFrame(np.hstack((y_val, x_val.toarray())))
    test_dataset = pd.DataFrame(np.hstack((y_test, x_test.toarray())))

    # Get feature columns after transformation
    transformed_feature_columns = preprocessor.get_feature_names_out()

    train_dataset.columns = [LABEL_COLUMN] + list(transformed_feature_columns)
    val_dataset.columns = [LABEL_COLUMN] + list(transformed_feature_columns)
    test_dataset.columns = [LABEL_COLUMN] + list(transformed_feature_columns)
    
    logger.debug("Creating output directories if they don't exist")
    os.makedirs('/opt/ml/processing/train', exist_ok=True)
    os.makedirs('/opt/ml/processing/validation', exist_ok=True)
    os.makedirs('/opt/ml/processing/test', exist_ok=True)
    os.makedirs('/opt/ml/processing/scaler_model', exist_ok=True)

    logger.debug("Saving datasets to CSV")
    train_dataset.to_csv(f'/opt/ml/processing/train/train.csv', header=True, index=False)
    val_dataset.to_csv(f'/opt/ml/processing/validation/validation.csv', header=True, index=False)
    test_dataset.to_csv(f'/opt/ml/processing/test/test.csv', header=True, index=False)

    logger.debug("Saving preprocessor model")
    joblib.dump(preprocessor, f'/opt/ml/processing/scaler_model/preprocessor.joblib')
    
    with tarfile.open(f'/opt/ml/processing/scaler_model/preprocessor.tar.gz', 'w:gz') as tar_handle:
        tar_handle.add(f'/opt/ml/processing/scaler_model/preprocessor.joblib', arcname='preprocessor.joblib')

    logger.debug("Preprocessing script completed successfully")

def input_fn(input_data, content_type):
    if content_type == "text/csv":
        df = pd.read_csv(StringIO(input_data), header=None)
        df.columns = transformed_feature_columns if len(df.columns) == len(transformed_feature_columns) else transformed_feature_columns + [LABEL_COLUMN]
        return df
    else:
        raise ValueError("{} not supported by script!".format(content_type))

def output_fn(prediction, accept):
    if accept == "application/json":
        instances = [row.tolist() for row in prediction]
        json_output = {"instances": instances}
        return worker.Response(json.dumps(json_output), mimetype=accept)
    elif accept == "text/csv":
        return worker.Response(encoders.encode(prediction, accept), mimetype=accept)
    else:
        print(f"Warning: {accept} accept type is not supported by this script. Defaulting to text/csv.")
        return worker.Response(encoders.encode(prediction, "text/csv"), mimetype="text/csv")

def predict_fn(input_data, model):
    features = model.transform(input_data)
    if LABEL_COLUMN in input_data:
        return np.insert(features, 0, input_data[LABEL_COLUMN], axis=1)
    else:
        return features

def model_fn(model_dir):
    preprocessor = joblib.load(os.path.join(model_dir, "preprocessor.joblib"))
    return preprocessor


Overwriting code/preprocess.py


In [10]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor = SKLearnProcessor(
    framework_version='1.2-1',
    role=role,
    instance_type='ml.c5.xlarge',
    instance_count=1 
)

sklearn_processor.run(
    code="code/preprocess.py",
    inputs=[
        ProcessingInput(
            source=s3_bucket_path + DATA_PREFIX,
            destination="/opt/ml/processing/input"
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="scaler_model",
            source="/opt/ml/processing/scaler_model",
            destination=MODEL_OUTPUT_S3_PATH
        ),
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/train",
            destination=s3_bucket_path + DATA_PREFIX
        ),
        ProcessingOutput(
            output_name="validation",
            source="/opt/ml/processing/validation",
            destination=s3_bucket_path + DATA_PREFIX
        ),
        ProcessingOutput(
            output_name="test",
            source="/opt/ml/processing/test",
            destination=s3_bucket_path + DATA_PREFIX
        ),
    ],
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker:Creating processing-job with name sagemaker-scikit-learn-2024-06-18-16-31-58-144


.............[34mINFO:sagemaker-containers:No GPUs detected (normal if no gpus installed)[0m
[34mDEBUG:__main__:Starting preprocessing script[0m
[34mDEBUG:__main__:Reading input data[0m
[34mDEBUG:__main__:Sampling data to reduce size[0m
[34mDEBUG:__main__:Merging dataframes[0m
[34mDEBUG:__main__:Applying transformations to the data[0m
[34mDEBUG:__main__:Transformed feature shape: (134868, 813)[0m
[34mDEBUG:__main__:Splitting data into train, validation, and test sets[0m
[34mDEBUG:__main__:x_train shape: (107894, 813), y_train shape: (107894, 1)[0m
[34mDEBUG:__main__:x_val shape: (13487, 813), y_val shape: (13487, 1)[0m
[34mDEBUG:__main__:x_test shape: (13487, 813), y_test shape: (13487, 1)[0m
[34mDEBUG:__main__:First 5 rows of x_train:   (0, 2)#0111.0
  (0, 18)#0111.0
  (0, 120)#0111.0
  (0, 452)#0111.0
  (0, 706)#0111.0
  (0, 712)#0111.0
  (0, 715)#0111.0
  (0, 735)#0111.0
  (0, 758)#0111.0
  (0, 790)#0111.0
  (0, 802)#0111.3382639788382422
  (0, 803)#0110.93544

In [23]:
sklearn_processor.jobs[0].describe()

{'ProcessingInputs': [{'InputName': 'input-1',
   'AppManaged': False,
   'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-412510750633/group6/GROUP6/data/',
    'LocalPath': '/opt/ml/processing/input',
    'S3DataType': 'S3Prefix',
    'S3InputMode': 'File',
    'S3DataDistributionType': 'FullyReplicated',
    'S3CompressionType': 'None'}},
  {'InputName': 'code',
   'AppManaged': False,
   'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-412510750633/sagemaker-scikit-learn-2024-06-17-02-29-32-754/input/code/preprocess.py',
    'LocalPath': '/opt/ml/processing/input/code',
    'S3DataType': 'S3Prefix',
    'S3InputMode': 'File',
    'S3DataDistributionType': 'FullyReplicated',
    'S3CompressionType': 'None'}}],
 'ProcessingOutputConfig': {'Outputs': [{'OutputName': 'scaler_model',
    'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-412510750633/group6/GROUP6/model/',
     'LocalPath': '/opt/ml/processing/scaler_model',
     'S3UploadMode': 'EndOfJob'},
    'AppManaged': False},
   {'Outp

In [24]:
s3_client = boto3.client("s3")
default_bucket = sagemaker.Session().default_bucket()
for i in range(1, 4):
    prefix = s3_client.list_objects(
        Bucket=default_bucket, Prefix="sagemaker-scikit-learn"
    )["Contents"][-i]["Key"]
    print("s3://" + default_bucket + "/" + prefix)

s3://sagemaker-us-east-1-412510750633/sagemaker-scikit-learn-2024-06-17-02-29-32-754/input/code/preprocess.py
s3://sagemaker-us-east-1-412510750633/sagemaker-scikit-learn-2024-06-17-02-14-44-085/input/code/preprocess.py
s3://sagemaker-us-east-1-412510750633/sagemaker-scikit-learn-2024-06-17-02-05-46-473/input/code/preprocess.py


In [None]:
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.image_uris import retrieve
from sagemaker.tuner import IntegerParameter, ContinuousParameter, HyperparameterTuner
from time import gmtime, strftime

sess = sagemaker.Session()

container = retrieve("xgboost", region, "latest")

xgb = sagemaker.estimator.Estimator(
    container,
    role,
    base_job_name="xgboost-random-search",
    instance_count=1,
    instance_type="ml.m5.xlarge",
    output_path=MODEL_OUTPUT_S3_PATH + 'xgboost',
    sagemaker_session=sess,
)

xgb.set_hyperparameters(objective='reg:squarederror',eval_metric='rmse')

hyperparameter_ranges = {
    'max_depth': IntegerParameter(100,300),
    'eta': ContinuousParameter(0.001, 0.2),
    'max_depth': IntegerParameter(3, 20),
    'min_child_weight': IntegerParameter(1, 10),
    'colsample_bytree': ContinuousParameter(0.5, 0.9)
}

objective_metric_name = 'validation:rmse'

tuner = HyperparameterTuner(estimator=xgb,
                            objective_metric_name=objective_metric_name,
                            hyperparameter_ranges=hyperparameter_ranges,
                            objective_type='Minimize',
                            max_jobs=20,
                            max_parallel_jobs=4,
                            strategy='Random',
                            output_path=MODEL_OUTPUT_S3_PATH + 'xgboost_tuned')

# Specify training data location
train_data = 's3://your-bucket/path/to/train/data'
val_data = 's3 - path to val data'

# Fit the tuner
tuner.fit(
    inputs={
        'train': train_data,
        'validation': val_data
    },
    job_name="xgb-randsearch-" + strftime("%Y%m%d-%H-%M-%S", gmtime())
)

# train_args = xgb.fit(
#     inputs={
#         "train": TrainingInput(
#             s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
#             content_type="text/csv",
#         ),
#         "validation": TrainingInput(
#             s3_data=step_process.properties.ProcessingOutputConfig.Outputs["val"].S3Output.S3Uri,
#             content_type="text/csv",
#         )
#     }
# )

# step_train_model = TrainingStep(name="TrainXGBModel1", step_args=train_args)

In [None]:
%%writefile code/evaluate_xgboost.py

import json
import logging
import math
import pickle
import tarfile

import numpy as np
import pandas as pd
import xgboost
from sklearn.metrics import mean_squared_error

from pathlib import Path

label_column = 'aveOralM'

model_tar_path = '/opt/ml/processing/model/model.tar.gz'

if __name__ == "__main__":
    ## Your code to perform model evaluation on testing dataset, and 
    ## store the evaluation report
    with tarfile.open(model_tar_path, 'r:gz') as tar:
        tar.extractall(path='./model')
    
    xgb_model = xgb_model = xgboost.Booster()
    xgb_model.load_model('./model/xgboost-model')
    
    test_path = "/opt/ml/processing/test/"
    df = pd.read_csv(test_path + "/test.csv")

    x_test = xgboost.DMatrix(df.iloc[:,1:])
    y_test = df.iloc[:,0]
    y_pred = xgb_model.predict(x_test)
    scores = np.sqrt(mean_squared_error(y_test, y_pred))
    print("\nTest RMSE :", scores)

    # Available metrics to add to model: https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html
    report_dict = {
        "regression_metrics": {
            "rmse": {"value": scores, "standard_deviation": "NaN"},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

In [None]:
from sagemaker.workflow.properties import PropertyFile
from sagemaker.sklearn.processing import ScriptProcessor

test_data = 's3 path to test data'

xgb_eval_image = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version='1.7-1'
)

evaluate_model_processor = ScriptProcessor(
    role=role,
    image_uri=xgb_eval_image,
    command=["python3"],
    instance_count=1,
    instance_type=processing_instance_type,
    sagemaker_session=pipeline_session,
)

# Create a PropertyFile
# A PropertyFile is used to be able to reference outputs from a processing step, for instance to use in a condition step.
# For more information, visit https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html
evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

eval_args = evaluate_model_processor.run(
    inputs=[
        ProcessingInput(
            source=MODEL_OUTPUT_S3_PATH + 'xgboost_tuned',
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=test_data,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation", destination='s3 path to evaluation output'),
    ],
    code="code/evaluate.py",
)

step_evaluate_model = ProcessingStep(
    name="EvaluateXgboost",
    step_args=eval_args,
    property_files=[evaluation_report],
)