Skip to content

Commit

Permalink
Updated logging for topic management methods (#832)
Browse files Browse the repository at this point in the history
  • Loading branch information
pdamodaran authored and bbengfort committed Feb 7, 2024
1 parent 138d9a8 commit 1005d84
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 17 deletions.
27 changes: 14 additions & 13 deletions pkg/ensign/duplicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ensign
import (
"bytes"
"context"
"fmt"

"github.com/bits-and-blooms/bloom/v3"
"github.com/oklog/ulid/v2"
Expand Down Expand Up @@ -61,14 +62,14 @@ func (s *Server) TopicFilter(topicID ulid.ULID) (_ *bloom.BloomFilter, err error
func (s *Server) Rehash(ctx context.Context, topicID ulid.ULID, policy *api.Deduplication) (err error) {
// Clear old hashes from the database.
if err = s.data.ClearIndash(topicID); err != nil {
return err
return fmt.Errorf("could not clear old hashes from the database: %w", err)
}

// Load topic info to build bloom filter
// Will return not found if there is no associated topic.
var info *api.TopicInfo
if info, err = s.meta.TopicInfo(topicID); err != nil {
return err
return fmt.Errorf("could not fetch topic info to build bloom filter: %w", err)
}

// Build the bloom filter for deduplication
Expand Down Expand Up @@ -101,7 +102,7 @@ deduplication:

event, err := iter.Event()
if err != nil {
return err
return fmt.Errorf("could not fetch next event in topic: %w", err)
}

// If we've reached the end of the events specified by the topic info snapshot
Expand All @@ -115,32 +116,32 @@ deduplication:
// Compute the hash of the event given the deduplication policy
hash, err := event.Hash(policy)
if err != nil {
return err
return fmt.Errorf("could not compute hash of event: %w", err)
}

// Check if the event is a duplicate of another event already
if filter.TestOrAdd(hash) {
// Load the identified duplicate, verify that it is a duplicate
var target *api.EventWrapper
if target, err = s.data.Unhash(topicID, hash); err != nil {
return err
return fmt.Errorf("could not unhash event: %w", err)
}

var isDuplicate bool
if isDuplicate, err = event.Duplicates(target, policy); err != nil {
return err
return fmt.Errorf("could not identify duplicate: %w", err)
}

if isDuplicate {
// Mark the event as a duplicate and save back to database
// TODO: handle offset -- e.g. is the target the duplicate or the event?
if err = event.DuplicateOf(target, policy); err != nil {
return err
return fmt.Errorf("could not mark duplicate: %w", err)
}

// Save the duplicate back to the database
if err = s.data.Insert(event); err != nil {
return err
return fmt.Errorf("could not save duplicate: %w", err)
}

// Update the duplicate counts on the topic info
Expand All @@ -158,7 +159,7 @@ deduplication:

// If the topic is not a duplicate store the hash in the database.
if err := s.data.Indash(topicID, hash, rlid.RLID(event.Id)); err != nil {
return err
return fmt.Errorf("could not store hash in database: %w", err)
}
}

Expand All @@ -167,17 +168,17 @@ deduplication:
if event.IsDuplicate {
var orig *api.EventWrapper
if orig, err = s.data.Retrieve(topicID, rlid.RLID(event.DuplicateId)); err != nil {
return err
return fmt.Errorf("could not retrieve event: %w", err)
}

// TODO: do we need to handle the offset here?
if err = event.DuplicateFrom(orig); err != nil {
return err
return fmt.Errorf("could not fetch duplicate from original: %w", err)
}

// Save the duplicate back to the database
if err = s.data.Insert(event); err != nil {
return err
return fmt.Errorf("could not save duplicate to database: %w", err)
}
}
}
Expand All @@ -188,7 +189,7 @@ deduplication:

// Save the topic info back to disk so that it can be carried on later.
if err = s.meta.UpdateTopicInfo(info); err != nil {
return err
return fmt.Errorf("could not update topic info: %w", err)
}
return nil
}
1 change: 1 addition & 0 deletions pkg/ensign/store/meta/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ func (s *Store) DeleteTopic(topicID ulid.ULID) (err error) {
var topic *api.Topic
if topic, err = s.RetrieveTopic(topicID); err != nil {
if errors.Is(err, errors.ErrNotFound) {
sentry.Warn(nil).Err(err).ULID("topic_id", topicID).Msg("topic not found")
return nil
}
return err
Expand Down
12 changes: 8 additions & 4 deletions pkg/ensign/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ensign
import (
"context"
goerrs "errors"
"fmt"
"strings"
"time"

Expand Down Expand Up @@ -165,10 +166,11 @@ func (s *Server) RetrieveTopic(ctx context.Context, in *api.Topic) (out *api.Top
// Retrieve the topic from the store
if out, err = s.meta.RetrieveTopic(topicID); err != nil {
if errors.Is(err, errors.ErrNotFound) {
sentry.Warn(ctx).Err(err).ULID("topic_id", topicID).Msg("topic not found")
return nil, status.Error(codes.NotFound, "topic not found")
}

sentry.Error(ctx).Err(err).Msg("could not retrieve topic")
sentry.Error(ctx).Err(err).ULID("topic_id", topicID).Msg("could not retrieve topic")
return nil, status.Error(codes.Internal, "could not complete retrieve topic request")
}

Expand Down Expand Up @@ -239,10 +241,11 @@ func (s *Server) DeleteTopic(ctx context.Context, in *api.TopicMod) (out *api.To
var topic *api.Topic
if topic, err = s.meta.RetrieveTopic(topicID); err != nil {
if errors.Is(err, errors.ErrNotFound) {
sentry.Warn(ctx).Err(err).ULID("topic_id", topicID).Msg("topic not found")
return nil, status.Error(codes.NotFound, "topic not found")
}

sentry.Error(ctx).Err(err).Msg("could not retrieve topic for deletion")
sentry.Error(ctx).Err(err).ULID("topic_id", topicID).Msg("could not retrieve topic for deletion")
return nil, status.Error(codes.Internal, "could not process delete topic request")
}

Expand Down Expand Up @@ -350,10 +353,11 @@ func (s *Server) SetTopicPolicy(ctx context.Context, in *api.TopicPolicy) (out *
var topic *api.Topic
if topic, err = s.meta.RetrieveTopic(topicID); err != nil {
if errors.Is(err, errors.ErrNotFound) {
sentry.Warn(ctx).Err(err).ULID("topic_id", topicID).Msg("topic not found")
return nil, status.Error(codes.NotFound, "topic not found")
}

sentry.Error(ctx).Err(err).Msg("could not retrieve topic for policy change")
sentry.Error(ctx).Err(err).ULID("topic_id", topicID).Msg("could not retrieve topic for policy change")
return nil, status.Error(codes.Internal, "could not process set topic policy request")
}

Expand Down Expand Up @@ -419,7 +423,7 @@ func (s *Server) SetTopicPolicy(ctx context.Context, in *api.TopicPolicy) (out *
// Mark the topic as ready again
topic, err := s.meta.RetrieveTopic(topicID)
if err != nil {
return err
return fmt.Errorf("error retrieving topic for topic_id %v: %v", topicID, err)
}

topic.Status = api.TopicState_READY
Expand Down

0 comments on commit 1005d84

Please sign in to comment.