# Amazon SageMaker Batch Transform: Associate prediction results with their corresponding input records


In [1]:
import boto3
import sagemaker
from pyathena import connect
import pandas as pd
import numpy as np

# Initialize AWS Session
session = boto3.session.Session()
region = session.region_name
sagemaker_session = sagemaker.Session()
role_arn = sagemaker.get_execution_role()

# Use SageMaker's default bucket
bucket = sagemaker_session.default_bucket()
prefix = "flight-delay-prediction-xgboost"



sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [2]:
# Define the Feature Group name
feature_group_name = "airline_delay_features"

# Initialize SageMaker client
sagemaker_client = boto3.client("sagemaker")

# ✅ Step 1: Check if Feature Group Exists
existing_groups = sagemaker_client.list_feature_groups()['FeatureGroupSummaries']
existing_group_names = [fg['FeatureGroupName'] for fg in existing_groups]

if feature_group_name in existing_group_names:
    print(f"✅ Feature Group '{feature_group_name}' already exists.")
else:
    print(f"🚀 Feature Group '{feature_group_name}' does NOT exist! Creating it now.")

✅ Feature Group 'airline_delay_features' already exists.


In [3]:
import time

def get_feature_store_table_name(feature_group_name):
    print("⏳ Waiting for the Feature Group to be available in Glue...")
    
    # Wait for Feature Group to be created
    while True:
        response = sagemaker_client.describe_feature_group(FeatureGroupName=feature_group_name)
        status = response["FeatureGroupStatus"]
        if status == "Created":
            print("✅ Feature Group is now active!")
            break
        print(f"⏳ Current status: {status}, retrying in 5 seconds...")
        time.sleep(5)
    
    # Retrieve Glue Table Name
    table_name = response["OfflineStoreConfig"]["DataCatalogConfig"]["TableName"]
    print(f"✅ Feature Store table registered in Glue: {table_name}")
    
    return table_name

# Call this function **after creating the Feature Store**
correct_feature_store_table = get_feature_store_table_name(feature_group_name)

⏳ Waiting for the Feature Group to be available in Glue...
✅ Feature Group is now active!
✅ Feature Store table registered in Glue: airline_delay_features_1739588690


In [4]:
# Perform Athena query, returning the result as a pandas DataFrame
query = f"""
SELECT * FROM "sagemaker_featurestore"."{correct_feature_store_table}"
;
"""
# Set up Athena connection
s3_staging_dir = f's3://{bucket}/athena-query-results/'
conn = connect(s3_staging_dir=s3_staging_dir, region_name=region)

# Execute the query using Pandas
df_offline = pd.read_sql(query, conn)

# Now df_offline contains all the records from your Feature Store offline table
print(df_offline.head())

  df_offline = pd.read_sql(query, conn)


     event_time  year  month carrier airport  arr_flights  arr_del15  \
0  1.739589e+09  2006      5      AS     EWR           62         16   
1  1.739589e+09  2008      6      B6     SYR          180         53   
2  1.739589e+09  2008      6      CO     ABQ           97         41   
3  1.739589e+09  2004      1      DL     SNA          265         56   
4  1.739589e+09  2008      6      CO     BWI          173         48   

   carrier_ct  weather_ct  nas_ct  ...  weather_delay  nas_delay  \
0           9           0       6  ...              0        230   
1          14           1      27  ...            401       1508   
2           7           1      17  ...            123        895   
3          10           0      33  ...             13        890   
4          10           1      25  ...            119       1651   

   security_delay  late_aircraft_delay  delay_rate  on_time  record_id  \
0               0                    0          25        0      39779   
1         

In [5]:
df_offline.drop('record_id', axis=1, inplace=True)
df_offline.drop('write_time', axis=1, inplace=True)
df_offline.drop('api_invocation_time', axis=1, inplace=True)
df_offline.drop('is_deleted', axis=1, inplace=True)

df_offline.head()

