Skip to content

Commit

Permalink
Destroy Topic Objects (#801)
Browse files Browse the repository at this point in the history
  • Loading branch information
bbengfort committed Oct 22, 2023
1 parent 07a5c7a commit d1c1255
Show file tree
Hide file tree
Showing 13 changed files with 223 additions and 33 deletions.
2 changes: 2 additions & 0 deletions pkg/ensign/broker/broker.go
Expand Up @@ -73,6 +73,8 @@ func (b *Broker) handleIncoming(inQ <-chan incoming, outQ chan<- *api.EventWrapp
// Create the publish result with the localID for handling
result := PublishResult{LocalID: incoming.event.LocalId}

// TODO: reject event if topic is in read-only mode or deleting.

// TODO: sequence RLIDs over topic offset instead of globally.
incoming.event.Id = seq.Next().Bytes()

Expand Down
2 changes: 1 addition & 1 deletion pkg/ensign/duplicates.go
Expand Up @@ -43,7 +43,7 @@ func (s *Server) TopicFilter(topicID ulid.ULID) (_ *bloom.BloomFilter, err error
var hash []byte
if hash, err = iter.Hash(); err != nil {
// NOTE: we are not skipping bad hashes because this would make it possible
// to miss duplicates -- however, it would be possible to relax this.
// to miss duplicates -- however, it could be possible to relax this.
return nil, err
}
filter.Add(hash)
Expand Down
6 changes: 0 additions & 6 deletions pkg/ensign/events_test.go
Expand Up @@ -34,7 +34,6 @@ func (s *serverTestSuite) TestPublishEvents() {
require := s.Require()
stream := s.setupValidPublisher()
s.store.UseError(store.Insert, nil)
defer s.store.Reset()

events := make([]*api.EventWrapper, 0, 10)
for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -73,7 +72,6 @@ func (s *serverTestSuite) TestPublisherStreamInitialization() {
stream := &mock.PublisherServer{}
s.store.OnAllowedTopics = MockAllowedTopics
s.store.OnTopicName = MockTopicName
defer s.store.Reset()

// Must be authenticated and have the publisher permission.
err := s.srv.Publish(stream)
Expand Down Expand Up @@ -169,7 +167,6 @@ func (s *serverTestSuite) TestSecondOpenStreamFail() {
// the stream handlers instead. It also prevents concurrency issues with send and
// recv on a stream and the possibility of EOF errors and intermittent failures.
stream := s.setupValidPublisher()
defer s.store.Reset()

// An OpenStream message must be the first message received by the handler
stream.OnRecv = func() (*api.PublisherRequest, error) {
Expand All @@ -196,7 +193,6 @@ func (s *serverTestSuite) TestBadPublisherRequest() {
// the stream handlers instead. It also prevents concurrency issues with send and
// recv on a stream and the possibility of EOF errors and intermittent failures.
stream := s.setupValidPublisher()
defer s.store.Reset()

// An OpenStream message must be the first message received by the handler
msg := 0
Expand Down Expand Up @@ -227,7 +223,6 @@ func (s *serverTestSuite) TestPublisherStreamTopicFilter() {
// the stream handlers instead. It also prevents concurrency issues with send and
// recv on a stream and the possibility of EOF errors and intermittent failures.
stream := s.setupValidPublisher()
defer s.store.Reset()

// Should receive an error when one of the topics is not in allowed topics
stream.WithEvents(&api.OpenStream{ClientId: "tester", Topics: []string{"foo", "bar"}})
Expand Down Expand Up @@ -266,7 +261,6 @@ func (s *serverTestSuite) TestPublisherNackEvents() {
// the stream handlers instead. It also prevents concurrency issues with send and
// recv on a stream and the possibility of EOF errors and intermittent failures.
stream := s.setupValidPublisher()
defer s.store.Reset()

// Should receive a nack when an event is published without a topic ID
event := &api.EventWrapper{LocalId: []byte("abc")}
Expand Down
4 changes: 0 additions & 4 deletions pkg/ensign/info_test.go
Expand Up @@ -20,8 +20,6 @@ import (

func (s *serverTestSuite) TestInfo() {
require := s.Require()
defer s.store.Reset()

claims := &tokens.Claims{
RegisteredClaims: jwt.RegisteredClaims{
Subject: "DbIxBEtIUgNIClnFMDmvoZeMrLxUTJVa",
Expand Down Expand Up @@ -128,8 +126,6 @@ func (s *serverTestSuite) TestInfo() {
func (s *serverTestSuite) TestInfoSingleTopic() {
// Should be able to get info for a single topic in a project
// This test ensures that a Beacon requirement is fulfilled
defer s.store.Reset()

require := s.Require()
ctx := context.Background()

Expand Down
12 changes: 8 additions & 4 deletions pkg/ensign/server_test.go
Expand Up @@ -44,6 +44,10 @@ func (s *serverTestSuite) SetupSuite() {
var err error
assert := s.Assert()

// Discard logging from the application to focus on test logs
// NOTE: ConsoleLog must be false otherwise this will be overridden
logger.Discard()

// Create a temporary data directory
s.dataDir, err = os.MkdirTemp("", "ensign-data-*")
assert.NoError(err)
Expand Down Expand Up @@ -99,10 +103,6 @@ func (s *serverTestSuite) SetupSuite() {

// Run the broker for handling events
s.srv.RunBroker()

// Discard logging from the application to focus on test logs
// NOTE: ConsoleLog must be false otherwise this will be overridden
logger.Discard()
}

func (s *serverTestSuite) TearDownSuite() {
Expand All @@ -120,6 +120,10 @@ func (s *serverTestSuite) TearDownSuite() {
logger.ResetLogger()
}

func (s *serverTestSuite) AfterTest(_, _ string) {
s.store.Reset()
}

// Check an error response from the gRPC Ensign client, ensuring that it is a) a status
// error, b) has the code specified, and c) (if supplied) that the message matches.
func (s *serverTestSuite) GRPCErrorIs(err error, code codes.Code, msg string) {
Expand Down
32 changes: 32 additions & 0 deletions pkg/ensign/store/events/events.go
Expand Up @@ -108,6 +108,38 @@ func (s *Store) Retrieve(topicId ulid.ULID, eventID rlid.RLID) (event *api.Event
return event, nil
}

// Destroy all events, meta-events, and index hashes of the specified topic.
// NOTE: this will destroy anything in the database that is prefixed with the topicID.
func (s *Store) Destroy(topicID ulid.ULID) (err error) {
if s.readonly {
return errors.ErrReadOnly
}

if ulids.IsZero(topicID) {
return errors.ErrKeyNull
}

// Iterate over all objects prefixed by the topicID.
prefix := util.BytesPrefix(topicID.Bytes())
iter := s.db.NewIterator(prefix, &opt.ReadOptions{DontFillCache: true})
defer iter.Release()

batch := &leveldb.Batch{}
for iter.Next() {
batch.Delete(iter.Key())
}

if err = iter.Error(); err != nil {
return err
}

if err = s.db.Write(batch, &opt.WriteOptions{Sync: false, NoWriteMerge: true}); err != nil {
return err
}

return nil
}

// Count the number of objects that match the specified range by iterating through all
// of the keys and counting them. This is primarily used for testing.
func (s *Store) Count(slice *util.Range) (count uint64, err error) {
Expand Down
52 changes: 52 additions & 0 deletions pkg/ensign/store/events/events_test.go
Expand Up @@ -239,3 +239,55 @@ func (s *readonlyEventsTestSuite) TestRetrieve() {
_, err = s.store.Retrieve(ulid.MustParse("01GTSN1139JMK1PS5A524FXWAZ"), rlid.RLID{})
require.ErrorIs(err, errors.ErrInvalidKey)
}

func (s *eventsTestSuite) TestDestroy() {
require := s.Require()
require.False(s.store.ReadOnly())

_, err := s.LoadAllFixtures()
require.NoError(err, "could not load fixtures")
defer s.ResetDatabase()

// Database should be empty to begin
count, err := s.store.Count(nil)
require.NoError(err, "could not count database")
require.Equal(uint64(0xee), count, "expected no objects in the database")

topicID := ulid.MustParse("01GTSN1139JMK1PS5A524FXWAZ")
err = s.store.Destroy(topicID)
require.NoError(err, "unable to destroy topic")

// Check to make sure all objects were destroyed
count, err = s.store.Count(nil)
require.NoError(err, "could not count database")
require.Equal(uint64(0xc2), count, "expected an event inserted into the database")

// There should be no events in the database
nEvents := 0
events := s.store.List(topicID)
defer events.Release()
for events.Next() {
nEvents++
}
require.NoError(events.Error(), "could not iterate over events")
require.Zero(nEvents, "expected no events in the database")

// There should be no index hashes in the database
nIndash := 0
hashes := s.store.LoadIndash(topicID)
defer hashes.Release()
for hashes.Next() {
nIndash++
}
require.NoError(hashes.Error(), "could not iterate over hashes")
require.Zero(nIndash, "expected no index hashes in the database")
}

func (s *readonlyEventsTestSuite) TestDestroy() {
require := s.Require()
require.True(s.store.ReadOnly())

topicID := ulid.MustParse("01GTSN1139JMK1PS5A524FXWAZ")
err := s.store.Destroy(topicID)
require.ErrorIs(err, errors.ErrReadOnly, "expected readonly error on destroy topic")
}
17 changes: 14 additions & 3 deletions pkg/ensign/store/mock/mock.go
Expand Up @@ -21,6 +21,7 @@ const (
Insert = "Insert"
List = "List"
Retrieve = "Retrieve"
Destroy = "Destroy"
Indash = "Indash"
Unhash = "Unhash"
LoadIndash = "LoadIndash"
Expand Down Expand Up @@ -51,6 +52,7 @@ type Store struct {
OnInsert func(*api.EventWrapper) error
OnList func(ulid.ULID) iterator.EventIterator
OnRetrieve func(ulid.ULID, rlid.RLID) (*api.EventWrapper, error)
OnDestroy func(ulid.ULID) error
OnIndash func(ulid.ULID, []byte, rlid.RLID) error
OnUnhash func(ulid.ULID, []byte) (*api.EventWrapper, error)
OnLoadIndash func(ulid.ULID) iterator.IndashIterator
Expand Down Expand Up @@ -91,6 +93,7 @@ func (s *Store) Reset() {
s.OnInsert = nil
s.OnList = nil
s.OnRetrieve = nil
s.OnDestroy = nil
s.OnIndash = nil
s.OnUnhash = nil
s.OnLoadIndash = nil
Expand Down Expand Up @@ -216,10 +219,10 @@ func (s *Store) UseError(call string, err error) error {
s.OnRetrieve = func(ulid.ULID, rlid.RLID) (*api.EventWrapper, error) {
return nil, err
}
case Destroy:
s.OnDestroy = func(ulid.ULID) error { return err }
case Indash:
s.OnIndash = func(u ulid.ULID, b []byte, r rlid.RLID) error {
return err
}
s.OnIndash = func(ulid.ULID, []byte, rlid.RLID) error { return err }
case Unhash:
s.OnUnhash = func(u ulid.ULID, b []byte) (*api.EventWrapper, error) {
return nil, err
Expand Down Expand Up @@ -301,6 +304,14 @@ func (s *Store) Retrieve(topicID ulid.ULID, eventID rlid.RLID) (*api.EventWrappe
return nil, errors.New("mock database cannot retrieve event")
}

func (s *Store) Destroy(topicID ulid.ULID) error {
s.incrCalls(Destroy)
if s.OnDestroy != nil {
return s.OnDestroy(topicID)
}
return errors.New("mock database cannot destroy events in topic")
}

func (s *Store) Indash(topicID ulid.ULID, hash []byte, eventID rlid.RLID) error {
s.incrCalls(Indash)
if s.OnIndash != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/ensign/store/store.go
Expand Up @@ -57,6 +57,7 @@ type EventStore interface {
Insert(*api.EventWrapper) error
List(topicID ulid.ULID) iterator.EventIterator
Retrieve(topicID ulid.ULID, eventID rlid.RLID) (*api.EventWrapper, error)
Destroy(topicID ulid.ULID) error
}

type EventHashStore interface {
Expand Down
1 change: 1 addition & 0 deletions pkg/ensign/testdata/topic.json
Expand Up @@ -6,6 +6,7 @@
"name": "testing.testapp.test",
"readonly": false,
"offset": 83123,
"status": "READY",
"shards": 1,
"placements": [],
"types": [],
Expand Down
5 changes: 5 additions & 0 deletions pkg/ensign/testdata/topics.json
Expand Up @@ -6,6 +6,7 @@
"project_id_ulid": "01GTSMMC152Q95RD4TNYDFJGHT",
"name": "testing.testapp.alerts",
"readonly": false,
"status": "READY",
"offset": 83123,
"shards": 1,
"placements": [],
Expand All @@ -20,6 +21,7 @@
"project_id_ulid": "01GTSMMC152Q95RD4TNYDFJGHT",
"name": "testing.testapp.orders",
"readonly": false,
"status": "READY",
"offset": 542,
"shards": 1,
"placements": [],
Expand All @@ -34,6 +36,7 @@
"project_id_ulid": "01GTSMMC152Q95RD4TNYDFJGHT",
"name": "testing.testapp.shipments",
"readonly": false,
"status": "READY",
"offset": 498,
"shards": 1,
"placements": [],
Expand All @@ -48,6 +51,7 @@
"project_id_ulid": "01GTSMZNRYXNAZQF5R8NHQ14NM",
"name": "mock.mockapp.feed",
"readonly": true,
"status": "READY",
"offset": 68,
"shards": 1,
"placements": [],
Expand All @@ -62,6 +66,7 @@
"project_id_ulid": "01GTSMZNRYXNAZQF5R8NHQ14NM",
"name": "mock.mockapp.post",
"readonly": true,
"status": "READY",
"offset": 1266,
"shards": 1,
"placements": [],
Expand Down

0 comments on commit d1c1255

Please sign in to comment.