Caravan is a Go library that provides powerful primitives for building in-process streaming applications with integrated state management. It combines reactive stream processing with efficient table operations to enable real-time data transformations, aggregations, and stateful workflows.
This is a work in progress. The basics are there, but not yet ready for production use. Use at your own risk
- Topics - FIFO message logs with independent producer/consumer tracking
- Streams - Composable stream processing pipelines with rich operators
- Tables - In-memory key-value tables with column-based storage
- Type Safety - Fully generic APIs leveraging Go generics
- Backpressure - Natural flow control through channel semantics
- Batch Operations - Efficient batch updates for high-throughput scenarios
- Aggregations - Stateful stream aggregations with table integration
go get github.com/kode4food/caravanTopics provide in-process FIFO message logs with independent consumer tracking:
import "github.com/kode4food/caravan"
// Create a topic
topic := caravan.NewTopic[string]()
// Produce messages
producer := topic.NewProducer()
defer producer.Close()
producer.Send() <- "hello world"
// Consume messages
consumer := topic.NewConsumer()
defer consumer.Close()
msg := <-consumer.Receive()Build composable stream processing pipelines:
import (
"github.com/kode4food/caravan"
"github.com/kode4food/caravan/stream/node"
)
type Order struct {
ID string
Amount int
}
// Create input/output topics
ordersIn := caravan.NewTopic[*Order]()
totalsOut := caravan.NewTopic[int]()
// Build a stream pipeline
stream := caravan.NewStream(
node.Bind(
node.Bind(
node.TopicConsumer(ordersIn),
node.Filter(func(o *Order) bool {
return o.Amount > 100
}),
),
node.Map(func(o *Order) int {
return o.Amount
}),
),
node.TopicProducer(totalsOut),
)
// Start processing
running := stream.Start()
defer running.Stop()Efficient key-value storage with column-based operations:
import (
"github.com/kode4food/caravan"
"github.com/kode4food/caravan/table/column"
)
type User struct {
ID string
Name string
Email string
}
// Create a table
usersTable, _ := caravan.NewTable[string, string](
"id", "name", "email",
)
// Create an updater
updater, _ := caravan.NewTableUpdater(
usersTable,
func(u *User) string { return u.ID },
table.MakeColumn("id", func(u *User) string { return u.ID }),
table.MakeColumn("name", func(u *User) string { return u.Name }),
table.MakeColumn("email", func(u *User) string { return u.Email }),
)
// Update the table from a stream
usersIn := caravan.NewTopic[*User]()
usersOut := caravan.NewTopic[*User]()
stream := caravan.NewStream(
node.Bind(
node.TopicConsumer(usersIn),
node.TableUpdater(updater),
),
node.TopicProducer(usersOut),
)
running := stream.Start()
defer running.Stop()
// Produce users
producer := usersIn.NewProducer()
defer producer.Close()
producer.Send() <- &User{ID: "1", Name: "Alice", Email: "alice@example.com"}
// Consume to allow processing
consumer := usersOut.NewConsumer()
defer consumer.Close()
<-consumer.Receive()
// Query the table
getter, _ := usersTable.Getter("name", "email")
values, _ := getter("1")
fmt.Printf("User: %s <%s>\n", values[0], values[1])Caravan provides a rich set of stream operators for building complex pipelines:
Map- Transform messages one-to-oneFlatMap- Transform messages one-to-manyFilter- Filter messages by predicateTransform- Generic transformation operator
Scan- Stateful accumulationAggregate- Windowed aggregationsTableAggregate- Aggregate to table storageReduce- Reduce to single value
GroupBy- Partition by keyWindow- Time-based windowingBuffer- Batch messages by count or time
TableUpdate- Update table from streamTableBatchUpdate- Batch update tableTableScan- Multi-key table lookupsTableAggregate- Aggregate into table
Limit- Limit message countDistinctBy- Remove duplicatesRetry- Retry failed operationsDebounce- Rate limitingThrottle- Throttle message rate
ForEach- Side effectsSplit- Split stream by predicateCombine- Merge multiple streamsGenerate- Generate messages
// Aggregate order totals by user in real-time
statsTable, _ := caravan.NewTable[string, string](
"user_id", "order_count", "total_amount",
)
setter, _ := statsTable.Setter("user_id", "order_count", "total_amount")
ordersIn := caravan.NewTopic[*Order]()
statsOut := caravan.NewTopic[*UserStats]()
stream := caravan.NewStream(
node.Bind(
node.Bind(
node.TopicConsumer(ordersIn),
node.GroupBy(func(o *Order) string {
return o.UserID
}),
),
node.TableAggregate(
&UserStats{OrderCount: 0, TotalAmount: 0},
func(stats *UserStats, order *Order) *UserStats {
return &UserStats{
UserID: order.UserID,
OrderCount: stats.OrderCount + 1,
TotalAmount: stats.TotalAmount + order.Amount,
}
},
func(stats *UserStats) (string, []string) {
return stats.UserID, []string{
stats.UserID,
fmt.Sprintf("%d", stats.OrderCount),
fmt.Sprintf("%d", stats.TotalAmount),
}
},
setter,
),
),
node.TopicProducer(statsOut),
)// Buffer messages and update table in batches
ordersIn := caravan.NewTopic[*Order]()
batchesOut := caravan.NewTopic[[]*Order]()
stream := caravan.NewStream(
node.Bind(
node.Bind(
node.TopicConsumer(ordersIn),
node.Buffer[*Order](100, 1*time.Second), // 100 messages or 1 second
),
node.TableBatchUpdate(orderUpdater),
),
node.TopicProducer(batchesOut),
)// Deduplicate -> Batch -> Update -> Aggregate
stream := caravan.NewStream(
node.Bind(
node.Bind(
node.Bind(
node.Bind(
node.TopicConsumer(ordersIn),
node.DistinctBy(func(o *Order) string {
return o.ID
}),
),
node.Buffer[*Order](10, 500*time.Millisecond),
),
node.TableBatchUpdate(orderUpdater),
),
node.FlatMap(func(batch []*Order) []*Order {
return batch
}),
),
node.Scan(
func(totals map[string]int, o *Order) map[string]int {
totals[o.UserID] += o.Amount
return totals
},
),
node.TopicProducer(statsOut),
)Streams provide flexible error handling through the advice system:
stream := caravan.NewStream(/* ... */)
running := stream.StartWith(func(advice context.Advice, next func()) {
switch a := advice.(type) {
case *context.Error:
log.Printf("Recoverable error: %v", a)
// Error is logged but processing continues
case *context.Fatal:
log.Printf("Fatal error: %v", a)
_ = running.Stop() // Fatal stops the processor
case *context.Debug:
log.Printf("Debug: %v", a)
case context.Stop:
_ = running.Stop() // Stop advises processor to stop
}
})