Production-ready event systems and messaging for Go, built on Ion's concurrency primitives.
Nova provides enterprise-grade event processing capabilities with predictable semantics, robust delivery guarantees, and comprehensive observability. Built as Ion's companion library, it leverages proven concurrency primitives for reliable, high-performance event systems.
- Predictable Performance: >100k events/sec throughput with <1ms p99 latency
- Delivery Guarantees: At-most-once, at-least-once, and exactly-once delivery modes
- Fault Tolerance: Circuit breakers, retry policies, and dead letter queues
- Graceful Degradation: Backpressure handling and bounded resource usage
- Context-First API: All operations accept context for cancellation and timeouts
- Type-Safe: Rich interfaces with compile-time safety
- Zero Dependencies: No external message brokers or heavyweight frameworks required
- Composable: Mix and match components as needed
- Metrics: Built-in metrics for throughput, latency, and error rates
- Tracing: Distributed trace propagation through events (OpenTelemetry ready)
- Health Monitoring: Circuit breaker status and system health endpoints
- Audit Trails: Complete event processing history
- Ion-Powered: Built on Ion's optimized workerpool, semaphore, and rate limiting
- Concurrent Processing: Configurable concurrency with intelligent load balancing
- Memory Efficient: Bounded queues and configurable retention policies
- Async-First: Non-blocking operations with optional synchronous modes
go get github.com/kolosys/nova@latest
package main
import (
"context"
"fmt"
"log"
"github.com/kolosys/ion/workerpool"
"github.com/kolosys/nova/emitter"
"github.com/kolosys/nova/shared"
)
func main() {
// Create Ion workerpool for event processing
pool := workerpool.New(4, 100, workerpool.WithName("events"))
defer pool.Close(context.Background())
// Create Nova emitter
em := emitter.New(emitter.Config{
WorkerPool: pool,
AsyncMode: true,
BufferSize: 1000,
})
defer em.Shutdown(context.Background())
// Create a simple listener
listener := shared.NewBaseListener("user-handler", func(event shared.Event) error {
fmt.Printf("User event: %s\n", event.ID())
return nil
})
// Subscribe to events
em.Subscribe("user.created", listener)
// Emit events
event := shared.NewBaseEvent("user-123", "user.created", map[string]any{
"name": "John Doe",
"email": "john@example.com",
})
if err := em.EmitAsync(context.Background(), event); err != nil {
log.Fatal(err)
}
fmt.Println("Event emitted successfully!")
}
Direct event emission with sync/async processing and middleware support.
// Create emitter with Ion workerpool
emitter := emitter.New(emitter.Config{
WorkerPool: pool,
AsyncMode: true,
BufferSize: 1000,
MaxConcurrency: 20,
})
// Add middleware for logging
emitter.Middleware(loggingMiddleware)
// Subscribe listeners
emitter.Subscribe("user.created", userHandler)
// Emit events
emitter.EmitAsync(ctx, userCreatedEvent)
Topic-based routing with pattern matching and partitioning.
// Create event bus
bus := bus.New(bus.Config{
WorkerPool: pool,
DefaultPartitions: 4,
DefaultDeliveryMode: bus.AtLeastOnce,
})
// Create topic with specific config
bus.CreateTopic("orders", bus.TopicConfig{
Partitions: 8,
DeliveryMode: bus.ExactlyOnce,
OrderingKey: func(e shared.Event) string { return e.Metadata()["customer_id"] },
})
// Subscribe to topics
bus.Subscribe("orders", orderProcessor)
bus.SubscribePattern("user\\..+", userProcessor) // Pattern matching
// Publish events
bus.Publish(ctx, "orders", orderCreatedEvent)
Lifecycle management with retry policies and circuit breakers.
// Create listener manager
lm := listener.New(listener.Config{WorkerPool: pool})
// Register listener with resilience features
lm.Register(myListener, listener.ListenerConfig{
Concurrency: 10,
RetryPolicy: listener.RetryPolicy{
MaxAttempts: 3,
Backoff: listener.ExponentialBackoff,
InitialDelay: 100 * time.Millisecond,
},
Circuit: listener.CircuitConfig{
Enabled: true,
FailureThreshold: 5,
Timeout: 30 * time.Second,
},
DeadLetter: listener.DeadLetterConfig{
Enabled: true,
Handler: deadLetterHandler,
},
})
lm.Start(ctx)
In-memory event store with replay and live subscriptions.
// Create event store
store := memory.New(memory.Config{
MaxEventsPerStream: 100000,
RetentionDuration: 24 * time.Hour,
})
// Append events
store.Append(ctx, "user-stream", events...)
// Read events
events, cursor, err := store.Read(ctx, "user-stream", cursor, 100)
// Replay historical events
replayCh, err := store.Replay(ctx, time.Now().Add(-1*time.Hour), time.Now())
for event := range replayCh {
fmt.Printf("Replayed: %s\n", event.ID())
}
// Live subscription
liveCh, err := store.Subscribe(ctx, "user-stream", cursor)
for event := range liveCh {
fmt.Printf("Live: %s\n", event.ID())
}
// At-most-once: Fire and forget (highest performance)
bus.AtMostOnce
// At-least-once: Guaranteed delivery, possible duplicates (default)
bus.AtLeastOnce
// Exactly-once: Guaranteed delivery, no duplicates (highest overhead)
bus.ExactlyOnce
listener.RetryPolicy{
MaxAttempts: 3,
InitialDelay: 100 * time.Millisecond,
MaxDelay: 30 * time.Second,
Backoff: listener.ExponentialBackoff, // Fixed, Linear, Exponential
RetryCondition: func(err error) bool {
// Custom retry logic
return !isNonRetryableError(err)
},
}
listener.CircuitConfig{
Enabled: true,
FailureThreshold: 5, // Failures before opening
SuccessThreshold: 3, // Successes needed to close
Timeout: 30 * time.Second, // Time before retry
SlidingWindow: time.Minute, // Window for counting failures
}
Nova provides comprehensive metrics out of the box:
// Core metrics
nova_events_emitted_total{type, result}
nova_events_processed_total{listener, result}
nova_listener_duration_seconds{listener}
nova_queue_size{component}
nova_active_listeners{emitter}
// Use with your metrics collector
metrics := shared.NewSimpleMetricsCollector()
emitter.New(emitter.Config{MetricsCollector: metrics})
// Check system health
health := listenerManager.Health()
fmt.Printf("System health: %s\n", health) // healthy, degraded, unhealthy, circuit-open
// Get detailed stats
stats := emitter.Stats()
fmt.Printf("Events emitted: %d, failed: %d\n", stats.EventsEmitted, stats.FailedEvents)
Events carry trace context automatically:
// Trace context flows through events
span := trace.SpanFromContext(ctx)
event.SetMetadata("trace_id", span.SpanContext().TraceID().String())
Check out the complete example for a full demonstration including:
- Multi-component event system setup
- User and order processing workflows
- Audit trails and event replay
- Health monitoring and metrics
- Graceful shutdown handling
cd examples/complete
go run .
Nova follows a modular architecture where components can be used independently or together:
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β Application β β Application β β Application β
βββββββββββ¬ββββββββ βββββββββββ¬ββββββββ βββββββββββ¬ββββββββ
β β β
βΌ βΌ βΌ
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β Emitter β β Bus β β Listener Mgr β
β β’ Sync/Async β β β’ Topics β β β’ Retries β
β β’ Middleware β β β’ Patterns β β β’ Circuits β
β β’ Concurrency β β β’ Partitions β β β’ Dead Letter β
βββββββββββ¬ββββββββ βββββββββββ¬ββββββββ βββββββββββ¬ββββββββ
β β β
ββββββββββββββββββββββββΌβββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββ
β Event Store β
β β’ Persistence β
β β’ Replay β
β β’ Subscriptions β
β β’ Retention β
βββββββββββ¬ββββββββββββ
β
βΌ
βββββββββββββββββββββββ
β Ion Workerpool β
β β’ Concurrency β
β β’ Load Balancing β
β β’ Resource Limits β
βββββββββββββββββββββββ
Nova delivers exceptional performance:
BenchmarkEmitter_EmitSync-8 4,962,074 273.4 ns/op
BenchmarkEmitter_EmitAsync-8 8,234,567 145.2 ns/op
BenchmarkEventBus_Publish-8 3,456,789 289.1 ns/op
BenchmarkEventStore_Append-8 2,987,654 335.7 ns/op
Run benchmarks yourself:
go test -bench=. -benchmem ./...
- Buffer Sizes: Match your event rate (start with 1000-10000)
- Concurrency: Use 2-4x CPU cores for CPU-bound listeners
- Partitions: Increase for better parallelism (start with CPU cores)
- Delivery Mode: Use AtMostOnce for highest throughput
Nova includes comprehensive tests with race detection:
# Run all tests
go test ./...
# Run with race detection (Linux/macOS)
go test -race ./...
# Run benchmarks
go test -bench=. ./...
# Test coverage
go test -cover ./...
Nova provides adapters and patterns for migrating from:
- Channels: Direct replacement with better error handling
- EventBus libraries: Similar API with production features
- Message Queues: In-process alternative with persistence options
Start with a single component and expand:
- Begin with Emitter: Replace direct function calls
- Add Bus: Introduce topic-based routing
- Enhance with Listeners: Add resilience features
- Store Events: Enable replay and audit capabilities
We welcome contributions! Please see CONTRIBUTING.md for guidelines.
git clone https://github.com/kolosys/nova.git
cd nova
go mod tidy
go test ./...
Nova is released under the MIT License. See LICENSE for details.
- Ion: Provides the foundational concurrency primitives
- Go Team: For the excellent sync and context packages
- Community: For feedback and contributions
Built with β€οΈ by the Kolosys team
For questions, issues, or feature requests, please open an issue or visit our discussions.