In [11]:
import os
import boto3
import pandas as pd
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.session import Session
from dotenv import load_dotenv

# Load environment variables
ENV_PATH = ".env"
load_dotenv(ENV_PATH)

# Get S3 bucket and region
bucket = os.getenv("BUCKET_NAME")
session = boto3.session.Session()
region = session.region_name

print(f"Bucket: {bucket}")
print(f"Region: {region}")

# Initialize SageMaker Feature Store session
sagemaker_session = sagemaker.Session()
role = os.getenv("SAGEMAKER_ROLE")
feature_store_session = Session(
    boto_session=session,
    sagemaker_client=boto3.client('sagemaker', region_name=region),
    sagemaker_featurestore_runtime_client=boto3.client('sagemaker-featurestore-runtime', region_name=region)
)

print(f"\nSageMaker Role: {role}")
print(f"Feature Store Session initialized")


Bucket: mlops-backblaze-d7b30cb5-us-east-1
Region: us-east-1

SageMaker Role: arn:aws:iam::656208180522:role/LabRole
Feature Store Session initialized


In [12]:
# Load Feature Group names from .env
feature_group_train = os.getenv("FEATURE_GROUP_TRAIN")
feature_group_val = os.getenv("FEATURE_GROUP_VAL")
feature_group_test = os.getenv("FEATURE_GROUP_TEST")
feature_group_prod = os.getenv("FEATURE_GROUP_PROD")

print("="*80)
print("FEATURE GROUP NAMES FROM .ENV")
print("="*80)
print(f"Training:     {feature_group_train}")
print(f"Validation:   {feature_group_val}")
print(f"Test:         {feature_group_test}")
print(f"Production:   {feature_group_prod}")
print("="*80)


FEATURE GROUP NAMES FROM .ENV
Training:     backblaze-hdd-failure-20260131-155325-train
Validation:   backblaze-hdd-failure-20260131-155325-validation
Test:         backblaze-hdd-failure-20260131-155325-test
Production:   backblaze-hdd-failure-20260131-155325-production


