Go Query Bus
A query bus to fetch all the things.
go get github.com/io-da/query
- Queries
- Handlers
- Result
- Iterator Handlers
- Iterator Result
- Error Handlers
- Cache Adapters
- The Bus
- Benchmarks
- Examples
This library is intended for anyone looking to query for data in a decoupled architecture. No reflection, no closures.
Queries are any type that implements the Query interface. Ideally they should contain immutable data.
type Query interface {
ID() []byte
}
Queries can optionally implement the Cacheable interface for builtin caching.
type Cacheable interface {
CacheKey() []byte
CacheDuration() time.Duration
}
Handlers are any type that implements the Handler interface. Handlers must be instantiated and provided to the bus using the bus.Handlers
function.
type Handler interface {
Handle(qry Query, res *Result) error
}
Handlers catch the query (stop propagation) whenever they explicitly use res.Done()
. Otherwise the query will be provided to all the handlers that expect it. This strategy can be used to have multiple fallback handlers for the same query or have the Result be populated by multiple handlers.
Whenever a query fails to be handled, the bus will throw an error. A query is considered handled whenever any data is provided to the result or when the function res.Handled()
is explicitly used.
Result is the struct returned from bus.Query
. This is where the data fetched will reside.
The handlers provide the data to the result using the functions res.Add
or res.Set
.
This data can then be retrieved by using the the functions res.First
(to retrieve only the first result) or res.All
(to return the whole data slice).
Iterator handlers are any type that implements the IteratorHandler interface. Iterator handlers must be instantiated and provided to the bus using the bus.InitializeIteratorHandlers
function.
type IteratorHandler interface {
Handle(qry Query, res *IteratorResult) error
}
These behave nearly identical to normal handlers. However there are a couple of differences:
- Expect an IteratorResult instead of Result.
- Can not be cached.
Iterator handlers are intended to be used with large sets of data. Providing a possibility to iterate over the data without additional preloading.
IteratorResult is the struct returned from bus.IteratorQuery
. This struct acts as a proxy between the handlers and the consumer.
The handlers provide the data to the result using the function res.Yield
.
This data can then be processed while being populated using the the function res.Iterate
.
Error handlers are any type that implements the ErrorHandler interface. Error handlers are optional (but advised) and provided to the bus using the bus.ErrorHandlers
function.
type ErrorHandler interface {
Handle(qry Query, err error)
}
Any time an error occurs within the bus, it will be passed on to the error handlers. This strategy can be used for decoupled error handling.
Below is a list of errors that can occur.
// query.InvalidQueryError
// query.QueryBusNotInitializedError
// query.QueryBusIsShuttingDownError
// query.ErrorNoQueryHandlersFound
// query.ErrorQueryTimedOut
type errorHandler struct {}
func (e errorHandler) Handle(qry Query, err error) {
switch(err.(type)) {
case query.InvalidQueryError:
// do something
case query.QueryBusNotInitializedError:
// do something
case query.QueryBusIsShuttingDownError:
// do something
case query.ErrorQueryTimedOut, query.ErrorNoQueryHandlersFound:
// do something
default:
// do something
}
}
bus.ErrorHandlers(errorHandler)
Cache adapters are any type that implements the CacheAdapter interface. Cache adapters are optional (but advised) and provided to the bus using the bus.CacheAdapters
function.
type CacheAdapter interface {
Set(qry Cacheable, res *Result) bool
Get(qry Cacheable) *Result
Expire(qry Cacheable)
Shutdown()
}
Just as the query handlers, this approach allows the usage of different cache adapters for different query types.
If the cache adapter returns true
on Set
the bus will assume the result was successfully cached.
On retrieval the bus will return the results from the first adapter that returns data for the given query. The order of the adapters is always respected.
By default the bus comes with a MemoryCacheAdapter. This adapter will cache the results in memory and supports duration specification on the order of microseconds (accuracy depends on server load). Expired results will be automatically cleared from memory.
Bus is the struct that will be used for all the application's queries.
The Bus should be instantiated (NewBus()
) and initialized(bus.InitializeIteratorHandlers
) on application startup.
The initialization is only required for iterator queries and is separated from the instantiation for dependency injection purposes.
The application should instantiate the Bus once and then use it's reference for all the queries.
The order in which the handlers are provided to the Bus is always respected. This is the order used when propagating queries.
The number of workers for iterator queries can be adjusted.
bus.IteratorWorkerPoolSize(10)
If used, this function must be called before the call to bus.InitializeIteratorHandlers
. And it specifies the number of goroutines used to handle iterator queries.
In some scenarios increasing this value can drastically improve performance.
It defaults to the value returned by runtime.GOMAXPROCS(0)
.
The buffer size of the iterator query queue can also be adjusted.
Depending on the use case, this value may greatly impact performance.
bus.IteratorQueueBuffer(100)
If used, this function must be called before the call to bus.InitializeIteratorHandlers
.
It defaults to 100.
The buffer size of the iterator results channel can also be adjusted.
Depending on the use case, this value may greatly impact performance.
bus.IteratorResultBuffer(0)
If used, this function may be called before any iterator query is performed.
It defaults to 0.
The Bus also provides a shutdown function that attempts to gracefully stop the query bus and all its routines.
bus.Shutdown()
This function will block until the bus is fully stopped.
The query handler returns a single value for simulation purposes.
Benchmark Type | Time |
---|---|
Queries | 201 ns/op |
IteratorQueries | 783 ns/op |
Iterator queries add a small overhead and are not worth when used for small sets of data (also due to lack of caching). They are better suited to iterate over large sets of data while avoiding preloading.
A struct
query.
type Foo struct {
bar string
}
func (*Foo) ID() []byte {
return []byte("FOO-UUID")
}
A string
query that also implements caching.
type Bar string
func (Bar) ID() []byte {
return []byte("BAR-UUID")
}
func (Bar) CacheKey() []byte {
return []byte("BAR-CACHE-KEY")
}
func (Bar) CacheDuration() time.Duration {
return time.Minute * 5
}
A query handler that listens to multiple query types.
type FooBarHandler struct {
}
func (hdl *FooBarHandler) Handle(qry Query, res *Result) error {
// check the query type
switch qry := qry.(type) {
case *Foo:
// handler logic
res.Add("Bar")
case Bar:
// handler logic
res.Add("Foo")
}
return nil
}
An iterator query handler.
type FooBarIteratorHandler struct {
}
func (hdl *FooBarIteratorHandler) Handle(qry Query, res *IteratorResult) error {
// check the query type
switch qry := qry.(type) {
case *Foo:
// handler logic
res.Yield("Bar")
}
return nil
}
Initialization and usage of the exemplified queries and handlers
import (
"github.com/io-da/query"
)
func main() {
// instantiate the bus (returns *query.Bus)
bus := query.NewBus()
// provide the bus with all of the application's query handlers
bus.Handlers(
&FooBarHandler{},
)
// initialize the bus with all of the application's iterator query handlers
bus.InitializeIteratorHandlers(
&FooBarIteratorHandler{},
)
// query away!
res1, err := bus.Query(&Foo{})
// get the first result only
val := res1.First() // "Bar"
res2, err := bus.Query(Bar("Bar"))
// get all the results
vals := res2.All() // ["Foo"]
res3, err := bus.IteratorQuery(&Foo{})
// range over the values, processing them while they are being populated
for val := range res3.Iterate() {
// do something with the val
// "Bar"
}
}
Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.
Please make sure to update tests as appropriate.