Skip to content

Commit

Permalink
Merge c6a9d00 into 56d6fcd
Browse files Browse the repository at this point in the history
  • Loading branch information
maxekman committed Oct 25, 2021
2 parents 56d6fcd + c6a9d00 commit 6a0fa17
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 32 deletions.
16 changes: 16 additions & 0 deletions eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package eventhorizon

import (
"context"
"errors"
"fmt"
"strings"

Expand All @@ -34,6 +35,21 @@ type EventStore interface {
Close() error
}

var (
// Missing events for save operation.
ErrMissingEvents = errors.New("missing events")
// Events in the same save operation is for different aggregate IDs.
ErrMismatchedEventAggregateIDs = errors.New("mismatched event aggregate IDs")
// Events in the same save operation is for different aggregate types.
ErrMismatchedEventAggregateTypes = errors.New("mismatched event aggregate types")
// Events in the same operation have non-serial versions or is not matching the original version.
ErrIncorrectEventVersion = errors.New("incorrect event version")
// Other events has been saved for this aggregate since the operation started.
ErrEventConflictFromOtherSave = errors.New("event conflict from other save")
// No matching event could be found (for maintenance operations etc).
ErrEventNotFound = errors.New("event not found")
)

// EventStoreOperation is the operation done when an error happened.
type EventStoreOperation string

Expand Down
12 changes: 4 additions & 8 deletions eventstore/acceptanece_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func AcceptanceTest(t *testing.T, store eh.EventStore, ctx context.Context) []eh
eventStoreErr := &eh.EventStoreError{}

err := store.Save(ctx, []eh.Event{}, 0)
if !errors.As(err, &eventStoreErr) || eventStoreErr.Err.Error() != "no events" {
if !errors.As(err, &eventStoreErr) || !errors.Is(err, eh.ErrMissingEvents) {
t.Error("there should be a event store error:", err)
}

Expand All @@ -68,10 +68,8 @@ func AcceptanceTest(t *testing.T, store eh.EventStore, ctx context.Context) []eh
// }

// Try to save same event twice.
eventStoreErr = &eh.EventStoreError{}

err = store.Save(ctx, []eh.Event{event1}, 1)
if !errors.As(err, &eventStoreErr) || eventStoreErr.Err.Error() != "invalid event version" {
if !errors.As(err, &eventStoreErr) || !errors.Is(err, eh.ErrIncorrectEventVersion) {
t.Error("there should be a event store error:", err)
}

Expand Down Expand Up @@ -121,7 +119,7 @@ func AcceptanceTest(t *testing.T, store eh.EventStore, ctx context.Context) []eh
eh.ForAggregate(mocks.AggregateType, uuid.New(), 8))

err = store.Save(ctx, []eh.Event{eventSameAggID, eventOtherAggID}, 6)
if !errors.As(err, &eventStoreErr) || eventStoreErr.Err.Error() != "event has different aggregate ID" {
if !errors.As(err, &eventStoreErr) || !errors.Is(err, eh.ErrMismatchedEventAggregateIDs) {
t.Error("there should be a event store error:", err)
}

Expand All @@ -132,7 +130,7 @@ func AcceptanceTest(t *testing.T, store eh.EventStore, ctx context.Context) []eh
eh.ForAggregate(eh.AggregateType("OtherAggregate"), id, 8))

err = store.Save(ctx, []eh.Event{eventSameAggType, eventOtherAggType}, 6)
if !errors.As(err, &eventStoreErr) || eventStoreErr.Err.Error() != "event has different aggregate type" {
if !errors.As(err, &eventStoreErr) || !errors.Is(err, eh.ErrMismatchedEventAggregateTypes) {
t.Error("there should be a event store error:", err)
}

Expand All @@ -149,8 +147,6 @@ func AcceptanceTest(t *testing.T, store eh.EventStore, ctx context.Context) []eh
savedEvents = append(savedEvents, event7)

// Load events for non-existing aggregate.
eventStoreErr = &eh.EventStoreError{}

events, err := store.Load(ctx, uuid.New())
if !errors.As(err, &eventStoreErr) || !errors.Is(err, eh.ErrAggregateNotFound) {
t.Error("there should be a not found error:", err)
Expand Down
2 changes: 1 addition & 1 deletion eventstore/maintenance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func MaintenanceAcceptanceTest(t *testing.T, store eh.EventStore, storeMaintenan
eventStoreErr := &eh.EventStoreError{}

err = storeMaintenance.Replace(ctx, eventWithInvalidVersion)
if !errors.As(err, &eventStoreErr) || eventStoreErr.Err.Error() != "could not find original event" {
if !errors.As(err, &eventStoreErr) || !errors.Is(err, eh.ErrEventNotFound) {
t.Error("there should be a event store error:", err)
}

Expand Down
2 changes: 1 addition & 1 deletion eventstore/memory/eventmaintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (s *EventStore) Replace(ctx context.Context, event eh.Event) error {

if idx == -1 {
return &eh.EventStoreError{
Err: fmt.Errorf("could not find original event"),
Err: eh.ErrEventNotFound,
Op: eh.EventStoreOpReplace,
AggregateID: id,
Events: []eh.Event{event},
Expand Down
10 changes: 5 additions & 5 deletions eventstore/memory/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (s *EventStore) save(ctx context.Context, events []eh.Event, originalVersio

if len(events) == 0 {
return &eh.EventStoreError{
Err: fmt.Errorf("no events"),
Err: eh.ErrMissingEvents,
Op: eh.EventStoreOpSave,
}
}
Expand All @@ -105,7 +105,7 @@ func (s *EventStore) save(ctx context.Context, events []eh.Event, originalVersio
// Only accept events belonging to the same aggregate.
if event.AggregateID() != id {
return &eh.EventStoreError{
Err: fmt.Errorf("event has different aggregate ID"),
Err: eh.ErrMismatchedEventAggregateIDs,
Op: eh.EventStoreOpSave,
AggregateType: at,
AggregateID: id,
Expand All @@ -116,7 +116,7 @@ func (s *EventStore) save(ctx context.Context, events []eh.Event, originalVersio

if event.AggregateType() != at {
return &eh.EventStoreError{
Err: fmt.Errorf("event has different aggregate type"),
Err: eh.ErrMismatchedEventAggregateTypes,
Op: eh.EventStoreOpSave,
AggregateType: at,
AggregateID: id,
Expand All @@ -128,7 +128,7 @@ func (s *EventStore) save(ctx context.Context, events []eh.Event, originalVersio
// Only accept events that apply to the correct aggregate version.
if event.Version() != originalVersion+i+1 {
return &eh.EventStoreError{
Err: fmt.Errorf("invalid event version"),
Err: eh.ErrIncorrectEventVersion,
Op: eh.EventStoreOpSave,
AggregateType: at,
AggregateID: id,
Expand Down Expand Up @@ -169,7 +169,7 @@ func (s *EventStore) save(ctx context.Context, events []eh.Event, originalVersio
if aggregate, ok := s.db[id]; ok {
if aggregate.Version != originalVersion {
return &eh.EventStoreError{
Err: fmt.Errorf("invalid original aggregate version, new version: %d", aggregate.Version),
Err: eh.ErrEventConflictFromOtherSave,
Op: eh.EventStoreOpSave,
AggregateType: at,
AggregateID: id,
Expand Down
2 changes: 1 addition & 1 deletion eventstore/mongodb/eventmaintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (s *EventStore) Replace(ctx context.Context, event eh.Event) error {
}
} else if r.MatchedCount == 0 {
return &eh.EventStoreError{
Err: fmt.Errorf("could not find original event"),
Err: eh.ErrEventNotFound,
Op: eh.EventStoreOpReplace,
AggregateType: at,
AggregateID: id,
Expand Down
10 changes: 5 additions & 5 deletions eventstore/mongodb/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func WithEventHandler(h eh.EventHandler) Option {
func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersion int) error {
if len(events) == 0 {
return &eh.EventStoreError{
Err: fmt.Errorf("no events"),
Err: eh.ErrMissingEvents,
Op: eh.EventStoreOpSave,
}
}
Expand All @@ -113,7 +113,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio
// Only accept events belonging to the same aggregate.
if event.AggregateID() != id {
return &eh.EventStoreError{
Err: fmt.Errorf("event has different aggregate ID"),
Err: eh.ErrMismatchedEventAggregateIDs,
Op: eh.EventStoreOpSave,
AggregateType: at,
AggregateID: id,
Expand All @@ -124,7 +124,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio

if event.AggregateType() != at {
return &eh.EventStoreError{
Err: fmt.Errorf("event has different aggregate type"),
Err: eh.ErrMismatchedEventAggregateTypes,
Op: eh.EventStoreOpSave,
AggregateType: at,
AggregateID: id,
Expand All @@ -136,7 +136,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio
// Only accept events that apply to the correct aggregate version.
if event.Version() != originalVersion+i+1 {
return &eh.EventStoreError{
Err: fmt.Errorf("invalid event version"),
Err: eh.ErrIncorrectEventVersion,
Op: eh.EventStoreOpSave,
AggregateType: at,
AggregateID: id,
Expand Down Expand Up @@ -203,7 +203,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio
}
} else if r.MatchedCount == 0 {
return &eh.EventStoreError{
Err: fmt.Errorf("invalid original aggregate version, new version %d", originalVersion),
Err: eh.ErrEventConflictFromOtherSave,
Op: eh.EventStoreOpSave,
AggregateType: at,
AggregateID: id,
Expand Down
2 changes: 1 addition & 1 deletion eventstore/mongodb_v2/eventmaintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s *EventStore) Replace(ctx context.Context, event eh.Event) error {
})
if res.Err() != nil {
if res.Err() == mongo.ErrNoDocuments {
return nil, fmt.Errorf("could not find original event")
return nil, eh.ErrEventNotFound
}

return nil, fmt.Errorf("could not find original event: %w", res.Err())
Expand Down
24 changes: 14 additions & 10 deletions eventstore/mongodb_v2/eventstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func WithEventHandler(h eh.EventHandler) Option {
func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersion int) error {
if len(events) == 0 {
return &eh.EventStoreError{
Err: fmt.Errorf("no events"),
Err: eh.ErrMissingEvents,
Op: eh.EventStoreOpSave,
}
}
Expand All @@ -138,7 +138,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio
// Only accept events belonging to the same aggregate.
if event.AggregateID() != id {
return &eh.EventStoreError{
Err: fmt.Errorf("event has different aggregate ID"),
Err: eh.ErrMismatchedEventAggregateIDs,
Op: eh.EventStoreOpSave,
AggregateType: at,
AggregateID: id,
Expand All @@ -149,7 +149,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio

if event.AggregateType() != at {
return &eh.EventStoreError{
Err: fmt.Errorf("event has different aggregate type"),
Err: eh.ErrMismatchedEventAggregateTypes,
Op: eh.EventStoreOpSave,
AggregateType: at,
AggregateID: id,
Expand All @@ -161,7 +161,7 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio
// Only accept events that apply to the correct aggregate version.
if event.Version() != originalVersion+i+1 {
return &eh.EventStoreError{
Err: fmt.Errorf("invalid event version"),
Err: eh.ErrIncorrectEventVersion,
Op: eh.EventStoreOpSave,
AggregateType: at,
AggregateID: id,
Expand Down Expand Up @@ -270,18 +270,22 @@ func (s *EventStore) Save(ctx context.Context, events []eh.Event, originalVersio
return nil, fmt.Errorf("could not insert stream: %w", err)
}
} else {
if res := s.streams.FindOneAndUpdate(txCtx,
bson.M{"_id": strm.ID},
if r, err := s.streams.UpdateOne(txCtx,
bson.M{
"_id": strm.ID,
"version": originalVersion,
},
bson.M{
"$set": bson.M{
"position": strm.Position,
"version": strm.Version,
"updated_at": strm.UpdatedAt,
},
"$inc": bson.M{"version": len(dbEvents)},
},
mongoOptions.FindOneAndUpdate().SetUpsert(true),
); res.Err() != nil {
return nil, fmt.Errorf("could not update stream: %w", res.Err())
); err != nil {
return nil, fmt.Errorf("could not update stream: %w", err)
} else if r.MatchedCount == 0 {
return nil, eh.ErrEventConflictFromOtherSave
}
}

Expand Down

0 comments on commit 6a0fa17

Please sign in to comment.