Skip to content

jaredmtdev/gather

Repository files navigation

  _____       _______ _    _ ______ _____  
 / ____|   /\|__   __| |  | |  ____|  __ \ 
| |  __   /  \  | |  | |__| | |__  | |__) |
| | |_ | / /\ \ | |  |  __  |  __| |  _  / 
| |__| |/ ____ \| |  | |  | | |____| | \ \ 
 \_____/_/    \_\_|  |_|  |_|______|_|  \_\
 
gather logo

Gather is a lightweight channel-based concurrency library for Go. It helps you build worker pools, pipelines, and middleware.

Quick Example

opts := []gather.Opt {
    gather.WithWorkerSize(100),
    gather.WithBufferSize(10),
    gather.WithOrderPreserved(),
}

handler := func(ctx context.Context, in Foo, _ *gather.Scope[Foo]) (Bar, error) {
    // do work...
    return Bar{}, nil
}

out := gather.Workers(ctx, in, handler, opts...) 

for v := range out {
    // consume
}

Add to your project

go get github.com/jaredmtdev/gather

API at a glance

  • Workers: start a worker pool that consumes an input channel and returns an output channel
  • HandlerFunc: handles each job
  • Scope: tools available to a handler (i.e. retries)
  • Middleware: wrap handlers and other middleware
  • Chain: chains multiple middleware

Design Philosophy

Gather provides the glue: workers, pipelines, and middleware.
You design the concurrency patterns that fit your use case.

Simple

  • familiar middleware model (like "net/http")
  • decouples middleware plumbing from business logic
  • uses plain go primitives: context, channels, and functions

Flexible

  • leaves retry and error handling decisions to you
  • lets you manage input/output channels directly
  • no global state
  • context-aware: honors cancellations, timeouts, and deadlines

Should I use Gather?

Use Gather if channels are unavoidable or if you need pipeline semantics that errgroup and sync.WaitGroup don't give you

When Gather is a good fit

  • you need middleware like retries, circuit breakers, backpressure, timeouts, log, etc
  • multi-stage pipelines with fan-out/fan-in
  • optional ordering with a reorder gate
  • composability of stages

When a simpler tool is better

  • run independent tasks concurrently and stop on the first error -> use errgroup
  • short-lived CPU-bound work -> use sync.WaitGroup
  • background task waiting to close a channel -> use plain goroutine
  • simple generator -> use plain goroutine

Getting Started

building a worker pool

1: build your handler

This handles each item sent to the worker pool

handler := func(ctx context.Context, in Foo, scope *gather.Scope[Foo]) (Bar, error) {
    // do work...
    return Bar{}, nil
}

The context can be cancelled at any time to shut down the worker pool. Note that it's important for the handler to also honor any context cancellation for a quicker cancellation.

The in variable can be any type and the response from the workerpool can be any type. When returning an error, the result is not sent to the output channel. If needed, the error can also be sent to an error channel (which you create) and then processed in a separate go routine (which you define). The error response also comes in handy when building middleware.

Optionally, make custom middleware to conveniently wrap around the handler:

wrappedHandler := logger(retries(rateLimiter(handler)))

// alternatively, chain the middleware:
mw := gather.Chain(rateLimiter, retries, logger)
wrappedHandler := mw(handler)

See examples/internal/samplemiddleware/ for more detailed examples on building middleware.

The scope provides extra capabilities that may come in handy such as retries or spawning new go routines with a guaruntee that those go routines finish before the worker pool shuts down.

2: build your generator

You need to have a channel of any type <-chan T which can only be received by the worker pool.

3: configure and run the worker pool

out := gather.Workers(ctx, in, handler, opts...) 

This will return an output channel. The channel must be consumed or drained so the workers don't get blocked:

// consume output
for v := range out {
    fmt.Println(v)
}

// alternatively, drain output if it doesn't need to be consumed
for range out{
}

Notice the opts.... These options are used to configure the worker pool. Look for any function starting with gather.With. Here you can configure things like the number of workers, channel buffer size, preserve order, etc.

Building a pipeline

The above worker pools are a single stage of the pipeline. To build a pipeline, just build multiple worker pools and pass the output of one into the next:

out1 := gather.Workers(ctx, in, handler1, opts...) 
out2 := gather.Workers(ctx, out1, handler2, opts...) 
out3 := gather.Workers(ctx, out2, handler3, opts...) 
for range out3 {
    // drain
}

This is quite simple and gives full control! You can tune the configuration of each stage of the pipeline. You could cancel at any stage to stop the entire pipeline.

Examples

Please see examples/ folder for some simple examples.

Future Ideas

  • sharding across multiple channels
  • seq package: offer synchronous helpers that utilize iter.Seq and integrate nicely with Gather
  • include WithEventHook(hook func(Event)) Opt which can be used for logging/debugging

About

A lightweight channel-based concurrency library for Go

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

  •  
  •