Unnamed: 0,event_time,year,month,carrier,airport,arr_flights,arr_del15,carrier_ct,weather_ct,nas_ct,...,arr_cancelled,arr_diverted,arr_delay,carrier_delay,weather_delay,nas_delay,security_delay,late_aircraft_delay,delay_rate,on_time
0,1739589000.0,2006,5,AS,EWR,62,16,9,0,6,...,0,0,799,569,0,230,0,0,25,0
1,1739589000.0,2008,6,B6,SYR,180,53,14,1,27,...,3,0,3057,696,401,1508,0,452,29,0
2,1739589000.0,2008,6,CO,ABQ,97,41,7,1,17,...,0,0,3546,873,123,895,0,1655,42,0
3,1739589000.0,2004,1,DL,SNA,265,56,10,0,33,...,1,1,1989,488,13,890,0,598,21,0
4,1739589000.0,2008,6,CO,BWI,173,48,10,1,25,...,1,0,2882,435,119,1651,0,677,27,0


In [6]:
# Encode categorical features
from sklearn.preprocessing import LabelEncoder

for col in ['carrier', 'airport']:
    le = LabelEncoder()
    df_offline[col] = le.fit_transform(df_offline[col])


In [7]:
df_offline.fillna(0, inplace=True)  # Handle missing values

In [8]:
# Split production data into training & validation sets
from sklearn.model_selection import train_test_split

X = df_offline.drop('on_time', axis=1)
y = df_offline['on_time']

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

# Convert to CSV for SageMaker (label column must be first)
train_combined = pd.concat([y_train, X_train], axis=1)
val_combined = pd.concat([y_val, X_val], axis=1)

train_csv_path = "train.csv"
val_csv_path = "validation.csv"

train_combined.to_csv(train_csv_path, index=False, header=False)
val_combined.to_csv(val_csv_path, index=False, header=False)

# Upload data to S3
train_s3_path = sagemaker_session.upload_data(path=train_csv_path, bucket=bucket, key_prefix=prefix+"/train")
val_s3_path = sagemaker_session.upload_data(path=val_csv_path, bucket=bucket, key_prefix=prefix+"/validation")

---

## Training job and model creation

