Skip to content

Commit

Permalink
Merge f47d2f7 into b5ee01b
Browse files Browse the repository at this point in the history
  • Loading branch information
maxekman committed Nov 22, 2021
2 parents b5ee01b + f47d2f7 commit e6f10b2
Show file tree
Hide file tree
Showing 12 changed files with 196 additions and 19 deletions.
54 changes: 43 additions & 11 deletions examples/todomvc/backend/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,13 @@ import (

eh "github.com/looplab/eventhorizon"
"github.com/looplab/eventhorizon/commandhandler/bus"

redisEventBus "github.com/looplab/eventhorizon/eventbus/redis"
tracingEventBus "github.com/looplab/eventhorizon/eventbus/tracing"
mongoEventStore "github.com/looplab/eventhorizon/eventstore/mongodb"
tracingEventStore "github.com/looplab/eventhorizon/eventstore/tracing"
"github.com/looplab/eventhorizon/middleware/commandhandler/tracing"
"github.com/looplab/eventhorizon/middleware/eventhandler/observer"
mongoOutbox "github.com/looplab/eventhorizon/outbox/mongodb"
mongoRepo "github.com/looplab/eventhorizon/repo/mongodb"
tracingRepo "github.com/looplab/eventhorizon/repo/tracing"
"github.com/looplab/eventhorizon/repo/version"
"github.com/looplab/eventhorizon/tracing"
"github.com/looplab/eventhorizon/uuid"

"github.com/looplab/eventhorizon/examples/todomvc/backend/domains/todo"
Expand All @@ -51,6 +47,7 @@ func main() {
if mongodbAddr == "" {
mongodbAddr = "localhost:27017"
}

mongodbURI := "mongodb://" + mongodbAddr

// Connect to localhost if not running inside docker
Expand All @@ -70,7 +67,9 @@ func main() {
if _, err := rand.Read(b); err != nil {
log.Fatal("could not get random DB name:", err)
}

db := "todomvc-" + hex.EncodeToString(b)

log.Println("using DB:", db)

traceCloser, err := NewTracer(db, tracingURL)
Expand All @@ -79,36 +78,47 @@ func main() {
}

// Create the outbox that will project and publish events.
outbox, err := mongoOutbox.NewOutbox(mongodbURI, db)
var outbox eh.Outbox

mongoOutbox, err := mongoOutbox.NewOutbox(mongodbURI, db)
if err != nil {
log.Fatalf("could not create outbox: %s", err)
}

outbox = tracing.NewOutbox(mongoOutbox)

go func() {
for err := range outbox.Errors() {
log.Print("outbox:", err)
}
}()

outbox.Start()

// Create the event store.
var eventStore eh.EventStore

if eventStore, err = mongoEventStore.NewEventStoreWithClient(
outbox.Client(), db,
mongoOutbox.Client(), db,
mongoEventStore.WithEventHandlerInTX(outbox),
); err != nil {
log.Fatal("could not create event store: ", err)
}
eventStore = tracingEventStore.NewEventStore(eventStore)

eventStore = tracing.NewEventStore(eventStore)

// Create an command bus.
commandBus := bus.NewCommandHandler()

// Create the repository and wrap in a version repository.
var todoRepo eh.ReadWriteRepo

if todoRepo, err = mongoRepo.NewRepo(mongodbURI, db, "todos"); err != nil {
log.Fatal("could not create invitation repository: ", err)
}

todoRepo = version.NewRepo(todoRepo)
todoRepo = tracingRepo.NewRepo(todoRepo)
todoRepo = tracing.NewRepo(todoRepo)

// Setup the Todo domain.
if err := todo.SetupDomain(commandBus, eventStore, outbox, todoRepo); err != nil {
Expand All @@ -119,15 +129,18 @@ func main() {

// Create the event bus that distributes events.
var eventBus eh.EventBus

if eventBus, err = redisEventBus.NewEventBus(redisAddr, db, "backend"); err != nil {
log.Fatal("could not create event bus: ", err)
}

go func() {
for err := range eventBus.Errors() {
log.Print("eventbus:", err)
}
}()
eventBus = tracingEventBus.NewEventBus(eventBus)

eventBus = tracing.NewEventBus(eventBus)
if err := outbox.AddHandler(ctx, eh.MatchAll{}, eventBus); err != nil {
log.Fatal("could not add event bus to outbox:", err)
}
Expand All @@ -144,7 +157,7 @@ func main() {

// Add tracing middleware to init tracing spans, and the logging middleware.
commandHandler := eh.UseCommandHandlerMiddleware(commandBus,
tracing.NewMiddleware(),
tracing.NewCommandHandlerMiddleware(),
CommandLogger,
)

Expand All @@ -153,15 +166,18 @@ func main() {
if err != nil {
log.Fatal("could not create handler: ", err)
}

srv := &http.Server{
Addr: ":8080",
Handler: h,
}
srvClosed := make(chan struct{})

go func() {
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal("could not listen HTTP: ", err)
}

close(srvClosed)
}()

Expand All @@ -182,25 +198,32 @@ func main() {
<-sigint

log.Println("waiting for HTTP server to close")

if err := srv.Shutdown(context.Background()); err != nil {
log.Print("could not shutdown HTTP server: ", err)
}

<-srvClosed

// Cancel all handlers and wait.
log.Println("waiting for handlers to finish")

if err := eventBus.Close(); err != nil {
log.Print("could not close event bus: ", err)
}

if err := outbox.Close(); err != nil {
log.Print("could not close outbox: ", err)
}

if err := todoRepo.Close(); err != nil {
log.Print("could not close todo repo: ", err)
}

if err := eventStore.Close(); err != nil {
log.Print("could not close event store: ", err)
}

if err := traceCloser.Close(); err != nil {
log.Print("could not close tracer: ", err)
}
Expand All @@ -212,6 +235,7 @@ func main() {
func CommandLogger(h eh.CommandHandler) eh.CommandHandler {
return eh.CommandHandlerFunc(func(ctx context.Context, cmd eh.Command) error {
log.Printf("CMD: %#v", cmd)

return h.HandleCommand(ctx, cmd)
})
}
Expand All @@ -227,12 +251,14 @@ func (l *EventLogger) HandlerType() eh.EventHandlerType {
// HandleEvent implements the HandleEvent method of the EventHandler interface.
func (l *EventLogger) HandleEvent(ctx context.Context, event eh.Event) error {
log.Printf("EVENT: %s", event)

return nil
}

func seedExample(h eh.CommandHandler, todoRepo eh.ReadRepo) {
cmdCtx := context.Background()
id := uuid.New()

if err := h.HandleCommand(cmdCtx, &todo.Create{
ID: id,
}); err != nil {
Expand All @@ -246,17 +272,21 @@ func seedExample(h eh.CommandHandler, todoRepo eh.ReadRepo) {
}); err != nil {
log.Fatal("there should be no error: ", err)
}

if err := h.HandleCommand(cmdCtx, &todo.AddItem{
ID: id,
Description: "Run the TodoMVC example",
}); err != nil {
log.Fatal("there should be no error: ", err)
}

findCtx, cancelFind := version.NewContextWithMinVersionWait(cmdCtx, 3)
if _, err := todoRepo.Find(findCtx, id); err != nil {
log.Fatal("could not find created todo list: ", err)
}

cancelFind()

if err := h.HandleCommand(cmdCtx, &todo.CheckAllItems{
ID: id,
}); err != nil {
Expand All @@ -270,12 +300,14 @@ func seedExample(h eh.CommandHandler, todoRepo eh.ReadRepo) {
}); err != nil {
log.Fatal("there should be no error: ", err)
}

if err := h.HandleCommand(cmdCtx, &todo.AddItem{
ID: id,
Description: "Read the Event Horizon source",
}); err != nil {
log.Fatal("there should be no error: ", err)
}

if err := h.HandleCommand(cmdCtx, &todo.AddItem{
ID: id,
Description: "Create a PR",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"github.com/opentracing/opentracing-go/ext"
)

// NewMiddleware returns a new command handler middleware that adds tracing spans.
func NewMiddleware() eh.CommandHandlerMiddleware {
// NewCommandHandlerMiddleware returns a new command handler middleware that adds tracing spans.
func NewCommandHandlerMiddleware() eh.CommandHandlerMiddleware {
return eh.CommandHandlerMiddleware(func(h eh.CommandHandler) eh.CommandHandler {
return eh.CommandHandlerFunc(func(ctx context.Context, cmd eh.Command) error {
opName := fmt.Sprintf("Command(%s)", cmd.CommandType())
Expand Down
File renamed without changes.
5 changes: 2 additions & 3 deletions eventbus/tracing/eventbus.go → tracing/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"

eh "github.com/looplab/eventhorizon"
"github.com/looplab/eventhorizon/middleware/eventhandler/tracing"
)

// EventBus is an event bus wrapper that adds tracing.
Expand All @@ -33,7 +32,7 @@ func NewEventBus(eventBus eh.EventBus) *EventBus {
EventBus: eventBus,
// Wrap the eh.EventHandler part of the bus with tracing middleware,
// set as producer to set the correct tags.
h: eh.UseEventHandlerMiddleware(eventBus, tracing.NewMiddleware()),
h: eh.UseEventHandlerMiddleware(eventBus, NewEventHandlerMiddleware()),
}
}

Expand All @@ -49,7 +48,7 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event
}

// Wrap the handlers in tracing middleware.
h = eh.UseEventHandlerMiddleware(h, tracing.NewMiddleware())
h = eh.UseEventHandlerMiddleware(h, NewEventHandlerMiddleware())

return b.EventBus.AddHandler(ctx, m, h)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
)

// NOTE: Not named "Integration" to enable running with the unit tests.
func TestAddHandler(t *testing.T) {
func TestEventBusAddHandler(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"github.com/opentracing/opentracing-go/ext"
)

// NewMiddleware returns an event handler middleware that adds tracing spans.
func NewMiddleware() eh.EventHandlerMiddleware {
// NewEventHandlerMiddleware returns an event handler middleware that adds tracing spans.
func NewEventHandlerMiddleware() eh.EventHandlerMiddleware {
return eh.EventHandlerMiddleware(func(h eh.EventHandler) eh.EventHandler {
return &eventHandler{h}
})
Expand Down
File renamed without changes.
File renamed without changes.
54 changes: 54 additions & 0 deletions tracing/outbox.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright (c) 2021 - The Event Horizon authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tracing

import (
"context"

eh "github.com/looplab/eventhorizon"
)

// Outbox is an event bus wrapper that adds tracing.
type Outbox struct {
eh.Outbox
h eh.EventHandler
}

// NewOutbox creates a Outbox.
func NewOutbox(outbox eh.Outbox) *Outbox {
return &Outbox{
Outbox: outbox,
// Wrap the eh.EventHandler part of the bus with tracing middleware,
// set as producer to set the correct tags.
h: eh.UseEventHandlerMiddleware(outbox, NewEventHandlerMiddleware()),
}
}

// HandleEvent implements the HandleEvent method of the eventhorizon.EventHandler interface.
func (b *Outbox) HandleEvent(ctx context.Context, event eh.Event) error {
return b.h.HandleEvent(ctx, event)
}

// AddHandler implements the AddHandler method of the eventhorizon.Outbox interface.
func (b *Outbox) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.EventHandler) error {
if h == nil {
return eh.ErrMissingHandler
}

// Wrap the handlers in tracing middleware.
h = eh.UseEventHandlerMiddleware(h, NewEventHandlerMiddleware())

return b.Outbox.AddHandler(ctx, m, h)
}

0 comments on commit e6f10b2

Please sign in to comment.