Skip to content

Commit

Permalink
Merge pull request #37 from vemilyus/fix/cache-hang-on-failed-deser
Browse files Browse the repository at this point in the history
Handling exceptions caused by value serializers
  • Loading branch information
graemerocher committed Jun 4, 2020
2 parents abb8adf + 5398ee5 commit 04cf6c6
Showing 1 changed file with 53 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,14 @@ public <T> CompletableFuture<T> get(Object key, Argument<T> requiredType, Suppli
result.completeExceptionally(throwable);
} else {
if (data != null) {
Optional<T> deserialized = valueSerializer.deserialize(data, requiredType.getType());
Optional<T> deserialized;
try {
deserialized = valueSerializer.deserialize(data, requiredType.getType());
} catch (Throwable t) {
result.completeExceptionally(t);
return;
}

boolean hasValue = deserialized.isPresent();
if (expireAfterAccess != null && hasValue) {
async.expire(serializedKey, expireAfterAccess).whenComplete((s, throwable1) -> {
Expand Down Expand Up @@ -395,7 +402,14 @@ public <T> CompletableFuture<Optional<T>> putIfAbsent(Object key, T value) {
if (data != null) {
completeGet(Argument.of((Class<T>) value.getClass()), result, async, serializedKey, data);
} else {
Optional<byte[]> serialized = valueSerializer.serialize(value);
Optional<byte[]> serialized;
try {
serialized = valueSerializer.serialize(value);
} catch (Throwable t) {
result.completeExceptionally(t);
return;
}

if (serialized.isPresent()) {
RedisFuture<Void> putOperation = newPutOperation(async, serializedKey, serialized.get(), value);
putOperation.whenComplete((s, throwable12) -> {
Expand Down Expand Up @@ -423,21 +437,28 @@ public CompletableFuture<Boolean> put(Object key, Object value) {
}
};
byte[] serializedKey = serializeKey(key);
Optional<byte[]> serialized = valueSerializer.serialize(value);
if (serialized.isPresent()) {
getAsync().thenAccept(async -> {
RedisFuture<Void> future = newPutOperation(async, serializedKey, serialized.get(), value);
future.whenComplete(booleanConsumer);
});
} else {
getAsync().thenAccept(async -> async.remove(serializedKey).whenComplete((aLong, throwable) -> {
if (throwable == null) {
result.complete(true);
} else {
result.completeExceptionally(throwable);
}
}));
Optional<byte[]> serialized;
try {
serialized = valueSerializer.serialize(value);

if (serialized.isPresent()) {
getAsync().thenAccept(async -> {
RedisFuture<Void> future = newPutOperation(async, serializedKey, serialized.get(), value);
future.whenComplete(booleanConsumer);
});
} else {
getAsync().thenAccept(async -> async.remove(serializedKey).whenComplete((aLong, throwable) -> {
if (throwable == null) {
result.complete(true);
} else {
result.completeExceptionally(throwable);
}
}));
}
} catch (Throwable t) {
result.completeExceptionally(t);
}

return result;
}

Expand Down Expand Up @@ -484,7 +505,14 @@ public String getName() {
}

private <T> void completeGet(Argument<T> requiredType, CompletableFuture<Optional<T>> result, AsyncCacheCommands async, byte[] serializedKey, byte[] data) {
Optional<T> deserialized = valueSerializer.deserialize(data, requiredType.getType());
Optional<T> deserialized;
try {
deserialized = valueSerializer.deserialize(data, requiredType.getType());
} catch (Throwable t) {
result.completeExceptionally(t);
return;
}

if (expireAfterAccess != null && deserialized.isPresent()) {
async.expire(serializedKey, expireAfterAccess).whenComplete((s, throwable1) -> {
if (throwable1 != null) {
Expand All @@ -509,7 +537,14 @@ private <T> void invokeSupplier(byte[] serializedKey, Supplier<T> supplier, Asyn
}
if (!hasSupplierError) {

Optional<byte[]> serialized = valueSerializer.serialize(value);
Optional<byte[]> serialized;
try {
serialized = valueSerializer.serialize(value);
} catch (Throwable t) {
result.completeExceptionally(t);
return;
}

if (serialized.isPresent()) {
RedisFuture<Void> future = newPutOperation(async, serializedKey, serialized.get(), value);
T finalValue = value;
Expand All @@ -533,6 +568,5 @@ private RedisFuture<Void> newPutOperation(AsyncCacheCommands async, byte[] seria
return async.put(serializedKey, serialized);
}
}

}
}

0 comments on commit 04cf6c6

Please sign in to comment.