# Data ingestion pipeline

<div class="alert alert-block alert-info">
<b>Objective</b> 
    
In this notebook, we will show how you can automate the data ingestion process. The pipeline will take raw data, parse it, transform it, validate it and then merge it before storing it into an S3 bucket.
</div>

We will be using a vanilla Process with a custom Docker image that we have pre-built for you.

(You can always built is yourself, but not here: Sagemaker studio does not have Docker so you would have to build it on your own machine or on a Sagemaker Notebook Instance)

<div class="alert alert-block alert-info">
<b>Data integrity</b> 
    
We will make use of **deepchecks** to validate the data and to mark the pipeline as successful.
</div>

# Setup

In [1]:
! pip install stepfunctions

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
You should consider upgrading via the '/opt/conda/bin/python -m pip install --upgrade pip' command.[0m


In [2]:
INSTANCE_TYPE = 'ml.m5.xlarge'

In [3]:
import boto3
from sagemaker.processing import ProcessingInput, ProcessingOutput
import sagemaker
import stepfunctions
import json
from sagemaker import Processor
import uuid

In [4]:
data_output_path = "data"
main_path = "amld22-workshop-sagemaker"

In [5]:
region = boto3.Session().region_name
boto_session = boto3.Session(region_name=region)
role = sagemaker.get_execution_role()

sessions = sagemaker.Session()
account = boto_session.client('sts').get_caller_identity()['Account']
default_bucket = sessions.default_bucket()
s3_client = boto3.client("s3", region_name=region)
sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)

print(f"Region: {region} | Default bucket: {default_bucket} | Role: {role}")

Region: eu-west-1 | Default bucket: sagemaker-eu-west-1-919788038405 | Role: arn:aws:iam::919788038405:role/service-role/AmazonSageMaker-ExecutionRole-20210604T120344


First, we upload all the raw data (available locally) to the S3 bucket.

In [6]:
from pathlib import Path

s3 = boto3.client('s3')
for file in Path('raw').glob('**/*.xlsx'):
    s3.upload_file(
        f'{file}',
        default_bucket,
        f'{main_path}/{data_output_path}/{file}'
    )
    print(f"File uploaded: {main_path}/{data_output_path}/{file}")

File uploaded: amld22-workshop-sagemaker/data/raw/ingestion/faulty_ingestion_data.xlsx
File uploaded: amld22-workshop-sagemaker/data/raw/ingestion/good_ingestion_data.xlsx
File uploaded: amld22-workshop-sagemaker/data/raw/entsoe-2016/2016_2017_data.xlsx
File uploaded: amld22-workshop-sagemaker/data/raw/terna/2017_data.xlsx
File uploaded: amld22-workshop-sagemaker/data/raw/terna/2022_data.xlsx
File uploaded: amld22-workshop-sagemaker/data/raw/terna/2019_data.xlsx
File uploaded: amld22-workshop-sagemaker/data/raw/terna/2020_data.xlsx
File uploaded: amld22-workshop-sagemaker/data/raw/terna/2018_data.xlsx
File uploaded: amld22-workshop-sagemaker/data/raw/terna/2021_data.xlsx
File uploaded: amld22-workshop-sagemaker/data/raw/entsoe-2006/2006_2015_data.xlsx


In [7]:
raw_input_data_path = f's3://{default_bucket}/{main_path}/{data_output_path}/raw'
output_data_path = f's3://{default_bucket}/{main_path}/{data_output_path}/normalized_data'

You need to build your docker image. To start, you create the ECR (Elastic Container Registry) repository. Then, you build the Docker image with the code to process the dataset and push it to such dataset.    

Also this time, your favourite ML Engineers have already done it for you, and you can use it directly.

In [8]:
#import subprocess
#docker_build = subprocess.check_output(f'bash ./build_push.sh prepare_data eu-west-1', shell=True)

In [9]:
image_uri = "919788038405.dkr.ecr.eu-west-1.amazonaws.com/prepare_data:latest"

processor = Processor(
    image_uri=image_uri,
    role=role,
    instance_type=INSTANCE_TYPE,
    instance_count=1
)

In [19]:
processor.run(
    inputs=[
        ProcessingInput(
            source=raw_input_data_path,
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="normalized_data",
            destination=output_data_path,
            source="/opt/ml/processing/normalized_data"
        )
    ],
    job_name=f"docker-data-processor-{uuid.uuid4().hex}"
)


Job Name:  858af98a01f94abba83492e0d455a472-docker-data-processor
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-eu-west-1-919788038405/amld22-workshop-sagemaker/data/raw', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'normalized_data', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-eu-west-1-919788038405/amld22-workshop-sagemaker/data/normalized_data', 'LocalPath': '/opt/ml/processing/normalized_data', 'S3UploadMode': 'EndOfJob'}}]
.......................[34mpandas version: 1.4.1[0m
[34m{'ProcessingJobArn': 'arn:aws:sagemaker:eu-west-1:919788038405:processing-job/858af98a01f94abba83492e0d455a472-docker-data-processor', 'ProcessingJobName': '858af98a01f94abba83492e0d455a472-docker-data-processor', 'AppSpecification': {'ImageUri': '919788038405.dkr.ecr.eu-west-1.amazonaws.com/prepa

We can see that the deepchecks have passed, so the data has been processed and ready to be used.

Before continuing, we need to add some permissions to the role we're using to run this notebook. 
Please, add the following Trust Relationship services:     

`
[
    "sagemaker.amazonaws.com",
    "events.amazonaws.com",
    "lambda.amazonaws.com",
    "states.amazonaws.com"
]
`    

Also, just to be sure, add the AdministratorAccess policy to the same role (**NEVER EVER DO THIS IN PRODUCTION**).

Let us now create some data that will make the pipeline fail. This will show the usefulness of the data checks.

First, we set up the recipient fo the emails in case of a failure in the pipeline

In [10]:
sns = boto3.resource('sns')

try:
    for topic in sns.topics.all():
        if topic.arn.split(':')[-1] == 'failure-topic':
            topic.delete()
except Exception as e:
    print(e)

sns_topic_arn = sns.meta.client.create_topic(Name='failure-topic')['TopicArn']

topic = sns.Topic(sns_topic_arn)

for email in [
    'gabriele.mazzola@xtreamers.io', 
    'emanuele.fabbiani@xtreamers.io', 
    'gabriele.orlandi@xtreamers.io', 
    'matteo.moroni@besharp.it'
]:
    topic.subscribe(
        Protocol='email',
        Endpoint=email
    )

## **Before continuing** make sure that the recipient accept the subscription so people can receive the notification

Then, we create a step function which is triggered by an S3 upload.

https://docs.aws.amazon.com/step-functions/latest/dg/tutorial-cloudwatch-events-s3.html

In [11]:
from stepfunctions.inputs import Placeholder
from stepfunctions.workflow import Workflow
from stepfunctions.steps import ProcessingStep, Catch, Fail, Task
# generate step function
from stepfunctions.steps import Succeed, SnsPublishStep

step_function_name = 'data_ingestion_step_function'

end_stepfunction = Succeed(
            state_id="Success"
)

fail_state = Fail(
    state_id="Fail",
)

end_stepfunction.update_parameters({"end": True})

error_step = SnsPublishStep(
    state_id="Fail state message",
    parameters={
        "TopicArn": sns_topic_arn,
        "Message.$": "$",
    }
)
error_step.next(fail_state)


ingest_data_step = Task(
    state_id='Ingest data step',
    resource="arn:aws:states:::sagemaker:createProcessingJob.sync",
    parameters={
        "ProcessingJobName.$": "$$.Execution.Input.id",
        "AppSpecification": {
            "ImageUri": image_uri,
        },
        "RoleArn": role,
        "ProcessingInputs": [
                ProcessingInput(
                    input_name="data",
                    source=raw_input_data_path,
                    destination="/opt/ml/processing/input"
                )._to_request_dict()
            ],
            "ProcessingOutputConfig": {
                "Outputs": [
                    ProcessingOutput(
                        output_name="normalized_data",
                        destination=output_data_path,
                        source="/opt/ml/processing/normalized_data"
                    )._to_request_dict()
                ]
            },
        "ProcessingResources": {
            "ClusterConfig": {
                "InstanceCount": 1,
                "InstanceType": INSTANCE_TYPE,
                "VolumeSizeInGB": 30
            },

        },
        "StoppingCondition": {
            "MaxRuntimeInSeconds": 600
        }
    }
)

ingest_data_step.add_catch(Catch(
    error_equals=["States.TaskFailed"],
    next_step=error_step
))

ingest_data_step.next(end_stepfunction)

workflow = Workflow(
    name=step_function_name,
    definition=ingest_data_step,
    role=role
)

In [12]:
from stepfunctions.exceptions import WorkflowNotFound

# workflow.delete()
sfn = boto3.client('stepfunctions')
workflow_arn = f'arn:aws:states:{region}:{account}:stateMachine:{step_function_name}'
try:
    sfn.update_state_machine(
        stateMachineArn=workflow_arn,
        definition=json.dumps(workflow.definition.to_dict())
    )
except Exception as e:
    print(e)
    workflow_arn = workflow.create()


In [13]:
s3 = boto3.resource('s3')

s3.meta.client.put_bucket_notification_configuration(
    Bucket=default_bucket,
    NotificationConfiguration={
        'EventBridgeConfiguration': {}
    }
)

{'ResponseMetadata': {'RequestId': 'V48RC5CJQ5KJJ0FN',
  'HostId': 'IcbMGw1dW0r1q90HXDDO5FvGwe5yF3EWV5ZJoPuFYvJ1rc9Qj99p5GSwkbvU86GyiWfsVb8ZMLA=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'IcbMGw1dW0r1q90HXDDO5FvGwe5yF3EWV5ZJoPuFYvJ1rc9Qj99p5GSwkbvU86GyiWfsVb8ZMLA=',
   'x-amz-request-id': 'V48RC5CJQ5KJJ0FN',
   'date': 'Sat, 26 Mar 2022 00:14:25 GMT',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0}}

In [14]:
import uuid
import json

events = boto3.client('events')
events.put_rule(
        Name='amld22-data-ingestion-rule',
        EventPattern=json.dumps({
        "source": [
            "aws.s3"
        ],
        "detail-type": [
            "Object Created"
        ],
        "detail": {
            "bucket": {
                "name": [
                    default_bucket
                ]
            },
            "object": {
                "key": [
                    {
                        "prefix": f"{main_path}/{data_output_path}/raw",
                    }
                ]
            }
        }
    }),
    State='ENABLED',
    Description='Start ingestion step function when data is uploaded in s3',
    RoleArn=role
)

print(f"Step function deployed to trigger on upload at: {main_path}/{data_output_path}/raw")

Step function deployed to trigger on upload at: amld22-workshop-sagemaker/data/raw


In [15]:
targets = events.list_targets_by_rule(Rule='amld22-data-ingestion-rule')['Targets']
if targets:
    print("Rule already has some targets. Let's clean it up.")
    ids_to_remove = [t['Id'] for t in targets]
    events.remove_targets(Rule='amld22-data-ingestion-rule', Ids=ids_to_remove)
    
events.put_targets(
    Rule='amld22-data-ingestion-rule',
    Targets=[
        {
            'Id': uuid.uuid4().hex,
            'Arn': workflow_arn,
            'RoleArn': role
        }
    ]
)

{'FailedEntryCount': 0,
 'FailedEntries': [],
 'ResponseMetadata': {'RequestId': 'ce611e19-0a58-4ed2-914e-3bdf06977206',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'ce611e19-0a58-4ed2-914e-3bdf06977206',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '41',
   'date': 'Sat, 26 Mar 2022 00:14:28 GMT'},
  'RetryAttempts': 0}}

In [33]:
default_bucket

'sagemaker-eu-west-1-919788038405'