diff --git a/server/cursors.go b/server/cursors.go index 2c11d5ec..32ff032e 100644 --- a/server/cursors.go +++ b/server/cursors.go @@ -79,6 +79,16 @@ func (c *cursorManager) Initialize() error { return status.Err() } +// BecomePartitionLeader should be called when this server becomes the leader +// for any cursor partitions. +func (c *cursorManager) BecomePartitionLeader() { + // Clear the cache when we become leader to avoid serving potentially stale + // cursors. + c.mu.Lock() + defer c.mu.Unlock() + c.cache.Purge() +} + // SetCursor stores a cursor position for a particular stream partition // uniquely identified by an opaque string. This returns an error if persisting // the cursor failed. @@ -91,6 +101,14 @@ func (c *cursorManager) SetCursor(ctx context.Context, streamName, cursorID stri return st } + partition := c.metadata.GetPartition(cursorsStream, cursorsPartitionID) + if partition == nil { + return status.Newf(codes.Internal, "Cursors partition %d does not exist", cursorsPartitionID) + } + if !partition.IsLeader() { + return status.New(codes.FailedPrecondition, "Server not cursor partition leader") + } + var ( cursor = &proto.Cursor{ Stream: streamName, @@ -140,6 +158,14 @@ func (c *cursorManager) GetCursor(ctx context.Context, streamName, cursorID stri return 0, st } + partition := c.metadata.GetPartition(cursorsStream, cursorsPartitionID) + if partition == nil { + return 0, status.Newf(codes.Internal, "Cursors partition %d does not exist", cursorsPartitionID) + } + if !partition.IsLeader() { + return 0, status.New(codes.FailedPrecondition, "Server not cursor partition leader") + } + if !c.disableCache { c.mu.RLock() if offset, ok := c.cache.Get(string(cursorKey)); ok { @@ -150,7 +176,7 @@ func (c *cursorManager) GetCursor(ctx context.Context, streamName, cursorID stri } // Find the latest offset for the cursor in the log. - offset, err := c.getLatestCursorOffset(ctx, cursorKey, cursorsPartitionID) + offset, err := c.getLatestCursorOffset(ctx, cursorKey, partition) if err != nil { return 0, status.New(codes.Internal, err.Error()) } @@ -190,11 +216,9 @@ func (c *cursorManager) getCursorKey(cursorID, streamName string, partitionID in return []byte(fmt.Sprintf("%s,%s,%d", cursorID, streamName, partitionID)) } -func (c *cursorManager) getLatestCursorOffset(ctx context.Context, cursorKey []byte, partitionID int32) (int64, error) { - partition := c.metadata.GetPartition(cursorsStream, partitionID) - if partition == nil { - return 0, fmt.Errorf("Cursors partition %d does not exist", partitionID) - } +func (c *cursorManager) getLatestCursorOffset(ctx context.Context, cursorKey []byte, partition *partition) ( + int64, error) { + hw := partition.log.HighWatermark() // No cursors have been committed or the cursors partition is now empty so @@ -208,7 +232,7 @@ func (c *cursorManager) getLatestCursorOffset(ctx context.Context, cursorKey []b defer cancel() sub, err := c.api.SubscribeInternal(ctx, &client.SubscribeRequest{ Stream: cursorsStream, - Partition: partitionID, + Partition: partition.Id, StartPosition: client.StartPosition_EARLIEST, Resume: true, }) diff --git a/server/partition.go b/server/partition.go index fa0091da..fe7e6e97 100644 --- a/server/partition.go +++ b/server/partition.go @@ -862,6 +862,11 @@ func (p *partition) becomeLeader(epoch uint64) error { p.isLeading = true p.isFollowing = false + // Notify the cursor manager if we've come leader for a cursor partition. + if p.Stream == cursorsStream { + p.srv.cursors.BecomePartitionLeader() + } + return nil }