In [8]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker.s3 import S3Downloader
from sagemaker.s3_utils import s3_path_join
from sagemaker.utils import name_from_base
from sagemaker.workflow.notebook_job_step import NotebookJobStep
from sagemaker.workflow.parameters import ParameterInteger, ParameterBoolean, ParameterString
from sagemaker.workflow.parallelism_config import ParallelismConfiguration
from sagemaker.workflow.pipeline import Pipeline
from sagemaker import session
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.triggers import PipelineSchedule
import s3fs
import datetime
import pickle as pkl
import inspect
s3 = s3fs.S3FileSystem()

## FULL pipeline meta

**Create a pipeline meta to solve the dependencies issues and save efforts.**\
note: if you don't have super complicated pipeline, you don't need to have this part. This part can be very confusing.

Problems:

- Dependencies issue: when you have complicated dependent steps. You have to configure it from bottom to top, otherwise, you have nothing to put into **"depends_on"** field.

- Some configuration can be shared between your steps like: 'image_uri','kernel_name','s3_root_uri','role','subnets' and 'security_group_ids'. In most of case, they should be the same in one project.

----------------------------------------------------------------------------------------

To solve these issue. I create a meta infomation for the pipeline.

It has two major part:

- the common configuration can be share in steps.
    
- the jobs:

> jobs:{\
>> <job_name>:{\
>>> **step_configuration** + **"depend_on_"**}}.

The field **"depend_on_"** takes **job_name NOT step_name**

**Finall, the pipeline_config function will take the pipeline meta to build a pipeline and recurrsively solve the dependent issues.**

In [9]:
# sagemaker configs
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
_date = str(datetime.datetime.now().date())
print(_date)

nfadd_pipeline_meta = {
        # common configuration for the project.
    'image_uri':"056156273734.dkr.ecr.us-east-1.amazonaws.com/pr-home-ds/model-340:1.0.5",
    'kernel_name':"model-340",
    's3_root_uri':'s3://pr-home-datascience/Projects/AdHoc/InternProjects/2025/2025InternSummer Driving distance for Prospect Table/pipeline_logs',
    'role':role,
    'subnets':["subnet-0ae5d4074bd33204f","subnet-02474270a2e11662a"],
    'security_group_ids':["sg-08336f3ee80233920"],
    
    # jobs can be parallel for the project.
    'jobs':{
        'drivingdist':{'name':'driving_dist_time'.replace('_','-'),
                        'depends_on_':None,
                        'display_name':'driving_dist_time'.replace('_','-'),
                        'description':'Calculate the driving distance and time from properties to fire stations',
                        'input_notebook':"property_FS.ipynb",
                        'notebook_job_name':"Driving-distance-time_Wei",
                        'instance_type':'ml.r5.xlarge',
                        'additional_dependencies':['./read_data.py','./functions.py'], #TODO
                        'max_runtime_in_seconds': 259200
        }
}}

2025-08-04


In [10]:
# recursion function to fix dependencies issues.
def pipeline_config(pipeline_job_name:str,
                    pipeline_meta:dict,
                    pipeline_affix='default'):
    """ This is the function to help fix dependent issues.
    Args:
        pipeline_job_name: the name for your pipeline
        pipeline_meta: the pipeline meta we prepared
        pipeline_affix: the pipeline_job_name should be unique in sagemaker. so sometimes, you can just add affix as you like
    Returns:
        pipeline
    """
    def _f(step_name,pipeline_meta):
        dependency = pipeline_meta['jobs'][step_name]['depends_on_']
        if dependency is not None:
            depends_on=[]
            for sn in dependency:
                depends_on.append(_f(step_name=sn,pipeline_meta=pipeline_meta))
            pipeline_meta['jobs'][step_name]['depends_on'] = depends_on

        nb_args = list(inspect.signature(NotebookJobStep).parameters.keys())# get all the arguements of NotebookJobStep.
        project_args = {k:v for k,v in pipeline_meta.items() if k in nb_args}
        jobs_args = {k:v for k,v in pipeline_meta['jobs'][step_name].items() if k in nb_args}
        
        if step_name in nb_steps.keys():
            return nb_steps[step_name]
        
        nb_jobs = NotebookJobStep(**project_args,**jobs_args)
        nb_steps[step_name] = nb_jobs #update the existed steps holder
        return nb_jobs
    
    pipeline_job_name = f'{pipeline_job_name}-{pipeline_affix}'.replace('_','-')
    steps_ls=[]
    nb_steps={} #hold for existed steps

    # loop the jobs pipeline_meta
    for task in pipeline_meta['jobs'].keys():
        steps_ls.append(_f(task,pipeline_meta))

    pipeline = Pipeline(name=pipeline_job_name,steps=steps_ls)

    return pipeline

