Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into ZOOKEEPER-3188
Browse files Browse the repository at this point in the history
  • Loading branch information
symat authored and Mate Szalay-Beko committed Nov 19, 2019
2 parents 4b6bcea + 945167c commit 45b6c0f
Show file tree
Hide file tree
Showing 11 changed files with 230 additions and 65 deletions.
10 changes: 10 additions & 0 deletions zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,16 @@ property, when available, is noted below.
by default with a value of 400, set to 0 or a negative
integer to turn the feature off.

* *maxGetChildrenResponseCacheSize* :
(Java system property: **zookeeper.maxGetChildrenResponseCacheSize**)
**New in 3.6.0:**
Similar to **maxResponseCacheSize**, but applies to get children
requests. The metrics **response_packet_get_children_cache_hits**
and **response_packet_get_children_cache_misses** can be used to tune
this value to a given workload. The feature is turned on
by default with a value of 400, set to 0 or a negative
integer to turn the feature off.

* *autopurge.snapRetainCount* :
(No Java system property)
**New in 3.4.0:**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ public void close(DisconnectReason reason) {
}

@Override
public void sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat) throws IOException {
public void sendResponse(ReplyHeader h, Record r, String tag,
String cacheKey, Stat stat, int opCode) throws IOException {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,19 +585,32 @@ public void processRequest(Request request) {
updateStats(request, lastOp, lastZxid);

try {
if (request.type == OpCode.getData && path != null && rsp != null) {
// Serialized read responses could be cached by the connection object.
// Cache entries are identified by their path and last modified zxid,
// so these values are passed along with the response.
GetDataResponse getDataResponse = (GetDataResponse) rsp;
if (path == null || rsp == null) {
cnxn.sendResponse(hdr, rsp, "response");
} else {
int opCode = request.type;
Stat stat = null;
if (getDataResponse.getStat() != null) {
stat = getDataResponse.getStat();
// Serialized read and get children responses could be cached by the connection
// object. Cache entries are identified by their path and last modified zxid,
// so these values are passed along with the response.
switch (opCode) {
case OpCode.getData : {
GetDataResponse getDataResponse = (GetDataResponse) rsp;
stat = getDataResponse.getStat();
cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);
break;
}
case OpCode.getChildren2 : {
GetChildren2Response getChildren2Response = (GetChildren2Response) rsp;
stat = getChildren2Response.getStat();
cnxn.sendResponse(hdr, rsp, "response", path, stat, opCode);
break;
}
default:
cnxn.sendResponse(hdr, rsp, "response");
}
cnxn.sendResponse(hdr, rsp, "response", path, stat);
} else {
cnxn.sendResponse(hdr, rsp, "response");
}

if (request.type == OpCode.closeSession) {
cnxn.sendCloseSession();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.ReplyHeader;
Expand Down Expand Up @@ -663,31 +664,10 @@ public static void closeSock(SocketChannel sock) {

private static final ByteBuffer packetSentinel = ByteBuffer.allocate(0);

/**
* Serializes a ZooKeeper response and enqueues it for sending.
*
* Serializes client response parts and enqueues them into outgoing queue.
*
* If both cache key and last modified zxid are provided, the serialized
* response is caсhed under the provided key, the last modified zxid is
* stored along with the value. A cache entry is invalidated if the
* provided last modified zxid is more recent than the stored one.
*
* Attention: this function is not thread safe, due to caching not being
* thread safe.
*
* @param h reply header
* @param r reply payload, can be null
* @param tag Jute serialization tag, can be null
* @param cacheKey key for caching the serialized payload. a null value
* prvents caching
* @param stat stat information for the the reply payload, used
* for cache invalidation. a value of 0 prevents caching.
*/
@Override
public void sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat) {
public void sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat, int opCode) {
try {
sendBuffer(serialize(h, r, tag, cacheKey, stat));
sendBuffer(serialize(h, r, tag, cacheKey, stat, opCode));
decrOutstandingAndCheckThrottle(h);
} catch (Exception e) {
LOG.warn("Unexpected exception. Destruction averted.", e);
Expand All @@ -712,7 +692,10 @@ public void process(WatchedEvent event) {
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();

sendResponse(h, e, "notification", null, null);
// The last parameter OpCode here is used to select the response cache.
// Passing OpCode.error (with a value of -1) means we don't care, as we don't need
// response cache on delivering watcher events.
sendResponse(h, e, "notification", null, null, ZooDefs.OpCode.error);
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,14 @@ public void process(WatchedEvent event) {
}

@Override
public void sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat) throws IOException {
public void sendResponse(ReplyHeader h, Record r, String tag,
String cacheKey, Stat stat, int opCode) throws IOException {
// cacheKey and stat are used in caching, which is not
// implemented here. Implementation example can be found in NIOServerCnxn.
if (closingChannel || !channel.isOpen()) {
return;
}
sendBuffer(serialize(h, r, tag, cacheKey, stat));
sendBuffer(serialize(h, r, tag, cacheKey, stat, opCode));
decrOutstandingAndCheckThrottle(h);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,31 @@
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("serial")
public class ResponseCache {
private static final Logger LOG = LoggerFactory.getLogger(ResponseCache.class);

// Magic number chosen to be "big enough but not too big"
private static final int DEFAULT_RESPONSE_CACHE_SIZE = 400;

public static final int DEFAULT_RESPONSE_CACHE_SIZE = 400;
private final int cacheSize;
private static class Entry {

public Stat stat;
public byte[] data;

}

private Map<String, Entry> cache = Collections.synchronizedMap(new LRUCache<String, Entry>(getResponseCacheSize()));
private final Map<String, Entry> cache;

public ResponseCache() {
public ResponseCache(int cacheSize) {
this.cacheSize = cacheSize;
cache = Collections.synchronizedMap(new LRUCache<>(cacheSize));
LOG.info("Response cache size is initialized with value {}.", cacheSize);
}

public int getCacheSize() {
return cacheSize;
}

public void put(String path, byte[] data, Stat stat) {
Expand All @@ -62,12 +70,8 @@ public byte[] get(String key, Stat stat) {
}
}

private static int getResponseCacheSize() {
return Integer.getInteger("zookeeper.maxResponseCacheSize", DEFAULT_RESPONSE_CACHE_SIZE);
}

public static boolean isEnabled() {
return getResponseCacheSize() > 0;
public boolean isEnabled() {
return cacheSize > 0;
}

private static class LRUCache<K, V> extends LinkedHashMap<K, V> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
import org.apache.zookeeper.Quotas;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.metrics.Counter;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.RequestHeader;
import org.slf4j.Logger;
Expand Down Expand Up @@ -161,10 +163,34 @@ public void decrOutstandingAndCheckThrottle(ReplyHeader h) {

public abstract void close(DisconnectReason reason);

public abstract void sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat) throws IOException;
/**
* Serializes a ZooKeeper response and enqueues it for sending.
*
* Serializes client response parts and enqueues them into outgoing queue.
*
* If both cache key and last modified zxid are provided, the serialized
* response is caсhed under the provided key, the last modified zxid is
* stored along with the value. A cache entry is invalidated if the
* provided last modified zxid is more recent than the stored one.
*
* Attention: this function is not thread safe, due to caching not being
* thread safe.
*
* @param h reply header
* @param r reply payload, can be null
* @param tag Jute serialization tag, can be null
* @param cacheKey Key for caching the serialized payload. A null value prevents caching.
* @param stat Stat information for the the reply payload, used for cache invalidation.
* A value of 0 prevents caching.
* @param opCode The op code appertains to the corresponding request of the response,
* used to decide which cache (e.g. read response cache,
* list of children response cache, ...) object to look up to when applicable.
*/
public abstract void sendResponse(ReplyHeader h, Record r, String tag,
String cacheKey, Stat stat, int opCode) throws IOException;

public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException {
sendResponse(h, r, tag, null, null);
sendResponse(h, r, tag, null, null, -1);
}

protected byte[] serializeRecord(Record record) throws IOException {
Expand All @@ -174,11 +200,30 @@ protected byte[] serializeRecord(Record record) throws IOException {
return baos.toByteArray();
}

protected ByteBuffer[] serialize(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat) throws IOException {
protected ByteBuffer[] serialize(ReplyHeader h, Record r, String tag,
String cacheKey, Stat stat, int opCode) throws IOException {
byte[] header = serializeRecord(h);
byte[] data = null;
if (r != null) {
ResponseCache cache = zkServer.getReadResponseCache();
ResponseCache cache = null;
Counter cacheHit = null, cacheMiss = null;
switch (opCode) {
case OpCode.getData : {
cache = zkServer.getReadResponseCache();
cacheHit = ServerMetrics.getMetrics().RESPONSE_PACKET_CACHE_HITS;
cacheMiss = ServerMetrics.getMetrics().RESPONSE_PACKET_CACHE_MISSING;
break;
}
case OpCode.getChildren2 : {
cache = zkServer.getGetChildrenResponseCache();
cacheHit = ServerMetrics.getMetrics().RESPONSE_PACKET_GET_CHILDREN_CACHE_HITS;
cacheMiss = ServerMetrics.getMetrics().RESPONSE_PACKET_GET_CHILDREN_CACHE_MISSING;
break;
}
default:
// op codes where response cache is not supported.
}

if (cache != null && stat != null && cacheKey != null && !cacheKey.endsWith(Quotas.statNode)) {
// Use cache to get serialized data.
//
Expand All @@ -189,9 +234,9 @@ protected ByteBuffer[] serialize(ReplyHeader h, Record r, String tag, String cac
// Cache miss, serialize the response and put it in cache.
data = serializeRecord(r);
cache.put(cacheKey, data, stat);
ServerMetrics.getMetrics().RESPONSE_PACKET_CACHE_MISSING.add(1);
cacheMiss.add(1);
} else {
ServerMetrics.getMetrics().RESPONSE_PACKET_CACHE_HITS.add(1);
cacheHit.add(1);
}
} else {
data = serializeRecord(r);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ private ServerMetrics(MetricsProvider metricsProvider) {
NODE_CHANGED_WATCHER = metricsContext.getSummary("node_changed_watch_count", DetailLevel.BASIC);
NODE_CHILDREN_WATCHER = metricsContext.getSummary("node_children_watch_count", DetailLevel.BASIC);


/*
* Number of dead watchers in DeadWatcherListener
*/
Expand All @@ -106,6 +105,8 @@ private ServerMetrics(MetricsProvider metricsProvider) {

RESPONSE_PACKET_CACHE_HITS = metricsContext.getCounter("response_packet_cache_hits");
RESPONSE_PACKET_CACHE_MISSING = metricsContext.getCounter("response_packet_cache_misses");
RESPONSE_PACKET_GET_CHILDREN_CACHE_HITS = metricsContext.getCounter("response_packet_get_children_cache_hits");
RESPONSE_PACKET_GET_CHILDREN_CACHE_MISSING = metricsContext.getCounter("response_packet_get_children_cache_misses");

ENSEMBLE_AUTH_SUCCESS = metricsContext.getCounter("ensemble_auth_success");

Expand Down Expand Up @@ -338,8 +339,14 @@ private ServerMetrics(MetricsProvider metricsProvider) {
public final Counter DEAD_WATCHERS_QUEUED;
public final Counter DEAD_WATCHERS_CLEARED;
public final Summary DEAD_WATCHERS_CLEANER_LATENCY;

/*
* Response cache hit and miss metrics.
*/
public final Counter RESPONSE_PACKET_CACHE_HITS;
public final Counter RESPONSE_PACKET_CACHE_MISSING;
public final Counter RESPONSE_PACKET_GET_CHILDREN_CACHE_HITS;
public final Counter RESPONSE_PACKET_GET_CHILDREN_CACHE_MISSING;

/**
* Learner handler quorum packet metrics.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public static void setCloseSessionTxnEnabled(boolean enabled) {
private FileTxnSnapLog txnLogFactory = null;
private ZKDatabase zkDb;
private ResponseCache readResponseCache;
private ResponseCache getChildrenResponseCache;
private final AtomicLong hzxid = new AtomicLong(0);
public static final Exception ok = new Exception("No prob");
protected RequestProcessor firstProcessor;
Expand Down Expand Up @@ -217,6 +218,9 @@ protected enum State {
public static final int DEFAULT_STARTING_BUFFER_SIZE = 1024;
public static final int intBufferStartingSizeBytes;

public static final String GET_DATA_RESPONSE_CACHE_SIZE = "zookeeper.maxResponseCacheSize";
public static final String GET_CHILDREN_RESPONSE_CACHE_SIZE = "zookeeper.maxGetChildrenResponseCacheSize";

static {
long configuredFlushDelay = Long.getLong(FLUSH_DELAY, 0);
setFlushDelay(configuredFlushDelay);
Expand Down Expand Up @@ -306,7 +310,13 @@ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessio

listener = new ZooKeeperServerListenerImpl(this);

readResponseCache = new ResponseCache();
readResponseCache = new ResponseCache(Integer.getInteger(
GET_DATA_RESPONSE_CACHE_SIZE,
ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE));

getChildrenResponseCache = new ResponseCache(Integer.getInteger(
GET_CHILDREN_RESPONSE_CACHE_SIZE,
ResponseCache.DEFAULT_RESPONSE_CACHE_SIZE));

this.initialConfig = initialConfig;

Expand Down Expand Up @@ -1764,6 +1774,10 @@ public ResponseCache getReadResponseCache() {
return isResponseCachingEnabled ? readResponseCache : null;
}

public ResponseCache getGetChildrenResponseCache() {
return isResponseCachingEnabled ? getChildrenResponseCache : null;
}

protected void registerMetrics() {
MetricsContext rootContext = ServerMetrics.getMetrics().getMetricsProvider().getRootContext();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public void close(DisconnectReason reason) {
}

@Override
public void sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat) throws IOException {
public void sendResponse(ReplyHeader h, Record r, String tag,
String cacheKey, Stat stat, int opCode) throws IOException {
}

@Override
Expand Down
Loading

0 comments on commit 45b6c0f

Please sign in to comment.