# Performance comparison for AWS RDS vs Redshift vs FeatureStore

First we will create a simple Pandas dataframe which we will use as a feature repository (Typically we create feature repository after preprocessing and transformation of the source data saved in S3 data lake , but for simplicity sake I will use a pandas dataframe)

In [None]:
#Install all required python pacakages sqlalchemy==1.4.49 sqlalchemy-redshift

In [1]:
import uuid
import pandas as pd
from datetime import datetime
data = {'Name': ['John', 'Anna', 'Peter', 'Linda'], 'Age': [28, 22, 35, 32]}
df = pd.DataFrame(data)
#now we have to create recordId and eventTime as 2 additonal features as its a requirement for Feature Store
df['eventTime'] = [datetime.utcnow().isoformat() + "Z" for _ in range(len(df))]
df['recordId'] = [str(uuid.uuid4()) for _ in range(len(df))]  # Unique identifier for each record


In [13]:
df.head()

Unnamed: 0,Name,Age,eventTime,recordId
0,John,28,2023-09-21T13:21:57.251165Z,ad957c5c-5cf9-4921-87cd-3167f77f5b0b
1,Anna,22,2023-09-21T13:21:57.251172Z,c99b6b11-6694-465d-99ef-0f454247ca67
2,Peter,35,2023-09-21T13:21:57.251173Z,f30f24bf-f951-4122-99bf-b5d389b896eb
3,Linda,32,2023-09-21T13:21:57.251175Z,89d53b95-8093-48d1-8e7e-eb40b22ae388


Next, we will evaluate the performance of each AWS Service by measuring the time required to insert the feature repository, represented by the dataframe 'df' in our case, into their platform. Subsequently, we will also measure the time it takes to retrieve a record from that dataset. Typically, this retrieved record is then used for real-time ML model inferencing or predictions.

# Sagemaker Feature Store

First we need to create feature group for feature store 

In [2]:
import pandas as pd
import sagemaker
from sagemaker import get_execution_role
from sagemaker.feature_store.feature_group import FeatureGroup, FeatureDefinition
from sagemaker.feature_store.feature_definition import FeatureTypeEnum
from sagemaker.session import Session

# Assuming df is feature repository

# Setup
sagemaker_session = sagemaker.Session()
role_arn = get_execution_role()

# Define feature group name
feature_group_name = 'feature_repository'
bucket_name='Your_S3_bucket_name'

# Define feature definitions
feature_definitions = [
    FeatureDefinition(feature_name=col, feature_type=FeatureTypeEnum.STRING) 
    if df[col].dtype == 'object' else FeatureDefinition(feature_name=col, feature_type=FeatureTypeEnum.INTEGRAL)
    for col in df.columns
]

# Create FeatureGroup
feature_group = FeatureGroup(
    name=feature_group_name, 
    sagemaker_session=sagemaker_session, 
    feature_definitions=feature_definitions
)

# Create the feature group
feature_group.create(
    s3_uri=f's3://{bucket_name}',  # replace with your S3 bucket URI
    record_identifier_name='recordId',  # replace with your record identifier feature name
    event_time_feature_name='eventTime',  # replace with your event time feature name
    role_arn=role_arn,  # use the role ARN you have
    enable_online_store=True #For faster retrieval in case of real time inferencing
)


print("Feature Group Created Successfully!")


Feature Group Created Successfully!


Now lets calculate the time taken for data insertion and retrieval

In [25]:
import time
import boto3
import sagemaker

# Initialize the SageMaker session and FeatureStore Runtime client
sagemaker_session = sagemaker.Session()
featurestore_runtime = boto3.client(service_name='sagemaker-featurestore-runtime', region_name=sagemaker_session.boto_region_name)

# Your feature group name
feature_group_name = 'feature_repository'



# Measuring the time for inserting records.
start_time = time.time()
feature_group.ingest(data_frame=df)
feature_store_insert_time = time.time() - start_time
print(f'Feature Store Insert Time: {feature_store_insert_time}')

# Measuring the time for retrieving records.
start_time = time.time()
#Lets retrieve the third row from the df dataframe having recordId f30f24bf-f951-4122-99bf-b5d389b896eb
response = featurestore_runtime.get_record(FeatureGroupName=feature_group_name, 
                                               RecordIdentifierValueAsString='f30f24bf-f951-4122-99bf-b5d389b896eb')
feature_store_retrieve_time = time.time() - start_time
print(f'Feature Store Retrieve Time: {feature_store_retrieve_time}')

