Skip to content

Commit

Permalink
Merge pull request #321 from maxekman/feature/lock-command-handler-mi…
Browse files Browse the repository at this point in the history
…ddleware

Feature / Lock command handler middleware
  • Loading branch information
maxekman committed Jun 7, 2021
2 parents 706754b + 5430dfa commit 760d8c9
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 13 deletions.
2 changes: 1 addition & 1 deletion middleware/commandhandler/async/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/looplab/eventhorizon/uuid"
)

func TestCommandHandler(t *testing.T) {
func TestMiddleware(t *testing.T) {
cmd := mocks.Command{
ID: uuid.New(),
Content: "content",
Expand Down
59 changes: 59 additions & 0 deletions middleware/commandhandler/lock/local_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2021 - The Event Horizon authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lock

import "sync"

// LocalLock is a Lock implemention using local locking only. Not suitable for
// use in distributed environments.
type LocalLock struct {
locks map[string]struct{}
mu sync.Mutex
}

// NewLocalLock creates a new LocalLock.
func NewLocalLock() *LocalLock {
return &LocalLock{
locks: map[string]struct{}{},
}
}

// Lock implements the Lock method of the Lock interface.
func (l *LocalLock) Lock(id string) error {
l.mu.Lock()
defer l.mu.Unlock()

if _, ok := l.locks[id]; ok {
return ErrLockExists
}

l.locks[id] = struct{}{}

return nil
}

// Unlock implements the Unlock method of the Lock interface.
func (l *LocalLock) Unlock(id string) error {
l.mu.Lock()
defer l.mu.Unlock()

if _, ok := l.locks[id]; !ok {
return ErrNoLockExists
}

delete(l.locks, id)

return nil
}
34 changes: 34 additions & 0 deletions middleware/commandhandler/lock/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) 2021 - The Event Horizon authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lock

import "fmt"

var (
// ErrLockExists is returned from Lock() when the lock is already taken.
ErrLockExists = fmt.Errorf("lock exists")
// ErrNoLockExists is returned from Unlock() when the lock does not exist.
ErrNoLockExists = fmt.Errorf("no lock exists")
)

// Lock is a locker of IDs.
type Lock interface {
// Lock sets a lock for the ID. Returns ErrLockExists if the lock is already
// taken or another error if it was not possible to get the lock.
Lock(id string) error
// Unlock releases the lock for the ID. Returns ErrNoLockExists if there is
// no lock for the ID or another error if it was not possible to unlock.
Unlock(id string) error
}
41 changes: 41 additions & 0 deletions middleware/commandhandler/lock/middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) 2021 - The Event Horizon authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lock

import (
"context"
"log"

eh "github.com/looplab/eventhorizon"
)

// NewMiddleware returns a new lock middle ware using a provided lock implementation.
// Useful for handling only one command per aggregate ID at a time.
func NewMiddleware(l Lock) eh.CommandHandlerMiddleware {
return eh.CommandHandlerMiddleware(func(h eh.CommandHandler) eh.CommandHandler {
return eh.CommandHandlerFunc(func(ctx context.Context, cmd eh.Command) error {
if err := l.Lock(cmd.AggregateID().String()); err != nil {
return err
}
defer func() {
if err := l.Unlock(cmd.AggregateID().String()); err != nil {
log.Printf("eventhorizon: could not unlock command '%s': %s", cmd.AggregateID(), err)
}
}()

return h.HandleCommand(ctx, cmd)
})
})
}
67 changes: 67 additions & 0 deletions middleware/commandhandler/lock/middleware_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) 2021 - The Event Horizon authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lock

import (
"context"
"errors"
"testing"
"time"

eh "github.com/looplab/eventhorizon"
"github.com/looplab/eventhorizon/mocks"
"github.com/looplab/eventhorizon/uuid"
)

func TestMiddleware(t *testing.T) {
cmd := mocks.Command{
ID: uuid.New(),
Content: "content",
}

inner := &LongCommandHandler{}
lock := NewLocalLock()
m := NewMiddleware(lock)
h := eh.UseCommandHandlerMiddleware(inner, m)

// Start a "long running" command.
go func() {
if err := h.HandleCommand(context.Background(), cmd); err != nil {
t.Error("there should not be an error:", err)
}
}()

// Let the goroutine start its work.
time.Sleep(10 * time.Millisecond)

// Try another command with the same ID.
if err := h.HandleCommand(context.Background(), cmd); !errors.Is(err, ErrLockExists) {
t.Error("there should be a lock exists error:", err)
}

time.Sleep(100 * time.Millisecond)

// After the initial command is done, it should be possible to issue another.
if err := h.HandleCommand(context.Background(), cmd); err != nil {
t.Error("there should not be an error:", err)
}
}

