Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retry for getRegion requests when regionId returns 0 #542

Merged
merged 1 commit into from Jan 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 15 additions & 7 deletions tikv-client/src/main/java/com/pingcap/tikv/PDClient.java
Expand Up @@ -16,6 +16,8 @@
package com.pingcap.tikv;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.pingcap.tikv.operation.PDErrorHandler.getRegionResponseErrorExtractor;
import static com.pingcap.tikv.pd.PDError.buildFromPdpbError;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.net.HostAndPort;
Expand Down Expand Up @@ -70,7 +72,9 @@ public TiTimestamp getTimestamp(BackOffer backOffer) {
Supplier<TsoRequest> request = () -> tsoReq;

PDErrorHandler<TsoResponse> handler =
new PDErrorHandler<>(r -> r.getHeader().hasError() ? r.getHeader().getError() : null, this);
new PDErrorHandler<>(
r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null,
this);

TsoResponse resp = callWithRetry(backOffer, PDGrpc.METHOD_TSO, request, handler);
Timestamp timestamp = resp.getTimestamp();
Expand All @@ -87,7 +91,7 @@ public TiRegion getRegionByKey(BackOffer backOffer, ByteString key) {
() -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(encodedKey).build();

PDErrorHandler<GetRegionResponse> handler =
new PDErrorHandler<>(r -> r.getHeader().hasError() ? r.getHeader().getError() : null, this);
new PDErrorHandler<>(getRegionResponseErrorExtractor, this);

GetRegionResponse resp = callWithRetry(backOffer, PDGrpc.METHOD_GET_REGION, request, handler);
return new TiRegion(
Expand All @@ -108,7 +112,7 @@ public Future<TiRegion> getRegionByKeyAsync(BackOffer backOffer, ByteString key)
() -> GetRegionRequest.newBuilder().setHeader(header).setRegionKey(key).build();

PDErrorHandler<GetRegionResponse> handler =
new PDErrorHandler<>(r -> r.getHeader().hasError() ? r.getHeader().getError() : null, this);
new PDErrorHandler<>(getRegionResponseErrorExtractor, this);

callAsyncWithRetry(backOffer, PDGrpc.METHOD_GET_REGION, request, responseObserver, handler);
return responseObserver.getFuture();
Expand All @@ -119,7 +123,7 @@ public TiRegion getRegionByID(BackOffer backOffer, long id) {
Supplier<GetRegionByIDRequest> request =
() -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build();
PDErrorHandler<GetRegionResponse> handler =
new PDErrorHandler<>(r -> r.getHeader().hasError() ? r.getHeader().getError() : null, this);
new PDErrorHandler<>(getRegionResponseErrorExtractor, this);

GetRegionResponse resp =
callWithRetry(backOffer, PDGrpc.METHOD_GET_REGION_BY_ID, request, handler);
Expand All @@ -142,7 +146,7 @@ public Future<TiRegion> getRegionByIDAsync(BackOffer backOffer, long id) {
Supplier<GetRegionByIDRequest> request =
() -> GetRegionByIDRequest.newBuilder().setHeader(header).setRegionId(id).build();
PDErrorHandler<GetRegionResponse> handler =
new PDErrorHandler<>(r -> r.getHeader().hasError() ? r.getHeader().getError() : null, this);
new PDErrorHandler<>(getRegionResponseErrorExtractor, this);

callAsyncWithRetry(
backOffer, PDGrpc.METHOD_GET_REGION_BY_ID, request, responseObserver, handler);
Expand All @@ -154,7 +158,9 @@ public Store getStore(BackOffer backOffer, long storeId) {
Supplier<GetStoreRequest> request =
() -> GetStoreRequest.newBuilder().setHeader(header).setStoreId(storeId).build();
PDErrorHandler<GetStoreResponse> handler =
new PDErrorHandler<>(r -> r.getHeader().hasError() ? r.getHeader().getError() : null, this);
new PDErrorHandler<>(
r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null,
this);

GetStoreResponse resp = callWithRetry(backOffer, PDGrpc.METHOD_GET_STORE, request, handler);
return resp.getStore();
Expand All @@ -168,7 +174,9 @@ public Future<Store> getStoreAsync(BackOffer backOffer, long storeId) {
Supplier<GetStoreRequest> request =
() -> GetStoreRequest.newBuilder().setHeader(header).setStoreId(storeId).build();
PDErrorHandler<GetStoreResponse> handler =
new PDErrorHandler<>(r -> r.getHeader().hasError() ? r.getHeader().getError() : null, this);
new PDErrorHandler<>(
r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null,
this);

callAsyncWithRetry(backOffer, PDGrpc.METHOD_GET_STORE, request, responseObserver, handler);
return responseObserver.getFuture();
Expand Down
Expand Up @@ -17,18 +17,30 @@

package com.pingcap.tikv.operation;

import static com.pingcap.tikv.pd.PDError.buildFromPdpbError;

import com.pingcap.tikv.PDClient;
import com.pingcap.tikv.exception.GrpcException;
import com.pingcap.tikv.exception.TiClientInternalException;
import com.pingcap.tikv.kvproto.Pdpb;
import com.pingcap.tikv.pd.PDError;
import com.pingcap.tikv.util.BackOffFunction;
import com.pingcap.tikv.util.BackOffer;
import java.util.function.Function;
import org.apache.log4j.Logger;

public class PDErrorHandler<RespT> implements ErrorHandler<RespT> {
private final Function<RespT, Pdpb.Error> getError;
private static final Logger logger = Logger.getLogger(PDErrorHandler.class);
private final Function<RespT, PDError> getError;
private final PDClient client;

public PDErrorHandler(Function<RespT, Pdpb.Error> errorExtractor, PDClient client) {
public static final Function<Pdpb.GetRegionResponse, PDError> getRegionResponseErrorExtractor =
r ->
r.getHeader().hasError()
? buildFromPdpbError(r.getHeader().getError())
: r.getRegion().getId() == 0 ? PDError.RegionPeerNotElected.DEFAULT_INSTANCE : null;

public PDErrorHandler(Function<RespT, PDError> errorExtractor, PDClient client) {
this.getError = errorExtractor;
this.client = client;
}
Expand All @@ -38,12 +50,22 @@ public boolean handleResponseError(BackOffer backOffer, RespT resp) {
if (resp == null) {
return false;
}
Pdpb.Error error = getError.apply(resp);
PDError error = getError.apply(resp);
if (error != null) {
client.updateLeader();
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoPDRPC, new GrpcException(error.toString()));
return true;
switch (error.getErrorType()) {
case PD_ERROR:
client.updateLeader();
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoPDRPC, new GrpcException(error.toString()));
return true;
case REGION_PEER_NOT_ELECTED:
logger.debug(error.getMessage());
backOffer.doBackOff(
BackOffFunction.BackOffFuncType.BoPDRPC, new GrpcException(error.toString()));
return true;
default:
throw new TiClientInternalException("Unknown error type encountered: " + error);
}
}
return false;
}
Expand Down
104 changes: 104 additions & 0 deletions tikv-client/src/main/java/com/pingcap/tikv/pd/PDError.java
@@ -0,0 +1,104 @@
/*
*
* Copyright 2018 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.pingcap.tikv.pd;

import com.pingcap.tikv.kvproto.Pdpb;

public final class PDError {
private final Pdpb.Error error;

private final ErrorType errorType;

public enum ErrorType {
PD_ERROR,
REGION_PEER_NOT_ELECTED
}

private PDError(Pdpb.Error error) {
this.error = error;
this.errorType = ErrorType.PD_ERROR;
}

private PDError(Pdpb.Error error, ErrorType errorType) {
this.error = error;
this.errorType = errorType;
}

public static PDError buildFromPdpbError(Pdpb.Error error) {
return new PDError(error);
}

public static Builder newBuilder() {
return new Builder();
}

public static Builder newBuilder(Pdpb.Error error) {
return new Builder(error);
}

public Pdpb.Error getError() {
return error;
}

public ErrorType getErrorType() {
return errorType;
}

public String getMessage() {
return getError().getMessage();
}

@Override
public String toString() {
return "\nErrorType: " + errorType + "\nError: " + error;
}

public static final class RegionPeerNotElected {
private static final String ERROR_MESSAGE = "Region Peer not elected. Please try later";
private static final Pdpb.Error DEFAULT_ERROR =
Pdpb.Error.newBuilder().setMessage(ERROR_MESSAGE).build();
private static final ErrorType ERROR_TYPE = ErrorType.REGION_PEER_NOT_ELECTED;
public static final PDError DEFAULT_INSTANCE =
PDError.newBuilder(DEFAULT_ERROR).setErrorType(ERROR_TYPE).build();
}

public static final class Builder {
private Pdpb.Error error_;
private ErrorType errorType_ = ErrorType.PD_ERROR;

public Builder() {}

public Builder(Pdpb.Error error) {
this.error_ = error;
}

public Builder setError(Pdpb.Error error) {
this.error_ = error;
return this;
}

public Builder setErrorType(ErrorType errorType) {
this.errorType_ = errorType;
return this;
}

public PDError build() {
return new PDError(error_, errorType_);
}
}
}