Skip to content

Commit

Permalink
Fix potential for serving stale cursors
Browse files Browse the repository at this point in the history
Clear the cursor cache when a server becomes the leader for a cursor
partition to avoid the potential for serving stale cursors.

Fixes #319
  • Loading branch information
tylertreat committed Mar 14, 2022
1 parent befe496 commit 8d33538
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 7 deletions.
38 changes: 31 additions & 7 deletions server/cursors.go
Expand Up @@ -79,6 +79,16 @@ func (c *cursorManager) Initialize() error {
return status.Err()
}

// BecomePartitionLeader should be called when this server becomes the leader
// for any cursor partitions.
func (c *cursorManager) BecomePartitionLeader() {
// Clear the cache when we become leader to avoid serving potentially stale
// cursors.
c.mu.Lock()
defer c.mu.Unlock()
c.cache.Purge()
}

// SetCursor stores a cursor position for a particular stream partition
// uniquely identified by an opaque string. This returns an error if persisting
// the cursor failed.
Expand All @@ -91,6 +101,14 @@ func (c *cursorManager) SetCursor(ctx context.Context, streamName, cursorID stri
return st
}

partition := c.metadata.GetPartition(cursorsStream, cursorsPartitionID)
if partition == nil {
return status.Newf(codes.Internal, "Cursors partition %d does not exist", cursorsPartitionID)
}
if !partition.IsLeader() {
return status.New(codes.FailedPrecondition, "Server not cursor partition leader")
}

var (
cursor = &proto.Cursor{
Stream: streamName,
Expand Down Expand Up @@ -140,6 +158,14 @@ func (c *cursorManager) GetCursor(ctx context.Context, streamName, cursorID stri
return 0, st
}

partition := c.metadata.GetPartition(cursorsStream, cursorsPartitionID)
if partition == nil {
return 0, status.Newf(codes.Internal, "Cursors partition %d does not exist", cursorsPartitionID)
}
if !partition.IsLeader() {
return 0, status.New(codes.FailedPrecondition, "Server not cursor partition leader")
}

if !c.disableCache {
c.mu.RLock()
if offset, ok := c.cache.Get(string(cursorKey)); ok {
Expand All @@ -150,7 +176,7 @@ func (c *cursorManager) GetCursor(ctx context.Context, streamName, cursorID stri
}

// Find the latest offset for the cursor in the log.
offset, err := c.getLatestCursorOffset(ctx, cursorKey, cursorsPartitionID)
offset, err := c.getLatestCursorOffset(ctx, cursorKey, partition)
if err != nil {
return 0, status.New(codes.Internal, err.Error())
}
Expand Down Expand Up @@ -190,11 +216,9 @@ func (c *cursorManager) getCursorKey(cursorID, streamName string, partitionID in
return []byte(fmt.Sprintf("%s,%s,%d", cursorID, streamName, partitionID))
}

func (c *cursorManager) getLatestCursorOffset(ctx context.Context, cursorKey []byte, partitionID int32) (int64, error) {
partition := c.metadata.GetPartition(cursorsStream, partitionID)
if partition == nil {
return 0, fmt.Errorf("Cursors partition %d does not exist", partitionID)
}
func (c *cursorManager) getLatestCursorOffset(ctx context.Context, cursorKey []byte, partition *partition) (
int64, error) {

hw := partition.log.HighWatermark()

// No cursors have been committed or the cursors partition is now empty so
Expand All @@ -208,7 +232,7 @@ func (c *cursorManager) getLatestCursorOffset(ctx context.Context, cursorKey []b
defer cancel()
sub, err := c.api.SubscribeInternal(ctx, &client.SubscribeRequest{
Stream: cursorsStream,
Partition: partitionID,
Partition: partition.Id,
StartPosition: client.StartPosition_EARLIEST,
Resume: true,
})
Expand Down
5 changes: 5 additions & 0 deletions server/partition.go
Expand Up @@ -862,6 +862,11 @@ func (p *partition) becomeLeader(epoch uint64) error {
p.isLeading = true
p.isFollowing = false

// Notify the cursor manager if we've come leader for a cursor partition.
if p.Stream == cursorsStream {
p.srv.cursors.BecomePartitionLeader()
}

return nil
}

Expand Down

0 comments on commit 8d33538

Please sign in to comment.