Skip to content

Commit

Permalink
storage: report optional StoreID with RangeNotFoundError
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
tbg committed Oct 12, 2018
1 parent 4d0fb44 commit 9998c7a
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 184 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,7 @@ func (ds *DistSender) sendToReplicas(
// cached RangeDescriptor and re-send.
if replicas.FindReplica(lh.StoreID) == -1 {
// Replace NotLeaseHolderError with RangeNotFoundError.
br.Error = roachpb.NewError(roachpb.NewRangeNotFoundError(rangeID))
br.Error = roachpb.NewError(roachpb.NewRangeNotFoundError(rangeID, curReplica.StoreID))
propagateError = true
} else {
// Move the new lease holder to the head of the queue for the next retry.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ func TestEvictCacheOnUnknownLeaseHolder(t *testing.T) {
case 0, 1:
err = &roachpb.NotLeaseHolderError{LeaseHolder: &roachpb.ReplicaDescriptor{NodeID: 99, StoreID: 999}}
case 2:
err = roachpb.NewRangeNotFoundError(0)
err = roachpb.NewRangeNotFoundError(0, 0)
default:
return args.CreateReply(), nil
}
Expand Down
12 changes: 9 additions & 3 deletions pkg/roachpb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,12 @@ func (s *SendError) message(_ *Error) string {

var _ ErrorDetailInterface = &SendError{}

// NewRangeNotFoundError initializes a new RangeNotFoundError.
func NewRangeNotFoundError(rangeID RangeID) *RangeNotFoundError {
// NewRangeNotFoundError initializes a new RangeNotFoundError for the given RangeID and, optionally,
// a StoreID.
func NewRangeNotFoundError(rangeID RangeID, storeID StoreID) *RangeNotFoundError {
return &RangeNotFoundError{
RangeID: rangeID,
StoreID: storeID,
}
}

Expand All @@ -253,7 +255,11 @@ func (e *RangeNotFoundError) Error() string {
}

func (e *RangeNotFoundError) message(_ *Error) string {
return fmt.Sprintf("r%d was not found", e.RangeID)
msg := fmt.Sprintf("r%d was not found", e.RangeID)
if e.StoreID != 0 {
msg += fmt.Sprintf(" on s%d", e.StoreID)
}
return msg
}

var _ ErrorDetailInterface = &RangeNotFoundError{}
Expand Down
368 changes: 198 additions & 170 deletions pkg/roachpb/errors.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pkg/roachpb/errors.proto
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ message RangeNotFoundError {

optional int64 range_id = 1 [(gogoproto.nullable) = false,
(gogoproto.customname) = "RangeID", (gogoproto.casttype) = "RangeID"];
// store_id is nonzero only if the error originated on a Store.
optional int64 store_id = 2 [(gogoproto.nullable) = false,
(gogoproto.customname) = "StoreID", (gogoproto.casttype) = "StoreID"];
}

// A RangeKeyMismatchError indicates that a command was sent to a
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -1339,7 +1339,7 @@ func (s *statusServer) CommandQueue(
}

if replica == nil {
return nil, roachpb.NewRangeNotFoundError(rangeID)
return nil, roachpb.NewRangeNotFoundError(rangeID, 0)
}

return &serverpb.CommandQueueResponse{
Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1761,7 +1761,7 @@ func (r *Replica) getReplicaDescriptorRLocked() (roachpb.ReplicaDescriptor, erro
if ok {
return repDesc, nil
}
return roachpb.ReplicaDescriptor{}, roachpb.NewRangeNotFoundError(r.RangeID)
return roachpb.ReplicaDescriptor{}, roachpb.NewRangeNotFoundError(r.RangeID, r.store.StoreID())
}

func (r *Replica) getMergeCompleteCh() chan struct{} {
Expand Down Expand Up @@ -2035,7 +2035,7 @@ func (r *Replica) sendWithRangeID(
if _, ok := pErr.GetDetail().(*roachpb.RaftGroupDeletedError); ok {
// This error needs to be converted appropriately so that
// clients will retry.
pErr = roachpb.NewError(roachpb.NewRangeNotFoundError(r.RangeID))
pErr = roachpb.NewError(roachpb.NewRangeNotFoundError(r.RangeID, r.store.StoreID()))
}
log.Eventf(ctx, "replica.Send got error: %s", pErr)
} else {
Expand Down Expand Up @@ -2927,7 +2927,7 @@ func (r *Replica) maybeWatchForMerge(ctx context.Context) error {
// The merge committed but the left-hand replica on this store hasn't
// subsumed this replica yet. Mark this replica as destroyed so it
// doesn't serve requests when we close the mergeCompleteCh below.
r.mu.destroyStatus.Set(roachpb.NewRangeNotFoundError(r.RangeID), destroyReasonMergePending)
r.mu.destroyStatus.Set(roachpb.NewRangeNotFoundError(r.RangeID, r.store.StoreID()), destroyReasonMergePending)
}
// Unblock pending requests. If the merge committed, the requests will
// notice that the replica has been destroyed and return an appropriate
Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2122,7 +2122,7 @@ func (s *Store) GetReplica(rangeID roachpb.RangeID) (*Replica, error) {
if value, ok := s.mu.replicas.Load(int64(rangeID)); ok {
return (*Replica)(value), nil
}
return nil, roachpb.NewRangeNotFoundError(rangeID)
return nil, roachpb.NewRangeNotFoundError(rangeID, s.StoreID())
}

// LookupReplica looks up the replica that contains the specified key. It
Expand Down Expand Up @@ -2768,7 +2768,7 @@ func (s *Store) removeReplicaImpl(
rep.mu.Lock()
rep.cancelPendingCommandsLocked()
rep.mu.internalRaftGroup = nil
rep.mu.destroyStatus.Set(roachpb.NewRangeNotFoundError(rep.RangeID), destroyReasonRemoved)
rep.mu.destroyStatus.Set(roachpb.NewRangeNotFoundError(rep.RangeID, rep.store.StoreID()), destroyReasonRemoved)
rep.mu.Unlock()
rep.readOnlyCmdMu.Unlock()

Expand Down Expand Up @@ -3789,7 +3789,8 @@ func (s *Store) HandleRaftResponse(ctx context.Context, resp *RaftMessageRespons
// could be re-added with a higher replicaID, in which this error is
// cleared in setReplicaIDRaftMuLockedMuLocked.
if repl.mu.destroyStatus.IsAlive() {
repl.mu.destroyStatus.Set(roachpb.NewRangeNotFoundError(repl.RangeID), destroyReasonRemovalPending)
storeID := repl.store.StoreID()
repl.mu.destroyStatus.Set(roachpb.NewRangeNotFoundError(repl.RangeID, storeID), destroyReasonRemovalPending)
}
repl.mu.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (ls *Stores) GetReplicaForRangeID(rangeID roachpb.RangeID) (*Replica, error
return nil, err
}
if replica == nil {
return nil, roachpb.NewRangeNotFoundError(rangeID)
return nil, roachpb.NewRangeNotFoundError(rangeID, 0)
}
return replica, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestStoresGetReplicaForRangeID(t *testing.T) {
if replica2 != nil {
t.Fatalf("expected replica to be nil; was %v", replica2)
}
expectedError := roachpb.NewRangeNotFoundError(rangeID2)
expectedError := roachpb.NewRangeNotFoundError(rangeID2, 0)
if err2.Error() != expectedError.Error() {
t.Fatalf("expected err to be %v; was %v", expectedError, err2)
}
Expand Down

0 comments on commit 9998c7a

Please sign in to comment.