In [1]:
!pip install pandas sagemaker boto3



In [2]:
import os
import time
import boto3
import json
import pandas as pd
import logging
from sagemaker import Session, get_execution_role
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.feature_store.feature_definition import FeatureDefinition, FeatureTypeEnum
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.sklearn.model import SKLearnModel
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer

sagemaker.config INFO - Not applying SDK defaults from location: C:\ProgramData\sagemaker\sagemaker\config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: C:\Users\Praveen\AppData\Local\sagemaker\sagemaker\config.yaml


In [3]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [4]:
CONFIG = {
    'feature_group_name': 'rr-user-events-v2',
    'endpoint_name': 'rr-featurestore-endpoint-v2',
    'instance_type_training': 'ml.t2.medium',
    'instance_type_endpoint': 'ml.t2.medium',
    'framework_version': '1.0-1',
    'sample_size': 1000,
    'max_workers': 3
}

In [5]:
def setup_environment():
    """Setup SageMaker environment and download data"""
    try:
        # Download RetailRocket dataset
        os.system('pip install --quiet kaggle')
        os.environ['KAGGLE_CONFIG_DIR'] = os.path.expanduser('~/.kaggle')
        os.system('mkdir -p data && kaggle datasets download retailrocket/ecommerce-dataset -p data --unzip')
        
        # Load and prepare events data
        events = pd.read_csv('data/events.csv')
        events['event_time'] = events['timestamp'].astype(int) * 1000
        events.rename(columns={'visitorid': 'user_id'}, inplace=True)
        df_events = events[['user_id', 'event_time', 'event']].head(CONFIG['sample_size'])
        
        # Ensure proper data types
        df_events['user_id'] = df_events['user_id'].astype(str)
        df_events['event_time'] = df_events['event_time'].astype(int)
        df_events['event'] = df_events['event'].astype(str)
        
        logger.info(f"Loaded {len(df_events)} events")
        return df_events
    except Exception as e:
        logger.error(f"Failed to setup environment: {e}")
        raise

In [6]:
def create_feature_group(sagemaker_session, role, bucket):
    """Create Feature Group with proper configuration"""
    try:
        feature_definitions = [
            FeatureDefinition('user_id', FeatureTypeEnum.STRING),
            FeatureDefinition('event_time', FeatureTypeEnum.INTEGRAL),
            FeatureDefinition('event', FeatureTypeEnum.STRING)
        ]
        
        feature_group = FeatureGroup(
            name=CONFIG['feature_group_name'],
            sagemaker_session=sagemaker_session,
            record_identifier_name='user_id',
            event_time_feature_name='event_time',
            feature_definitions=feature_definitions
        )
        
        # Create with both online and offline store
        feature_group.create(
            s3_uri=f's3://{bucket}/retailrocket/offline-store/',
            record_identifier_name='user_id',
            event_time_feature_name='event_time',
            role_arn=role,
            enable_online_store=True,
            offline_store_config={
                'S3StorageConfig': {
                    'S3Uri': f's3://{bucket}/retailrocket/offline-store/'
                }
            }
        )
        
        # Wait for feature group to be created
        logger.info("Waiting for feature group creation to complete...")
        feature_group.wait()
        logger.info(f"Feature Group '{CONFIG['feature_group_name']}' created successfully")
        
        return feature_group
    except Exception as e:
        logger.error(f"Failed to create feature group: {e}")
        raise

In [7]:
def ingest_data_to_feature_store(feature_group, df_events):
    """Efficiently ingest data using batch ingest"""
    try:
        # Use batch ingest instead of individual put_record calls
        feature_group.ingest(
            data_frame=df_events,
            max_workers=CONFIG['max_workers'],
            wait=True
        )
        logger.info("Events ingested successfully into Feature Store")
    except Exception as e:
        logger.error(f"Failed to ingest data: {e}")
        raise

