# Amazon SageMaker Batch Transform: Associate prediction results with their corresponding input records


In [1]:
%store

Stored variables and their in-db values:
account_id                               -> '607916531205'
baseline_model_logistic_path             -> 'baseline_model_logistic.pkl'
baseline_model_path                      -> 'baseline_model.pkl'
create_base_csv_athena_db                -> True
create_base_csv_athena_table             -> True
database_name                            -> 'db_airline_delay_cause'
dev_feature_group_name                   -> 'airline_delay_features_dev'
dev_feature_store_table                  -> 'airline_delay_features_dev_1740273029'
dev_s3_path                              -> 's3://sagemaker-us-east-1-607916531205/data/develo
dev_s3_uri                               -> 's3://sagemaker-us-east-1-607916531205/feature-sto
dev_table_name                           -> 'development_data'
packages_installed                       -> True
prod_feature_group_name                  -> 'airline_delay_features_prod'
prod_feature_store_table                 -> 'airline_delay_fe

In [2]:
import boto3
import sagemaker
from pyathena import connect
import pandas as pd
import numpy as np

# ✅ Retrieve stored variables
%store -r region
%store -r role

# ✅ Initialize AWS Session
session = boto3.session.Session()
sagemaker_session = sagemaker.Session()

# ✅ Use SageMaker's default bucket
bucket = sagemaker_session.default_bucket()
prefix = "flight-delay-prediction-xgboost"


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [3]:
import time

# ✅ Retrieve stored feature group names and table names
%store -r dev_feature_group_name
%store -r prod_feature_group_name
%store -r dev_feature_store_table
%store -r prod_feature_store_table

# ✅ Initialize SageMaker client
sagemaker_client = boto3.client("sagemaker")

# ✅ Function to Get Feature Store Table Name (Skipping if already stored)
def get_feature_store_table_name(feature_group_name):
    print(f"⏳ Waiting for Feature Group '{feature_group_name}' to be available in Glue...")
    
    # Wait for Feature Group to be created
    while True:
        response = sagemaker_client.describe_feature_group(FeatureGroupName=feature_group_name)
        status = response["FeatureGroupStatus"]
        if status == "Created":
            print(f"✅ Feature Group '{feature_group_name}' is now active!")
            break
        print(f"⏳ Current status: {status}, retrying in 5 seconds...")
        time.sleep(5)
    
    # Retrieve Glue Table Name
    table_name = response["OfflineStoreConfig"]["DataCatalogConfig"]["TableName"]
    print(f"✅ Feature Store table registered in Glue: {table_name}")
    
    return table_name

# ✅ Use the stored feature store table names
dev_feature_store_table = dev_feature_store_table if "dev_feature_store_table" in locals() else get_feature_store_table_name(dev_feature_group_name)
prod_feature_store_table = prod_feature_store_table if "prod_feature_store_table" in locals() else get_feature_store_table_name(prod_feature_group_name)

# ✅ Store feature store table names for later use (if they were retrieved or new)
%store dev_feature_store_table
%store prod_feature_store_table


Stored 'dev_feature_store_table' (str)
Stored 'prod_feature_store_table' (str)


In [4]:
# ✅ Retrieve stored variables
%store -r dev_feature_store_table
%store -r s3_staging_dir
%store -r region

# ✅ Perform Athena query, returning the result as a pandas DataFrame
query = f"""
SELECT * FROM "sagemaker_featurestore"."{dev_feature_store_table}"
;
"""

# ✅ Set up Athena connection
conn = connect(s3_staging_dir=s3_staging_dir, region_name=region)

# ✅ Execute the query using Pandas
df_offline = pd.read_sql(query, conn)

# ✅ Now df_offline contains all the records from the Feature Store offline table
print(df_offline.head())

  df_offline = pd.read_sql(query, conn)


     event_time  year  month carrier airport  arr_flights  arr_del15  \
0  1.740273e+09  2015      5      OO     MCI          161         30   
1  1.740273e+09  2013      4      DL     OGG           60          9   
2  1.740273e+09  2005     12      UA     MCO          674        188   
3  1.740273e+09  2011      1      B6     BTV          137         31   
4  1.740273e+09  2015      5      OO     TUL          187         37   

   carrier_ct  weather_ct  nas_ct  ...  weather_delay  nas_delay  \
