Skip to content

utkarsh5026/gopool

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

355 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

gopool Logo

Gopool

A simple, powerful, and type-safe worker pool for Go

Production-ready worker pool implementation using Go generics
Designed for efficient concurrent task processing with advanced features

CI Go Version Go Reference
Go Report Card License Release

Features β€’ Installation β€’ Quick Start β€’ Usage β€’ Performance β€’ API


✨ Features

πŸ”§ Generic & Type-Safe

  • Works with any task type (T) and result type (R)
  • Leverages Go 1.18+ generics for compile-time safety
  • Zero interface{} conversions

βš™οΈ Highly Configurable

  • Fine-tune worker count and buffer sizes

  • Flexible processing modes (slice, map, stream)

  • Customizable error handling strategies

🎯 Production-Ready

  • Context-aware cancellation and timeouts
  • Built-in panic recovery in workers
  • Thread-safe lifecycle hooks

πŸš€ Advanced Features

  • Configurable retry policies with exponential backoff

  • Rate limiting for external API calls

  • Real-time task monitoring and metrics


πŸ“¦ Installation

go get github.com/utkarsh5026/gopool

Requirements: Go 1.18 or higher


πŸš€ Quick Start

Get up and running in less than a minute:

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/utkarsh5026/gopool/pool"
)

func main() {
    ctx := context.Background()
    tasks := []int{1, 2, 3, 4, 5, 6, 7, 8}

    // Create a pool with 4 workers
    p := pool.NewWorkerPool[int, string](pool.WithWorkerCount(4))

    // Define the processing function
    processFn := func(ctx context.Context, task int) (string, error) {
        time.Sleep(10 * time.Millisecond)
        return fmt.Sprintf("Processed task %d", task), nil
    }

    // Process tasks and get results in order
    results, err := p.Process(ctx, tasks, processFn)
    if err != nil {
        panic(err)
    }

    fmt.Println(results)
    // Output: [Processed task 1 Processed task 2 ... Processed task 8]
}

πŸ’‘ Why Choose gopool?

Feature gopool Traditional Approaches
Type Safety βœ… Full generic support ❌ Interface{} hell
Retry Logic βœ… Built-in with exponential backoff ⚠️ Manual implementation
Rate Limiting βœ… Native support ⚠️ External library needed
Lifecycle Hooks βœ… Thread-safe monitoring ❌ Not available
Context Support βœ… First-class citizen ⚠️ Varies
Panic Recovery βœ… Automatic per worker ⚠️ Manual handling

🎯 Scheduling Strategies

gopool implements 7 distinct scheduling strategies plus a Task Fusion wrapper,
each optimized for specific use cases and workload characteristics.

πŸ“‹ Strategy Overview

Strategy Best For Ordering Contention Max Workers Latency
πŸ“Œ Channel General purpose FIFO Low ∞ Low
πŸ”„ Work-Stealing CPU-intensive LIFO (local) Very Low ∞ Low
⚑ MPMC Many submitters FIFO Minimal ∞ Low
πŸ† Priority Queue Priority tasks Priority Moderate ∞ Moderate
πŸ“Š Skip List Concurrent priority Priority Low ∞ Low
πŸŽ›οΈ Bitmask Low-latency dispatch Direct Minimal 64 Ultra-low
πŸš€ LMAX Ultra-high throughput Sequence None ∞ Predictable

πŸ“Œ Channel Strategy (Default)


How It Works

The default strategy using Go channels with round-robin distribution.

  • Creates one buffered channel per worker
  • Uses round-robin distribution with an atomic counter
  • Attempts non-blocking sends with fallback to alternative channels
  • Supports task affinity via FNV-1a hashing for consistent routing

Best Use Cases

βœ… General-purpose workloads
βœ… Uniform task complexity
βœ… Simple, predictable scheduling
βœ… CPU-bound with consistent time

// Default - no explicit configuration needed
p := pool.NewWorkerPool[int, string](pool.WithWorkerCount(8))

// With task affinity for consistent routing
p := pool.NewWorkerPool[Job, string](
    pool.WithWorkerCount(8),
    pool.WithAffinity(func(task Job) string {
        return fmt.Sprintf("user-%d", task.UserID)
    }),
)

πŸ”„ Work-Stealing Strategy


How It Works

