### Deploy registered AG model in MLFlow for RT Inference

In [30]:
! pip install fastavro avro -q

In [61]:
import sagemaker
from sagemaker import get_execution_role, Session
from sagemaker.model import Model
from sagemaker import image_uris
from sagemaker.deserializers import JSONDeserializer
from sagemaker.predictor import Predictor
import os
import shutil
import json
import pandas as pd
import io
import fastavro
import matplotlib.pyplot as plt
import time
from sagemaker.serializers import BaseSerializer
import cloudpickle
from autogluon.timeseries import TimeSeriesDataFrame

In [62]:
bucket = 'ag-example-timeseries'
avro_prefix = 'avro-inf-stream'

# Create S3 client
s3 = boto3.client("s3")
prefix = 'mlflow-packaged-models'

mlflow_experiment   = "autogluon-timeseries"
region      = sagemaker.Session().boto_region_name
session     = sagemaker.Session()
role        = sagemaker.get_execution_role() 

In [None]:
# --- Inference Script Content ---
# The contents of ag_ts_inference.py are stored as a string to be written later.
ag_ts_inference_script = """
import os
import json
import pandas as pd
import io
import fastavro
from autogluon.timeseries import TimeSeriesPredictor, TimeSeriesDataFrame

# Required by SageMaker for loading the model
def model_fn(model_dir):
    \"\"\"
    Loads the trained TimeSeriesPredictor from the model directory.
    \"\"\"
    try:
        model_path = os.path.join(model_dir, "model")
        print(f"MODEL_LOG - Attempting to load model from: {model_path}")
        predictor = TimeSeriesPredictor.load(model_path)
        print("MODEL_LOG - Model loaded successfully.")
        return predictor
    except Exception as e:
        print(f"MODEL_LOG - Error loading model: {e}")
        # Re-raise the exception to fail the health check
        raise

# Required by SageMaker for inference
def transform_fn(predictor, data, content_type, accept_type):
    \"\"\"
    Handles data deserialization, prediction, and serialization.
    \"\"\"
    print(f"TRANSFORM_LOG - Received data with content_type: {content_type}")
    try:
        # 1. Deserialize input based on the content_type
        if content_type == "application/json":
            df = pd.read_json(io.StringIO(data), orient="split")
        elif content_type == "application/x-avro-bytes":
            input_stream = io.BytesIO(data)
            reader = fastavro.reader(input_stream)
            records = [r for r in reader]
            df = pd.DataFrame.from_records(records)
        else:
            raise ValueError(f"Unsupported content type: {content_type}")
        
        # Log deserialization details for debugging
        print(f"TRANSFORM_LOG - Deserialized data shape: {df.shape}")
        print("TRANSFORM_LOG - Deserialized data head:")
        print(df.head())
        
        # 2. Perform prediction on the deserialized data
        ts_dataframe = TimeSeriesDataFrame(df)
        predictions = predictor.predict(ts_dataframe)
        
        # 3. Serialize output based on the accept_type
        if accept_type == "application/json":
            # Return pandas-split JSON and the correct content type
            return predictions.to_json(orient="split"), accept_type
        elif accept_type == "text/csv":
            # Return CSV string and the correct content type
            return predictions.to_csv(), accept_type
        else:
            raise ValueError(f"Unsupported accept type: {accept_type}")
    except Exception as e:
        print(f"TRANSFORM_LOG - Error during transformation: {e}")
        raise
"""

# --- Custom Serializer and Predictor Classes ---
# These are placed here so the notebook can use them
class TimeSeriesAvroSerializer(BaseSerializer):
    CONTENT_TYPE = "application/x-avro-bytes"
    def __init__(self):
        super().__init__()
    def serialize(self, data):
        schema = {
            "type": "array",
            "items": {
                "type": "record",
                "name": "timeseries",
                "fields": [
                    {"name": "item_id", "type": "string"},
                    {"name": "timestamp", "type": "string"},
                    {"name": "target", "type": "float"}
                ]
            }
        }
        records = []
        for index, row in data.iterrows():
            item_id = str(index[0])
            timestamp = str(index[1])
            target = float(row['target'])
            records.append({"item_id": item_id, "timestamp": timestamp, "target": target})
        
        with io.BytesIO() as out:
            fastavro.writer(out, schema, records)
            return out.getvalue()

class CustomPredictor(Predictor):
    def __init__(self, endpoint_name, sagemaker_session):
        super(CustomPredictor, self).__init__(
            endpoint_name=endpoint_name,
            sagemaker_session=sagemaker_session,
            serializer=TimeSeriesAvroSerializer(),
            deserializer=JSONDeserializer(),
            content_type="application/x-avro-bytes",
            accept="application/json"
        )

# --- SageMaker Configuration ---
# Your AWS IAM role for SageMaker

# The S3 URI pointing directly to the existing model.tar.gz from your training job's output.
s3_uri_to_model_tar = "s3://sagemaker-us-east-1-543531862107/ag-ts-train-1757359159-6101/output/model.tar.gz"

# The instance type for your endpoint
instance_type = "ml.g4dn.2xlarge"

# The name of the endpoint you are creating or updating
endpoint_name = "pytorch-inference-test"