In [8]:
def prepare_training_data(df_events, sagemaker_session):
    """Aggregate data and upload to S3 for training"""
    try:
        # Create aggregated features
        df_agg = df_events.groupby('user_id').agg({
            'event': [
                lambda x: (x == 'view').sum(),
                lambda x: (x == 'transaction').sum(),
                'count'
            ],
            'event_time': 'max'
        }).reset_index()
        
        # Flatten column names
        df_agg.columns = ['user_id', 'view_count', 'transaction_count', 'total_events', 'last_event_time']
        
        # Save and upload training data
        csv_path = '/tmp/rr_user_agg.csv'
        df_agg.to_csv(csv_path, index=False)
        s3_train_path = sagemaker_session.upload_data(
            path=csv_path, 
            key_prefix='retailrocket/training'
        )
        
        logger.info(f"Training data uploaded to: {s3_train_path}")
        return s3_train_path, df_agg
    except Exception as e:
        logger.error(f"Failed to prepare training data: {e}")
        raise

In [9]:
def create_training_script():
    """Create improved training script"""
    script = '''
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import joblib
import os

def main():
    # Read data from SageMaker input path
    input_path = '/opt/ml/input/data/training'
    files = os.listdir(input_path)
    csv_file = [f for f in files if f.endswith('.csv')][0]
    
    df = pd.read_csv(os.path.join(input_path, csv_file))
    
    # Prepare features and target
    feature_cols = ['view_count', 'total_events']
    X = df[feature_cols]
    y = df['transaction_count']
    
    # Split data for validation
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # Train model
    model = RandomForestRegressor(n_estimators=50, random_state=42)
    model.fit(X_train, y_train)
    
    # Validate model
    y_pred = model.predict(X_test)
    mse = mean_squared_error(y_test, y_pred)
    print(f"Validation MSE: {mse}")
    
    # Save model
    model_path = '/opt/ml/model/model.joblib'
    joblib.dump(model, model_path)
    
    # Save feature names for inference
    feature_names_path = '/opt/ml/model/feature_names.json'
    import json
    with open(feature_names_path, 'w') as f:
        json.dump(feature_cols, f)
    
    print("Model training completed successfully")

if __name__ == '__main__':
    main()
'''
    with open('train.py', 'w') as f:
        f.write(script)
    logger.info("Training script created")

def create_inference_script():
    """Create improved inference script with proper feature retrieval"""
    script = '''
import json
import joblib
import pandas as pd
import boto3
import os

def model_fn(model_dir):
    """Load model and feature names"""
    model = joblib.load(os.path.join(model_dir, 'model.joblib'))
    
    # Load feature names
    with open(os.path.join(model_dir, 'feature_names.json'), 'r') as f:
        feature_names = json.load(f)
    
    return {'model': model, 'feature_names': feature_names}

def input_fn(request_body, request_content_type):
    """Parse input request"""
    if request_content_type == 'application/json':
        data = json.loads(request_body)
        return data
    else:
        raise ValueError(f"Unsupported content type: {request_content_type}")

def predict_fn(input_data, model_dict):
    """Make predictions"""
    model = model_dict['model']
    feature_names = model_dict['feature_names']
    
    # If input contains user_id, we would fetch features from Feature Store
    # For this example, we expect features to be provided directly
    if 'features' in input_data:
        features_df = pd.DataFrame([input_data['features']], columns=feature_names)
        predictions = model.predict(features_df)
        return predictions.tolist()
    else:
        raise ValueError("Expected 'features' in input data")

def output_fn(prediction, response_content_type):
    """Format output response"""
    if response_content_type == 'application/json':
        return json.dumps({'predictions': prediction})
    else:
        raise ValueError(f"Unsupported response content type: {response_content_type}")
'''
    with open('inference.py', 'w') as f:
        f.write(script)
    logger.info("Inference script created")

