Skip to content

nmrastogi/producer_consumer

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

20 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Producer-Consumer Patterns in Go

This repository demonstrates producer-consumer patterns in Go using two different approaches:

  1. In-memory channels - A simple producer-consumer implementation using Go channels
  2. Apache Kafka - A distributed messaging system implementation

Features

  • ✅ In-memory producer-consumer using Go channels and goroutines
  • ✅ Kafka-based producer-consumer with retry logic and error handling
  • ✅ Automatic topic creation and Kafka readiness checks
  • ✅ Consumer group support with load balancing
  • ✅ Exponential backoff retry mechanism

Prerequisites

  • Go 1.25+ - Install Go
  • Docker - For running Kafka (optional, only needed for Kafka examples)

Installation

  1. Clone the repository:
git clone <repository-url>
cd producer_consumer
  1. Install dependencies:
go mod download

Running the Programs

1. In-Memory Producer-Consumer

This example uses Go channels for communication between producers and consumers:

go run producer_consumer.go

What it does:

  • Creates 2 producers that each produce 5 jobs (10 total)
  • Creates 2 consumers that consume jobs from a shared channel
  • Uses a buffered channel (capacity 100) to hold jobs
  • Demonstrates concurrent processing with goroutines
  • Uses sync.WaitGroup for synchronization
  • Gracefully closes channel when producers finish

2. Kafka Producer-Consumer

Step 1: Start Kafka

If Kafka container doesn't exist, create it:

docker run -d --name kafka -p 9092:9092 \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_PROCESS_ROLES=broker,controller \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  -e KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  apache/kafka:latest

If container already exists, remove it first:

docker rm -f kafka

Then run the docker run command above.

Verify Kafka is running:

docker ps --filter "name=kafka"

Wait for Kafka to initialize (30 seconds recommended):

echo "Waiting for Kafka to initialize..." && sleep 30

Check Kafka logs (optional):

docker logs kafka --tail 20

Step 2: Run the Consumer

In Terminal 1, start the consumer:

cd kafka
go run consumer.go topic_init.go

Important: You must include both consumer.go and topic_init.go because the consumer uses helper functions from topic_init.go.

The consumer will:

  • Wait for Kafka to be ready
  • Delete and recreate the "jobs" topic (to start fresh and reset offsets)
  • Wait 5 seconds for Kafka to fully initialize
  • Start 2 consumer goroutines in the "worker-group" consumer group
  • Kafka assigns partitions to consumers (with 3 partitions and 2 consumers, load is shared)
  • Process messages with retry logic and exponential backoff
  • Display consumed messages with partition and offset information
  • Both consumers will process messages from different partitions

Leave this terminal running - the consumer will wait for messages.

Step 3: Run the Producer

In Terminal 2, start the producer:

cd kafka
go run producer.go topic_init.go

Important: You must include both producer.go and topic_init.go because the producer uses helper functions from topic_init.go.

The producer will:

  • Wait for Kafka to be ready
  • Delete and recreate the "jobs" topic (to start fresh)
  • Create 2 producer goroutines
  • Each producer sends 10 messages (20 total) to the "jobs" topic
  • Uses Hash balancer with unique keys (p1-m0, p1-m1, etc.) to distribute messages across partitions
  • Messages are formatted as "job-{id}" with keys for partition distribution
  • Exit after all messages are sent (~3 seconds)

Step 4: Stop and Clean Up Kafka

After you're done testing, stop and remove the Kafka container:

Option 1: Stop first, then remove (recommended):

# Stop the container
docker stop kafka

# Remove the container
docker rm kafka

Option 2: Stop and remove in one command:

docker stop kafka && docker rm kafka

