Skip to content

Commit

Permalink
Move lz4 decompress to backend executor (apache#1237)
Browse files Browse the repository at this point in the history
* use whitebox to set LOW_WATER_MARK and HIGH_WATER_MARK

Change-Id: Ic7770f0b8cb9a81dbb714ea9ce8ef99d5b710807
  • Loading branch information
Linary authored and tmljob committed Dec 10, 2020
1 parent b066363 commit f41ae0d
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ protected void addSchema(SchemaElement schema) {
@SuppressWarnings("unchecked")
protected <T extends SchemaElement> T getSchema(HugeType type, Id id) {
// try get from optimized array cache
if (id.number() && id.asLong() > 0) {
if (id.number() && id.asLong() > 0L) {
SchemaElement value = this.arrayCaches.get(type, id);
if (value != null) {
return (T) value;
Expand Down Expand Up @@ -320,14 +320,19 @@ public void updateIfNeeded(V schema) {
return;
}
Id id = schema.id();
if (id.number() && id.asLong() > 0) {
if (id.number() && id.asLong() > 0L) {
this.set(schema.type(), id, schema);
}
}

public V get(HugeType type, Id id) {
assert id.number() && id.asLong() > 0 : id;
int key = (int) id.asLong();
assert id.number();
long longId = id.asLong();
if (longId <= 0L) {
assert false : id;
return null;
}
int key = (int) longId;
if (key >= this.size) {
return null;
}
Expand All @@ -346,8 +351,13 @@ public V get(HugeType type, Id id) {
}

public void set(HugeType type, Id id, V value) {
assert id.number() && id.asLong() > 0 : id;
int key = (int) id.asLong();
assert id.number();
long longId = id.asLong();
if (longId <= 0L) {
assert false : id;
return;
}
int key = (int) longId;
if (key >= this.size) {
return;
}
Expand All @@ -371,8 +381,12 @@ public void set(HugeType type, Id id, V value) {
}

public void remove(HugeType type, Id id) {
assert id.number() && id.asLong() > 0 : id;
int key = (int) id.asLong();
assert id.number();
long longId = id.asLong();
if (longId <= 0L) {
return;
}
int key = (int) longId;
V value = null;
if (key >= this.size) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ public void truncate() {

@Override
public void mutate(BackendMutation mutation) {
if (mutation.isEmpty()) {
return;
}
// Just add to local buffer
this.getOrNewBatch().add(mutation);
}
Expand Down Expand Up @@ -203,7 +206,7 @@ private Object submitAndWait(StoreCommand command) {
}

private Object queryByRaft(Object query, Function<Object, Object> func) {
if (!this.context.isSafeRead()) {
if (this.node().selfIsLeader() || !this.context.isSafeRead()) {
return func.apply(query);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ public Status status() {
private RaftResult<T> get() {
try {
return this.future.get(WAIT_RAFT_LOG_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException e) {
} catch (ExecutionException e) {
throw new BackendException("ExecutionException", e);
} catch (InterruptedException e) {
throw new BackendException("InterruptedException", e);
} catch (TimeoutException e) {
throw new BackendException("Wait closure timeout");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ private void submitCommand(StoreCommand command, StoreClosure closure) {
// compress return BytesBuffer
ByteBuffer buffer = LZ4Util.compress(command.data(),
RaftSharedContext.BLOCK_SIZE)
.forReadWritten()
.asByteBuffer();
LOG.debug("The bytes size of command(compressed) {} is {}",
command.action(), buffer.limit());
Expand Down Expand Up @@ -217,7 +218,6 @@ private void waitIfBusy() {
if (counter <= 0) {
return;
}
// TODO:should sleep or throw exception directly?
// It may lead many thread sleep, but this is exactly what I want
long time = counter * BUSY_SLEEP_FACTOR;
LOG.info("The node {} will sleep {} ms", this.node, time);
Expand All @@ -230,7 +230,8 @@ private void waitIfBusy() {
if (this.busyCounter.get() > 0) {
synchronized (this) {
if (this.busyCounter.get() > 0) {
this.busyCounter.decrementAndGet();
counter = this.busyCounter.decrementAndGet();
LOG.info("Decrease busy counter: [{}]", counter);
}
}
}
Expand Down Expand Up @@ -267,7 +268,7 @@ public void onError(PeerId peer, Status status) {
if (this.isWriteBufferOverflow(status)) {
// increment busy counter
int count = RaftNode.this.busyCounter.incrementAndGet();
LOG.info("Busy counter: [{}]", count);
LOG.info("Increase busy counter: [{}]", count);
}
}

Expand All @@ -278,6 +279,16 @@ private boolean isWriteBufferOverflow(Status status) {
status.getErrorMsg().contains(expectMsg);
}

/**
* Maybe useful in the future
*/
private boolean isRpcTimeout(Status status) {
String expectMsg = "Invoke timeout";
return RaftError.EINTERNAL == status.getRaftError() &&
status.getErrorMsg() != null &&
status.getErrorMsg().contains(expectMsg);
}

@Override
public void onDestroyed(PeerId peer) {
LOG.warn("Replicator {} prepare to offline", peer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.rpc.impl.BoltRaftRpcFactory;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.baidu.hugegraph.HugeException;
Expand All @@ -55,6 +56,7 @@
import com.baidu.hugegraph.config.CoreOptions;
import com.baidu.hugegraph.config.HugeConfig;
import com.baidu.hugegraph.event.EventHub;
import com.baidu.hugegraph.testutil.Whitebox;
import com.baidu.hugegraph.type.HugeType;
import com.baidu.hugegraph.type.define.GraphMode;
import com.baidu.hugegraph.util.E;
Expand Down Expand Up @@ -307,6 +309,13 @@ private HugeConfig config() {
}

private RpcServer initAndStartRpcServer() {
Whitebox.setInternalState(
BoltRaftRpcFactory.class, "CHANNEL_WRITE_BUF_LOW_WATER_MARK",
this.config().get(CoreOptions.RAFT_RPC_BUF_LOW_WATER_MARK));
Whitebox.setInternalState(
BoltRaftRpcFactory.class, "CHANNEL_WRITE_BUF_HIGH_WATER_MARK",
this.config().get(CoreOptions.RAFT_RPC_BUF_HIGH_WATER_MARK));

PeerId serverId = new PeerId();
serverId.parse(this.config().get(CoreOptions.RAFT_ENDPOINT));
RpcServer rpcServer = RaftRpcServerFactory.createAndStartRaftRpcServer(
Expand Down Expand Up @@ -344,7 +353,7 @@ private static ExecutorService newPool(int coreThreads, int maxThreads,
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>();
return ThreadPoolUtil.newBuilder()
.poolName(name)
.enableMetric(true)
.enableMetric(false)
.coreThreads(coreThreads)
.maximumThreads(maxThreads)
.keepAliveSeconds(300L)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,31 +134,28 @@ public void onApply(Iterator iter) {
StoreClosure closure = null;
try {
while (iter.hasNext()) {
StoreType type;
StoreAction action;
BytesBuffer buffer;
closure = (StoreClosure) iter.done();
if (closure != null) {
// Leader just take it out from the closure
buffer = BytesBuffer.wrap(closure.command().data());
} else {
// Follower need readMutation data
buffer = LZ4Util.decompress(iter.getData().array(),
RaftSharedContext.BLOCK_SIZE);
}
// The first two bytes are StoreType and StoreAction
type = StoreType.valueOf(buffer.read());
action = StoreAction.valueOf(buffer.read());
if (closure != null) {
// Closure is null on follower node
BytesBuffer buffer = BytesBuffer.wrap(closure.command().data());
// The first two bytes are StoreType and StoreAction
StoreType type = StoreType.valueOf(buffer.read());
StoreAction action = StoreAction.valueOf(buffer.read());
// Let the producer thread to handle it
closure.complete(Status.OK(), () -> {
return this.applyCommand(type, action, buffer);
});
} else {
// Follower need readMutation data
byte[] bytes = iter.getData().array();
// Follower seems no way to wait future
// Let the backend thread do it directly
this.context.backendExecutor().submit(() -> {
BytesBuffer buffer = LZ4Util.decompress(bytes,
RaftSharedContext.BLOCK_SIZE);
buffer.forReadWritten();
StoreType type = StoreType.valueOf(buffer.read());
StoreAction action = StoreAction.valueOf(buffer.read());
try {
this.applyCommand(type, action, buffer);
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,30 @@ public static synchronized CoreOptions instance() {
60000
);

public static final ConfigOption<Integer> RAFT_RPC_BUF_LOW_WATER_MARK =
new ConfigOption<>(
"raft.rpc_buf_low_water_mark",
"The ChannelOutboundBuffer's low water mark of netty, " +
"when buffer size less than this size, the method " +
"ChannelOutboundBuffer.isWritable() will return true, " +
"it means that low downstream pressure or good network",
positiveInt(),
10 * 1024 * 1024
);

public static final ConfigOption<Integer> RAFT_RPC_BUF_HIGH_WATER_MARK =
new ConfigOption<>(
"raft.rpc_buf_high_water_mark",
"The ChannelOutboundBuffer's high water mark of netty, " +
"only when buffer size exceed this size, the method " +
"ChannelOutboundBuffer.isWritable() will return false, " +
"it means that the downstream pressure is too great to " +
"process the request or network is very congestion, " +
"upstream needs to limit rate at this time",
positiveInt(),
20 * 1024 * 1024
);

public static final ConfigOption<Integer> RATE_LIMIT_WRITE =
new ConfigOption<>(
"rate_limit.write",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public static BytesBuffer compress(byte[] bytes, int blockSize,
} catch (IOException e) {
throw new BackendException("Failed to compress", e);
}
/*
* If need to perform reading outside the method,
* remember to call forReadWritten()
*/
return buf;
}

Expand Down Expand Up @@ -82,6 +86,10 @@ public static BytesBuffer decompress(byte[] bytes, int blockSize,
} catch (IOException e) {
throw new BackendException("Failed to decompress", e);
}
/*
* If need to perform reading outside the method,
* remember to call forReadWritten()
*/
return buf;
}
}

0 comments on commit f41ae0d

Please sign in to comment.