Skip to content

Commit

Permalink
ISPN-11731 Remove blocking writes check from Hot Rod
Browse files Browse the repository at this point in the history
  • Loading branch information
Gustavo Fernandes authored and wburns committed Aug 17, 2020
1 parent c68c255 commit 610e4e7
Showing 1 changed file with 8 additions and 45 deletions.
Expand Up @@ -37,11 +37,6 @@ class CacheRequestProcessor extends BaseRequestProcessor {
listenerRegistry = server.getClientListenerRegistry();
}

private boolean isBlockingWrite(CacheInfo cacheInfo, HotRodHeader header) {
// Note: cache store cannot be skipped (yet)
return cacheInfo.indexing && !header.isSkipIndexing();
}

void ping(HotRodHeader header, Subject subject) {
// we need to throw an exception when the cache is inaccessible
// but ignore the default cache, because the client always pings the default cache first
Expand Down Expand Up @@ -167,11 +162,7 @@ void put(HotRodHeader header, Subject subject, byte[] key, byte[] value, Metadat
CacheInfo cacheInfo = server.getCacheInfo(header);
AdvancedCache<byte[], byte[]> cache = server.cache(cacheInfo, header, subject);
metadata.version(cacheInfo.versionGenerator.generateNew());
if (isBlockingWrite(cacheInfo, header)) {
executor.execute(() -> putInternal(header, cache, key, value, metadata.build()));
} else {
putInternal(header, cache, key, value, metadata.build());
}
putInternal(header, cache, key, value, metadata.build());
}

private void putInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, byte[] key, byte[] value, Metadata metadata) {
Expand All @@ -191,11 +182,7 @@ void replaceIfUnmodified(HotRodHeader header, Subject subject, byte[] key, long
CacheInfo cacheInfo = server.getCacheInfo(header);
AdvancedCache<byte[], byte[]> cache = server.cache(cacheInfo, header, subject);
metadata.version(cacheInfo.versionGenerator.generateNew());
if (isBlockingWrite(cacheInfo, header)) {
executor.execute(() -> replaceIfUnmodifiedInternal(header, cache, key, version, value, metadata.build()));
} else {
replaceIfUnmodifiedInternal(header, cache, key, version, value, metadata.build());
}
replaceIfUnmodifiedInternal(header, cache, key, version, value, metadata.build());
}

private void replaceIfUnmodifiedInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, byte[] key, long version, byte[] value, Metadata metadata) {
Expand Down Expand Up @@ -232,11 +219,7 @@ void replace(HotRodHeader header, Subject subject, byte[] key, byte[] value, Met
CacheInfo cacheInfo = server.getCacheInfo(header);
AdvancedCache<byte[], byte[]> cache = server.cache(cacheInfo, header, subject);
metadata.version(cacheInfo.versionGenerator.generateNew());
if (isBlockingWrite(cacheInfo, header)) {
executor.execute(() -> replaceInternal(header, cache, key, value, metadata.build()));
} else {
replaceInternal(header, cache, key, value, metadata.build());
}
replaceInternal(header, cache, key, value, metadata.build());
}

private void replaceInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, byte[] key, byte[] value, Metadata metadata) {
Expand Down Expand Up @@ -272,11 +255,7 @@ void putIfAbsent(HotRodHeader header, Subject subject, byte[] key, byte[] value,
CacheInfo cacheInfo = server.getCacheInfo(header);
AdvancedCache<byte[], byte[]> cache = server.cache(cacheInfo, header, subject);
metadata.version(cacheInfo.versionGenerator.generateNew());
if (isBlockingWrite(cacheInfo, header)) {
executor.execute(() -> putIfAbsentInternal(header, cache, key, value, metadata.build()));
} else {
putIfAbsentInternal(header, cache, key, value, metadata.build());
}
putIfAbsentInternal(header, cache, key, value, metadata.build());
}

private void putIfAbsentInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, byte[] key, byte[] value, Metadata metadata) {
Expand Down Expand Up @@ -308,11 +287,7 @@ private void handlePutIfAbsent(HotRodHeader header, byte[] result, Throwable thr
void remove(HotRodHeader header, Subject subject, byte[] key) {
CacheInfo cacheInfo = server.getCacheInfo(header);
AdvancedCache<byte[], byte[]> cache = server.cache(cacheInfo, header, subject);
if (isBlockingWrite(cacheInfo, header)) {
executor.execute(() -> removeInternal(header, cache, key));
} else {
removeInternal(header, cache, key);
}
removeInternal(header, cache, key);
}

private void removeInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, byte[] key) {
Expand All @@ -332,11 +307,7 @@ private void handleRemove(HotRodHeader header, byte[] prev, Throwable throwable)
void removeIfUnmodified(HotRodHeader header, Subject subject, byte[] key, long version) {
CacheInfo cacheInfo = server.getCacheInfo(header);
AdvancedCache<byte[], byte[]> cache = server.cache(cacheInfo, header, subject);
if (isBlockingWrite(cacheInfo, header)) {
executor.execute(() -> removeIfUnmodifiedInternal(header, cache, key, version));
} else {
removeIfUnmodifiedInternal(header, cache, key, version);
}
removeIfUnmodifiedInternal(header, cache, key, version);
}

private void removeIfUnmodifiedInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, byte[] key, long version) {
Expand Down Expand Up @@ -371,11 +342,7 @@ private void handleGetForRemoveIfUnmodified(HotRodHeader header, AdvancedCache<b
void clear(HotRodHeader header, Subject subject) {
CacheInfo cacheInfo = server.getCacheInfo(header);
AdvancedCache<byte[], byte[]> cache = server.cache(cacheInfo, header, subject);
if (isBlockingWrite(cacheInfo, header)) {
executor.execute(() -> clearInternal(header, cache));
} else {
clearInternal(header, cache);
}
clearInternal(header, cache);
}

private void clearInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache) {
Expand All @@ -391,11 +358,7 @@ private void clearInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> ca
void putAll(HotRodHeader header, Subject subject, Map<byte[], byte[]> entries, Metadata.Builder metadata) {
CacheInfo cacheInfo = server.getCacheInfo(header);
AdvancedCache<byte[], byte[]> cache = server.cache(cacheInfo, header, subject);
if (isBlockingWrite(cacheInfo, header)) {
executor.execute(() -> putAllInternal(header, cache, entries, metadata.build()));
} else {
putAllInternal(header, cache, entries, metadata.build());
}
putAllInternal(header, cache, entries, metadata.build());
}

private void putAllInternal(HotRodHeader header, AdvancedCache<byte[], byte[]> cache, Map<byte[], byte[]> entries, Metadata metadata) {
Expand Down

0 comments on commit 610e4e7

Please sign in to comment.