# This is the local directory that will be packaged and uploaded to S3.
source_dir = "ag_ts_source"
inference_script = "ag_ts_inference.py"

# --- Prepare Source Directory ---
print("Preparing source directory for packaging...")
if os.path.exists(source_dir):
    shutil.rmtree(source_dir)
os.makedirs(source_dir, exist_ok=True)

# Create the requirements.txt file
with open(os.path.join(source_dir, "requirements.txt"), "w") as f:
    f.write("autogluon.timeseries\n")
    f.write("pandas\n")
    f.write("fastavro\n")
print("requirements.txt created.")

# Write the inference script content to the file
with open(os.path.join(source_dir, inference_script), "w") as f:
    f.write(ag_ts_inference_script)
print(f"'{inference_script}' created in '{source_dir}' successfully.")

# --- SageMaker Deployment ---
print("--- Starting Model Deployment ---")

# Retrieve the URI for a PyTorch GPU inference image
print(f"Retrieving GPU image URI for {instance_type} in {region}...")
gpu_image_uri = image_uris.retrieve(
    framework="pytorch",
    region=region,
    version="2.2.0",
    py_version="py310",
    instance_type=instance_type,
    image_scope="inference",
)

# Create a SageMaker generic Model instance
print("Creating SageMaker Model object...")
model = Model(
    image_uri=gpu_image_uri,
    model_data=s3_uri_to_model_tar,
    role=role,
    entry_point=inference_script,
    source_dir=source_dir,
    sagemaker_session=sagemaker_session,
)

# Deploy the model with extended timeouts
print(f"Deploying model to endpoint: {endpoint_name} with custom timeouts...")
print("This may take several minutes. Check the SageMaker console for status.")
predictor = model.deploy(
    initial_instance_count=1,
    instance_type=instance_type,
    endpoint_name=endpoint_name,
    container_startup_health_check_timeout_seconds=300,  # 5 minutes
    model_data_download_timeout=3600, # 1 hour
    max_payload_in_mb=100
)

print("\n--- Model Deployment Complete ---")
print("You can now proceed with inference tests.")

# --- Inference Test ---

def generate_timeseries_df():
    """Generates a TimeSeriesDataFrame to match the expected schema."""
    data = {
        'timestamp': pd.to_datetime(['2013-03-10 00:00:00', '2013-03-10 00:30:00']),
        'target': [5200.0, 5220.0],
        'item_id': ['T000000', 'T000000']
    }
    df = pd.DataFrame(data)
    return TimeSeriesDataFrame(df.set_index(['item_id', 'timestamp']))

print("\n--- Running Inference Test ---")
try:
    # 1. Instantiate the custom predictor
    custom_predictor = CustomPredictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker_session)

    # 2. Generate and send the data to the endpoint
    data_to_send = generate_timeseries_df()
    print("Sending data to endpoint:")
    print(data_to_send)
    
    response_bytes = custom_predictor.predict(data_to_send)
    
    # 3. Process the response
    response_json = json.loads(response_bytes)
    
    print("\nPrediction received successfully:")
    print(json.dumps(response_json, indent=2))

    # --- Plotting the result ---
    results = []
    for item_preds in response_json["data"]:
        # The response format will be [item_id, timestamp, prediction]
        results.append({
            "item_id": item_preds[0],
            "timestamp": pd.to_datetime(item_preds[1]),
            "prediction": item_preds[2]
        })

    if results:
        results_df = pd.DataFrame(results)
        
        fig, ax = plt.subplots(figsize=(12, 6))
        for item_id in results_df["item_id"].unique():
            item_df = results_df[results_df["item_id"] == item_id]
            ax.plot(item_df["timestamp"], item_df["prediction"], label=f"Item {item_id}")
        
        ax.set_title("Real-Time Autogluon TimeSeries Predictions")
        ax.set_xlabel("Time")
        ax.set_ylabel("Prediction")
        ax.legend()
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.show()
    else:
        print("No valid prediction data to plot.")

except Exception as e:
    print(f"\nAn unexpected error occurred: {e}")

INFO:sagemaker:Repacking model artifact (s3://sagemaker-us-east-1-543531862107/ag-ts-train-1757359159-6101/output/model.tar.gz), script artifact (ag_ts_source), and dependencies ([]) into single tar.gz file located at s3://sagemaker-us-east-1-543531862107/pytorch-inference-2025-09-09-23-00-47-296/model.tar.gz. This may take some time depending on model size...


Preparing source directory for packaging...
requirements.txt created.
'ag_ts_inference.py' created in 'ag_ts_source' successfully.
--- Starting Model Deployment ---
Retrieving GPU image URI for ml.g4dn.2xlarge in us-east-1...
Creating SageMaker Model object...
Deploying model to endpoint: pytorch-inference-test with custom timeouts...
This may take several minutes. Check the SageMaker console for status.


INFO:sagemaker:Creating model with name: pytorch-inference-2025-09-09-23-00-49-333
INFO:sagemaker:Creating endpoint-config with name pytorch-inference-test
INFO:sagemaker:Creating endpoint with name pytorch-inference-test
