Skip to content

Commit 1205cde

Browse files
asimbketelsen
andcommitted
Re-add events package (#2761)
* Re-add events package * run redis as a dep * remove redis events * fix: data race on event subscriber * fix: data race in tests * fix: store errors * fix: lint issues * feat: default stream * Update file.go --------- Co-authored-by: Brian Ketelsen <bketelsen@gmail.com>
1 parent 8315eee commit 1205cde

File tree

17 files changed

+1621
-32
lines changed

17 files changed

+1621
-32
lines changed

cmd/cmd.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"go-micro.dev/v5/debug/profile/http"
2424
"go-micro.dev/v5/debug/profile/pprof"
2525
"go-micro.dev/v5/debug/trace"
26+
"go-micro.dev/v5/events"
2627
"go-micro.dev/v5/logger"
2728
mprofile "go-micro.dev/v5/profile"
2829
"go-micro.dev/v5/registry"
@@ -293,6 +294,7 @@ var (
293294
DefaultCaches = map[string]func(...cache.Option) cache.Cache{
294295
"redis": redis.NewRedisCache,
295296
}
297+
DefaultStreams = map[string]func(...events.Option) (events.Stream, error){}
296298
)
297299

298300
func init() {
@@ -313,6 +315,7 @@ func newCmd(opts ...Option) Cmd {
313315
DebugProfile: &profile.DefaultProfile,
314316
Config: &config.DefaultConfig,
315317
Cache: &cache.DefaultCache,
318+
Stream: &events.DefaultStream,
316319

317320
Brokers: DefaultBrokers,
318321
Clients: DefaultClients,
@@ -376,7 +379,10 @@ func (c *cmd) Before(ctx *cli.Context) error {
376379
if profileName != "" {
377380
switch profileName {
378381
case "local":
379-
imported := mprofile.LocalProfile()
382+
imported, ierr := mprofile.LocalProfile()
383+
if ierr != nil {
384+
return fmt.Errorf("failed to load local profile: %v", ierr)
385+
}
380386
*c.opts.Registry = imported.Registry
381387
registry.DefaultRegistry = imported.Registry
382388
*c.opts.Broker = imported.Broker
@@ -386,7 +392,10 @@ func (c *cmd) Before(ctx *cli.Context) error {
386392
*c.opts.Transport = imported.Transport
387393
transport.DefaultTransport = imported.Transport
388394
case "nats":
389-
imported := mprofile.NatsProfile()
395+
imported, ierr := mprofile.NatsProfile()
396+
if ierr != nil {
397+
return fmt.Errorf("failed to load nats profile: %v", ierr)
398+
}
390399
// Set the registry
391400
sopts, clopts := c.setRegistry(imported.Registry)
392401
serverOpts = append(serverOpts, sopts...)
@@ -407,6 +416,11 @@ func (c *cmd) Before(ctx *cli.Context) error {
407416
serverOpts = append(serverOpts, sopts...)
408417
clientOpts = append(clientOpts, clopts...)
409418

419+
// Set the stream
420+
sopts, clopts = c.setStream(imported.Stream)
421+
serverOpts = append(serverOpts, sopts...)
422+
clientOpts = append(clientOpts, clopts...)
423+
410424
// Add more profiles as needed
411425
default:
412426
return fmt.Errorf("unsupported profile: %s", profileName)
@@ -701,6 +715,17 @@ func (c *cmd) setRegistry(r registry.Registry) ([]server.Option, []client.Option
701715
registry.DefaultRegistry = *c.opts.Registry
702716
return serverOpts, clientOpts
703717
}
718+
func (c *cmd) setStream(s events.Stream) ([]server.Option, []client.Option) {
719+
var serverOpts []server.Option
720+
var clientOpts []client.Option
721+
*c.opts.Stream = s
722+
// TODO: do server and client need a Stream?
723+
// serverOpts = append(serverOpts, server.Registry(*c.opts.Registry))
724+
// clientOpts = append(clientOpts, client.Registry(*c.opts.Registry))
725+
726+
events.DefaultStream = *c.opts.Stream
727+
return serverOpts, clientOpts
728+
}
704729

705730
func (c *cmd) setBroker(b broker.Broker) ([]server.Option, []client.Option) {
706731
var serverOpts []server.Option

cmd/options.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"go-micro.dev/v5/config"
1111
"go-micro.dev/v5/debug/profile"
1212
"go-micro.dev/v5/debug/trace"
13+
"go-micro.dev/v5/events"
1314
"go-micro.dev/v5/registry"
1415
"go-micro.dev/v5/selector"
1516
"go-micro.dev/v5/server"
@@ -42,13 +43,15 @@ type Options struct {
4243
Broker *broker.Broker
4344
Auths map[string]func(...auth.Option) auth.Auth
4445
Store *store.Store
46+
Stream *events.Stream
4547
Configs map[string]func(...config.Option) (config.Config, error)
4648
Clients map[string]func(...client.Option) client.Client
4749
Registries map[string]func(...registry.Option) registry.Registry
4850
Selectors map[string]func(...selector.Option) selector.Selector
4951
Servers map[string]func(...server.Option) server.Server
5052
Transports map[string]func(...transport.Option) transport.Transport
5153
Stores map[string]func(...store.Option) store.Store
54+
Streams map[string]func(...events.Option) events.Stream
5255
Tracers map[string]func(...trace.Option) trace.Tracer
5356
Version string
5457

@@ -141,6 +144,13 @@ func Store(s *store.Store) Option {
141144
}
142145
}
143146

147+
func Stream(s *events.Stream) Option {
148+
return func(o *Options) {
149+
o.Stream = s
150+
events.DefaultStream = *s
151+
}
152+
}
153+
144154
func Tracer(t *trace.Tracer) Option {
145155
return func(o *Options) {
146156
o.Tracer = t
@@ -169,6 +179,13 @@ func NewBroker(name string, b func(...broker.Option) broker.Broker) Option {
169179
}
170180
}
171181

182+
// New stream func.
183+
func NewStream(name string, b func(...events.Option) events.Stream) Option {
184+
return func(o *Options) {
185+
o.Streams[name] = b
186+
}
187+
}
188+
172189
// New cache func.
173190
func NewCache(name string, c func(...cache.Option) cache.Cache) Option {
174191
return func(o *Options) {

events/events.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
// Package events is for event streaming and storage
2+
package events
3+
4+
import (
5+
"encoding/json"
6+
"errors"
7+
"time"
8+
)
9+
10+
var (
11+
// DefaultStream is the default events stream implementation
12+
DefaultStream Stream
13+
// DefaultStore is the default events store implementation
14+
DefaultStore Store
15+
)
16+
17+
var (
18+
// ErrMissingTopic is returned if a blank topic was provided to publish
19+
ErrMissingTopic = errors.New("missing topic")
20+
// ErrEncodingMessage is returned from publish if there was an error encoding the message option
21+
ErrEncodingMessage = errors.New("error encoding message")
22+
)
23+
24+
// Stream is an event streaming interface
25+
type Stream interface {
26+
Publish(topic string, msg interface{}, opts ...PublishOption) error
27+
Consume(topic string, opts ...ConsumeOption) (<-chan Event, error)
28+
}
29+
30+
// Store is an event store interface
31+
type Store interface {
32+
Read(topic string, opts ...ReadOption) ([]*Event, error)
33+
Write(event *Event, opts ...WriteOption) error
34+
}
35+
36+
type AckFunc func() error
37+
type NackFunc func() error
38+
39+
// Event is the object returned by the broker when you subscribe to a topic
40+
type Event struct {
41+
// ID to uniquely identify the event
42+
ID string
43+
// Topic of event, e.g. "registry.service.created"
44+
Topic string
45+
// Timestamp of the event
46+
Timestamp time.Time
47+
// Metadata contains the values the event was indexed by
48+
Metadata map[string]string
49+
// Payload contains the encoded message
50+
Payload []byte
51+
52+
ackFunc AckFunc
53+
nackFunc NackFunc
54+
}
55+
56+
// Unmarshal the events message into an object
57+
func (e *Event) Unmarshal(v interface{}) error {
58+
return json.Unmarshal(e.Payload, v)
59+
}
60+
61+
// Ack acknowledges successful processing of the event in ManualAck mode
62+
func (e *Event) Ack() error {
63+
return e.ackFunc()
64+
}
65+
66+
func (e *Event) SetAckFunc(f AckFunc) {
67+
e.ackFunc = f
68+
}
69+
70+
// Nack negatively acknowledges processing of the event (i.e. failure) in ManualAck mode
71+
func (e *Event) Nack() error {
72+
return e.nackFunc()
73+
}
74+
75+
func (e *Event) SetNackFunc(f NackFunc) {
76+
e.nackFunc = f
77+
}
78+
79+
// Publish an event to a topic
80+
func Publish(topic string, msg interface{}, opts ...PublishOption) error {
81+
return DefaultStream.Publish(topic, msg, opts...)
82+
}
83+
84+
// Consume to events
85+
func Consume(topic string, opts ...ConsumeOption) (<-chan Event, error) {
86+
return DefaultStream.Consume(topic, opts...)
87+
}
88+
89+
// Read events for a topic
90+
func Read(topic string, opts ...ReadOption) ([]*Event, error) {
91+
return DefaultStore.Read(topic, opts...)
92+
}

0 commit comments

Comments
 (0)