#### predictAndAlertLambda

In [None]:
import json
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

runtime = boto3.client('sagemaker-runtime')
sns = boto3.client('sns')
fs = boto3.client('sagemaker-featurestore-runtime')

FEATURE_GROUP_NAME = 'xgb-pca-feature-group'
ENDPOINT_NAME = 'fraud-xgb-endpoint-flat'
SNS_TOPIC_ARN = 'arn:aws:sns:us-east-1:869935087425:fraud-alerts'

feature_order = ['V1', 'V2', 'V3', 'V4', 'V5', 'V6', 'V7', 'V8', 'V9', 'V10',
                 'V11', 'V12', 'V13', 'V14', 'V15', 'V16', 'V17', 'V18', 'V19', 'V20',
                 'V21', 'V22', 'V23', 'V24', 'V25', 'V26', 'V27', 'V28', 'Amount']

def lambda_handler(event, context):
    for record in event['Records']:
        txn_id = record['body']
        logger.info(f"Processing transaction ID: {txn_id}")

        try:
            fs_response = fs.get_record(
                FeatureGroupName=FEATURE_GROUP_NAME,
                RecordIdentifierValueAsString=txn_id
            )
        except Exception as e:
            logger.error(f"FG fetch failed: {e}")
            continue

        try:
            # Convert FS record to dict
            record_dict = {
                item['FeatureName']: float(item['ValueAsString'])
                for item in fs_response['Record']
                if item['FeatureName'] in feature_order
            }
            features = [record_dict[f] for f in feature_order]
            body = ','.join(map(str, features))

            # Call model endpoint
            response = runtime.invoke_endpoint(
                EndpointName=ENDPOINT_NAME,
                ContentType='text/csv',
                Body=body
            )

            prediction = json.loads(response['Body'].read().decode())
            score = prediction
            logger.info(f"Prediction score: {score}")

            if score >= 0.5:#0.00001:#
                sns.publish(
                    TopicArn=SNS_TOPIC_ARN,
                    Subject="🚨 Fraud Alert",
                    Message=f"Fraudulent transaction detected!\n\nTransaction ID: {txn_id}\nScore: {score}"
                )
                logger.info("Alert sent to SNS.")
            else:
                logger.info("No alert triggered.")
        except Exception as e:
            logger.error(f"Inference or alert failed: {e}")

    return {
        'statusCode': 200,
        'body': 'Processed transactions.'
    }


#### kinesisToFeatureStore

In [None]:
import json
import base64
import boto3
import uuid
from datetime import datetime
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

sagemaker = boto3.client('sagemaker-featurestore-runtime')
sqs = boto3.client('sqs')

FEATURE_GROUP_NAME = 'xgb-pca-feature-group'
QUEUE_URL = 'https://queue.amazonaws.com/869935087425/fraud-predict-queue'

def lambda_handler(event, context):
    for record in event['Records']:
        # Decode and parse Kinesis data
        decoded_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
        payload = json.loads(decoded_data)

        # Add required fields
        payload['transaction_id'] = str(uuid.uuid4())
        payload['ingestion_time'] = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
        
        txn_id = payload['transaction_id']

        # Format for feature store
        feature_record = [
            {'FeatureName': key, 'ValueAsString': str(value)}
            for key, value in payload.items()
        ]
        logger.info(f"Decoded payload: {payload}")
        logger.info("Attempting to write to Feature Store...")

        # Put record into feature store
        sagemaker.put_record(
            FeatureGroupName=FEATURE_GROUP_NAME,
            Record=feature_record
        )

        logger.info("PutRecord success!")
        # Push txn_id to SQS
        sqs.send_message(
            QueueUrl=QUEUE_URL,
            MessageBody=txn_id
        )
        logger.info("Txn ID sent to SQS")

    return {
        'statusCode': 200,
        'body': json.dumps('Records processed and stored')
    }
