<a href="https://colab.research.google.com/github/jman4162/aws-for-ml/blob/main/Advanced_Amazon_SNS_Tutorial_for_Machine_Learning_Scientists.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Advanced Amazon SNS Tutorial for Machine Learning Scientists

This tutorial will demonstrate how to leverage Amazon SNS in machine learning pipelines, real-time inference systems, and distributed training environments.

## Prerequisites

- Familiarity with Python and AWS services
- Basic understanding of machine learning concepts
- AWS account with appropriate permissions
- Python 3.x, Boto3, and relevant ML libraries (e.g., TensorFlow, PyTorch, scikit-learn) installed

## 1. Model Training Notifications

Use SNS to send notifications about the progress and completion of long-running model training jobs.

```python
import boto3
import time
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification

sns = boto3.client('sns')
topic_arn = 'your_topic_arn'  # Create this topic in advance

# Generate a sample dataset
X, y = make_classification(n_samples=10000, n_features=20, n_classes=2)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

# Function to send SNS notification
def send_notification(message):
    sns.publish(TopicArn=topic_arn, Message=message)

# Start training
send_notification("Model training started")
start_time = time.time()

# Train the model
model = RandomForestClassifier(n_estimators=100)
model.fit(X_train, y_train)

# Training completed
duration = time.time() - start_time
accuracy = model.score(X_test, y_test)
send_notification(f"Model training completed. Duration: {duration:.2f}s, Accuracy: {accuracy:.4f}")
```

## 2. Distributed Training Coordination

Use SNS to coordinate distributed training across multiple instances or containers.

```python
import boto3
import json
import os
import torch.distributed as dist

sns = boto3.client('sns')
sqs = boto3.client('sqs')

# Create SNS topic and SQS queue for coordination
topic_arn = sns.create_topic(Name='DistributedTrainingCoordination')['TopicArn']
queue_url = sqs.create_queue(QueueName='TrainingCoordinationQueue')['QueueUrl']
queue_arn = sqs.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['QueueArn'])['Attributes']['QueueArn']

# Subscribe SQS to SNS
sns.subscribe(TopicArn=topic_arn, Protocol='sqs', Endpoint=queue_arn)

def coordinate_training(world_size):
    # Assume this is running on multiple instances
    rank = int(os.environ.get('RANK', 0))
    
    if rank == 0:
        # Master node: send coordination message
        sns.publish(
            TopicArn=topic_arn,
            Message=json.dumps({
                'action': 'init',
                'world_size': world_size,
                'master_addr': 'master_ip_address'
            })
        )
    
    # All nodes: wait for coordination message
    while True:
        response = sqs.receive_message(QueueUrl=queue_url)
        if 'Messages' in response:
            message = json.loads(json.loads(response['Messages'][0]['Body'])['Message'])
            if message['action'] == 'init':
                # Initialize distributed training
                dist.init_process_group(
                    backend='nccl',
                    init_method=f"tcp://{message['master_addr']}:12345",
                    world_size=message['world_size'],
                    rank=rank
                )
                break

    # Proceed with distributed training...

coordinate_training(world_size=4)
```

## 3. Real-time Inference Pipeline

Implement a real-time inference pipeline using SNS for event-driven processing.

```python
import boto3
import json
import numpy as np
from tensorflow.keras.models import load_model

sns = boto3.client('sns')
sqs = boto3.client('sqs')

# Assume you have created these resources beforehand
input_topic_arn = 'input_topic_arn'
output_topic_arn = 'output_topic_arn'
queue_url = 'sqs_queue_url'

# Load your pre-trained model
model = load_model('your_model.h5')

def process_message(message):
    # Parse input data
    data = json.loads(message['Body'])
    input_data = np.array(data['input'])
    
    # Make prediction
    prediction = model.predict(input_data)
    
    # Publish result
    sns.publish(
        TopicArn=output_topic_arn,
        Message=json.dumps({
            'id': data['id'],
            'prediction': prediction.tolist()
        })
    )

# Main processing loop
while True:
    response = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10)
    
    if 'Messages' in response:
        for message in response['Messages']:
            process_message(message)
            sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=message['ReceiptHandle'])
```

## 4. Hyperparameter Tuning Notifications

Integrate SNS notifications into a hyperparameter tuning process.

```python
import boto3
from sklearn.model_selection import RandomizedSearchCV
from sklearn.ensemble import RandomForestClassifier
from sklearn.datasets import make_classification

sns = boto3.client('sns')
topic_arn = 'your_topic_arn'

# Generate sample data
X, y = make_classification(n_samples=1000, n_features=20, n_classes=2)

# Define parameter space
param_dist = {
    'n_estimators': [10, 50, 100, 200],
    'max_depth': [None, 10, 20, 30],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 4]
}

# Custom scoring function with SNS notification
def custom_score(estimator, X, y):
    score = estimator.score(X, y)
    sns.publish(
        TopicArn=topic_arn,
        Message=json.dumps({
            'event': 'iteration_complete',
            'params': estimator.get_params(),
            'score': score
        })
    )
    return score

# Perform randomized search with custom scoring
random_search = RandomizedSearchCV(
    RandomForestClassifier(),
    param_distributions=param_dist,
    n_iter=20,
    scoring=custom_score,
    cv=3
)

random_search.fit(X, y)

# Send final results
sns.publish(
    TopicArn=topic_arn,
    Message=json.dumps({
        'event': 'tuning_complete',
        'best_params': random_search.best_params_,
        'best_score': random_search.best_score_
    })
)
```