In [21]:
def load_feature_group_to_dataframe(feature_group_name, feature_store_session, use_offline_store=False):
    """
    Load a Feature Group from SageMaker Feature Store into a pandas DataFrame.
    
    Parameters:
    -----------
    feature_group_name : str
        Name of the feature group to load
    feature_store_session : sagemaker.Session
        SageMaker session for Feature Store
    use_offline_store : bool
        If True, use Athena to query offline store. If False, read from S3 parquet directly.
    
    Returns:
    --------
    pd.DataFrame : DataFrame containing all records from the feature group
    """
    import time
    from sagemaker.feature_store.feature_group import FeatureGroup
    
    print(f"\nLoading Feature Group: {feature_group_name}")
    
    # Create Feature Group object
    feature_group = FeatureGroup(
        name=feature_group_name,
        sagemaker_session=feature_store_session
    )
    
    # Check status
    try:
        description = feature_group.describe()
        status = description.get('FeatureGroupStatus')
        print(f"  Status: {status}")
        
        if status != 'Created':
            print(f"  ⚠ Warning: Feature Group is not in 'Created' state")
            return None
    except Exception as e:
        print(f"  ✗ Error: Feature Group not found - {str(e)[:100]}")
        return None
    
    if use_offline_store:
        # Use Athena query to retrieve data from offline store
        query_string = f'SELECT * FROM "{feature_group_name}"'
        
        print(f"  Querying offline store via Athena...")
        print(f"  Query: {query_string}")
        
        try:
            # Create Athena query
            athena_query = feature_group.athena_query()
            
            # Set database and table
            athena_query.run(
                query_string=query_string,
                output_location=f"s3://{bucket}/athena-results/"
            )
            
            # Wait for query to complete
            athena_query.wait()
            
            # Get results as DataFrame
            df = athena_query.as_dataframe()
            
            print(f"  ✓ Loaded {len(df):,} records")
            print(f"  Columns: {list(df.columns)}")
            
            return df
            
        except Exception as e:
            print(f"  ✗ Error loading data via Athena: {str(e)[:200]}")
            print(f"\n  Note: Offline store data may take a few minutes to be available.")
            print(f"  Trying direct S3 read instead...")
    
    # Read directly from S3 parquet files (faster and works immediately)
    try:
        print(f"  Reading parquet files from S3...")
        
        # Get S3 URI for offline store - use the resolved URI from description
        offline_store_config = description.get('OfflineStoreConfig', {})
        resolved_output_s3_uri = offline_store_config.get('S3StorageConfig', {}).get('ResolvedOutputS3Uri', '')
        
        if not resolved_output_s3_uri:
            # Fallback: construct from bucket  
            resolved_output_s3_uri = f"s3://{bucket}/feature-store/{feature_group_name}"
        
        # The resolved URI already points to the base path; parquet files are in /data subdirectory
        # Check if it already ends with /data, if not append it
        if resolved_output_s3_uri.endswith('/data'):
            data_s3_uri = resolved_output_s3_uri
        else:
            data_s3_uri = f"{resolved_output_s3_uri}/data"
        
        print(f"  S3 Path: {data_s3_uri}")
        
        # Read parquet files using pandas
        import pyarrow.parquet as pq
        import pyarrow.dataset as ds
        
        # Read the dataset (partitioned by year/month/day/hour)
        dataset = ds.dataset(data_s3_uri, format='parquet', partitioning='hive')
        df = dataset.to_table().to_pandas()
        
        # Drop internal columns and partition columns
        columns_to_drop = [col for col in df.columns if col in ['write_time', 'api_invocation_time', 'is_deleted', 'year', 'month', 'day', 'hour']]
        if columns_to_drop:
            df = df.drop(columns=columns_to_drop)
        
        print(f"  ✓ Loaded {len(df):,} records")
        print(f"  Columns: {list(df.columns)[:10]}..." if len(df.columns) > 10 else f"  Columns: {list(df.columns)}")
        
        return df
        
    except Exception as e:
        print(f"  ✗ Error loading data from S3: {str(e)[:200]}")
        import traceback
        traceback.print_exc()
        return None


In [18]:
# Check the actual S3 structure for one feature group
s3_client = boto3.client('s3', region_name=region)

print("Checking S3 structure for feature store...")
prefix = "feature-store/"
response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix, Delimiter='/', MaxKeys=10)

print(f"\nTop-level directories under {prefix}:")
for item in response.get('CommonPrefixes', []):
    print(f"  {item['Prefix']}")

# Check one feature group in detail
if feature_group_train:
    fg_prefix = f"feature-store/{feature_group_train}/"
    print(f"\nChecking structure under {fg_prefix}:")
    response = s3_client.list_objects_v2(Bucket=bucket, Prefix=fg_prefix, MaxKeys=20)
    
    for obj in response.get('Contents', [])[:10]:
        print(f"  {obj['Key']}")


Checking S3 structure for feature store...

Top-level directories under feature-store/:
  feature-store/backblaze-hdd-failure-20260131-145420-production/
  feature-store/backblaze-hdd-failure-20260131-145420-test/
  feature-store/backblaze-hdd-failure-20260131-145420-train/
  feature-store/backblaze-hdd-failure-20260131-145420-validation/
  feature-store/backblaze-hdd-failure-20260131-150433-production/
  feature-store/backblaze-hdd-failure-20260131-150433-test/
  feature-store/backblaze-hdd-failure-20260131-150433-train/
  feature-store/backblaze-hdd-failure-20260131-150433-validation/
  feature-store/backblaze-hdd-failure-20260131-155325-production/
  feature-store/backblaze-hdd-failure-20260131-155325-test/

Checking structure under feature-store/backblaze-hdd-failure-20260131-155325-train/:
  feature-store/backblaze-hdd-failure-20260131-155325-train/656208180522/sagemaker/us-east-1/offline-store/backblaze-hdd-failure-20260131-155325-train-1769874829/backblaze-hdd-failure-20260131-1

In [22]:
# Load all feature groups into dataframes
print("="*80)
print("LOADING FEATURE GROUPS INTO DATAFRAMES")
print("="*80)