print(f'Total Time: {feature_store_insert_time + feature_store_retrieve_time}')


Feature Store Insert Time: 0.04401803016662598
Feature Store Retrieve Time: 0.03712344169616699
Total Time: 0.08114147186279297


# AWS RDS

In [40]:
import time
import pandas as pd
import mysql.connector
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# Database connection details
username = 'Your_RDS_Username'
password = 'Your_RDS_Password'
host = 'Your_RDS_Endpoint'
port = '3306'  # default port for MySQL
database = 'Your_Database_name'  
table_name = 'Your_tablename'  

# Create Engine and Session
engine_url = f'mysql+mysqlconnector://{username}:{password}@{host}/{database}'
engine = create_engine(engine_url)
Session = sessionmaker(bind=engine)

def insert_and_fetch():
    session = Session()
    try:
        # Measure Insert Time
        start_time_insert = time.time()
        engine.raw_connection().rollback()  # Explicitly Rollback
        df.to_sql(name=table_name, con=engine, index=False, if_exists='replace')
        session.commit()
        insert_time = time.time() - start_time_insert

        # Measure Fetch Time
        start_time_fetch = time.time()
        connection = mysql.connector.connect(host=host, user=username, password=password, database=database)
        cursor = connection.cursor(dictionary=True)
        cursor.execute(f"SELECT * FROM {table_name} where recordid='f30f24bf-f951-4122-99bf-b5d389b896eb';") #Lets retrieve the third row from the df dataframe having recordId f30f24bf-f951-4122-99bf-b5d389b896eb
        rows = cursor.fetchall()
        cursor.close()
        connection.close()
        fetch_time = time.time() - start_time_fetch

        # Return Insert Time, Fetch Time, and Total Time
        return insert_time, fetch_time, insert_time + fetch_time

    except Exception as e:
        print(e)
        session.rollback()
    finally:
        session.close()

# Run the function and print the results
insert_time, fetch_time, total_time = insert_and_fetch()
print(f"Insert Time: {insert_time} seconds")
print(f"Fetch Time: {fetch_time} seconds")
print(f"Total Execution Time: {total_time} seconds")


Insert Time: 0.1189734935760498 seconds
Fetch Time: 0.039176225662231445 seconds
Total Execution Time: 0.15814971923828125 seconds


# AWS Redshift

In [41]:
import time
import pandas as pd
import psycopg2
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError

# Database connection details
username = 'Your_Redshift_username'
password = 'Your_Redshift_password'
host = 'Your_Redshift_Host'
port = '5439'  # default port for Redshift
database = 'Your_redshift_database'
table_name = 'Your_redshift_tablename'

# Create Engine and Session
engine_url = f'redshift+psycopg2://{username}:{password}@{host}:{port}/{database}'
engine = create_engine(engine_url)
Session = sessionmaker(bind=engine)

def insert_and_fetch():
    session = Session()
    try:
        # Measure Insert Time
        start_time_insert = time.time()
        df.to_sql(name=table_name, con=engine, index=False, if_exists='replace', schema='public')
        session.commit()
        insert_time = time.time() - start_time_insert
        
        # Measure Fetch Time
        start_time_fetch = time.time()
        connection = psycopg2.connect(user=username, password=password, host=host, port=port, database=database)
        cursor = connection.cursor()
        cursor.execute(f"SELECT * FROM public.{table_name} where recordid='f30f24bf-f951-4122-99bf-b5d389b896eb';")#Lets retrieve the third row from the df dataframe having recordId f30f24bf-f951-4122-99bf-b5d389b896eb
        rows = cursor.fetchall()
        cursor.close()
        connection.close()
        fetch_time = time.time() - start_time_fetch
        
        # Return Insert Time, Fetch Time, and Total Time
        return insert_time, fetch_time, insert_time + fetch_time
        
    except SQLAlchemyError as e:
        print(e)
        session.rollback()
    finally:
        session.close()

# Run the function and print the results
insert_time, fetch_time, total_time = insert_and_fetch()
print(f"Insert Time: {insert_time} seconds")
print(f"Fetch Time: {fetch_time} seconds")
print(f"Total Execution Time: {total_time} seconds")


Insert Time: 1.014460802078247 seconds
Fetch Time: 0.04340100288391113 seconds
Total Execution Time: 1.0578618049621582 seconds


### As you can see it makes sense to use Sagemaker feature Store for feature storage and retrieval as it is 47% faster compared to RDS and 92% faster compared to Redshift