0          12           0       6  ...              0        187   
1           6           0       3  ...              0         86   
2          65           5      49  ...            272       1729   
3          13           0       7  ...              0        372   
4          12           0      16  ...             98        616   

   security_delay  late_aircraft_delay  delay_rate  on_time  record_id  \
0               0                  817          18        0     123246   
1         

In [5]:
df_offline.drop('record_id', axis=1, inplace=True)
df_offline.drop('write_time', axis=1, inplace=True)
df_offline.drop('api_invocation_time', axis=1, inplace=True)
df_offline.drop('is_deleted', axis=1, inplace=True)

df_offline.head()

Unnamed: 0,event_time,year,month,carrier,airport,arr_flights,arr_del15,carrier_ct,weather_ct,nas_ct,...,arr_cancelled,arr_diverted,arr_delay,carrier_delay,weather_delay,nas_delay,security_delay,late_aircraft_delay,delay_rate,on_time
0,1740273000.0,2015,5,OO,MCI,161,30,12,0,6,...,3,1,1782,778,0,187,0,817,18,0
1,1740273000.0,2013,4,DL,OGG,60,9,6,0,3,...,0,0,753,667,0,86,0,0,15,0
2,1740273000.0,2005,12,UA,MCO,674,188,65,5,49,...,3,2,10356,3507,272,1729,0,4848,27,0
3,1740273000.0,2011,1,B6,BTV,137,31,13,0,7,...,7,0,1659,711,0,372,0,576,22,0
4,1740273000.0,2015,5,OO,TUL,187,37,12,0,16,...,0,0,1794,701,98,616,0,379,19,0


In [6]:
# Encode categorical features
from sklearn.preprocessing import LabelEncoder

for col in ['carrier', 'airport']:
    le = LabelEncoder()
    df_offline[col] = le.fit_transform(df_offline[col])


In [7]:
df_offline.fillna(0, inplace=True)  # Handle missing values

In [None]:

import boto3
import joblib
import os
import pandas as pd

# Define S3 paths for best model and metrics
s3_client = boto3.client("s3")
bucket_name = "<your-s3-bucket>"  # Replace with your bucket
model_key = "best_model.pkl"
metrics_key = "best_model_metrics.csv"

# Local paths
best_model_path = "best_model.pkl"
best_metrics_path = "best_model_metrics.csv"

# Check if a previous best model exists in S3
try:
    print("🔄 Loading previous best model and metrics...")
    s3_client.download_file(bucket_name, model_key, best_model_path)
    s3_client.download_file(bucket_name, metrics_key, best_metrics_path)
    
    # Load previous best model
    best_model = joblib.load(best_model_path)
    
    # Load previous best metrics
    best_metrics = pd.read_csv(best_metrics_path)
    prev_best_accuracy = best_metrics["accuracy"].values[0]

    print(f"📊 Previous Best Model Accuracy: {prev_best_accuracy:.4f}")

except Exception as e:
    print("⚠️ No previous best model found. This will be the first model.")
    best_model = None
    prev_best_accuracy = 0.0  # Default low accuracy for first-time run


In [8]:
# Split production data into training & validation sets
from sklearn.model_selection import train_test_split

X = df_offline.drop('on_time', axis=1)
y = df_offline['on_time']

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

# Convert to CSV for SageMaker (label column must be first)
train_combined = pd.concat([y_train, X_train], axis=1)
val_combined = pd.concat([y_val, X_val], axis=1)

train_csv_path = "train.csv"
val_csv_path = "validation.csv"

train_combined.to_csv(train_csv_path, index=False, header=False)
val_combined.to_csv(val_csv_path, index=False, header=False)

# Upload data to S3
train_s3_path = sagemaker_session.upload_data(path=train_csv_path, bucket=bucket, key_prefix=prefix+"/train")
val_s3_path = sagemaker_session.upload_data(path=val_csv_path, bucket=bucket, key_prefix=prefix+"/validation")

---

## Training job and model creation