type LongCommandHandler struct{}

func (h *LongCommandHandler) HandleCommand(ctx context.Context, cmd eh.Command) error {
time.Sleep(100 * time.Millisecond)
return nil
}
12 changes: 6 additions & 6 deletions middleware/commandhandler/scheduler/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/looplab/eventhorizon/uuid"
)

func TestCommandHandler_Immediate(t *testing.T) {
func TestMiddleware_Immediate(t *testing.T) {
inner := &mocks.CommandHandler{}
m, _ := NewMiddleware()
h := eh.UseCommandHandlerMiddleware(inner, m)
Expand All @@ -42,7 +42,7 @@ func TestCommandHandler_Immediate(t *testing.T) {
}
}

func TestCommandHandler_Delayed(t *testing.T) {
func TestMiddleware_Delayed(t *testing.T) {
inner := &mocks.CommandHandler{}
m, _ := NewMiddleware()
h := eh.UseCommandHandlerMiddleware(inner, m)
Expand All @@ -68,7 +68,7 @@ func TestCommandHandler_Delayed(t *testing.T) {
inner.RUnlock()
}

func TestCommandHandler_ZeroTime(t *testing.T) {
func TestMiddleware_ZeroTime(t *testing.T) {
inner := &mocks.CommandHandler{}
m, _ := NewMiddleware()
h := eh.UseCommandHandlerMiddleware(inner, m)
Expand All @@ -85,7 +85,7 @@ func TestCommandHandler_ZeroTime(t *testing.T) {
}
}

func TestCommandHandler_Errors(t *testing.T) {
func TestMiddleware_Errors(t *testing.T) {
handlerErr := errors.New("handler error")
inner := &mocks.CommandHandler{
Err: handlerErr,
Expand Down Expand Up @@ -113,7 +113,7 @@ func TestCommandHandler_Errors(t *testing.T) {
}
}

func TestCommandHandler_ContextCanceled(t *testing.T) {
func TestMiddleware_ContextCanceled(t *testing.T) {
handlerErr := errors.New("handler error")
inner := &mocks.CommandHandler{
Err: handlerErr,
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestCommandHandler_ContextCanceled(t *testing.T) {
}
}

func TestCommandHandler_ContextDeadline(t *testing.T) {
func TestMiddleware_ContextDeadline(t *testing.T) {
handlerErr := errors.New("handler error")
inner := &mocks.CommandHandler{
Err: handlerErr,
Expand Down
6 changes: 3 additions & 3 deletions middleware/commandhandler/validate/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/looplab/eventhorizon/uuid"
)

func TestCommandHandler_Immediate(t *testing.T) {
func TestMiddleware_Immediate(t *testing.T) {
inner := &mocks.CommandHandler{}
m := NewMiddleware()
h := eh.UseCommandHandlerMiddleware(inner, m)
Expand All @@ -41,7 +41,7 @@ func TestCommandHandler_Immediate(t *testing.T) {
}
}

func TestCommandHandler_WithValidationError(t *testing.T) {
func TestMiddleware_WithValidationError(t *testing.T) {
inner := &mocks.CommandHandler{}
m := NewMiddleware()
h := eh.UseCommandHandlerMiddleware(inner, m)
Expand All @@ -64,7 +64,7 @@ func TestCommandHandler_WithValidationError(t *testing.T) {
}
}

func TestCommandHandler_WithValidationNoError(t *testing.T) {
func TestMiddleware_WithValidationNoError(t *testing.T) {
inner := &mocks.CommandHandler{}
m := NewMiddleware()
h := eh.UseCommandHandlerMiddleware(inner, m)
Expand Down
2 changes: 1 addition & 1 deletion middleware/eventhandler/async/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/looplab/eventhorizon/uuid"
)

func TestEventHandler(t *testing.T) {
func TestMiddleware(t *testing.T) {
id := uuid.New()
eventData := &mocks.EventData{Content: "event1"}
timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
Expand Down
2 changes: 1 addition & 1 deletion middleware/eventhandler/observer/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/looplab/eventhorizon/uuid"
)

func TestEventHandler(t *testing.T) {
func TestMiddleware(t *testing.T) {
id := uuid.New()
eventData := &mocks.EventData{Content: "event1"}
timestamp := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
Expand Down
2 changes: 1 addition & 1 deletion middleware/eventhandler/scheduler/middleware_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/looplab/eventhorizon/uuid"
)

func TestCommandHandler(t *testing.T) {
func TestMiddleware(t *testing.T) {
inner := mocks.NewEventHandler("test")

schedulerCtx, cancelScheduler := context.WithCancel(context.Background())
Expand Down

0 comments on commit 760d8c9

Please sign in to comment.