In [11]:
pipeline = pipeline_config(pipeline_job_name='driving_distance_time',
                            pipeline_meta=nfadd_pipeline_meta)

In [12]:
pipeline.upsert(session.get_execution_role())


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:056156273734:pipeline/driving-distance-time-default',
 'ResponseMetadata': {'RequestId': 'cf61b195-f22b-4f51-9d1c-0800b5281e91',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'cf61b195-f22b-4f51-9d1c-0800b5281e91',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '120',
   'date': 'Mon, 04 Aug 2025 12:09:38 GMT'},
  'RetryAttempts': 0}}

In [13]:
pipeline.start()

_PipelineExecution(arn='arn:aws:sagemaker:us-east-1:056156273734:pipeline/driving-distance-time-default/execution/47tecpt4w85x', sagemaker_session=<sagemaker.session.Session object at 0x7fb4a7a546b0>)

In [25]:
pipeline = pipeline_config(pipeline_job_name='address-match-w-qpid',
                            pipeline_meta=add2qpid_pipeline_meta,
                            pipeline_affix='scheduled')
# Create a pipeline
# pipeline.create(session.get_execution_role())

# Update an existed pipeline
pipeline.update(session.get_execution_role())

In [10]:
add_qpid_scheduler = PipelineSchedule(name="add_match_w_qpid",cron="0 2 * * ? *") # 2 utc is 10 pm in ets
response = pipeline.put_triggers(triggers=[add_qpid_scheduler],role_arn=session.get_execution_role())

In [13]:
pipeline.start()

_PipelineExecution(arn='arn:aws:sagemaker:us-east-1:056156273734:pipeline/address-match-w-qpid-scheduled/execution/nlqrwsvjnq74', sagemaker_session=<sagemaker.session.Session object at 0x7fa1a0c0bec0>)

# Fetch the training log

Here are some piece of code to retrive the processed notebook from s3.\
You can find the output notebooks from `s3_root_url` that you configured in your pipeline.

In [2]:
from sagemaker.s3_utils import s3_path_join
from sagemaker.utils import _tmpdir
from sagemaker.s3 import S3Downloader
import tarfile
import os
sagemaker_session = sagemaker.Session()


# get job details
def _get_training_job_details(notebook_job_step):
    training_job_arn = notebook_job_step["Metadata"]["TrainingJob"]["Arn"]

    return sagemaker_session.sagemaker_client.describe_training_job(
        TrainingJobName=training_job_arn.split("/")[1]
    )


def _download_notebook(output_s3_uri, kms_key=None):
    download_folder = "outputs"

    if not os.path.exists(download_folder):
        os.makedirs(download_folder)

    with _tmpdir() as temp_output_folder:
        S3Downloader.download(
            output_s3_uri,
            temp_output_folder,
            sagemaker_session=sagemaker_session,
            kms_key=kms_key,
        )

        with tarfile.open(os.path.join(temp_output_folder, "output.tar.gz"), "r:gz") as tar:
            tar.extractall(download_folder)
            print(f"Downloaded to {download_folder}")

In [4]:

_download_notebook('s3://pr-home-datascience/Projects/Underwriting/UnconstraintModels/HO_UNCS_V0340_2024/production/scoring_mon/model340-scoring-testt/tqgab22u0pvv/sc-raw/m340-data-processed-tqgab22u0pvv-WobkA1r1fu/output/')

Downloaded to outputs