The below cell uses the [SageMaker Python SDK](https://github.com/aws/sagemaker-python-sdk) to kick off the training job using both our training set and validation set. Not that the objective is set to 'binary:logistic' which trains a model to output a probability between 0 and 1 (here the probability of a tumor being malignant).

In [10]:
# Retrieve XGBoost container image
from sagemaker.image_uris import retrieve
from sagemaker.inputs import TrainingInput
xgboost_image_uri = retrieve("xgboost", region, "1.5-1")

# Define XGBoost estimator
xgb = sagemaker.estimator.Estimator(
    image_uri=xgboost_image_uri,
    role=role_arn,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    output_path=f"s3://{bucket}/{prefix}/output",
    sagemaker_session=sagemaker_session
)

xgb.set_hyperparameters(
    objective="binary:logistic",
    num_round=100,
    max_depth=5,
    eta=0.2,
    subsample=0.8,
    eval_metric="auc"
)

# Define training inputs
train_input = TrainingInput(train_s3_path, content_type="text/csv")
val_input = TrainingInput(val_s3_path, content_type="text/csv")

# Train the model
xgb.fit({"train": train_input, "validation": val_input})

2025-02-15 04:40:52 Starting - Starting the training job...
2025-02-15 04:41:07 Starting - Preparing the instances for training...
2025-02-15 04:41:48 Downloading - Downloading the training image......
2025-02-15 04:42:54 Training - Training image download completed. Training in progress.
  from pandas import MultiIndex, Int64Index[0m
[34m[2025-02-15 04:42:45.130 ip-10-2-254-12.ec2.internal:7 INFO utils.py:28] RULE_JOB_STOP_SIGNAL_FILENAME: None[0m
[34m[2025-02-15 04:42:45.158 ip-10-2-254-12.ec2.internal:7 INFO profiler_config_parser.py:111] User has disabled profiler.[0m
[34m[2025-02-15:04:42:45:INFO] Imported framework sagemaker_xgboost_container.training[0m
[34m[2025-02-15:04:42:45:INFO] Failed to parse hyperparameter eval_metric value auc to Json.[0m
[34mReturning the value itself[0m
[34m[2025-02-15:04:42:45:INFO] Failed to parse hyperparameter objective value binary:logistic to Json.[0m
[34mReturning the value itself[0m
[34m[2025-02-15:04:42:45:INFO] No GPUs detect

In [13]:
# Deploy the model as a SageMaker endpoint
endpoint_name_single_request = "flight-delay-xgboost-endpoint-single-request"
predictor_single_request = xgb.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.xlarge",
    endpoint_name=endpoint_name_single_request
)

------!

In [15]:
import io
import numpy as np

single_sample = X_val.iloc[0].values.reshape(1, -1)
csv_buffer = io.StringIO()
np.savetxt(csv_buffer, single_sample, delimiter=",")
csv_text = csv_buffer.getvalue().strip()  # str

response = predictor_single_request.predict(
    csv_text,
    initial_args={"ContentType": "text/csv"}
)
print("Response:", response)


Response: b'2.7848066110891523e-06\n'


---

## Batch Transform



In [18]:
# Only the features (no label)

small_test_set = df_offline.drop('on_time', axis=1).sample(500)

small_test_csv_path = "small_test.csv"
small_test_set.to_csv(small_test_csv_path, index=False, header=False)

# Upload smaller dataset to S3
small_test_s3_path = sagemaker_session.upload_data(
    path=small_test_csv_path,
    bucket=bucket,
    key_prefix=prefix + "/small_test"
)


In [19]:
batch_input_s3 = small_test_s3_path
batch_output_s3 = f"s3://{bucket}/{prefix}/batch-output"

transformer = xgb.transformer(
    instance_count=1,
    instance_type="ml.m5.xlarge",
    output_path=batch_output_s3
)

transformer.transform(
    data=batch_input_s3,
    content_type="text/csv",
    split_type="Line"
)
transformer.wait()

print("Batch transform job complete.")

  from pandas import MultiIndex, Int64Index[0m
[34m[2025-02-15:05:12:52:INFO] No GPUs detected (normal if no gpus installed)[0m
[34m[2025-02-15:05:12:52:INFO] No GPUs detected (normal if no gpus installed)[0m
[34m[2025-02-15:05:12:52:INFO] nginx config: [0m
[34mworker_processes auto;[0m
  from pandas import MultiIndex, Int64Index[0m
[35m[2025-02-15:05:12:52:INFO] No GPUs detected (normal if no gpus installed)[0m
[35m[2025-02-15:05:12:52:INFO] No GPUs detected (normal if no gpus installed)[0m
[35m[2025-02-15:05:12:52:INFO] nginx config: [0m
[35mworker_processes auto;[0m
[34mdaemon off;[0m
[34mpid /tmp/nginx.pid;[0m
[34merror_log  /dev/stderr;[0m
[34mworker_rlimit_nofile 4096;[0m
[34mevents {
  worker_connections 2048;[0m
[34m}[0m
[34mhttp {
  include /etc/nginx/mime.types;
  default_type application/octet-stream;
  access_log /dev/stdout combined;
  upstream gunicorn {
    server unix:/tmp/gunicorn.sock;
  }
  server {
    listen 8080 deferred;
    client_

In [20]:
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer

endpoint_name_batch_transform = "flight-delay-xgboost-endpoint-with-batch-transform"

predictor_batch_request = xgb.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.xlarge",
    endpoint_name=endpoint_name_batch_transform,
    serializer=CSVSerializer(),              # <--- important
    deserializer=JSONDeserializer()          # or StringDeserializer() depending on your output
)


------!

In [21]:
# Test inference with valid data (multiple samples)
test_samples = X_val.sample(5).values  # Select 5 random validation samples
import io
csv_buffer = io.StringIO()
np.savetxt(csv_buffer, test_samples, delimiter=",")
test_samples_csv = csv_buffer.getvalue().strip()

response = predictor_batch_request.predict(test_samples_csv)
print("Predicted probabilities of delay for test samples:", response)

# Cleanup: Uncomment to delete endpoint when done
# predictor.delete_endpoint()

Predicted probabilities of delay for test samples: {'predictions': [{'score': 2.7848066110891523e-06}, {'score': 6.739716809534002e-06}, {'score': 2.7848066110891523e-06}, {'score': 2.7848066110891523e-06}, {'score': 2.7848066110891523e-06}]}


In [None]:
# Cleanup: Uncomment to delete endpoint when done
predictor_single_request.delete_endpoint()
predictor_batch_request.delete_endpoint()