In [10]:
def train_and_deploy_model(s3_train_path, role, sagemaker_session):
    """Train model and deploy endpoint"""
    try:
        # Create training scripts
        create_training_script()
        create_inference_script()
        
        # Train model
        sklearn_estimator = SKLearn(
            entry_point='train.py',
            source_dir='.',
            role=role,
            instance_type=CONFIG['instance_type_training'],
            framework_version=CONFIG['framework_version'],
            py_version='py3',
            sagemaker_session=sagemaker_session
        )
        
        sklearn_estimator.fit({'training': s3_train_path})
        logger.info("Model training completed")
        
        # Deploy model
        sklearn_model = SKLearnModel(
            model_data=sklearn_estimator.model_data,
            role=role,
            entry_point='inference.py',
            framework_version=CONFIG['framework_version'],
            py_version='py3'
        )
        
        predictor = sklearn_model.deploy(
            initial_instance_count=1,
            instance_type=CONFIG['instance_type_endpoint'],
            endpoint_name=CONFIG['endpoint_name'],
            serializer=JSONSerializer(),
            deserializer=JSONDeserializer()
        )
        
        logger.info(f"Model deployed to endpoint: {CONFIG['endpoint_name']}")
        return predictor
    except Exception as e:
        logger.error(f"Failed to train and deploy model: {e}")
        raise

In [11]:
def perform_inference(df_agg, region):
    """Perform inference with sample data"""
    try:
        client = boto3.client('sagemaker-runtime', region_name=region)
        
        # Get sample user data
        sample_user = df_agg.iloc[0]
        payload = {
            'features': {
                'view_count': int(sample_user['view_count']),
                'total_events': int(sample_user['total_events'])
            }
        }
        
        response = client.invoke_endpoint(
            EndpointName=CONFIG['endpoint_name'],
            ContentType='application/json',
            Body=json.dumps(payload)
        )
        
        result = json.loads(response['Body'].read())
        logger.info(f"Inference result: {result}")
        return result
    except Exception as e:
        logger.error(f"Failed to perform inference: {e}")
        raise

In [12]:
def cleanup_resources(sagemaker_session, feature_group):
    """Clean up AWS resources"""
    try:
        # Delete endpoint
        sagemaker_session.delete_endpoint(CONFIG['endpoint_name'])
        logger.info(f"Deleted endpoint: {CONFIG['endpoint_name']}")
        
        # Delete feature group
        feature_group.delete()
        logger.info(f"Deleted feature group: {CONFIG['feature_group_name']}")
    except Exception as e:
        logger.warning(f"Cleanup warning: {e}")

In [13]:
def main():
    """Main execution function"""
    try:
        # Setup
        logger.info("Starting SageMaker Feature Store example...")
        sagemaker_session = Session()
        boto_session = boto3.Session()
        region = boto_session.region_name
        role = get_execution_role()
        bucket = sagemaker_session.default_bucket()
        
        # Load data
        df_events = setup_environment()
        
        # Create Feature Group
        feature_group = create_feature_group(sagemaker_session, role, bucket)
        
        # Ingest data
        ingest_data_to_feature_store(feature_group, df_events)
        
        # Prepare training data
        s3_train_path, df_agg = prepare_training_data(df_events, sagemaker_session)
        
        # Train and deploy model
        predictor = train_and_deploy_model(s3_train_path, role, sagemaker_session)
        
        # Perform inference
        result = perform_inference(df_agg, region)
        print(f"Final inference result: {result}")
        
        # Uncomment the following line if you want to clean up resources
        # cleanup_resources(sagemaker_session, feature_group)
        
        logger.info("Example completed successfully!")
        
    except Exception as e:
        logger.error(f"Example failed: {e}")
        raise

# Run the main function
if __name__ == "__main__":
    main()

INFO:__main__:Starting SageMaker Feature Store example...
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
INFO:botocore.credentials:Found credentials in shared credentials file: ~/.aws/credentials
ERROR:__main__:Example failed: The current AWS identity is not a role: arn:aws:iam::750952118292:user/windows-pc, therefore it cannot be used as a SageMaker execution role