Based on the Chase-Lev work-stealing deque algorithm for optimal load balancing.

  • Each worker has a local lock-free deque (double-ended queue)
  • Workers process local queue in LIFO order (better cache locality)
  • Idle workers steal from other workers' queues in FIFO order
  • Automatic load balancing across workers

Best Use Cases

βœ… CPU-intensive variable tasks
βœ… Recursive/divide-and-conquer
βœ… Tasks spawning subtasks
βœ… Unpredictable task duration

p := pool.NewWorkerPool[Task, float64](
    pool.WithWorkerCount(runtime.NumCPU()),
    pool.WithWorkStealing(),
)

// Process CPU-intensive tasks with variable complexity
results, err := p.Process(ctx, tasks, func(ctx context.Context, task Task) (float64, error) {
    return computeIntensiveWork(task), nil
})

⚑ MPMC Queue Strategy


How It Works

Multi-Producer Multi-Consumer lock-free ring buffer using atomic CAS operations.

  • Multiple producers can enqueue concurrently
  • Multiple consumers (workers) can dequeue concurrently
  • Supports both bounded and unbounded modes
  • Uses sequence numbers for slot synchronization

Best Use Cases

βœ… High-throughput scenarios
βœ… Many concurrent submitters
βœ… Predictable low-latency
βœ… Bursty task patterns

// Unbounded queue (default)
p := pool.NewWorkerPool[Task, Result](
    pool.WithWorkerCount(8),
    pool.WithMPMCQueue(pool.WithUnboundedQueue()),
)

// Bounded queue with backpressure
p := pool.NewWorkerPool[Task, Result](
    pool.WithWorkerCount(8),
    pool.WithMPMCQueue(pool.WithBoundedQueue(10000)),
)

πŸ† Priority Queue Strategy


How It Works

Heap-based priority queue for tasks with varying importance levels.

  • Uses Go's container/heap for a min-heap priority queue
  • Tasks ordered by user-defined comparison function
  • Workers process highest-priority tasks first
  • Thread-safe with mutex protection

Best Use Cases

βœ… Varying priority levels
βœ… SLA-driven workloads
βœ… Deadline-based scheduling
βœ… Quality-of-service needs

type Job struct {
    ID       int
    Priority int  // 1=Urgent, 5=Low
    Name     string
}

p := pool.NewWorkerPool[Job, string](
    pool.WithWorkerCount(4),
    pool.WithPriorityQueue(func(a, b Job) bool {
        return a.Priority < b.Priority  // Lower value = higher priority
    }),
)

πŸ“Š Skip List Strategy


How It Works

Probabilistic data structure with O(log n) operations and excellent concurrency.

  • Multiple levels of linked lists as "express lanes"
  • Better concurrency than heap (lock-free reads)
  • Same-priority tasks grouped together (FIFO within priority)
  • Scales well with concurrent access

Best Use Cases

βœ… High-throughput priority tasks
βœ… Many concurrent submitters
βœ… Uniform priority distribution
βœ… Better than heap concurrency

type Task struct {
    Deadline time.Time
}

p := pool.NewWorkerPool[Task, string](
    pool.WithWorkerCount(8),
    pool.WithSkipList(func(a, b Task) bool {
        return a.Deadline.Before(b.Deadline)  // Earlier deadline = higher priority
    }),
)

πŸŽ›οΈ Bitmask Strategy


How It Works

Uses a 64-bit atomic bitmask for ultra-low latency worker dispatch.

  • 64-bit atomic bitmask tracks worker idle/busy state
  • Each bit: 1 = idle, 0 = busy
  • Direct worker assignment using TrailingZeros64
  • Falls back to global queue when all workers are busy

Best Use Cases

βœ… Ultra-low latency dispatch
βœ… Moderate workers (≀64)
βœ… Direct worker assignment
βœ… Predictable overhead

p := pool.NewWorkerPool[Task, Result](
    pool.WithWorkerCount(32),  // Max 64 workers
    pool.WithBitmask(),
)

⚠️ Note: Limited to maximum 64 workers due to bitmask size.


πŸš€ LMAX Disruptor Strategy


How It Works

Inspired by the LMAX Disruptor pattern used in high-frequency trading systems.

  • Pre-allocated ring buffer (power-of-2 size)
  • Sequence-based coordination (no locks)
  • Cache-line padding to prevent false sharing
  • Minimal GC pressure from pre-allocation

