# From Delta Lake to Amazon SageMaker

[Delta Lake](https://delta.io/) is a common open-source framework used for storing data in Lakehouse architectures.

In this sample we demonstrate how to integrate Delta Tables with Amazon SageMaker for performing data exploration, ingestion, processing, training, and hosting for Machine Learning.

---

## 2 - Feature Engineering and Ingestion

In this notebook, we will ingest data from our Delta Tables, perform some transformations on it via code using **SageMaker Processing**, and ingesting the features into **SageMaker Feature Store**. For this purpose we will:
* Prepare a processing script for our feature engineering, including the configuration for relying on Delta Sharing for connecting to our Delta Table
* Run a SageMaker Processing job pointing towards our sample Delta Table profile file URL
* Create a SageMaker Feature Store Feature Group, both offline and online
* Work with the processed data for ingesting it to our Feature Group

<center><img src="../images/DeltaLake_to_SageMaker_2.png" width="60%"></center>

Note the transformations to the data can also be performed with other services in AWS, e.g. for low-code/no-code processing you can rely on **SageMaker Data Wrangler**, as it currently supports direct connections towards Delta Lakes via JDBC for data exploration, analysis, and feature engineering. You can check more details about this method in this blog post:

https://aws.amazon.com/blogs/machine-learning/prepare-data-from-databricks-for-machine-learning-using-amazon-sagemaker-data-wrangler/

### Processing data from Delta Lake with SageMaker Processing

In [42]:
import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor
import pandas as pd
import os
from urllib.parse import urlparse

# S3 bucket for saving processing job outputs
sm_session = sagemaker.Session()
bucket = sm_session.default_bucket()
region = sm_session.boto_region_name

# Delta Sharing profile file location - Replace these with your own if you want to customize this example
profile_file_url_s3 = f's3://{bucket}/delta_to_sagemaker/delta_sharing/profile/open-datasets.share'
table = '#delta_sharing.default.boston-housing'

profile_file_name = os.path.basename(urlparse(profile_file_url_s3).path)
role = get_execution_role()

In [24]:
sklearn_processor = SKLearnProcessor(
    framework_version="0.20.0",
    role=role,
    instance_type="ml.m5.xlarge",
    instance_count=1
)

In [59]:
%%writefile ./code/preprocessing.py
#  Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
#  Licensed under the Apache License, Version 2.0 (the "License").
#  You may not use this file except in compliance with the License.
#  A copy of the License is located at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  or in the "license" file accompanying this file. This file is distributed
#  on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
#  express or implied. See the License for the specific language governing
#  permissions and limitations under the License.

from __future__ import print_function

# Import generic functions...
import argparse
import os
import warnings
import pandas as pd
import numpy as np

import sys
import subprocess

# Install and import dependencies...
subprocess.check_call([
    sys.executable, "-m", "pip", "install", "-r",
    "/opt/ml/processing/input/code/my_package/requirements.txt",
])
import delta_sharing

# Import SKLearn libraries...
from sklearn.compose import make_column_transformer

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--profile-file", type=str)
    parser.add_argument("--table", type=str)
    args, _ = parser.parse_known_args()
    print("Received arguments {}".format(args))

    # Take the Delta Sharing profile file and create a SharingClient...
    profile_files = [os.path.join(args.profile_file, file) for file in os.listdir(args.profile_file)]
    if len(profile_files) == 0:
        raise ValueError(
            ("There are no files in {}.\n").format(args.profile_file, "profile-file")
        )
    profile_file = profile_files[0]
    print(f'Found profile file: {profile_file}')
    client = delta_sharing.SharingClient(profile_file)
    table_url = profile_file + args.table

    # Load the Delta Table as a Pandas DataFrame...
    print(f'Loading {args.table} table from Delta Lake')
    df = delta_sharing.load_as_pandas(table_url)
    print(f'Train data shape: {df.shape}')

    # Perform some sample transformations - Replace here with your own transformations...
    # ---
    
    df.dropna(inplace=True)
    df.drop_duplicates(inplace=True)

    negative_examples, positive_examples = np.bincount(df["chas"])
    print(
        "Data after cleaning: {}, {} positive examples, {} negative examples".format(
            df.shape, positive_examples, negative_examples
        )
    )

    processed_features = df

    # ---
    # Write processed data after transformations...
    processed_features_output_path = os.path.join("/opt/ml/processing/processed", "processed_features.csv")

    print("Saving processed features to {}".format(processed_features_output_path))
    pd.DataFrame(processed_features).to_csv(processed_features_output_path, header=True, index=False)

Overwriting ./code/preprocessing.py


In [60]:
from sagemaker.processing import ProcessingInput, ProcessingOutput

sklearn_processor.run(
    code="./code/preprocessing.py",
    inputs=[
        ProcessingInput(source=profile_file_url_s3, destination='/opt/ml/processing/profile/'),
        ProcessingInput(source='code/', destination="/opt/ml/processing/input/code/my_package/")
    ],
    outputs=[
        ProcessingOutput(output_name="processed_data", source="/opt/ml/processing/processed"),
    ],
    arguments=['--profile-file', '/opt/ml/processing/profile/', '--table', table],
)


Job Name:  sagemaker-scikit-learn-2022-09-01-10-37-12-320
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-eu-west-1-889960878219/delta_to_sagemaker/delta_sharing/profile/open-datasets.share', 'LocalPath': '/opt/ml/processing/profile/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'input-2', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-eu-west-1-889960878219/sagemaker-scikit-learn-2022-09-01-10-37-12-320/input/input-2', 'LocalPath': '/opt/ml/processing/input/code/my_package/', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-eu-west-1-889960878219/sagemaker-scikit-learn-2022-09-01-10-37-12-320/input/code/preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': '

In [61]:
processing_job_description = sklearn_processor.jobs[-1].describe()

output_config = processing_job_description["ProcessingOutputConfig"]
for output in output_config["Outputs"]:
    if output["OutputName"] == "processed_data":
        processed_data = output["S3Output"]["S3Uri"]

In [62]:
processed_features = pd.read_csv(processed_data + "/processed_features.csv")
print("Processed features shape: {}".format(processed_features.shape))
processed_features.head()

Processed features shape: (333, 15)


Unnamed: 0,ID,crim,zn,indus,chas,nox,rm,age,dis,rad,tax,ptratio,black,lstat,medv
0,1,0.00632,18.0,2.31,0,0.538,6.575,65.2,4.09,1,296,15.3,396.9,4.98,24.0
1,2,0.02731,0.0,7.07,0,0.469,6.421,78.9,4.9671,2,242,17.8,396.9,9.14,21.6
2,4,0.03237,0.0,2.18,0,0.458,6.998,45.8,6.0622,3,222,18.7,394.63,2.94,33.4
3,5,0.06905,0.0,2.18,0,0.458,7.147,54.2,6.0622,3,222,18.7,396.9,5.33,36.2
4,7,0.08829,12.5,7.87,0,0.524,6.012,66.6,5.5605,5,311,15.2,395.6,12.43,22.9


----

### Ingesting processed data into SageMaker Feature Store

Now that the data is pre-processsed, we will proceed to ingest it into SageMaker Feature Store. This this we will:
* Create a Feature Group for our processed data
* Ingest the records to the Feature Group
* Query the Feature Group for ensuring the data was properly ingested

In [67]:
from sagemaker.feature_store.feature_group import FeatureGroup

feature_group_name = 'boston-housing-fg'

feature_group = FeatureGroup(
    name=feature_group_name, sagemaker_session=sm_session
)

In [69]:
import time

current_time_sec = int(round(time.time()))

record_identifier_feature_name = "ID"
processed_features["EventTime"] = pd.Series([current_time_sec] * len(processed_features), dtype="float64")

In [70]:
feature_group.load_feature_definitions(data_frame=processed_features)

[FeatureDefinition(feature_name='ID', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='crim', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='zn', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='indus', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='chas', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='nox', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='rm', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='age', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='dis', feature_type=<FeatureTypeEnum.FRACTIONAL: 'Fractional'>),
 FeatureDefinition(feature_name='rad', feature_type=<FeatureTypeEnum.INTEGRAL: 'Integral'>),
 FeatureDefinition(feature_name='tax', fe

In [71]:
feature_group.create(
    s3_uri=f"s3://{bucket}/delta_to_sagemaker/",
    record_identifier_name=record_identifier_feature_name,
    event_time_feature_name="EventTime",
    role_arn=role,
    enable_online_store=True,
)

{'FeatureGroupArn': 'arn:aws:sagemaker:eu-west-1:889960878219:feature-group/boston-housing-fg',
 'ResponseMetadata': {'RequestId': '04520484-4319-4caa-ab59-7f5155c6b5c0',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '04520484-4319-4caa-ab59-7f5155c6b5c0',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '94',
   'date': 'Thu, 01 Sep 2022 10:43:17 GMT'},
  'RetryAttempts': 0}}

To confirm that your FeatureGroup has been created we use `DescribeFeatureGroup` and `ListFeatureGroups` APIs to display the created FeatureGroup.

In [72]:
feature_group.describe()

{'FeatureGroupArn': 'arn:aws:sagemaker:eu-west-1:889960878219:feature-group/boston-housing-fg',
 'FeatureGroupName': 'boston-housing-fg',
 'RecordIdentifierFeatureName': 'ID',
 'EventTimeFeatureName': 'EventTime',
 'FeatureDefinitions': [{'FeatureName': 'ID', 'FeatureType': 'Integral'},
  {'FeatureName': 'crim', 'FeatureType': 'Fractional'},
  {'FeatureName': 'zn', 'FeatureType': 'Fractional'},
  {'FeatureName': 'indus', 'FeatureType': 'Fractional'},
  {'FeatureName': 'chas', 'FeatureType': 'Integral'},
  {'FeatureName': 'nox', 'FeatureType': 'Fractional'},
  {'FeatureName': 'rm', 'FeatureType': 'Fractional'},
  {'FeatureName': 'age', 'FeatureType': 'Fractional'},
  {'FeatureName': 'dis', 'FeatureType': 'Fractional'},
  {'FeatureName': 'rad', 'FeatureType': 'Integral'},
  {'FeatureName': 'tax', 'FeatureType': 'Integral'},
  {'FeatureName': 'ptratio', 'FeatureType': 'Fractional'},
  {'FeatureName': 'black', 'FeatureType': 'Fractional'},
  {'FeatureName': 'lstat', 'FeatureType': 'Fractio

In [73]:
sm_session.boto_session.client(
    "sagemaker", region_name=region
).list_feature_groups()  # We use the boto client to list FeatureGroups

{'FeatureGroupSummaries': [{'FeatureGroupName': 'transaction-feature-group-25-12-30-54',
   'FeatureGroupArn': 'arn:aws:sagemaker:eu-west-1:889960878219:feature-group/transaction-feature-group-25-12-30-54',
   'CreationTime': datetime.datetime(2021, 1, 25, 12, 32, 5, 865000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created',
   'OfflineStoreStatus': {'Status': 'Active'}},
  {'FeatureGroupName': 'synthetic-housing-data-2',
   'FeatureGroupArn': 'arn:aws:sagemaker:eu-west-1:889960878219:feature-group/synthetic-housing-data-2',
   'CreationTime': datetime.datetime(2022, 4, 11, 10, 14, 50, 318000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Created',
   'OfflineStoreStatus': {'Status': 'Active'}},
  {'FeatureGroupName': 'orders-feature-group-14-13-34-20',
   'FeatureGroupArn': 'arn:aws:sagemaker:eu-west-1:889960878219:feature-group/orders-feature-group-14-13-34-20',
   'CreationTime': datetime.datetime(2021, 6, 14, 13, 34, 21, 316000, tzinfo=tzlocal()),
   'FeatureGroupStatus': 'Creat

After the FeatureGroup has been created, we can put data into it by using the `PutRecord` API.

It will take < 1min to ingest data for the FeatureGroup.

In [74]:
def check_feature_group_status(feature_group):
    status = feature_group.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print("Waiting for Feature Group to be Created")
        time.sleep(5)
        status = feature_group.describe().get("FeatureGroupStatus")
    print(f"FeatureGroup {feature_group.name} successfully created.")


check_feature_group_status(feature_group)

FeatureGroup boston-housing-fg successfully created.


In [75]:
feature_group.ingest(data_frame=processed_features, max_workers=3, wait=True)

IngestionManagerPandas(feature_group_name='boston-housing-fg', sagemaker_fs_runtime_client_config=<botocore.config.Config object at 0x7fb23149e370>, max_workers=3, max_processes=1, profile_name=None, _async_result=<multiprocess.pool.MapResult object at 0x7fb22e30a400>, _processing_pool=<pool ProcessPool(ncpus=1)>, _failed_indices=[])

Using an arbirary customer record id, 57 we use `get_record` to check that the data has been ingested into the feature group.

In [82]:
customer_id = 57
sample_record = sm_session.boto_session.client(
    "sagemaker-featurestore-runtime", region_name=region
).get_record(
    FeatureGroupName=feature_group_name, RecordIdentifierValueAsString=str(customer_id)
)

In [83]:
sample_record

{'ResponseMetadata': {'RequestId': '201e5cb2-20bb-48d6-bfb6-dad026d4adb7',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '201e5cb2-20bb-48d6-bfb6-dad026d4adb7',
   'content-type': 'application/json',
   'content-length': '754',
   'date': 'Thu, 01 Sep 2022 10:48:33 GMT'},
  'RetryAttempts': 0},
 'Record': [{'FeatureName': 'ID', 'ValueAsString': '57'},
  {'FeatureName': 'crim', 'ValueAsString': '0.02055'},
  {'FeatureName': 'zn', 'ValueAsString': '85.0'},
  {'FeatureName': 'indus', 'ValueAsString': '0.74'},
  {'FeatureName': 'chas', 'ValueAsString': '0'},
  {'FeatureName': 'nox', 'ValueAsString': '0.41'},
  {'FeatureName': 'rm', 'ValueAsString': '6.383'},
  {'FeatureName': 'age', 'ValueAsString': '35.7'},
  {'FeatureName': 'dis', 'ValueAsString': '9.1876'},
  {'FeatureName': 'rad', 'ValueAsString': '2'},
  {'FeatureName': 'tax', 'ValueAsString': '313'},
  {'FeatureName': 'ptratio', 'ValueAsString': '17.3'},
  {'FeatureName': 'black', 'ValueAsString': '396.9'},
  {'Featu

We use `batch_get_record` to check that all data has been ingested into two feature groups by providing customer ids.

In [85]:
all_records = sm_session.boto_session.client(
    "sagemaker-featurestore-runtime", region_name=region
).batch_get_record(
    Identifiers=[
        {
            "FeatureGroupName": feature_group_name,
            "RecordIdentifiersValueAsString": ["57", "10", "8", "124"],
        }
    ]
)

In [86]:
all_records

{'ResponseMetadata': {'RequestId': 'ec918e5e-d924-4c9a-9780-5aa167ccc476',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'ec918e5e-d924-4c9a-9780-5aa167ccc476',
   'content-type': 'application/json',
   'content-length': '1720',
   'date': 'Thu, 01 Sep 2022 10:49:34 GMT'},
  'RetryAttempts': 0},
 'Records': [{'FeatureGroupName': 'boston-housing-fg',
   'RecordIdentifierValueAsString': '124',
   'Record': [{'FeatureName': 'ID', 'ValueAsString': '124'},
    {'FeatureName': 'crim', 'ValueAsString': '0.15038'},
    {'FeatureName': 'zn', 'ValueAsString': '0.0'},
    {'FeatureName': 'indus', 'ValueAsString': '25.65'},
    {'FeatureName': 'chas', 'ValueAsString': '0'},
    {'FeatureName': 'nox', 'ValueAsString': '0.581'},
    {'FeatureName': 'rm', 'ValueAsString': '5.856'},
    {'FeatureName': 'age', 'ValueAsString': '97.0'},
    {'FeatureName': 'dis', 'ValueAsString': '1.9444'},
    {'FeatureName': 'rad', 'ValueAsString': '2'},
    {'FeatureName': 'tax', 'ValueAsString': '18