Skip to content

Commit

Permalink
Add custom collection name for Outbox
Browse files Browse the repository at this point in the history
Add custom collection name for Snapshots
  • Loading branch information
totemcaf committed May 11, 2023
1 parent 599169c commit c1df202
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 53 deletions.
25 changes: 19 additions & 6 deletions eventstore/mongodb_v2/eventstore.go
Expand Up @@ -41,11 +41,10 @@ import (

// EventStore is an eventhorizon.EventStore for MongoDB, using one collection
// for all events and another to keep track of all aggregates/streams. It also
// keep tracks of the global position of events, stored as metadata.
// keeps track of the global position of events, stored as metadata.
type EventStore struct {
client *mongo.Client
clientOwnership clientOwnership
db *mongo.Database
events *mongo.Collection
streams *mongo.Collection
snapshots *mongo.Collection
Expand Down Expand Up @@ -90,7 +89,6 @@ func newEventStoreWithClient(client *mongo.Client, clientOwnership clientOwnersh
s := &EventStore{
client: client,
clientOwnership: clientOwnership,
db: db,
events: db.Collection("events"),
streams: db.Collection("streams"),
snapshots: db.Collection("snapshots"),
Expand Down Expand Up @@ -202,8 +200,23 @@ func WithCollectionNames(eventsColl, streamsColl string) Option {
return fmt.Errorf("missing collection name")
}

s.events = s.db.Collection(eventsColl)
s.streams = s.db.Collection(streamsColl)
db := s.events.Database()
s.events = db.Collection(eventsColl)
s.streams = db.Collection(streamsColl)

return nil
}
}

// WithSnapshotCollectionName uses different collections from the default "snapshots" collections.
func WithSnapshotCollectionName(snapshotColl string) Option {
return func(s *EventStore) error {
if snapshotColl == "" {
return fmt.Errorf("missing collection name")
}

db := s.events.Database()
s.snapshots = db.Collection(snapshotColl)

return nil
}
Expand Down Expand Up @@ -688,7 +701,7 @@ type evt struct {
}

// newEvt returns a new evt for an event.
func newEvt(ctx context.Context, event eh.Event) (*evt, error) {
func newEvt(_ context.Context, event eh.Event) (*evt, error) {
e := &evt{
EventType: event.EventType(),
Timestamp: event.Timestamp(),
Expand Down
73 changes: 57 additions & 16 deletions eventstore/mongodb_v2/eventstore_test.go
Expand Up @@ -74,26 +74,11 @@ func TestWithCollectionNamesIntegration(t *testing.T) {
t.Skip("skipping integration test")
}

// Use MongoDB in Docker with fallback to localhost.
url := os.Getenv("MONGODB_ADDR")
if url == "" {
url = "localhost:27017"
}

url = "mongodb://" + url

// Get a random DB name.
b := make([]byte, 4)
if _, err := rand.Read(b); err != nil {
t.Fatal(err)
}
url, db := makeDB(t)

db := "test-" + hex.EncodeToString(b)
eventsColl := "foo_events"
streamsColl := "bar_streams"

t.Log("using DB:", db)

store, err := NewEventStore(url, db,
WithCollectionNames(eventsColl, streamsColl),
)
Expand Down Expand Up @@ -140,6 +125,62 @@ func TestWithCollectionNamesIntegration(t *testing.T) {
}
}

func TestWithSnapshotCollectionNamesIntegration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}

url, db := makeDB(t)

snapshotColl := "foo_snapshots"

store, err := NewEventStore(url, db,
WithSnapshotCollectionName(snapshotColl),
)
if err != nil {
t.Fatal("there should be no error:", err)
}

if store == nil {
t.Fatal("there should be a store")
}

defer store.Close()

if store.snapshots.Name() != snapshotColl {
t.Fatal("snapshots collection should use custom collection name")
}

// providing empty snapshot collection names should result in an error
_, err = NewEventStore(url, db,
WithSnapshotCollectionName(""),
)
if err == nil || err.Error() != "error while applying option: missing collection name" {
t.Fatal("there should be an error")
}
}

func makeDB(t *testing.T) (string, string) {
// Use MongoDB in Docker with fallback to localhost.
url := os.Getenv("MONGODB_ADDR")
if url == "" {
url = "localhost:27017"
}

url = "mongodb://" + url

// Get a random DB name.
b := make([]byte, 4)
if _, err := rand.Read(b); err != nil {
t.Fatal(err)
}

db := "test-" + hex.EncodeToString(b)

t.Log("using DB:", db)
return url, db
}

func TestWithEventHandlerIntegration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
Expand Down
15 changes: 14 additions & 1 deletion outbox/mongodb/outbox.go
Expand Up @@ -127,6 +127,19 @@ func WithWatchToken(token string) Option {
}
}

// WithCollectionName uses different collections from the default "outbox" collection.
func WithCollectionName(outboxColl string) Option {
return func(s *Outbox) error {
if outboxColl == "" {
return fmt.Errorf("missing collection name")
}

s.outbox = s.outbox.Database().Collection(outboxColl)

return nil
}
}

// Client returns the MongoDB client used by the outbox. To use the outbox with
// the EventStore it needs to be created with the same client.
func (o *Outbox) Client() *mongo.Client {
Expand All @@ -139,7 +152,7 @@ func (o *Outbox) HandlerType() eh.EventHandlerType {
}

// AddHandler implements the AddHandler method of the eventhorizon.Outbox interface.
func (o *Outbox) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.EventHandler) error {
func (o *Outbox) AddHandler(_ context.Context, m eh.EventMatcher, h eh.EventHandler) error {
if m == nil {
return eh.ErrMissingMatcher
}
Expand Down
72 changes: 42 additions & 30 deletions outbox/mongodb/outbox_test.go
Expand Up @@ -20,37 +20,65 @@ func TestOutboxAddHandler(t *testing.T) {
t.Skip("skipping integration test")
}

// Use MongoDB in Docker with fallback to localhost.
url := os.Getenv("MONGODB_ADDR")
if url == "" {
url = "localhost:27017"
url, db := makeDB(t)

o, err := NewOutbox(url, db)
if err != nil {
t.Fatal(err)
}

url = "mongodb://" + url
outbox.TestAddHandler(t, o, context.Background())
}

// Get a random DB name.
bs := make([]byte, 4)
if _, err := rand.Read(bs); err != nil {
t.Fatal(err)
func TestOutboxIntegration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}

db := "test-" + hex.EncodeToString(bs)
url, db := makeDB(t)

t.Log("using DB:", db)
// Shorter sweeps for testing
PeriodicSweepInterval = 2 * time.Second
PeriodicSweepAge = 2 * time.Second

o, err := NewOutbox(url, db)
if err != nil {
t.Fatal(err)
}

outbox.TestAddHandler(t, o, context.Background())
o.Start()

outbox.AcceptanceTest(t, o, context.Background(), "none")

if err := o.Close(); err != nil {
t.Error("there should be no error:", err)
}
}

func TestOutboxIntegration(t *testing.T) {
func TestWithCollectionNameIntegration(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}

url, db := makeDB(t)

o, err := NewOutbox(url, db, WithCollectionName("foo-outbox"))
if err != nil {
t.Fatal(err)
}

defer o.Close()

if o == nil {
t.Fatal("there should be a store")
}

if o.outbox.Name() != "foo-outbox" {
t.Fatal("collection name should use custom collection name")
}
}

func makeDB(t *testing.T) (string, string) {
// Use MongoDB in Docker with fallback to localhost.
url := os.Getenv("MONGODB_ADDR")
if url == "" {
Expand All @@ -68,23 +96,7 @@ func TestOutboxIntegration(t *testing.T) {
db := "test-" + hex.EncodeToString(bs)

t.Log("using DB:", db)

// Shorter sweeps for testing
PeriodicSweepInterval = 2 * time.Second
PeriodicSweepAge = 2 * time.Second

o, err := NewOutbox(url, db)
if err != nil {
t.Fatal(err)
}

o.Start()

outbox.AcceptanceTest(t, o, context.Background(), "none")

if err := o.Close(); err != nil {
t.Error("there should be no error:", err)
}
return url, db
}

func BenchmarkOutbox(b *testing.B) {
Expand Down

0 comments on commit c1df202

Please sign in to comment.