## 5. Anomaly Detection and Alerting

Implement an anomaly detection system with real-time alerting using SNS.

```python
import boto3
import numpy as np
from sklearn.ensemble import IsolationForest
import json

sns = boto3.client('sns')
topic_arn = 'your_alert_topic_arn'

# Load or generate your data
data = np.random.randn(1000, 5)  # Replace with your actual data

# Train the anomaly detection model
model = IsolationForest(contamination=0.1)
model.fit(data)

def check_anomaly(new_data):
    prediction = model.predict(new_data.reshape(1, -1))
    if prediction[0] == -1:
        # Anomaly detected, send alert
        sns.publish(
            TopicArn=topic_arn,
            Message=json.dumps({
                'event': 'anomaly_detected',
                'data': new_data.tolist()
            }),
            MessageAttributes={
                'anomaly_type': {
                    'DataType': 'String',
                    'StringValue': 'isolation_forest'
                }
            }
        )

# Simulate real-time data and check for anomalies
for _ in range(100):
    new_point = np.random.randn(5)
    check_anomaly(new_point)
```

## Conclusion

This advanced tutorial demonstrated how Amazon SNS can be integrated into various machine learning workflows, including distributed training coordination, real-time inference pipelines, hyperparameter tuning notifications, and anomaly detection systems.

By leveraging SNS, machine learning scientists can build more robust, scalable, and responsive ML systems. SNS provides a flexible way to implement event-driven architectures and real-time notifications, which are crucial for monitoring and managing complex ML pipelines and deployments.

Remember to implement proper error handling, logging, and security measures in your production systems. Also, be mindful of the costs associated with high-volume messaging and consider using batch operations or message filtering where appropriate to optimize your usage of SNS.

## Appendix: Difference between SNS and SQS

Amazon Simple Notification Service (SNS) and Amazon Simple Queue Service (SQS) are both messaging services provided by AWS, but they serve different purposes and have distinct characteristics. Here's a comparison of SNS and SQS:

Amazon SNS (Simple Notification Service):

1. Publish-Subscribe (Pub/Sub) model
2. Push-based messaging
3. One-to-many communication
4. Supports multiple protocols (HTTP, email, SMS, SQS, Lambda, etc.)
5. Messages are not persisted (unless explicitly configured with a queue)
6. Ideal for broadcast communication and fanout patterns
7. Supports message filtering at the subscriber level

Amazon SQS (Simple Queue Service):

1. Queue model
2. Pull-based messaging
3. One-to-one communication
4. Primarily used with applications
5. Messages are persisted in the queue until processed
6. Ideal for decoupling application components and handling backpressure
7. Supports message delay and visibility timeout

Key Differences:

1. Message Delivery:
   - SNS: Push-based. SNS actively sends messages to all subscribers.
   - SQS: Pull-based. Consumers must poll the queue to retrieve messages.

2. Message Persistence:
   - SNS: Messages are not stored; they're delivered immediately or lost.
   - SQS: Messages are stored in the queue until successfully processed and deleted.

3. Scalability Pattern:
   - SNS: Fanout pattern. One message to multiple recipients.
   - SQS: Work queue pattern. Tasks distributed among multiple workers.

4. Use Cases:
   - SNS: Real-time notifications, mobile push notifications, email alerts.
   - SQS: Task queues, workload decoupling, batch processing.

5. Message Retention:
   - SNS: No built-in retention. Messages are sent once and not stored.
   - SQS: Messages can be retained for up to 14 days.

6. Order of Messages:
   - SNS: Does not guarantee message ordering.
   - SQS: Offers FIFO (First-In-First-Out) queues for strict ordering.

7. Subscription Model:
   - SNS: Supports various protocols and can send to multiple endpoint types.
   - SQS: Typically consumed by applications directly.

8. Delivery Retry:
   - SNS: Limited retry capability.
   - SQS: Messages remain in the queue and can be retried multiple times.

In practice, SNS and SQS are often used together. A common pattern is to use SNS to publish messages to multiple SQS queues, combining the fanout capability of SNS with the persistence and scalability of SQS.

For example, you might use SNS to broadcast a message about a new data file, and have multiple SQS queues subscribed to this topic. Each queue could then feed into a different data processing pipeline, allowing parallel processing of the same event in different ways.

Understanding these differences helps in designing robust, scalable, and efficient cloud-native applications, especially in the context of machine learning pipelines and distributed systems.