Skip to content

Commit

Permalink
Fix region manager race (#1987)
Browse files Browse the repository at this point in the history
* fix region manager race

Signed-off-by: birdstorm <samuelwyf@hotmail.com>
  • Loading branch information
birdstorm committed Aug 2, 2021
1 parent ddbf7d3 commit 4aad320
Show file tree
Hide file tree
Showing 25 changed files with 1,209 additions and 471 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ class CacheInvalidateEventHandler(regionManager: RegionManager) {
// Used for updating region/store cache in the given regionManager
if (event.shouldUpdateRegion()) {
logger.info(s"Invalidating region ${event.getRegionId} cache at driver.")
regionManager.invalidateRegion(event.getRegionId)
val region = regionManager.getRegionById(event.getRegionId);
if (region != null) {
regionManager.invalidateRegion(region)
}
}

if (event.shouldUpdateStore()) {
Expand All @@ -55,10 +58,18 @@ class CacheInvalidateEventHandler(regionManager: RegionManager) {
// Used for updating leader information cached in the given regionManager
logger.info(
s"Invalidating leader of region:${event.getRegionId} store:${event.getStoreId} cache at driver.")
regionManager.updateLeader(event.getRegionId, event.getStoreId)
val region = regionManager.getRegionById(event.getRegionId);
if (region != null) {
regionManager.updateLeader(region, event.getStoreId)
regionManager.invalidateRegion(region)
}

case CacheType.REQ_FAILED =>
logger.info(s"Request failed cache invalidation for region ${event.getRegionId}")
regionManager.onRequestFail(event.getRegionId, event.getStoreId)
val region = regionManager.getRegionById(event.getRegionId);
if (region != null) {
regionManager.onRequestFail(region)
}
case _ => throw new IllegalArgumentException("Unsupported cache invalidate type.")
}
} catch {
Expand Down
2 changes: 1 addition & 1 deletion tikv-client/scripts/proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ CURRENT_DIR=`pwd`
TISPARK_HOME="$(cd "`dirname "$0"`"/../..; pwd)"
cd $TISPARK_HOME/tikv-client

kvproto_hash=e6d6090277c921c3291c48c5bc8fb38907c852d0
kvproto_hash=6ed99a08e262d8a32d6355dcba91cf99cb92074a

raft_rs_hash=b9891b673573fad77ebcf9bbe0969cf945841926

Expand Down
136 changes: 31 additions & 105 deletions tikv-client/src/main/java/com/pingcap/tikv/KVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,57 +17,51 @@

package com.pingcap.tikv;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import static com.pingcap.tikv.util.ClientUtils.getBatches;
import static com.pingcap.tikv.util.ClientUtils.getKvPairs;

import com.google.protobuf.ByteString;
import com.pingcap.tikv.exception.GrpcException;
import com.pingcap.tikv.exception.TiClientInternalException;
import com.pingcap.tikv.exception.TiKVException;
import com.pingcap.tikv.operation.iterator.ConcreteScanIterator;
import com.pingcap.tikv.region.RegionStoreClient;
import com.pingcap.tikv.region.RegionStoreClient.RegionStoreClientBuilder;
import com.pingcap.tikv.region.TiRegion;
import com.pingcap.tikv.util.BackOffFunction;
import com.pingcap.tikv.util.BackOffer;
import com.pingcap.tikv.util.Batch;
import com.pingcap.tikv.util.ConcreteBackOffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tikv.kvproto.Kvrpcpb;
import org.tikv.kvproto.Kvrpcpb.KvPair;

public class KVClient implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(KVClient.class);
private static final int MAX_BATCH_LIMIT = 1024;
private static final int BATCH_GET_SIZE = 16 * 1024;
private final RegionStoreClientBuilder clientBuilder;
private final TiConfiguration conf;
private final ExecutorService executorService;
private final ExecutorService batchGetThreadPool;

public KVClient(TiConfiguration conf, RegionStoreClientBuilder clientBuilder) {
public KVClient(
TiConfiguration conf,
RegionStoreClientBuilder clientBuilder,
ExecutorService batchGetThreadPool) {
Objects.requireNonNull(conf, "conf is null");
Objects.requireNonNull(clientBuilder, "clientBuilder is null");
this.conf = conf;
this.clientBuilder = clientBuilder;
executorService =
Executors.newFixedThreadPool(
conf.getKvClientConcurrency(),
new ThreadFactoryBuilder().setNameFormat("kvclient-pool-%d").setDaemon(true).build());
this.batchGetThreadPool = batchGetThreadPool;
}

@Override
public void close() {
if (executorService != null) {
executorService.shutdownNow();
}
}
public void close() {}

/**
* Get a key-value pair from TiKV if key exists
Expand All @@ -81,7 +75,7 @@ public ByteString get(ByteString key, long version) throws GrpcException {
RegionStoreClient client = clientBuilder.build(key);
try {
return client.get(backOffer, key, version);
} catch (final TiKVException | TiClientInternalException e) {
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
}
}
Expand All @@ -108,59 +102,42 @@ public List<KvPair> batchGet(BackOffer backOffer, List<ByteString> keys, long ve
* @param endKey end key, exclusive
* @return list of key-value pairs in range
*/
public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, long version)
public List<KvPair> scan(ByteString startKey, ByteString endKey, long version)
throws GrpcException {
Iterator<Kvrpcpb.KvPair> iterator =
scanIterator(conf, clientBuilder, startKey, endKey, version);
List<Kvrpcpb.KvPair> result = new ArrayList<>();
Iterator<KvPair> iterator = scanIterator(conf, clientBuilder, startKey, endKey, version);
List<KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
}

private List<KvPair> doSendBatchGet(BackOffer backOffer, List<ByteString> keys, long version) {
ExecutorCompletionService<List<KvPair>> completionService =
new ExecutorCompletionService<>(executorService);
new ExecutorCompletionService<>(batchGetThreadPool);

Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(keys);
List<Batch> batches = new ArrayList<>();

for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(batches, entry.getKey(), entry.getValue(), BATCH_GET_SIZE);
}
List<Batch> batches =
getBatches(backOffer, keys, BATCH_GET_SIZE, MAX_BATCH_LIMIT, this.clientBuilder);

for (Batch batch : batches) {
BackOffer singleBatchBackOffer = ConcreteBackOffer.create(backOffer);
completionService.submit(
() -> doSendBatchGetInBatchesWithRetry(singleBatchBackOffer, batch, version));
() -> doSendBatchGetInBatchesWithRetry(batch.getBackOffer(), batch, version));
}

try {
List<KvPair> result = new ArrayList<>();
for (int i = 0; i < batches.size(); i++) {
result.addAll(completionService.take().get());
}
return result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (ExecutionException e) {
throw new TiKVException("Execution exception met.", e);
}
return getKvPairs(completionService, batches, BackOffer.BATCH_GET_MAX_BACKOFF);
}

private List<KvPair> doSendBatchGetInBatchesWithRetry(
BackOffer backOffer, Batch batch, long version) {
TiRegion oldRegion = batch.region;
TiRegion oldRegion = batch.getRegion();
TiRegion currentRegion =
clientBuilder.getRegionManager().getRegionByKey(oldRegion.getStartKey());

if (oldRegion.equals(currentRegion)) {
RegionStoreClient client = clientBuilder.build(batch.region);
RegionStoreClient client = clientBuilder.build(batch.getRegion());
try {
return client.batchGet(backOffer, batch.keys, version);
} catch (final TiKVException | TiClientInternalException e) {
return client.batchGet(backOffer, batch.getKeys(), version);
} catch (final TiKVException e) {
backOffer.doBackOff(BackOffFunction.BackOffFuncType.BoRegionMiss, e);
clientBuilder.getRegionManager().invalidateRegion(batch.region.getId());
clientBuilder.getRegionManager().invalidateRegion(batch.getRegion());
logger.warn("ReSplitting ranges for BatchGetRequest", e);

// retry
Expand All @@ -173,76 +150,25 @@ private List<KvPair> doSendBatchGetInBatchesWithRetry(

private List<KvPair> doSendBatchGetWithRefetchRegion(
BackOffer backOffer, Batch batch, long version) {
Map<TiRegion, List<ByteString>> groupKeys = groupKeysByRegion(batch.keys);
List<Batch> retryBatches = new ArrayList<>();

for (Map.Entry<TiRegion, List<ByteString>> entry : groupKeys.entrySet()) {
appendBatches(retryBatches, entry.getKey(), entry.getValue(), BATCH_GET_SIZE);
}
List<Batch> retryBatches =
getBatches(backOffer, batch.getKeys(), BATCH_GET_SIZE, MAX_BATCH_LIMIT, this.clientBuilder);

ArrayList<KvPair> results = new ArrayList<>();
for (Batch retryBatch : retryBatches) {
// recursive calls
List<KvPair> batchResult = doSendBatchGetInBatchesWithRetry(backOffer, retryBatch, version);
List<KvPair> batchResult =
doSendBatchGetInBatchesWithRetry(retryBatch.getBackOffer(), retryBatch, version);
results.addAll(batchResult);
}
return results;
}

/**
* Append batch to list and split them according to batch limit
*
* @param batches a grouped batch
* @param region region
* @param keys keys
* @param batchGetMaxSizeInByte batch max limit
*/
private void appendBatches(
List<Batch> batches, TiRegion region, List<ByteString> keys, int batchGetMaxSizeInByte) {
int start;
int end;
if (keys == null) {
return;
}
int len = keys.size();
for (start = 0; start < len; start = end) {
int size = 0;
for (end = start; end < len && size < batchGetMaxSizeInByte; end++) {
size += keys.get(end).size();
}
Batch batch = new Batch(region, keys.subList(start, end));
batches.add(batch);
}
}

/**
* Group by list of keys according to its region
*
* @param keys keys
* @return a mapping of keys and their region
*/
private Map<TiRegion, List<ByteString>> groupKeysByRegion(List<ByteString> keys) {
return keys.stream()
.collect(Collectors.groupingBy(clientBuilder.getRegionManager()::getRegionByKey));
}

private Iterator<Kvrpcpb.KvPair> scanIterator(
private Iterator<KvPair> scanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey,
long version) {
return new ConcreteScanIterator(conf, builder, startKey, endKey, version);
}

/** A Batch containing the region and a list of keys to send */
private static final class Batch {
private final TiRegion region;
private final List<ByteString> keys;

Batch(TiRegion region, List<ByteString> keys) {
this.region = region;
this.keys = keys;
}
}
}
12 changes: 10 additions & 2 deletions tikv-client/src/main/java/com/pingcap/tikv/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ public byte[] get(byte[] key) {
}

public ByteString get(ByteString key) {
try (KVClient client = new KVClient(session.getConf(), session.getRegionStoreClientBuilder())) {
try (KVClient client =
new KVClient(
session.getConf(),
session.getRegionStoreClientBuilder(),
session.getThreadPoolForBatchGet())) {
return client.get(key, timestamp.getVersion());
}
}
Expand All @@ -78,7 +82,11 @@ public List<BytePairWrapper> batchGet(int backOffer, List<byte[]> keys) {
for (byte[] key : keys) {
list.add(ByteString.copyFrom(key));
}
try (KVClient client = new KVClient(session.getConf(), session.getRegionStoreClientBuilder())) {
try (KVClient client =
new KVClient(
session.getConf(),
session.getRegionStoreClientBuilder(),
session.getThreadPoolForBatchGet())) {
List<KvPair> kvPairList =
client.batchGet(
ConcreteBackOffer.newCustomBackOff(backOffer), list, timestamp.getVersion());
Expand Down
2 changes: 1 addition & 1 deletion tikv-client/src/main/java/com/pingcap/tikv/TTLManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private void sendTxnHeartBeat(BackOffer bo, long ttl) {
String.format("sendTxnHeartBeat failed, regionId=%s", tiRegion.getId()),
result.getException()));
this.regionManager.invalidateStore(store.getId());
this.regionManager.invalidateRegion(tiRegion.getId());
this.regionManager.invalidateRegion(tiRegion);
// re-split keys and commit again.
sendTxnHeartBeat(bo, ttl);
} catch (GrpcException e) {
Expand Down
55 changes: 55 additions & 0 deletions tikv-client/src/main/java/com/pingcap/tikv/TiConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public class TiConfiguration implements Serializable {
private static final int MAX_REQUEST_KEY_RANGE_SIZE = 20000;
private static final int DEF_INDEX_SCAN_CONCURRENCY = 5;
private static final int DEF_TABLE_SCAN_CONCURRENCY = 512;
private static final int DEF_BATCH_GET_CONCURRENCY = 20;
private static final int DEF_BATCH_PUT_CONCURRENCY = 20;
private static final int DEF_BATCH_DELETE_CONCURRENCY = 20;
private static final int DEF_BATCH_SCAN_CONCURRENCY = 5;
private static final int DEF_DELETE_RANGE_CONCURRENCY = 20;
private static final CommandPri DEF_COMMAND_PRIORITY = CommandPri.Low;
private static final IsolationLevel DEF_ISOLATION_LEVEL = IsolationLevel.SI;
private static final boolean DEF_SHOW_ROWID = false;
Expand All @@ -68,6 +73,11 @@ public class TiConfiguration implements Serializable {
private int downgradeThreshold = DEF_REGION_SCAN_DOWNGRADE_THRESHOLD;
private int indexScanConcurrency = DEF_INDEX_SCAN_CONCURRENCY;
private int tableScanConcurrency = DEF_TABLE_SCAN_CONCURRENCY;
private int batchGetConcurrency = DEF_BATCH_GET_CONCURRENCY;
private int batchPutConcurrency = DEF_BATCH_PUT_CONCURRENCY;
private int batchDeleteConcurrency = DEF_BATCH_DELETE_CONCURRENCY;
private int batchScanConcurrency = DEF_BATCH_SCAN_CONCURRENCY;
private int deleteRangeConcurrency = DEF_DELETE_RANGE_CONCURRENCY;
private CommandPri commandPriority = DEF_COMMAND_PRIORITY;
private IsolationLevel isolationLevel = DEF_ISOLATION_LEVEL;
private int maxRequestKeyRangeSize = MAX_REQUEST_KEY_RANGE_SIZE;
Expand Down Expand Up @@ -196,6 +206,51 @@ public void setTableScanConcurrency(int tableScanConcurrency) {
this.tableScanConcurrency = tableScanConcurrency;
}

public int getBatchGetConcurrency() {
return batchGetConcurrency;
}

public TiConfiguration setBatchGetConcurrency(int batchGetConcurrency) {
this.batchGetConcurrency = batchGetConcurrency;
return this;
}

public int getBatchPutConcurrency() {
return batchPutConcurrency;
}

public TiConfiguration setBatchPutConcurrency(int batchPutConcurrency) {
this.batchPutConcurrency = batchPutConcurrency;
return this;
}

public int getBatchDeleteConcurrency() {
return batchDeleteConcurrency;
}

public TiConfiguration setBatchDeleteConcurrency(int batchDeleteConcurrency) {
this.batchDeleteConcurrency = batchDeleteConcurrency;
return this;
}

public int getBatchScanConcurrency() {
return batchScanConcurrency;
}

public TiConfiguration setBatchScanConcurrency(int batchScanConcurrency) {
this.batchScanConcurrency = batchScanConcurrency;
return this;
}

public int getDeleteRangeConcurrency() {
return deleteRangeConcurrency;
}

public TiConfiguration setDeleteRangeConcurrency(int deleteRangeConcurrency) {
this.deleteRangeConcurrency = deleteRangeConcurrency;
return this;
}

public CommandPri getCommandPriority() {
return commandPriority;
}
Expand Down
Loading

0 comments on commit 4aad320

Please sign in to comment.