In [1]:
import pandas as pd
import boto3
import os
from statsmodels.stats.outliers_influence import variance_inflation_factor
 
# S3 Bucket Details
s3 = boto3.client("s3")
bucket_name = "fr-detector"
 
# Download dataset from S3
s3.download_file(bucket_name, "balanced_train/X_train.csv", "X_train.csv")
s3.download_file(bucket_name, "balanced_train/y_train.csv", "y_train.csv")
 
# Load dataset
X_train = pd.read_csv("X_train.csv")
y_train = pd.read_csv("y_train.csv")
 
# Function to calculate VIF and remove multicollinear features
def remove_multicollinear_features(X, threshold=10):
    X_numeric = X.select_dtypes(include=['number'])  # Only consider numerical features
    vif_data = pd.DataFrame()
    vif_data["Feature"] = X_numeric.columns
    vif_data["VIF"] = [variance_inflation_factor(X_numeric.values, i) for i in range(len(X_numeric.columns))]
    
    # Drop features with high VIF
    high_vif_features = vif_data[vif_data["VIF"] > threshold]["Feature"].tolist()
    print(f"Features dropped due to high multicollinearity (VIF > {threshold}): {high_vif_features}")
    X_dropped = X.drop(columns=high_vif_features, errors='ignore')
    
    return X_dropped
 
# Remove multicollinear features
X_train_processed = remove_multicollinear_features(X_train)
 
# Ensure the target column is included before saving
X_train_processed["Class"] = y_train["Class"]
 
# Save the processed dataset
os.makedirs("feature_store", exist_ok=True)
X_train_processed.to_csv("feature_store/X_train_processed.csv", index=False)
 
# Upload processed dataset to S3
s3.upload_file("feature_store/X_train_processed.csv", bucket_name, "feature_store/X_train_processed.csv")
 
print("Feature engineering completed and processed dataset uploaded to S3 successfully!")

Features dropped due to high multicollinearity (VIF > 10): ['V2', 'V3', 'V5', 'V7', 'V10', 'V12', 'V14', 'V16', 'V17']
Feature engineering completed and processed dataset uploaded to S3 successfully!


In [15]:
import pandas as pd
import boto3
import sagemaker
import numpy as np
import time
import pyarrow.parquet as pq  # Optimized storage format
from sagemaker.feature_store.feature_group import FeatureGroup
from statsmodels.stats.outliers_influence import variance_inflation_factor
from botocore.exceptions import ClientError  # Correct exception handling
 
# ✅ AWS Setup
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket_name = "fr-detector"
feature_group_name = "credit_card_fraud_features"
 
# ✅ Load Balanced Data from S3
s3 = boto3.client("s3")
s3.download_file(bucket_name, "balanced_train/X_train.csv", "X_train.csv")
s3.download_file(bucket_name, "balanced_train/y_train.csv", "y_train.csv")
 
X_train = pd.read_csv("X_train.csv")
y_train = pd.read_csv("y_train.csv")
 
# ✅ Merge X & y for correlation analysis
df = X_train.copy()
df["Class"] = y_train  # Ensure Class column is retained
 
# ✅ Step 1: Compute Correlation with Target
corr_threshold = 0.2  # Keep features correlated with target
correlation_matrix = df.corr()
selected_features = correlation_matrix["Class"].abs().sort_values(ascending=False)
selected_features = selected_features[selected_features > corr_threshold].index.tolist()
 
# ✅ Step 2: Check for Multicollinearity (VIF Analysis)
def calculate_vif(df):
    vif_data = pd.DataFrame()
    vif_data["Feature"] = df.columns
    vif_data["VIF"] = [variance_inflation_factor(df.values, i) for i in range(df.shape[1])]
    return vif_data
 
X_selected = df[selected_features].drop(columns=["Class"])  # Keep only selected features
vif_result = calculate_vif(X_selected)
 
# ✅ Drop features with high multicollinearity (VIF > 10)
low_vif_features = vif_result[vif_result["VIF"] < 10]["Feature"].tolist()
X_final = X_selected[low_vif_features]
 
# ✅ Ensure "Class" column is retained
X_final["Class"] = df["Class"].values
 
# ✅ Prepare Data for Feature Store
X_final["record_id"] = range(1, len(X_final) + 1)
X_final["event_time"] = pd.to_datetime("now", utc=True).strftime("%Y-%m-%dT%H:%M:%SZ")
 
# ✅ Save as Parquet for Feature Store Ingestion
parquet_filename = "optimized_features.parquet"
X_final.to_parquet(parquet_filename, index=False)
 
# ✅ Upload to S3 Feature Store Folder
s3.upload_file(parquet_filename, bucket_name, f"feature_store/{parquet_filename}")
 
# ✅ Step 3: Create Feature Group (If Not Exists)
feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker_session)
 
try:
    feature_group.describe()  # Check if feature group exists
    print(f"✅ Feature Group '{feature_group_name}' already exists.")