Best Use Cases

βœ… Ultra-high throughput (M/sec)
βœ… Latency-sensitive apps
βœ… Financial trading systems
βœ… Minimal GC requirements

p := pool.NewWorkerPool[Event, Result](
    pool.WithWorkerCount(8),
    pool.WithLmax(),
)

πŸ”— Task Fusion (Wrapper)


How It Works

Wraps any underlying strategy to batch tasks for reduced overhead.

  • Batches tasks within a configurable time window
  • Flushes when batch size reached or timer expires
  • Reduces per-task submission overhead
  • Compatible with all other strategies

Best Use Cases

βœ… High-volume small tasks
βœ… I/O-bound batching
βœ… Acceptable small delays
βœ… Overhead reduction

p := pool.NewWorkerPool[Task, Result](
    pool.WithWorkerCount(8),
    pool.WithWorkStealing(),  // Any underlying strategy
    pool.WithTaskFusion(100*time.Millisecond, 50),  // 100ms window, 50 max batch
)

🧭 Strategy Selection Guide

πŸ’Ό Scenario 🎯 Recommended Strategy
General-purpose workloads πŸ“Œ Channel (default)
CPU-intensive with variable complexity πŸ”„ Work-Stealing
Many concurrent task submitters ⚑ MPMC
Tasks with priority levels πŸ† Priority Queue or πŸ“Š Skip List
Low-latency dispatch (≀64 workers) πŸŽ›οΈ Bitmask
Ultra-high throughput systems πŸš€ LMAX
High-volume small tasks πŸ”— Any strategy + Task Fusion

πŸ“– Usage

Processing Modes

1️⃣ Slice Processing (Ordered Results)

Process a slice of tasks and get results in the same order:

tasks := []int{1, 2, 3, 4, 5}
p := pool.NewWorkerPool[int, string](pool.WithWorkerCount(2))

results, err := p.Process(ctx, tasks, processFn)
2️⃣ Map Processing (Key-Value Pairs)

Process map entries and get results mapped by keys:

tasks := map[string]int{
    "task1": 1,
    "task2": 2,
    "task3": 3,
}
p := pool.NewWorkerPool[int, string](pool.WithWorkerCount(2))

results, err := p.ProcessMap(ctx, tasks, processFn)
// results: map[string]string
3️⃣ Stream Processing (Channel-Based)

Process tasks from a channel as they arrive:

taskChan := make(chan int, 10)
p := pool.NewWorkerPool[int, string](pool.WithWorkerCount(2))

// Send tasks to channel
go func() {
    for i := 1; i <= 5; i++ {
        taskChan <- i
    }
    close(taskChan)
}()

resultChan := p.ProcessStream(ctx, taskChan, processFn)
for result := range resultChan {
    if result.Err != nil {
        fmt.Printf("Error: %v\n", result.Err)
    } else {
        fmt.Printf("Result: %v\n", result.Value)
    }
}

βš™οΈ Configuration Options

Worker Count Configuration

Control the number of concurrent workers:

p := pool.NewWorkerPool[int, string](
    pool.WithWorkerCount(10), // 10 concurrent workers
)
Task Buffer Management

Set the buffer size for the internal task channel:

p := pool.NewWorkerPool[int, string](
    pool.WithWorkerCount(4),
    pool.WithTaskBuffer(100), // Buffer up to 100 tasks
)
Retry Policy with Exponential Backoff

Configure automatic retries:

p := pool.NewWorkerPool[int, string](
    pool.WithRetryPolicy(
        3,                      // Max 3 attempts per task
        100*time.Millisecond,   // Initial delay of 100ms
    ),
)
// Retry delays: 100ms, 200ms, 400ms (exponential backoff)
Rate Limiting

Control task throughput to prevent overwhelming external services:

p := pool.NewWorkerPool[int, string](
    pool.WithRateLimit(
        10.0,  // 10 tasks per second
        5,     // Burst of up to 5 tasks
    ),
)

🎣 Lifecycle Hooks

Monitor and react to task lifecycle events:

Before Task Start Hook

Called before each task begins processing:

p := pool.NewWorkerPool[int, string](
    pool.WithBeforeTaskStart(func(task int) {
        log.Printf("Starting task: %d", task)
    }),
)
On Task End Hook

