A Node.js Kafka admin client designed to connect to AWS Managed Streaming for Apache Kafka (MSK) and fetch topic information. This client provides a simple interface for managing Kafka topics and cluster operations.
- 🔐 AWS MSK Authentication: Uses IAM credentials for secure connection to MSK clusters
- 📋 Topic Management: List, create, delete, and get detailed information about Kafka topics
- 🏢 Cluster Information: Retrieve cluster metadata and broker information
- 🚀 Easy to Use: Simple API for common Kafka admin operations
- 🔧 Configurable: Support for custom configuration options
- Node.js 14.x or higher
- AWS credentials configured (via AWS CLI, environment variables, or IAM roles)
- Access to an AWS MSK cluster
- Clone or download this project
- Install dependencies:
npm install
- Copy the environment configuration:
cp .env.example .env
- Update the
.env
file with your AWS and Kafka configuration:
AWS_REGION=us-east-1
KAFKA_BOOTSTRAP_SERVERS=your-msk-cluster-endpoint:9092
Variable | Description | Required | Default |
---|---|---|---|
AWS_REGION |
AWS region where your MSK cluster is located | Yes | us-east-1 |
KAFKA_BOOTSTRAP_SERVERS |
Comma-separated list of MSK bootstrap servers | Yes | localhost:9092 |
AWS_ACCESS_KEY_ID |
AWS access key ID | No* | - |
AWS_SECRET_ACCESS_KEY |
AWS secret access key | No* | - |
AWS_SESSION_TOKEN |
AWS session token (for temporary credentials) | No* | - |
*If not provided, the SDK will use IAM roles, AWS CLI credentials, or other credential providers.
You can configure AWS credentials in several ways:
-
Environment Variables (recommended for development):
export AWS_ACCESS_KEY_ID=your_access_key export AWS_SECRET_ACCESS_KEY=your_secret_key export AWS_REGION=us-east-1
-
AWS CLI:
aws configure
-
IAM Roles (recommended for production):
- Attach an IAM role to your EC2 instance or ECS task
- The SDK will automatically use the role credentials
const { KafkaAdminClient } = require('./src');
async function main() {
const kafkaAdmin = new KafkaAdminClient(
'your-msk-cluster-endpoint:9092',
'us-east-1'
);
try {
await kafkaAdmin.connect();
// List all topics
const topics = await kafkaAdmin.listKafkaTopics();
console.log('Topics:', Object.keys(topics));
// Get cluster information
const clusterInfo = await kafkaAdmin.getClusterInfo();
console.log('Cluster ID:', clusterInfo.clusterId);
} finally {
await kafkaAdmin.disconnect();
}
}
main().catch(console.error);
const { KafkaAdminClient } = require('./src');
async function advancedExample() {
const kafkaAdmin = new KafkaAdminClient(
'your-msk-cluster-endpoint:9092',
'us-east-1',
{
// Additional Kafka configuration
requestTimeout: 30000,
retry: {
retries: 3
}
}
);
try {
await kafkaAdmin.connect();
// Create a new topic
await kafkaAdmin.createTopic('my-new-topic', {
numPartitions: 3,
replicationFactor: 2
});
// Get detailed information about a topic
const topicDetails = await kafkaAdmin.getTopicDetails('my-new-topic');
console.log('Topic details:', topicDetails);
// Delete a topic
await kafkaAdmin.deleteTopic('my-new-topic');
} finally {
await kafkaAdmin.disconnect();
}
}
advancedExample().catch(console.error);
new KafkaAdminClient(bootstrapServers, region, optionalConfigs)
bootstrapServers
(string): Comma-separated list of MSK bootstrap serversregion
(string): AWS region where the MSK cluster is locatedoptionalConfigs
(object): Additional Kafka configuration options
Connects to the Kafka cluster.
await kafkaAdmin.connect();
Disconnects from the Kafka cluster.
await kafkaAdmin.disconnect();
Lists all topics in the cluster.
const topics = await kafkaAdmin.listKafkaTopics();
// Returns: { 'topic1': {}, 'topic2': {}, ... }
Gets detailed information about a specific topic.
const details = await kafkaAdmin.getTopicDetails('my-topic');
// Returns: { name: 'my-topic', partitions: [...], isInternal: false }
Gets cluster information including brokers and controller.
const clusterInfo = await kafkaAdmin.getClusterInfo();
// Returns: { clusterId: '...', brokers: [...], controller: {...} }
Creates a new topic.
await kafkaAdmin.createTopic('my-topic', {
numPartitions: 3,
replicationFactor: 2,
configEntries: [
{ name: 'cleanup.policy', value: 'compact' }
]
});
Deletes a topic.
await kafkaAdmin.deleteTopic('my-topic');
npm run dev
npm start
The client includes comprehensive error handling. All methods throw descriptive errors that can be caught and handled appropriately:
try {
const topics = await kafkaAdmin.listKafkaTopics();
} catch (error) {
console.error('Failed to list topics:', error.message);
// Handle the error appropriately
}
- Always use IAM roles in production environments
- Ensure your AWS credentials have the minimum required permissions
- Use environment variables or secure credential stores for sensitive information
- Consider using AWS Secrets Manager for credential management
Your AWS credentials need the following permissions to work with MSK:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka:GetBootstrapBrokers",
"kafka:DescribeCluster"
],
"Resource": "arn:aws:kafka:region:account:cluster/cluster-name/*"
}
]
}
- Connection Timeout: Check your network connectivity and MSK cluster endpoint
- Authentication Failed: Verify your AWS credentials and permissions
- Topic Not Found: Ensure the topic exists and you have proper permissions
Enable debug logging by setting the environment variable:
DEBUG=kafkajs* npm start
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests if applicable
- Submit a pull request
ISC License - see LICENSE file for details.
For issues and questions, please create an issue in the repository or contact the development team.