except ClientError as e:
    if "ResourceNotFound" in str(e):
        print(f"🚀 Creating Feature Group '{feature_group_name}'...")
        feature_group.load_feature_definitions(data_frame=X_final)  # Define schema
        feature_group.create(
            record_identifier_name="record_id",
            event_time_feature_name="event_time",
            role_arn=role,
            s3_uri=f"s3://{bucket_name}/feature_store/",
            enable_online_store=True,
        )
    else:
        raise  # Re-raise if error is not ResourceNotFound
 
# ✅ Step 4: Ingest Data into Feature Store in Batches
batch_size = 5000  # Reduce batch size for better stability
for i in range(0, len(X_final), batch_size):
    batch = X_final.iloc[i : i + batch_size]
    feature_group.ingest(data_frame=batch, max_workers=3, wait=True)  # Ensure stability
    print(f"✅ Ingested batch {i // batch_size + 1}")
 
print("🚀 Feature Store Ingestion Completed Successfully!")

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X_final["Class"] = df["Class"].values
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X_final["record_id"] = range(1, len(X_final) + 1)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  X_final["event_time"] = pd.to_datetime("now", utc=True).strftime("%Y-%m-%dT%H:%M:%SZ")


✅ Feature Group 'credit_card_fraud_features' already exists.
✅ Ingested batch 1
✅ Ingested batch 2
✅ Ingested batch 3
✅ Ingested batch 4
✅ Ingested batch 5
✅ Ingested batch 6
✅ Ingested batch 7
✅ Ingested batch 8
✅ Ingested batch 9
✅ Ingested batch 10
✅ Ingested batch 11
✅ Ingested batch 12
✅ Ingested batch 13
✅ Ingested batch 14
✅ Ingested batch 15
✅ Ingested batch 16
✅ Ingested batch 17
✅ Ingested batch 18
✅ Ingested batch 19
✅ Ingested batch 20
✅ Ingested batch 21
✅ Ingested batch 22
✅ Ingested batch 23
✅ Ingested batch 24
✅ Ingested batch 25
✅ Ingested batch 26
✅ Ingested batch 27
✅ Ingested batch 28
✅ Ingested batch 29
✅ Ingested batch 30
✅ Ingested batch 31
✅ Ingested batch 32
✅ Ingested batch 33
✅ Ingested batch 34
✅ Ingested batch 35
✅ Ingested batch 36
✅ Ingested batch 37
✅ Ingested batch 38
✅ Ingested batch 39
✅ Ingested batch 40
✅ Ingested batch 41
✅ Ingested batch 42
✅ Ingested batch 43
✅ Ingested batch 44
✅ Ingested batch 45
✅ Ingested batch 46
✅ Ingested batch 47
✅ Ingest

In [2]:
import boto3
import pandas as pd
from sagemaker.feature_store.feature_group import FeatureGroup
 
# Initialize Feature Group
feature_store_runtime = boto3.client('sagemaker-featurestore-runtime')
feature_group_name = "credit_card_fraud_features"
 
# Fetch sample data from Feature Store
response = feature_store_runtime.get_record(
    FeatureGroupName=feature_group_name,
    RecordIdentifierValueAsString="1"  # Change this to an actual record ID
)
 
# Print record
print(response)



sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


{'ResponseMetadata': {'RequestId': '7137ca40-f993-452d-b590-a6901796657b', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '7137ca40-f993-452d-b590-a6901796657b', 'content-type': 'application/json', 'content-length': '1174', 'date': 'Wed, 12 Mar 2025 06:22:07 GMT'}, 'RetryAttempts': 0}, 'Record': [{'FeatureName': 'V14', 'ValueAsString': '-0.253266461227142'}, {'FeatureName': 'V4', 'ValueAsString': '0.330155451931532'}, {'FeatureName': 'V11', 'ValueAsString': '0.624995774387124'}, {'FeatureName': 'V9', 'ValueAsString': '1.05456029821935'}, {'FeatureName': 'V7', 'ValueAsString': '-0.627977906310739'}, {'FeatureName': 'V2', 'ValueAsString': '-0.380782710937387'}, {'FeatureName': 'V18', 'ValueAsString': '0.651665792250448'}, {'FeatureName': 'V1', 'ValueAsString': '1.95504092199146'}, {'FeatureName': 'V6', 'ValueAsString': '-0.0861974531704895'}, {'FeatureName': 'V5', 'ValueAsString': '-0.509374248165253'}, {'FeatureName': 'V19', 'ValueAsString': '0.167986640374626'}, {'FeatureNa

In [9]:
import boto3
 
sm_client = boto3.client("sagemaker")
 
feature_group_name = "credit_card_fraud_features"  # Your feature group name
 
response = sm_client.describe_feature_group(FeatureGroupName=feature_group_name)
feature_definitions = response["FeatureDefinitions"]
 
# Extract column names
stored_features = [feature["FeatureName"] for feature in feature_definitions]
 
print(f"📝 Features in Feature Store:\n{stored_features}")
print(f"📊 Total Features in Feature Store: {len(stored_features)}")

📝 Features in Feature Store:
['V14', 'V4', 'V11', 'V9', 'V7', 'V2', 'V18', 'V1', 'V6', 'V5', 'V19', 'Class', 'record_id', 'event_time']
📊 Total Features in Feature Store: 14