Called after each task completes (success or failure):

p := pool.NewWorkerPool[int, string](
    pool.WithOnTaskEnd(func(task int, result string, err error) {
        if err != nil {
            log.Printf("Task %d failed: %v", task, err)
        } else {
            log.Printf("Task %d completed: %s", task, result)
        }
    }),
)
On Each Retry Attempt Hook

Called after each retry attempt (requires retry policy):

p := pool.NewWorkerPool[int, string](
    pool.WithRetryPolicy(3, 100*time.Millisecond),
    pool.WithOnEachAttempt(func(task int, attempt int, err error) {
        log.Printf("Task %d attempt %d failed: %v", task, attempt, err)
    }),
)

⚠️ Important: All hooks must be thread-safe as they may be called concurrently by multiple workers.


πŸ”₯ Complete Example

Combining multiple features for a robust task processing system:

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/utkarsh5026/gopool/pool"
)

type APIRequest struct {
    ID   int
    URL  string
}

func main() {
    ctx := context.Background()

    // Configure pool with multiple features
    p := pool.NewWorkerPool[APIRequest, string](
        pool.WithWorkerCount(5),
        pool.WithTaskBuffer(50),
        pool.WithRetryPolicy(3, 200*time.Millisecond),
        pool.WithRateLimit(10.0, 5),
        pool.WithBeforeTaskStart(func(task APIRequest) {
            log.Printf("Processing request %d: %s", task.ID, task.URL)
        }),
        pool.WithOnTaskEnd(func(task APIRequest, result string, err error) {
            if err != nil {
                log.Printf("Request %d failed: %v", task.ID, err)
            }
        }),
        pool.WithOnEachAttempt(func(task APIRequest, attempt int, err error) {
            log.Printf("Request %d retry attempt %d: %v", task.ID, attempt, err)
        }),
    )

    // Create tasks
    tasks := []APIRequest{
        {ID: 1, URL: "https://api.example.com/data/1"},
        {ID: 2, URL: "https://api.example.com/data/2"},
        {ID: 3, URL: "https://api.example.com/data/3"},
    }

    // Process with timeout
    ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
    defer cancel()

    results, err := p.Process(ctx, tasks, func(ctx context.Context, req APIRequest) (string, error) {
        // Simulate API call
        time.Sleep(100 * time.Millisecond)
        return fmt.Sprintf("Data from %s", req.URL), nil
    })

    if err != nil {
        log.Fatal(err)
    }

    for i, result := range results {
        fmt.Printf("Result %d: %s\n", i+1, result)
    }
}

πŸ“š API Reference

Core Types

WorkerPool[T, R]

The main worker pool type.

  • T: Task type (input)
  • R: Result type (output)

Constructor

func NewWorkerPool[T, R any](options ...WorkerPoolOption) *WorkerPool[T, R]

Processing Methods

// Process a slice of tasks, returns ordered results
func (wp *WorkerPool[T, R]) Process(
    ctx context.Context,
    tasks []T,
    processFn func(context.Context, T) (R, error),
) ([]R, error)

// Process a map of tasks, returns results mapped by keys
func (wp *WorkerPool[T, R]) ProcessMap(
    ctx context.Context,
    tasks map[K]T,
    processFn func(context.Context, T) (R, error),
) (map[K]R, error)

// Process tasks from a channel, returns result channel
func (wp *WorkerPool[T, R]) ProcessStream(
    ctx context.Context,
    taskChan <-chan T,
    processFn func(context.Context, T) (R, error),
) <-chan StreamResult[R]

Configuration Options

Option Description
WithWorkerCount(count int) Set number of concurrent workers
WithTaskBuffer(size int) Set task channel buffer size
WithRetryPolicy(maxAttempts int, initialDelay time.Duration) Configure retry behavior with exponential backoff
WithRateLimit(tasksPerSecond float64, burst int) Set rate limiting for task processing
WithBeforeTaskStart[T](func(T)) Hook called before task processing
WithOnTaskEnd[T, R](func(T, R, error)) Hook called after task completion
WithOnEachAttempt[T](func(T, int, error)) Hook called after each retry attempt

πŸ’‘ Best Practices