# Load training data
df_train = load_feature_group_to_dataframe(feature_group_train, feature_store_session)

# Load validation data
df_val = load_feature_group_to_dataframe(feature_group_val, feature_store_session)

# Load test data
df_test = load_feature_group_to_dataframe(feature_group_test, feature_store_session)

# Load production data
df_prod = load_feature_group_to_dataframe(feature_group_prod, feature_store_session)

print("\n" + "="*80)
print("LOADING SUMMARY")
print("="*80)

datasets = {
    'Training': df_train,
    'Validation': df_val,
    'Test': df_test,
    'Production': df_prod
}

for name, df in datasets.items():
    if df is not None:
        print(f"\n{name} Dataset:")
        print(f"  Shape: {df.shape}")
        print(f"  Failures: {df['failure'].sum() if 'failure' in df.columns else 'N/A'}")
    else:
        print(f"\n{name} Dataset: Failed to load")


LOADING FEATURE GROUPS INTO DATAFRAMES

Loading Feature Group: backblaze-hdd-failure-20260131-155325-train
  Status: Created
  Reading parquet files from S3...
  S3 Path: s3://mlops-backblaze-d7b30cb5-us-east-1/feature-store/backblaze-hdd-failure-20260131-155325-train/656208180522/sagemaker/us-east-1/offline-store/backblaze-hdd-failure-20260131-155325-train-1769874829/data
  ✓ Loaded 651,891 records
  Columns: ['record_id', 'event_time', 'serial_number', 'date', 'pct_one_star', 'pct_two_star', 'smart_5_raw', 'smart_187_raw', 'smart_188_raw', 'smart_197_raw']...

