# MSK Connection Test Project

## Purpose

The **MSK Connection Test Project** aims to assess the performance and connectivity of an **Amazon Managed Streaming for Apache Kafka (MSK)** cluster. The project provides a comprehensive testing framework to validate the following aspects of the MSK setup:

- **Cluster Connectivity:** Ensuring secure and reliable connectivity to the MSK cluster using IAM-based SASL authentication.
- **Producer Performance:** Measuring the ability to send messages to Kafka topics with proper error handling.
- **Consumer Performance:** Validating the consumption of messages from Kafka topics with accurate metrics.
- **Latency and Throughput:** Evaluating message delivery speed and capacity under varying load conditions.

## Scope

The project is designed for:

- Performance benchmarking of an **MSK** cluster.
- Security validation using **IAM-based SASL authentication**.
- End-to-end testing of Kafka producer and consumer components.

## Key Features

- **Kafka Producer and Consumer Implementation:** The project includes Python-based implementations for both producer and consumer using the `kafka-python` library.
- **IAM Authentication Integration:** Security is enforced through **MSK IAM SASL Signer** for authentication.
- **Automated Topic Management:** The project can create and manage Kafka topics programmatically.
- **Performance Metrics Collection:** Throughput, latency, and error rates are collected and visualized using Matplotlib.

## Technologies Used

- **Python 3.9.7**
- **Amazon MSK**
- **boto3** (AWS SDK for Python)
- **kafka-python**
- **aws-msk-iam-sasl-signer-python**

## Project Workflow

1. **Environment Setup:**
   - Install required libraries (`boto3`, `kafka-python`, `aws-msk-iam-sasl-signer-python`).
2. **Cluster Connection:**
   - Initialize the MSK IAM Token Provider.
   - Establish connection to the MSK cluster using public bootstrap servers.
3. **Topic Management:**
   - Programmatically create a new Kafka topic.
4. **Producer Operations:**
   - Send test messages to a Kafka topic and log the delivery status.
5. **Consumer Operations:**
   - Consume messages from the Kafka topic and log received messages.

## Usage Instructions

1. **Run the Notebook:** Execute the cells sequentially.
2. **Modify Cluster Details:** Update the `bootstrap_servers` and `region` variables as needed.
3. **Verify Message Delivery:** Check producer and consumer logs for successful message exchange.

## Conclusion

This project serves as a foundational tool for validating MSK cluster connectivity and performance. It can be extended for larger-scale tests and custom data generation scenarios.