The below cell uses the [SageMaker Python SDK](https://github.com/aws/sagemaker-python-sdk) to kick off the training job using both our training set and validation set. Not that the objective is set to 'binary:logistic' which trains a model to output a probability between 0 and 1 (here the probability of a tumor being malignant).

In [None]:

import joblib
import json
import os

# Define paths
model_store_path = "best_model.pkl"
metrics_store_path = "best_model_metrics.json"

# Load previous best model and its metrics
if os.path.exists(model_store_path) and os.path.exists(metrics_store_path):
    print("🔄 Loading the last best model for comparison...")
    best_model = joblib.load(model_store_path)

    with open(metrics_store_path, "r") as f:
        best_model_metrics = json.load(f)

    print(f"📊 Last best model accuracy: {best_model_metrics['accuracy']:.4f}")
else:
    print("⚠️ No previous best model found. The first model will automatically be set as the best.")
    best_model = None
    best_model_metrics = {"accuracy": 0.0}  # Default low value

# Train and evaluate the new model
print("🚀 Training new model...")
new_model = LogisticRegression()
new_model.fit(X_train_scaled, y_train)
y_test_pred = new_model.predict(X_test_scaled)
new_model_accuracy = accuracy_score(y_test, y_test_pred)

print(f"📊 New model accuracy: {new_model_accuracy:.4f}")

# Compare new model with the last best model
if new_model_accuracy > best_model_metrics["accuracy"]:
    print("✅ New model is better! Updating the model store...")

    # Save new model as the best model
    joblib.dump(new_model, model_store_path)
    
    # Save new model metrics
    new_model_metrics = {"accuracy": new_model_accuracy}
    with open(metrics_store_path, "w") as f:
        json.dump(new_model_metrics, f)

    print(f"✅ New best model saved as {model_store_path}")
    print(f"✅ New best model metrics updated in {metrics_store_path}")
else:
    print("❌ New model did NOT outperform the best model. Keeping the previous model.")


In [None]:

# Evaluate the new model
new_model_accuracy = accuracy_score(y_test, y_test_pred)

print(f"📊 New Model Accuracy: {new_model_accuracy:.4f}")

# Compare new model against the previous best
if new_model_accuracy > prev_best_accuracy:
    print("✅ New model outperforms the previous best! Updating model store.")

    # Save new model and metrics
    joblib.dump(model, best_model_path)

    # Save new metrics
    best_metrics = pd.DataFrame({"accuracy": [new_model_accuracy]})
    best_metrics.to_csv(best_metrics_path, index=False)

    # Upload new best model and metrics to S3
    s3_client.upload_file(best_model_path, bucket_name, model_key)
    s3_client.upload_file(best_metrics_path, bucket_name, metrics_key)

    print("✅ Best model and metrics updated in S3.")

else:
    print("🚨 New model does not outperform previous best. Discarding new model.")
    raise ValueError("❌ New model is worse than or equal to the previous best. Stopping pipeline.")


In [9]:
# Retrieve XGBoost container image
from sagemaker.image_uris import retrieve
from sagemaker.inputs import TrainingInput
xgboost_image_uri = retrieve("xgboost", region, "1.5-1")

# Define XGBoost estimator
xgb = sagemaker.estimator.Estimator(
    image_uri=xgboost_image_uri,
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    output_path=f"s3://{bucket}/{prefix}/output",
    sagemaker_session=sagemaker_session
)

xgb.set_hyperparameters(
    objective="binary:logistic",
    num_round=100,
    max_depth=5,
    eta=0.2,
    subsample=0.8,
    eval_metric="auc"
)

# Define training inputs
train_input = TrainingInput(train_s3_path, content_type="text/csv")
val_input = TrainingInput(val_s3_path, content_type="text/csv")

# Train the model
xgb.fit({"train": train_input, "validation": val_input})

INFO:sagemaker:Creating training-job with name: sagemaker-xgboost-2025-02-23-01-46-51-127


2025-02-23 01:46:53 Starting - Starting the training job...
2025-02-23 01:47:07 Starting - Preparing the instances for training...
2025-02-23 01:47:33 Downloading - Downloading input data...
2025-02-23 01:47:58 Downloading - Downloading the training image...
  from pandas import MultiIndex, Int64Index[0m
[34m[2025-02-23 01:48:50.518 ip-10-0-220-172.ec2.internal:7 INFO utils.py:28] RULE_JOB_STOP_SIGNAL_FILENAME: None[0m
[34m[2025-02-23 01:48:50.544 ip-10-0-220-172.ec2.internal:7 INFO profiler_config_parser.py:111] User has disabled profiler.[0m
[34m[2025-02-23:01:48:50:INFO] Imported framework sagemaker_xgboost_container.training[0m
[34m[2025-02-23:01:48:50:INFO] Failed to parse hyperparameter eval_metric value auc to Json.[0m
[34mReturning the value itself[0m
[34m[2025-02-23:01:48:50:INFO] Failed to parse hyperparameter objective value binary:logistic to Json.[0m
[34mReturning the value itself[0m
[34m[2025-02-23:01:48:50:INFO] No GPUs detected (normal if no gpus install

In [None]:

import os

# Define the flag file path
flag_path = "model_ready.flag"

# Check if the model passed the evaluation before proceeding
if not os.path.exists(flag_path):
    raise ValueError("🚨 Model has not passed evaluation! Registration aborted.")

print("✅ Model passed evaluation, proceeding with registration...")


In [None]:

import sagemaker
from sagemaker.model import Model

# ✅ If the model passed evaluation, register it in SageMaker
model_name = "my-trained-model"
model_artifact = f"s3://{bucket}/{prefix}/output/{model_name}.tar.gz"

# Define model registration
model = Model(
    image_uri=xgboost_image_uri,
    model_data=model_artifact,
    role=role,
    sagemaker_session=sagemaker_session
)

# Register the model
model.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.xlarge",
    endpoint_name=model_name
)

print(f"✅ Model successfully registered: {model_name}")


In [None]:

import sagemaker
from sagemaker.model import Model

print("🚀 Registering the new best model in SageMaker...")

# Define model registration
model_name = "best-performing-model"
model_artifact = f"s3://{bucket_name}/best_model.pkl"

# Register the model
best_model_sagemaker = Model(
    image_uri=xgboost_image_uri,
    model_data=model_artifact,
    role=role,
    sagemaker_session=sagemaker_session
)

# Deploy the model
best_model_sagemaker.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.xlarge",
    endpoint_name=model_name
)

print(f"✅ Model successfully registered: {model_name}")


In [12]:
from sagemaker.model_monitor import DataCaptureConfig

# ✅ Enable data capture for monitoring
data_capture_config = DataCaptureConfig(
    enable_capture=True,
    sampling_percentage=100,  # Capture 100% of inference data
    destination_s3_uri=f"s3://{bucket}/data_capture/",
    capture_options=["Input", "Output"]
)

# Deploy the model as a SageMaker endpoint with data capture enabled
endpoint_name_single_request = "flight-delay-xgboost-endpoint-single-request"
predictor_single_request = xgb.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.xlarge",
    endpoint_name=endpoint_name_single_request,
    data_capture_config=data_capture_config  # ✅ Added data capture config
)

# ✅ Store the endpoint name for later retrieval (especially for cleanup)
%store endpoint_name_single_request
print(f"✅ Stored SageMaker endpoint name: {endpoint_name_single_request}")


INFO:sagemaker:Creating model with name: sagemaker-xgboost-2025-02-23-01-51-13-753
INFO:sagemaker:Creating endpoint-config with name flight-delay-xgboost-endpoint-single-request
INFO:sagemaker:Creating endpoint with name flight-delay-xgboost-endpoint-single-request


-----!Stored 'endpoint_name_single_request' (str)
✅ Stored SageMaker endpoint name: flight-delay-xgboost-endpoint-single-request


In [13]:
import io
import numpy as np

single_sample = X_val.iloc[0].values.reshape(1, -1)
csv_buffer = io.StringIO()
np.savetxt(csv_buffer, single_sample, delimiter=",")
csv_text = csv_buffer.getvalue().strip()  # str

response = predictor_single_request.predict(
    csv_text,
    initial_args={"ContentType": "text/csv"}
)
print("Response:", response)


Response: b'4.337779500929173e-06\n'


---

## Batch Transform



In [14]:
# Only the features (no label)

small_test_set = df_offline.drop('on_time', axis=1).sample(500)

small_test_csv_path = "small_test.csv"
small_test_set.to_csv(small_test_csv_path, index=False, header=False)

# Upload smaller dataset to S3
small_test_s3_path = sagemaker_session.upload_data(
    path=small_test_csv_path,
    bucket=bucket,
    key_prefix=prefix + "/small_test"
)


In [15]:
batch_input_s3 = small_test_s3_path
batch_output_s3 = f"s3://{bucket}/{prefix}/batch-output"

transformer = xgb.transformer(
    instance_count=1,
    instance_type="ml.m5.xlarge",
    output_path=batch_output_s3
)

transformer.transform(
    data=batch_input_s3,
    content_type="text/csv",
    split_type="Line"
)
transformer.wait()

print("Batch transform job complete.")

INFO:sagemaker:Creating model with name: sagemaker-xgboost-2025-02-23-01-55-29-918
INFO:sagemaker:Creating transform job with name: sagemaker-xgboost-2025-02-23-01-55-30-697


..............................
  from pandas import MultiIndex, Int64Index[0m
[34m[2025-02-23:02:00:30:INFO] No GPUs detected (normal if no gpus installed)[0m
[34m[2025-02-23:02:00:30:INFO] No GPUs detected (normal if no gpus installed)[0m
[34m[2025-02-23:02:00:30:INFO] nginx config: [0m
[34mworker_processes auto;[0m
[34mdaemon off;[0m
[34mpid /tmp/nginx.pid;[0m
[34merror_log  /dev/stderr;[0m
[34mworker_rlimit_nofile 4096;[0m
[34mevents {
  worker_connections 2048;[0m
[34m}[0m
[34mhttp {
  include /etc/nginx/mime.types;
  default_type application/octet-stream;
  access_log /dev/stdout combined;
  upstream gunicorn {
    server unix:/tmp/gunicorn.sock;
  }
  server {
    listen 8080 deferred;
    client_max_body_size 0;
    keepalive_timeout 3;
    location ~ ^/(ping|invocations|execution-parameters) {
      proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
      proxy_set_header Host $http_host;
      proxy_redirect off;
      proxy_read_timeout 60s;
  

In [18]:
from sagemaker.model_monitor import DataCaptureConfig
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer

# ✅ Enable data capture for batch transform monitoring
data_capture_config = DataCaptureConfig(
    enable_capture=True,
    sampling_percentage=100,  # Capture 100% of inference data
    destination_s3_uri=f"s3://{bucket}/data_capture/",
    capture_options=["Input", "Output"]
)

endpoint_name_batch_transform = "flight-delay-xgboost-endpoint-with-batch-transform"

predictor_batch_request = xgb.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.xlarge",
    endpoint_name=endpoint_name_batch_transform,
    serializer=CSVSerializer(),  # <--- important
    deserializer=JSONDeserializer(),  # or StringDeserializer() depending on your output
    data_capture_config=data_capture_config  # ✅ Added data capture config
)

# ✅ Store the batch transform endpoint name for later retrieval (especially for cleanup)
%store endpoint_name_batch_transform
print(f"✅ Stored batch transform SageMaker endpoint name: {endpoint_name_batch_transform}")


INFO:sagemaker:Creating model with name: sagemaker-xgboost-2025-02-23-02-02-46-115
INFO:sagemaker:Creating endpoint-config with name flight-delay-xgboost-endpoint-with-batch-transform
INFO:sagemaker:Creating endpoint with name flight-delay-xgboost-endpoint-with-batch-transform


------!Stored 'endpoint_name_batch_transform' (str)
✅ Stored batch transform SageMaker endpoint name: flight-delay-xgboost-endpoint-with-batch-transform


In [19]:
# Test inference with valid data (multiple samples)
test_samples = X_val.sample(5).values  # Select 5 random validation samples
import io
csv_buffer = io.StringIO()
np.savetxt(csv_buffer, test_samples, delimiter=",")
test_samples_csv = csv_buffer.getvalue().strip()

response = predictor_batch_request.predict(test_samples_csv)
print("Predicted probabilities of delay for test samples:", response)


Predicted probabilities of delay for test samples: {'predictions': [{'score': 4.337779500929173e-06}, {'score': 4.337779500929173e-06}, {'score': 1.0346633644076064e-05}, {'score': 4.337779500929173e-06}, {'score': 1.0346633644076064e-05}]}
