In [1]:
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, CacheConfig
from sagemaker.workflow.parameters import ParameterString
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
import boto3



sagemaker.config INFO - Not applying SDK defaults from location: C:\ProgramData\sagemaker\sagemaker\config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: C:\Users\tochi\AppData\Local\sagemaker\sagemaker\config.yaml


In [2]:
# Set up session and role
region = "us-east-2"
sagemaker_session = sagemaker.Session()

account = boto3.client("sts").get_caller_identity().get("Account")
role = f"arn:aws:iam::{account}:role/service-role/AmazonSageMakerServiceCatalogProductsUseRole"

In [3]:
# Define pipeline parameters
years_to_filter = ParameterString(name="Historical_Years", default_value="10")
cache_config = CacheConfig(
    enable_caching=True, expire_after="1d"
)  # Cache configuration

In [4]:
# S3 bucket for storing intermediate data
# inference_bucket = "s3://aws-portfolio-projects/snp500-data/inference_data/"

## Data Ingestion Step 

In [6]:
# Use an SKLearnProcessor for data ingestion

data_ingestion_processor = SKLearnProcessor(
    framework_version="1.0-1",
    command=["python3"],
    role=role,
    instance_count=1,
    instance_type="ml.m5.large",
)

# Data ingestion step
ingestion_step = ProcessingStep(
    name="DataIngestion",
    processor=data_ingestion_processor,
    inputs=[],
    outputs=[
        sagemaker.processing.ProcessingOutput(
            output_name="ingested",
            source="/opt/ml/processing/output",
            destination="s3://aws-portfolio-projects/snp500-data/inference_data/input/",
        )
    ],
    code="inference_scripts/data_ingestion.py",
    cache_config=cache_config,
    job_arguments=["--years-to-filter", years_to_filter],
)

## Data Processing Step

In [7]:
data_preprocessor = SKLearnProcessor(
    framework_version="1.0-1", role=role, instance_type="ml.m5.large", instance_count=1
)

# Data processing step
processing_step = ProcessingStep(
    name="DataPreprocessing",
    processor=data_preprocessor,
    inputs=[
        sagemaker.processing.ProcessingInput(
            source=ingestion_step.properties.ProcessingOutputConfig.Outputs[
                "ingested"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/input",
        )
    ],
    outputs=[
        sagemaker.processing.ProcessingOutput(
            output_name="processed",
            source="/opt/ml/processing/output/train",
            destination="s3://aws-portfolio-projects/snp500-data/inference_data/processed/",
        )
    ],
    code="inference_scripts/data_processing.py",
    cache_config=cache_config,
    job_arguments=[
        "--input_dir",
        "/opt/ml/processing/input/",
        "--output_dir",
        "/opt/ml/processing/output/train",
    ],
)

## Inference Step

In [7]:
# Create a Lambda function for prediction
lambda_role = "arn:aws:iam::930627915954:role/sagemaker-pipeline-lambda-role"
lambda_client = boto3.client("lambda", region_name=region)
lambda_function_name = "sagemaker-stock-prediction-lambda"
# Lambda function for invoking the SageMaker endpoint
lambda_step = LambdaStep(
    name="InvokeSageMakerEndpoint",
    lambda_func=Lambda(
        function_name=lambda_function_name,
        execution_role_arn=lambda_role,
        script="inference_scripts/lambda_prediction.py",
        handler="lambda_prediction.lambda_handler",
    ),
    inputs={
        "processed_data_s3_uri": "s3://aws-portfolio-projects/snp500-data/inference_data/processed/sp500_processed.csv",
        "endpoint_name": "stock-market-prediction-endpoint",
        "dynamodb_table_name": "stock-prediction-data",
    },
)

In [8]:
# Define the pipeline
pipeline = Pipeline(
    name="StockInferencePipeline",
    parameters=[years_to_filter],
    steps=[ingestion_step, processing_step],
    sagemaker_session=sagemaker_session,
)

# Execute the pipeline
pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.wait()

## Schedule Inference Pipeline

In [9]:
import boto3
import pytz
from datetime import datetime, timedelta

# Set US Central Time (UTC-6)
central_tz = pytz.timezone("US/Central")

# Set the schedule time to 9 PM US Central Time on the next weekday (Monday to Friday)
now = datetime.now(central_tz)
next_weekday = now + timedelta(
    days=(7 - now.weekday()) % 7 if now.weekday() >= 5 else 1
)
naive_next_weekday = next_weekday.replace(
    hour=21, minute=0, second=0, microsecond=0
).replace(tzinfo=None)
schedule_time = central_tz.localize(naive_next_weekday)

# Convert to UTC
schedule_time_utc = schedule_time.astimezone(pytz.utc)

# Define the cron expression for every Monday to Friday at 9 PM US Central Time
cron_expression = "cron(0 3 ? * MON-FRI *)"  # 9 PM Central Time is 3 AM UTC

# Create EventBridge client
events_client = boto3.client("events")

# Create the EventBridge rule
rule_response = events_client.put_rule(
    Name="SageMakerInferencePipelineSchedule",
    ScheduleExpression=cron_expression,
    State="ENABLED",
    Description="Trigger SageMaker pipeline every Monday to Friday at 9 PM US Central Time",
)

# Get the rule ARN
rule_arn = rule_response["RuleArn"]

# Define the target to invoke the Lambda function
sagemaker_inference_arn = (
    f"arn:aws:sagemaker:us-east-2:{account}:pipeline/StockInferencePipeline"
)
# Add the target to the rule
put_targets_response = events_client.put_targets(
    Rule="SageMakerInferencePipelineSchedule",
    Targets=[
        {"Id": "SageMakerInferencePipelineTarget", "Arn": sagemaker_inference_arn}
    ],
)

print(put_targets_response)