In [1]:
!pip install kafka-python

Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[K     |████████████████████████████████| 246 kB 6.3 MB/s eta 0:00:01
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2


In [2]:
!pip install aws-msk-iam-sasl-signer-python

Collecting aws-msk-iam-sasl-signer-python
  Downloading aws_msk_iam_sasl_signer_python-1.0.1-py2.py3-none-any.whl (13 kB)
Collecting botocore>=1.29.125
  Downloading botocore-1.35.95-py3-none-any.whl (13.3 MB)
[K     |████████████████████████████████| 13.3 MB 3.6 MB/s eta 0:00:01
[?25hCollecting boto3>=1.26.125
  Downloading boto3-1.35.95-py3-none-any.whl (139 kB)
[K     |████████████████████████████████| 139 kB 39.8 MB/s eta 0:00:01
[?25hCollecting s3transfer<0.11.0,>=0.10.0
  Downloading s3transfer-0.10.4-py3-none-any.whl (83 kB)
[K     |████████████████████████████████| 83 kB 2.1 MB/s  eta 0:00:01
[?25hCollecting jmespath<2.0.0,>=0.7.1
  Using cached jmespath-1.0.1-py3-none-any.whl (20 kB)
Installing collected packages: jmespath, botocore, s3transfer, boto3, aws-msk-iam-sasl-signer-python
Successfully installed aws-msk-iam-sasl-signer-python-1.0.1 boto3-1.35.95 botocore-1.35.95 jmespath-1.0.1 s3transfer-0.10.4


In [11]:
!pip install matplotlib



In [45]:
from kafka.errors import KafkaError
from kafka.admin import KafkaAdminClient, NewTopic
import socket
import time
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import boto3
import json
import time
from kafka import KafkaProducer,KafkaConsumer


In [4]:
region = "us-east-1"

# MSKTokenProvider Class Documentation

## Purpose

The `MSKTokenProvider` class is designed to generate an authentication token for **Amazon Managed Streaming for Apache Kafka (MSK)** using **IAM-based SASL authentication**. This token is required for secure communication with the MSK cluster and ensures only authorized clients can interact with the cluster.


In [5]:
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(region)
        return token

tp = MSKTokenProvider()

# KafkaAdminClient Configuration Documentation

## Purpose

The code initializes a **KafkaAdminClient** for interacting with an **Amazon Managed Streaming for Apache Kafka (MSK)** cluster. This client is used to manage and administer Kafka resources such as topics, partitions, and configurations in a secure manner.



In [6]:
admin_client = KafkaAdminClient(
    bootstrap_servers="b-2-public.leonardocluster.ykfidp.c14.kafka.us-east-1.amazonaws.com:9198,b-1-public.leonardocluster.ykfidp.c14.kafka.us-east-1.amazonaws.com:9198",
    security_protocol="SASL_SSL",
    sasl_mechanism="OAUTHBEARER",
    sasl_oauth_token_provider=tp,
    client_id=socket.gethostname(),
)

In [7]:
def create_topic(topic_name, num_partitions, replication_factor):
    """Create a Kafka topic using MSK IAM authentication."""
    topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)
    #response = admin_client.create_topics([topic])
    #print(response)
    try:
        response = admin_client.create_topics([topic])
        # Fix: Correctly accessing the tuple structure
        for topic_error in response.topic_errors:
            topic_name, error_code, error_message = topic_error
            if error_code == 0:
                print(f"Topic '{topic_name}' successfully created.")
            else:
                print(f"Error creating topic '{topic_name}': {error_message}")
    except KafkaError as e:
        print(f"KafkaError occurred: {e}")
    except Exception as e:
        print(f"Failed to create topic '{topic_name}': {e}")
    finally:
        print("Topic creation attempt complete.")

In [9]:
create_topic("ruby-msk-topic",3,2)

Topic 'ruby-msk-topic' successfully created.
Topic creation attempt complete.


In [30]:
bootstrap_servers = ["b-1-public.leonardocluster.ykfidp.c14.kafka.us-east-1.amazonaws.com:9198","b-2-public.leonardocluster.ykfidp.c14.kafka.us-east-1.amazonaws.com:9198"]
topic_name = "leonardo-msk-topic"

In [49]:
producer = KafkaProducer(
    bootstrap_servers=bootstrap_servers,
    security_protocol="SASL_SSL",
    sasl_mechanism="OAUTHBEARER",
    sasl_oauth_token_provider=tp,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    client_id=socket.gethostname()
)

def send_message(topic_name, message):
    """Send a message to the specified Kafka topic."""
    try:
        future = producer.send(topic_name, value=message)
        record_metadata = future.get(timeout=10)
        print(f"Message sent to {record_metadata.topic} partition {record_metadata.partition} offset {record_metadata.offset}")
    except Exception as e:
        print(f"Failed to send message: {e}")

In [50]:
send_message(topic_name, {"event": "Hello Leonardo!", "type": "greeting"})
producer.close()

Message sent to leonardo-msk-topic partition 1 offset 1


In [51]:
consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=bootstrap_servers,
    security_protocol="SASL_SSL",
    sasl_mechanism="OAUTHBEARER",
    sasl_oauth_token_provider=tp,
    value_deserializer=lambda v: json.loads(v.decode('utf-8')),
    group_id="leonardo-group",
    client_id=socket.gethostname(),
    auto_offset_reset="earliest"
)

# Consume messages from the Kafka topic
def consume_messages():
    try:
        print("Starting consumer...")
        for message in consumer:
            print(f"Received message: {message.value} from topic {message.topic} partition {message.partition} offset {message.offset}")
    except Exception as e:
        print(f"Error while consuming messages: {e}")
    finally:
        consumer.close()

In [None]:
consume_messages()

Starting consumer...
Received message: {'event': 'Hello Leonardo!', 'type': 'greeting'} from topic leonardo-msk-topic partition 1 offset 1
