Skip to content

go-async provides a modern, type-safe, and retryable abstraction over Go’s native synchronization primitives — combining simplicity, determinism, and high throughput.

License

Notifications You must be signed in to change notification settings

rubengp99/go-async

Repository files navigation

go-async 🌀

CI Status Version Coverage Go Report Card GoDoc License

A lightweight, type-safe, and retryable asynchronous worker pool for Go — built on sync.WaitGroup, semaphores, context, and condition variables, not errgroup.

go-async provides deterministic, leak-free concurrency with automatic retries, result draining, and type-safe tasks, suitable for high-throughput Go applications.


✨ Features

  • ✅ Type-safe generic workers (Task[T])
  • 🧩 Graceful error propagation
  • 🔁 Built-in retry with exponential backoff + jitter
  • ⚡ Asynchronous result draining (Drain)
  • 🧵 Deterministic shutdown (no goroutine leaks)
  • 🔒 Mutex + condition variable–protected data aggregation
  • 🧰 Fluent functional composition (WithRetry, DrainTo, WithInput)
  • 🧠 Implemented with sync.WaitGroup, semaphores, context, and sync.Cond

📦 Installation

go get github.com/rubengp99/go-async

🧠 Concept Overview

Type Description
Task[T] Represents a unit of async work
Pool Manages concurrent execution using WaitGroup and semaphores
Drain[T] Collects results asynchronously using mutex + condition variable
Args[T] Provides task input and drainer reference
Worker Interface for executable and retryable tasks

⚙️ How It Works

Pool orchestrates multiple Workers concurrently:

  1. Each worker runs in a separate goroutine managed by a WaitGroup.
  2. Concurrency is controlled with a semaphore.
  3. Shared context handles cancellation.
  4. Drain[T] asynchronously collects results.
  5. On completion, resources and channels close automatically.

🧩 Example Usage

Basic Task

output := async.NewDrainer[User]()
task := async.NewTask(func(t async.Args[User]) error {
    t.Drainer.Send(User{Name: "Alice"})
    return nil
}).DrainTo(output)

pool := async.NewPool()
defer pool.Close()

pool.Go(task).Wait()
results := output.Drain()
fmt.Println(results[0].Name) // Alice

Task With Retry

var numRetries int
task := async.NewTask(func(t async.Args[any]) error {
    numRetries++
    if numRetries < 3 {
        return fmt.Errorf("transient error")
    }
    return nil
}).WithRetry(3, 200*time.Millisecond)

pool := async.NewPool()
pool.Go(task).Wait()

Multiple Task Types

outA := async.NewDrainer[A]()
outB := async.NewDrainer[B]()

// Task A
t1 := async.NewTask(func(t async.Args[A]) error {
    t.Drainer.Send(A{Value: "Hello"})
    return nil
}).DrainTo(outA)

// Task B
t2 := async.NewTask(func(t async.Args[B]) error {
    t.Drainer.Send(B{Value: 42.5})
    return nil
}).DrainTo(outB)

pool := async.NewPool()
pool.Go(t1, t2).Wait()

fmt.Println(outA.Drain())
fmt.Println(outB.Drain())

🧰 Interfaces

type Worker interface { Executable; Retryable }
type Executable interface { Execute() error }
type Retryable interface { WithRetry(attempts uint, sleep time.Duration) Worker }

🧱 Structs and Functions

Task[T]

  • Execute() - run the task
  • WithRetry(attempts, sleep) - add retry logic
  • DrainTo(d *Drain[T]) - send output to drain
  • WithInput(input *T) - provide task input

Pool

  • Go(tasks ...Worker) - run tasks concurrently
  • WithRetry(attempts, sleep) - global retry policy
  • WithLimit(limit) - set concurrency limit
  • Wait() - wait for all tasks
  • Close() - cancel and cleanup

Drain[T]

  • Send(input T) - safely push a value
  • Drain() - collect all values
  • Count() - get collected count

🧪 Benchmarks

goos: linux, goarch: amd64, cpu: 13th Gen Intel i9-13900KS
Name Time per op (ns) Allocs per op Bytes per op
ErrGroup-32 178.7 1 24 B
ChannelsWithOutputAndErrChannel-32 259.9 2 72 B
ChannelsWithWaitGroup-32 272.8 2 80 B
MutexWithErrGroup-32 270.9 2 102 B
AsyncPackageWithDrainer-32 277.5 4 162 B
ChannelsWithErrGroup-32 279.5 2 80 B
AsyncPackage-32 297.4 3 96 B

Benchmark Comparison

Even though go-async adds a small constant overhead compared to errgroup (≈100–130 ns per operation), it provides type safety, retries, automatic draining, and deterministic cleanup — all while staying within ~1.7× of native concurrency performance.

🧩 Benchmark Insights

  • AsyncPackage and AsyncPackageWithDrainer show consistent sub-microsecond operation times.
  • Memory allocations remain extremely low — under 250 B/op even with drainer support.
  • The performance delta vs errgroup reflects controlled synchronization overhead (mutex + condition variable).
  • In practice, go-async scales linearly with worker count and maintains predictable latency under load.

⚡ Design Highlights

  • Structured concurrency with sync.WaitGroup
  • Controlled parallelism via semaphores
  • Mutex + sync.Cond–protected drains
  • Context-based cancellation and cleanup
  • Exponential backoff retries
  • Leak-free, deterministic shutdown

⚠️ Notes and Best Practices

General

  • Graceful Shutdown — always call pool.Close() or defer it for safe cleanup.
  • Thread Safety — never access internal slices or channels directly.
  • Non-blocking design — use Drain() or wait for pool completion instead of manual close() calls.

Drainer (Drain)

  • Create via async.NewDrainer[T]()
  • Use Send() to safely push results
  • Collect values using Drain()
  • Internally guarded by sync.Mutex and sync.Cond

Task and Worker Management

  • Wrap async functions with async.NewTask()
  • Chain configuration fluently using .WithRetry() and .DrainTo()
  • Provide inputs using .WithInput()

Pool

  • Use async.NewPool() for controlled concurrency
  • Limit parallelism with .WithLimit(limit)
  • Apply retry policy globally with .WithRetry(attempts, sleep)
  • Wait for all tasks to complete using .Wait()

Testing

  • Run deterministic tests with:
go test -v ./...
  • Benchmark performance with:
go test -bench . -benchmem -memprofile=mem.prof

💬 Summary

go-async provides a modern, type-safe, and retryable abstraction over Go’s native synchronization primitives — combining simplicity, determinism, and high throughput.

Built for developers who want concurrency that’s:

  • Readable
  • Deterministic
  • Retry-aware
  • Leak-free

📜 License

MIT License © 2025 rubengp99

About

go-async provides a modern, type-safe, and retryable abstraction over Go’s native synchronization primitives — combining simplicity, determinism, and high throughput.

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Packages

No packages published

Languages