Skip to content

Re-add events package #2761

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jun 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go-micro.dev/v5/debug/profile/http"
"go-micro.dev/v5/debug/profile/pprof"
"go-micro.dev/v5/debug/trace"
"go-micro.dev/v5/events"
"go-micro.dev/v5/logger"
mprofile "go-micro.dev/v5/profile"
"go-micro.dev/v5/registry"
Expand Down Expand Up @@ -293,6 +294,7 @@ var (
DefaultCaches = map[string]func(...cache.Option) cache.Cache{
"redis": redis.NewRedisCache,
}
DefaultStreams = map[string]func(...events.Option) (events.Stream, error){}
)

func init() {
Expand All @@ -313,6 +315,7 @@ func newCmd(opts ...Option) Cmd {
DebugProfile: &profile.DefaultProfile,
Config: &config.DefaultConfig,
Cache: &cache.DefaultCache,
Stream: &events.DefaultStream,

Brokers: DefaultBrokers,
Clients: DefaultClients,
Expand Down Expand Up @@ -376,7 +379,10 @@ func (c *cmd) Before(ctx *cli.Context) error {
if profileName != "" {
switch profileName {
case "local":
imported := mprofile.LocalProfile()
imported, ierr := mprofile.LocalProfile()
if ierr != nil {
return fmt.Errorf("failed to load local profile: %v", ierr)
}
*c.opts.Registry = imported.Registry
registry.DefaultRegistry = imported.Registry
*c.opts.Broker = imported.Broker
Expand All @@ -386,7 +392,10 @@ func (c *cmd) Before(ctx *cli.Context) error {
*c.opts.Transport = imported.Transport
transport.DefaultTransport = imported.Transport
case "nats":
imported := mprofile.NatsProfile()
imported, ierr := mprofile.NatsProfile()
if ierr != nil {
return fmt.Errorf("failed to load nats profile: %v", ierr)
}
// Set the registry
sopts, clopts := c.setRegistry(imported.Registry)
serverOpts = append(serverOpts, sopts...)
Expand All @@ -407,6 +416,11 @@ func (c *cmd) Before(ctx *cli.Context) error {
serverOpts = append(serverOpts, sopts...)
clientOpts = append(clientOpts, clopts...)

// Set the stream
sopts, clopts = c.setStream(imported.Stream)
serverOpts = append(serverOpts, sopts...)
clientOpts = append(clientOpts, clopts...)

// Add more profiles as needed
default:
return fmt.Errorf("unsupported profile: %s", profileName)
Expand Down Expand Up @@ -701,6 +715,17 @@ func (c *cmd) setRegistry(r registry.Registry) ([]server.Option, []client.Option
registry.DefaultRegistry = *c.opts.Registry
return serverOpts, clientOpts
}
func (c *cmd) setStream(s events.Stream) ([]server.Option, []client.Option) {
var serverOpts []server.Option
var clientOpts []client.Option
*c.opts.Stream = s
// TODO: do server and client need a Stream?
// serverOpts = append(serverOpts, server.Registry(*c.opts.Registry))
// clientOpts = append(clientOpts, client.Registry(*c.opts.Registry))

events.DefaultStream = *c.opts.Stream
return serverOpts, clientOpts
}

func (c *cmd) setBroker(b broker.Broker) ([]server.Option, []client.Option) {
var serverOpts []server.Option
Expand Down
17 changes: 17 additions & 0 deletions cmd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"go-micro.dev/v5/config"
"go-micro.dev/v5/debug/profile"
"go-micro.dev/v5/debug/trace"
"go-micro.dev/v5/events"
"go-micro.dev/v5/registry"
"go-micro.dev/v5/selector"
"go-micro.dev/v5/server"
Expand Down Expand Up @@ -42,13 +43,15 @@ type Options struct {
Broker *broker.Broker
Auths map[string]func(...auth.Option) auth.Auth
Store *store.Store
Stream *events.Stream
Configs map[string]func(...config.Option) (config.Config, error)
Clients map[string]func(...client.Option) client.Client
Registries map[string]func(...registry.Option) registry.Registry
Selectors map[string]func(...selector.Option) selector.Selector
Servers map[string]func(...server.Option) server.Server
Transports map[string]func(...transport.Option) transport.Transport
Stores map[string]func(...store.Option) store.Store
Streams map[string]func(...events.Option) events.Stream
Tracers map[string]func(...trace.Option) trace.Tracer
Version string

Expand Down Expand Up @@ -141,6 +144,13 @@ func Store(s *store.Store) Option {
}
}

func Stream(s *events.Stream) Option {
return func(o *Options) {
o.Stream = s
events.DefaultStream = *s
}
}

func Tracer(t *trace.Tracer) Option {
return func(o *Options) {
o.Tracer = t
Expand Down Expand Up @@ -169,6 +179,13 @@ func NewBroker(name string, b func(...broker.Option) broker.Broker) Option {
}
}

// New stream func.
func NewStream(name string, b func(...events.Option) events.Stream) Option {
return func(o *Options) {
o.Streams[name] = b
}
}

// New cache func.
func NewCache(name string, c func(...cache.Option) cache.Cache) Option {
return func(o *Options) {
Expand Down
92 changes: 92 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Package events is for event streaming and storage
package events

import (
"encoding/json"
"errors"
"time"
)

var (
// DefaultStream is the default events stream implementation
DefaultStream Stream
// DefaultStore is the default events store implementation
DefaultStore Store
)

var (
// ErrMissingTopic is returned if a blank topic was provided to publish
ErrMissingTopic = errors.New("missing topic")
// ErrEncodingMessage is returned from publish if there was an error encoding the message option
ErrEncodingMessage = errors.New("error encoding message")
)

// Stream is an event streaming interface
type Stream interface {
Publish(topic string, msg interface{}, opts ...PublishOption) error
Consume(topic string, opts ...ConsumeOption) (<-chan Event, error)
}

// Store is an event store interface
type Store interface {
Read(topic string, opts ...ReadOption) ([]*Event, error)
Write(event *Event, opts ...WriteOption) error
}

type AckFunc func() error
type NackFunc func() error

// Event is the object returned by the broker when you subscribe to a topic
type Event struct {
// ID to uniquely identify the event
ID string
// Topic of event, e.g. "registry.service.created"
Topic string
// Timestamp of the event
Timestamp time.Time
// Metadata contains the values the event was indexed by
Metadata map[string]string
// Payload contains the encoded message
Payload []byte

ackFunc AckFunc
nackFunc NackFunc
}

// Unmarshal the events message into an object
func (e *Event) Unmarshal(v interface{}) error {
return json.Unmarshal(e.Payload, v)
}

// Ack acknowledges successful processing of the event in ManualAck mode
func (e *Event) Ack() error {
return e.ackFunc()
}

func (e *Event) SetAckFunc(f AckFunc) {
e.ackFunc = f
}

// Nack negatively acknowledges processing of the event (i.e. failure) in ManualAck mode
func (e *Event) Nack() error {
return e.nackFunc()
}

func (e *Event) SetNackFunc(f NackFunc) {
e.nackFunc = f
}

// Publish an event to a topic
func Publish(topic string, msg interface{}, opts ...PublishOption) error {
return DefaultStream.Publish(topic, msg, opts...)
}

// Consume to events
func Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) {
return DefaultStream.Consume(topic, opts...)
}

// Read events for a topic
func Read(topic string, opts ...ReadOption) ([]*Event, error) {
return DefaultStore.Read(topic, opts...)
}
Loading
Loading