1️⃣ Choose the Right Worker Count

  • CPU-bound tasks: Set to runtime.NumCPU()
  • I/O-bound tasks: Set higher (e.g., 2-4x CPU count)
  • External API calls: Respect rate limits and consider timeouts

2️⃣ Use Context for Cancellation

Always pass a context with timeout or cancellation:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()

results, err := pool.Process(ctx, tasks, processFn)

3️⃣ Handle Errors Appropriately

  • Use retry policy for transient failures
  • Implement proper error handling in your process function
  • Use hooks to log or monitor errors

4️⃣ Rate Limiting for External Services

When calling external APIs, use rate limiting to avoid overwhelming them:

pool.NewWorkerPool[T, R](
    pool.WithRateLimit(10.0, 5), // 10 req/sec, burst of 5
)

5️⃣ Thread-Safe Hooks

Ensure hook functions are thread-safe as they run concurrently:

var mu sync.Mutex
var completed int

pool.WithOnTaskEnd(func(task int, result string, err error) {
    mu.Lock()
    completed++
    mu.Unlock()
})

⚑ Performance

Benchmark Results

Tested on Intel i7-11800H @ 2.30GHz (16 cores):

Metric Result
Peak Throughput ~1M tasks/sec (simple CPU tasks)
Worker Efficiency 400-500K tasks/sec/worker (2-4 workers)
Memory per Task ~65 bytes
Parallel Speedup 19x vs sequential (1000 tasks)

Key Findings:

  • Buffer size 4-8x worker count provides ~30% throughput boost
  • Optimal worker count: 8-16 for CPU-bound, 24-48 for I/O-bound tasks
  • Minimal overhead: ~5% with hooks, ~1 allocation per task

Run benchmarks:

# Run all comprehensive benchmarks
go test -bench=BenchmarkComprehensive -benchmem ./benchmarks/

# Run specific benchmark categories
go test -bench=BenchmarkComprehensive_Throughput -benchmem ./benchmarks/
go test -bench=BenchmarkComprehensive_Modes -benchmem ./benchmarks/
go test -bench=BenchmarkComprehensive_Features -benchmem ./benchmarks/
go test -bench=BenchmarkComprehensive_Workload -benchmem ./benchmarks/
go test -bench=BenchmarkComprehensive_Memory -benchmem ./benchmarks/
go test -bench=BenchmarkComprehensive_Scenario -benchmem ./benchmarks/

Note: The comprehensive benchmarks are located in the benchmarks directory as a separate package for better organization and modularity.


πŸ“Š Benchmark Visualization

gopool includes a powerful benchmark visualization tool that runs all strategy comparison benchmarks and presents the results in an interactive dark-themed dashboard.

Features

  • One-Command Execution: Run benchmarks and start visualization server with Docker
  • Interactive Charts: Apache ECharts with zoom, pan, and filtering capabilities
  • Strategy Comparisons: Compare Channel, WorkStealing, MPMC, and PriorityQueue strategies
  • Historical Analysis: Track performance trends over time
  • Dark Theme UI: Beautiful Tailwind CSS dark mode interface

Quick Start

# Using Docker
docker build -t gopool-benchviz -f tools/benchviz/Dockerfile .
docker run -p 8080:8080 -v $(pwd)/benchmark-data:/data gopool-benchviz

# Using Docker Compose
cd tools/benchviz
docker-compose up --build

# Access the dashboard
open http://localhost:8080

Dashboard Views

Main Dashboard:

  • Summary cards with key metrics
  • Category tabs for filtering (CPUBound, IOBound, Mixed, etc.)
  • Execution time comparison chart (grouped bar chart)
  • Throughput chart (tasks/sec)
  • Memory usage chart (bytes/op and allocs/op)
  • Latency percentiles box plot
  • Sortable detailed results table

Comparison View:

  • Select and compare two benchmark runs
  • Performance delta visualization
  • Regression and improvement tables
  • Historical trend analysis

For detailed documentation, see tools/benchviz/README.md.


🀝 Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.


πŸ“„ License

This project is licensed under the Apache License 2.0. See the LICENSE file for details.



gopool Logo



Made with ❀️ by utkarsh5026


If you find this project helpful, please consider giving it a ⭐


Report Bug β€’ Request Feature β€’ Documentation

About

A small, generic worker pool for concurrent task processing in Go πŸ™‚

Topics

Resources

License

Stars

Watchers

Forks

Contributors