Loading Feature Group: backblaze-hdd-failure-20260131-155325-validation
  Status: Created
  Reading parquet files from S3...
  S3 Path: s3://mlops-backblaze-d7b30cb5-us-east-1/feature-store/backblaze-hdd-failure-20260131-155325-validation/656208180522/sagemaker/us-east-1/offline-store/backblaze-hdd-failure-20260131-155325-validation-1769874830/data
  ✓ Loaded 162,918 records
  Columns: ['record_id', 'event_time', 'serial_number

In [23]:
# Display sample data from training set
if df_train is not None:
    print("="*80)
    print("SAMPLE DATA FROM TRAINING SET")
    print("="*80)
    print(f"\nFirst 5 rows:")
    display(df_train.head())
    
    print(f"\nData types:")
    print(df_train.dtypes)
    
    print(f"\nBasic statistics:")
    display(df_train.describe())


SAMPLE DATA FROM TRAINING SET

First 5 rows:


Unnamed: 0,record_id,event_time,serial_number,date,pct_one_star,pct_two_star,smart_5_raw,smart_187_raw,smart_188_raw,smart_197_raw,smart_198_raw,failure
0,ZL23V7T1_2025-09-24,1769875000.0,ZL23V7T1,2025-09-24,25.12,7.27,0.0,0.0,0.0,0.0,0.0,0
1,22E0A21AFV8G_2025-09-25,1769875000.0,22E0A21AFV8G,2025-09-25,17.01,6.1,0.0,0.0,0.0,0.0,0.0,0
2,22G0A08AFV8G_2025-09-26,1769875000.0,22G0A08AFV8G,2025-09-26,17.01,6.1,3.0,0.0,0.0,0.0,0.0,0
3,8160A08HFV8G_2025-09-25,1769875000.0,8160A08HFV8G,2025-09-25,17.01,6.1,0.0,0.0,0.0,0.0,0.0,0
4,8160A096FV8G_2025-09-24,1769875000.0,8160A096FV8G,2025-09-24,17.01,6.1,0.0,0.0,0.0,0.0,0.0,0



Data types:
record_id         object
event_time       float64
serial_number     object
date              object
pct_one_star     float64
pct_two_star     float64
smart_5_raw      float64
smart_187_raw    float64
smart_188_raw    float64
smart_197_raw    float64
smart_198_raw    float64
failure            int64
dtype: object

Basic statistics:


Unnamed: 0,event_time,pct_one_star,pct_two_star,smart_5_raw,smart_187_raw,smart_188_raw,smart_197_raw,smart_198_raw,failure
count,651891.0,651891.0,651891.0,651891.0,651891.0,651891.0,651891.0,651891.0,651891.0
mean,1769875000.0,18.839298,6.368991,45.554063,2.649619,395704100.0,1.813253,1.175213,7.7e-05
std,0.0,4.711874,0.675577,1152.373257,330.320273,13865350000.0,66.307377,50.599231,0.008758
min,1769875000.0,13.55,5.46,0.0,0.0,0.0,0.0,0.0,0.0
25%,1769875000.0,15.45,5.77,0.0,0.0,0.0,0.0,0.0,0.0
50%,1769875000.0,17.01,6.1,0.0,0.0,0.0,0.0,0.0,0.0
75%,1769875000.0,25.12,7.27,0.0,0.0,0.0,0.0,0.0,0.0
max,1769875000.0,25.12,7.27,65528.0,65535.0,2207647000000.0,17573.0,17573.0,1.0


In [24]:
# Prepare X and y datasets for modeling
feature_cols = ['pct_one_star', 'pct_two_star', 'smart_5_raw', 
                'smart_187_raw', 'smart_188_raw', 'smart_197_raw', 'smart_198_raw']

if df_train is not None and df_val is not None and df_test is not None:
    print("="*80)
    print("PREPARING FEATURE MATRICES AND TARGET VECTORS")
    print("="*80)
    
    # Training data
    X_train = df_train[feature_cols].copy()
    y_train = df_train['failure'].copy()
    
    # Validation data
    X_val = df_val[feature_cols].copy()
    y_val = df_val['failure'].copy()
    
    # Test data
    X_test = df_test[feature_cols].copy()
    y_test = df_test['failure'].copy()
    
    # Production data (if needed)
    if df_prod is not None:
        X_prod = df_prod[feature_cols].copy()
        y_prod = df_prod['failure'].copy()
    
    print(f"\nTraining Set:")
    print(f"  X_train: {X_train.shape}")
    print(f"  y_train: {y_train.shape}")
    print(f"  Failure rate: {y_train.sum() / len(y_train) * 100:.4f}%")
    
    print(f"\nValidation Set:")
    print(f"  X_val: {X_val.shape}")
    print(f"  y_val: {y_val.shape}")
    print(f"  Failure rate: {y_val.sum() / len(y_val) * 100:.4f}%")
    
    print(f"\nTest Set:")
    print(f"  X_test: {X_test.shape}")
    print(f"  y_test: {y_test.shape}")
    print(f"  Failure rate: {y_test.sum() / len(y_test) * 100:.4f}%")
    
    if df_prod is not None:
        print(f"\nProduction Set:")
        print(f"  X_prod: {X_prod.shape}")
        print(f"  y_prod: {y_prod.shape}")
        print(f"  Failure rate: {y_prod.sum() / len(y_prod) * 100:.4f}%")
    
    print(f"\n" + "="*80)
    print(f"Feature columns ({len(feature_cols)}):")
    for i, col in enumerate(feature_cols, 1):
        print(f"  {i}. {col}")
    
else:
    print("\n⚠ Warning: Not all datasets loaded successfully. Cannot prepare feature matrices.")


PREPARING FEATURE MATRICES AND TARGET VECTORS

Training Set:
  X_train: (651891, 7)
  y_train: (651891,)
  Failure rate: 0.0077%

Validation Set:
  X_val: (162918, 7)
  y_val: (162918,)
  Failure rate: 0.0117%

Test Set:
  X_test: (162962, 7)
  y_test: (162962,)
  Failure rate: 0.0049%

Production Set:
  X_prod: (651878, 7)
  y_prod: (651878,)
  Failure rate: 0.0081%

Feature columns (7):
  1. pct_one_star
  2. pct_two_star
  3. smart_5_raw
  4. smart_187_raw
  5. smart_188_raw
  6. smart_197_raw
  7. smart_198_raw
