diff --git a/pkg/ensign/duplicates.go b/pkg/ensign/duplicates.go index 646b514c4..9bcb98ca1 100644 --- a/pkg/ensign/duplicates.go +++ b/pkg/ensign/duplicates.go @@ -3,6 +3,7 @@ package ensign import ( "bytes" "context" + "fmt" "github.com/bits-and-blooms/bloom/v3" "github.com/oklog/ulid/v2" @@ -61,14 +62,14 @@ func (s *Server) TopicFilter(topicID ulid.ULID) (_ *bloom.BloomFilter, err error func (s *Server) Rehash(ctx context.Context, topicID ulid.ULID, policy *api.Deduplication) (err error) { // Clear old hashes from the database. if err = s.data.ClearIndash(topicID); err != nil { - return err + return fmt.Errorf("could not clear old hashes from the database: %w", err) } // Load topic info to build bloom filter // Will return not found if there is no associated topic. var info *api.TopicInfo if info, err = s.meta.TopicInfo(topicID); err != nil { - return err + return fmt.Errorf("could not fetch topic info to build bloom filter: %w", err) } // Build the bloom filter for deduplication @@ -101,7 +102,7 @@ deduplication: event, err := iter.Event() if err != nil { - return err + return fmt.Errorf("could not fetch next event in topic: %w", err) } // If we've reached the end of the events specified by the topic info snapshot @@ -115,7 +116,7 @@ deduplication: // Compute the hash of the event given the deduplication policy hash, err := event.Hash(policy) if err != nil { - return err + return fmt.Errorf("could not compute hash of event: %w", err) } // Check if the event is a duplicate of another event already @@ -123,24 +124,24 @@ deduplication: // Load the identified duplicate, verify that it is a duplicate var target *api.EventWrapper if target, err = s.data.Unhash(topicID, hash); err != nil { - return err + return fmt.Errorf("could not unhash event: %w", err) } var isDuplicate bool if isDuplicate, err = event.Duplicates(target, policy); err != nil { - return err + return fmt.Errorf("could not identify duplicate: %w", err) } if isDuplicate { // Mark the event as a duplicate and save back to database // TODO: handle offset -- e.g. is the target the duplicate or the event? if err = event.DuplicateOf(target, policy); err != nil { - return err + return fmt.Errorf("could not mark duplicate: %w", err) } // Save the duplicate back to the database if err = s.data.Insert(event); err != nil { - return err + return fmt.Errorf("could not save duplicate: %w", err) } // Update the duplicate counts on the topic info @@ -158,7 +159,7 @@ deduplication: // If the topic is not a duplicate store the hash in the database. if err := s.data.Indash(topicID, hash, rlid.RLID(event.Id)); err != nil { - return err + return fmt.Errorf("could not store hash in database: %w", err) } } @@ -167,17 +168,17 @@ deduplication: if event.IsDuplicate { var orig *api.EventWrapper if orig, err = s.data.Retrieve(topicID, rlid.RLID(event.DuplicateId)); err != nil { - return err + return fmt.Errorf("could not retrieve event: %w", err) } // TODO: do we need to handle the offset here? if err = event.DuplicateFrom(orig); err != nil { - return err + return fmt.Errorf("could not fetch duplicate from original: %w", err) } // Save the duplicate back to the database if err = s.data.Insert(event); err != nil { - return err + return fmt.Errorf("could not save duplicate to database: %w", err) } } } @@ -188,7 +189,7 @@ deduplication: // Save the topic info back to disk so that it can be carried on later. if err = s.meta.UpdateTopicInfo(info); err != nil { - return err + return fmt.Errorf("could not update topic info: %w", err) } return nil } diff --git a/pkg/ensign/store/meta/topics.go b/pkg/ensign/store/meta/topics.go index 6a169873d..6ac7244d5 100644 --- a/pkg/ensign/store/meta/topics.go +++ b/pkg/ensign/store/meta/topics.go @@ -246,6 +246,7 @@ func (s *Store) DeleteTopic(topicID ulid.ULID) (err error) { var topic *api.Topic if topic, err = s.RetrieveTopic(topicID); err != nil { if errors.Is(err, errors.ErrNotFound) { + sentry.Warn(nil).Err(err).ULID("topic_id", topicID).Msg("topic not found") return nil } return err diff --git a/pkg/ensign/topics.go b/pkg/ensign/topics.go index 95a344021..77fd006fb 100644 --- a/pkg/ensign/topics.go +++ b/pkg/ensign/topics.go @@ -3,6 +3,7 @@ package ensign import ( "context" goerrs "errors" + "fmt" "strings" "time" @@ -165,10 +166,11 @@ func (s *Server) RetrieveTopic(ctx context.Context, in *api.Topic) (out *api.Top // Retrieve the topic from the store if out, err = s.meta.RetrieveTopic(topicID); err != nil { if errors.Is(err, errors.ErrNotFound) { + sentry.Warn(ctx).Err(err).ULID("topic_id", topicID).Msg("topic not found") return nil, status.Error(codes.NotFound, "topic not found") } - sentry.Error(ctx).Err(err).Msg("could not retrieve topic") + sentry.Error(ctx).Err(err).ULID("topic_id", topicID).Msg("could not retrieve topic") return nil, status.Error(codes.Internal, "could not complete retrieve topic request") } @@ -239,10 +241,11 @@ func (s *Server) DeleteTopic(ctx context.Context, in *api.TopicMod) (out *api.To var topic *api.Topic if topic, err = s.meta.RetrieveTopic(topicID); err != nil { if errors.Is(err, errors.ErrNotFound) { + sentry.Warn(ctx).Err(err).ULID("topic_id", topicID).Msg("topic not found") return nil, status.Error(codes.NotFound, "topic not found") } - sentry.Error(ctx).Err(err).Msg("could not retrieve topic for deletion") + sentry.Error(ctx).Err(err).ULID("topic_id", topicID).Msg("could not retrieve topic for deletion") return nil, status.Error(codes.Internal, "could not process delete topic request") } @@ -350,10 +353,11 @@ func (s *Server) SetTopicPolicy(ctx context.Context, in *api.TopicPolicy) (out * var topic *api.Topic if topic, err = s.meta.RetrieveTopic(topicID); err != nil { if errors.Is(err, errors.ErrNotFound) { + sentry.Warn(ctx).Err(err).ULID("topic_id", topicID).Msg("topic not found") return nil, status.Error(codes.NotFound, "topic not found") } - sentry.Error(ctx).Err(err).Msg("could not retrieve topic for policy change") + sentry.Error(ctx).Err(err).ULID("topic_id", topicID).Msg("could not retrieve topic for policy change") return nil, status.Error(codes.Internal, "could not process set topic policy request") } @@ -419,7 +423,7 @@ func (s *Server) SetTopicPolicy(ctx context.Context, in *api.TopicPolicy) (out * // Mark the topic as ready again topic, err := s.meta.RetrieveTopic(topicID) if err != nil { - return err + return fmt.Errorf("error retrieving topic for topic_id %v: %v", topicID, err) } topic.Status = api.TopicState_READY