Option 3: Force remove (if container is stuck or won't stop):

docker rm -f kafka

Verify the container is removed:

docker ps -a --filter "name=kafka"

Note: If you see "No such container" or no output, the container has been successfully removed.

Troubleshooting Commands

Check if Kafka container exists:

docker ps -a --filter "name=kafka"

View Kafka logs:

docker logs kafka

View last 50 lines of Kafka logs:

docker logs kafka --tail 50

Restart Kafka container:

docker restart kafka

Check Kafka container status:

docker inspect kafka --format='{{.State.Status}}'

Project Structure

producer_consumer/
├── producer_consumer.go    # In-memory producer-consumer using channels
├── kafka/
│   ├── producer.go         # Kafka producer implementation
│   ├── consumer.go         # Kafka consumer with retry logic
│   └── topic_init.go       # Helper functions for topic creation and Kafka readiness
├── diagram.html            # Interactive visual diagram (open in browser)
├── go.mod                  # Go module definition
├── go.sum                  # Dependency checksums
├── .gitignore             # Git ignore rules
└── README.md              # This file

Dependencies

Key Concepts Demonstrated

In-Memory Pattern

  • Channels: Buffered channels for job queue
  • Goroutines: Concurrent producers and consumers
  • WaitGroup: Synchronization for consumer completion
  • Channel closing: Graceful shutdown pattern

Kafka Pattern

  • Producer: Writing messages to Kafka topics with Hash balancer for partition distribution
  • Consumer Groups: Multiple consumers sharing work via partition assignment
  • Partition Distribution: Hash balancer with unique keys distributes messages across 3 partitions
  • Load Balancing: With 2 consumers and 3 partitions, Kafka assigns partitions evenly
  • Retry Logic: Exponential backoff for error handling
  • Topic Management: Automatic topic deletion and recreation for fresh starts
  • Connection Handling: Timeout-based message reading with coordinator error handling

Error Handling

The Kafka consumer includes robust error handling:

  • Retry mechanism: Automatically retries on errors
  • Exponential backoff: Increases wait time between retries (up to 10 seconds)
  • Timeout handling: Uses context timeouts for message reading (10-second timeout)
  • Connection recovery: Handles Kafka coordinator unavailability
  • Coordinator errors: Gracefully handles "Group Coordinator Not Available" errors during initialization
  • Context cleanup: Explicitly cancels contexts to prevent resource leaks

Complete Command Reference

Quick Start (All Commands)

# 1. Start Kafka (if not already running)
docker run -d --name kafka -p 9092:9092 \
  -e KAFKA_NODE_ID=1 \
  -e KAFKA_PROCESS_ROLES=broker,controller \
  -e KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  -e KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  apache/kafka:latest

# 2. Wait for Kafka to initialize
sleep 30

# 3. Terminal 1 - Start Consumer
cd kafka
go run consumer.go topic_init.go

# 4. Terminal 2 - Start Producer
cd kafka
go run producer.go topic_init.go

# 5. Cleanup (when done)
docker stop kafka && docker rm kafka

# Verify cleanup
docker ps -a --filter "name=kafka"

Notes

  • In-Memory Version: Produces 5 jobs per producer (10 total), uses sync.WaitGroup for synchronization
  • Kafka Version: Produces 10 jobs per producer (20 total), uses Hash balancer for partition distribution
  • Topic Management: Both producer and consumer delete and recreate the "jobs" topic to ensure a fresh start
  • Partition Distribution: The producer uses a Hash balancer with unique keys (p1-m0, p1-m1, etc.) to distribute messages across all 3 partitions
  • Consumer Group: The "worker-group" consumer group allows multiple consumers to share the workload via partition assignment
  • Load Balancing: With 2 consumers and 3 partitions, Kafka assigns:
    • Consumer 1: Partitions 0 and 1
    • Consumer 2: Partition 2
  • Initialization: Kafka needs 30-60 seconds to fully initialize the consumer offsets topic
  • Error Handling: The consumer automatically handles coordinator initialization errors and timeouts
  • KRaft Mode: The KAFKA_OFFSETS_TOPIC_* environment variables are important for KRaft mode to work properly
  • Visual Diagram: Open diagram.html in a browser to see an interactive comparison diagram
  • Both programs demonstrate concurrent processing patterns in Go

License

This project is for educational purposes.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors