In [None]:
# # Download the dataset to s3 

# !pip install --upgrade gdown


# import gdown
# import boto3

# # Step 1: Google Drive file ID
# file_id = "1jMxcLF5uYMeJGn2rSEOy0YwficN7-2TL"  # Replace with actual ID
# download_url = f"https://drive.google.com/uc?id={file_id}"

# # Step 2: Local path
# local_file_path = "log_data.json"

# # Step 3: Download JSON file from Google Drive
# gdown.download(download_url, local_file_path, quiet=False)

# # Step 4: Upload to S3
# s3 = boto3.client('s3')
# bucket_name = "thmanyah-bucket"
# s3_key = "raw-dataset/log_data.json"

# s3.upload_file(local_file_path, bucket_name, s3_key)

# print(f"✅ Uploaded to s3://{bucket_name}/{s3_key}")

In [148]:
# Setup

import os
import boto3
import sagemaker
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep, TrainingStep ,CreateModelStep
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.functions import JsonGet
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.tuner import HyperparameterTuner, IntegerParameter, ContinuousParameter
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.processing import ScriptProcessor
from sagemaker import image_uris
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.model import Model
from sagemaker.workflow.model_step import CreateModelStep

region = boto3.Session().region_name
# sagemaker_session = sagemaker.Session()

sagemaker_session =PipelineSession()
role = sagemaker.get_execution_role()
bucket = 'thmanyah-bucket'
prefix = 'customer-churn-pipeline'
prepared_data='prepared-data'
model_artifact='model-artifact'
# Parameters
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.large")
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")

instance='ml.t3.medium'
endpoint_name = "churn-endpoint"




# Step 1: Download GitHub Scripts to S3
script_processor = SKLearnProcessor(
    framework_version='1.2-1',
    role=role,
    instance_type=instance,
    instance_count=1,
    sagemaker_session=sagemaker_session
)



step_download_scripts = ProcessingStep(
    name="DownloadScriptsFromGitHub",
    processor=script_processor,
    code="download_scripts.py",
    outputs=[
        ProcessingOutput(
            source="/opt/ml/processing/output/scripts",
            destination=f"s3://{bucket}/{prefix}/scripts"
        )
    ]
)




# Step 2: Feature Engineering
sklearn_processor = SKLearnProcessor(
    framework_version='1.2-1',
    role=role,
    instance_type='ml.t3.large',
    instance_count=1,
    sagemaker_session=sagemaker_session
)



step_process = ProcessingStep(
    name='ChurnFeatureEngineering',
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=f's3://{bucket}/{prefix}/raw-dataset/', destination='/opt/ml/processing/input')
    ],
    outputs=[
        ProcessingOutput(source="/opt/ml/processing/train", destination=f's3://{bucket}/{prefix}/{prepared_data}/train'),
        ProcessingOutput(source="/opt/ml/processing/validation", destination=f's3://{bucket}/{prefix}/{prepared_data}/validation'),
    ],
    code=f's3://{bucket}/{prefix}/scripts/feature_engineering.py',
    job_arguments=[
        '--input_data', '/opt/ml/processing/input',
        '--output_data_train', '/opt/ml/processing/train',
        '--output_data_validation', '/opt/ml/processing/validation'
    ],
    depends_on=[step_download_scripts]
)



# Step 3: Hyperparameter Tuning
xgb_estimator = XGBoost(
    entry_point='train.py',
    framework_version='1.3-1',
    instance_type=training_instance_type.default_value,
    instance_count=1,
    output_path=f's3://{bucket}/{prefix}/{model_artifact}',
    role=role,
    sagemaker_session=sagemaker_session
)

hyperparameter_ranges = {
    "max_depth": IntegerParameter(3, 10),
    "eta": ContinuousParameter(0.1, 0.5),
    "gamma": ContinuousParameter(0, 10),
    "min_child_weight": IntegerParameter(1, 10),
    "subsample": ContinuousParameter(0.5, 1.0),
}

tuner = HyperparameterTuner(
    estimator=xgb_estimator,
    objective_metric_name="validation:recall",
    hyperparameter_ranges=hyperparameter_ranges,
    objective_type="Maximize",
    max_jobs=4,
    max_parallel_jobs=2
)


# ✅ Use TuningStep instead of TrainingStep
step_tuning = TuningStep(
    name="ChurnModelTuning",
    tuner=tuner,
    depends_on=[step_process],

    inputs={
        "train": TrainingInput(f's3://{bucket}/{prefix}/{prepared_data}/train', content_type="csv"),
        "validation": TrainingInput(f's3://{bucket}/{prefix}/{prepared_data}/validation', content_type="csv"),
    
    }
)





# # Step 4: Evaluation


validation_data_s3_path = f"s3://{bucket}/{prefix}/prepared-data/validation"
evaluate_script_s3_path = f"s3://{bucket}/{prefix}/scripts/evaluate.py"


image_uri = image_uris.retrieve(
    framework="sklearn",
    region=region,
    version="1.2-1",
    instance_type="ml.m5.large",
    # image_scope="scriptprocessor"  # required for use with ScriptProcessor
)

print("Image URI:", image_uri)
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name="evaluation",
    output_name="evaluation",
    path="evaluation.json"
)
top_model_prefix_path=prefix+"/"+model_artifact+""

evaluation_processor = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    role=role,
    instance_type=instance,
    instance_count=1,
    sagemaker_session=sagemaker_session
)

step_eval = ProcessingStep(
    name='EvaluateModel',
    processor=evaluation_processor,
    depends_on=[step_tuning],
    inputs=[
        
        ProcessingInput(source=validation_data_s3_path, destination="/opt/ml/processing/validation"),
        ProcessingInput(source=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=bucket,prefix=top_model_prefix_path),
                        destination="/opt/ml/processing/model"),
    ],
    code=f's3://{bucket}/{prefix}/scripts/evaluate.py',
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation")
    ],
    property_files=[evaluation_report],
    job_arguments=[
        "--model_dir", "/opt/ml/processing/model",
        "--test_data", "/opt/ml/processing/validation",
        "--output_dir", "/opt/ml/processing/evaluation"
    ]

)



# Step 5: Register Model
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=f's3://{bucket}/{prefix}/eval-metrics/metrics.json',
        content_type="application/json"
    )
)

step_register = RegisterModel(
    name="ChurnModelRegister",
    estimator=xgb_estimator,
    model_data=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=bucket,prefix=top_model_prefix_path),
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_metrics=model_metrics
)



# Define model (with pipeline placeholder for model_data)
model = Model(
    image_uri=xgb_estimator.training_image_uri(),
    model_data=step_tuning.get_top_model_s3_uri(top_k=0,s3_bucket=bucket,prefix=top_model_prefix_path),
    role=role,
    sagemaker_session=sagemaker_session
)

# CreateModelStep
step_create_model = CreateModelStep(
    name="CreateChurnModel",
    model=model,
    depends_on=[step_eval]

)




step_deploy_model = LambdaStep(
    name="DeployChurnModelToEndpoint",
    depends_on=[step_create_model],
     lambda_func=Lambda(
        function_arn="arn:aws:lambda:us-east-1:767397763254:function:deploy_lambda",
        session=PipelineSession(),
         execution_role_arn=role,      
    ),
    inputs={
        "model_name": step_create_model.properties.ModelName,
        "endpoint_name": endpoint_name
    }
)


sns_lambda_func = Lambda(
    function_arn="arn:aws:lambda:us-east-1:767397763254:function:notify-pipeline-status",
    session=sagemaker_session
)

# LambdaStep
sns_notify_step = LambdaStep(
    name="NotifyIfConditionFails",
    depends_on=[step_create_model],
    lambda_func=sns_lambda_func,
    inputs={"message": "Model performance below threshold."}
)


# Condition Step
step_cond = ConditionStep(
    name="CheckAUCCondition",
    conditions=[
        ConditionGreaterThan(
            left=JsonGet(step_name=step_eval.name, property_file="evaluation", json_path="metrics.recall"),
            right=0.6
        )
    ],

        
    if_steps=[step_register, step_create_model,step_deploy_model],
    else_steps=[sns_notify_step]
)




INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.xlarge.
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


Image URI: 683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3


In [None]:
# Pipeline
pipeline = Pipeline(
    name="CustomerChurnPipeline",
    parameters=[processing_instance_type, training_instance_type],
    # steps=[ step_tuning, step_eval, step_cond],
    steps=[step_download_scripts, step_process, step_tuning, step_eval, step_cond],

    sagemaker_session=sagemaker_session
)

pipeline.upsert(role_arn=role)
pipeline.start()
