Skip to content

Commit

Permalink
feat: Wait Till Cache is ready (#73)
Browse files Browse the repository at this point in the history
* feat: Wait Till Cache is ready

During the Cache client creation, do a gets to determine if the cache is ready and available for use. This will allow us to fail early at Cache Client creation rather than at the time of doing gets/sets.

* update as per review feedback

* fix build

* faster retries

* Update exception handling

* update exception handling
  • Loading branch information
gautamomento committed Oct 5, 2021
1 parent f8e8d1d commit ab43e80
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 64 deletions.
53 changes: 13 additions & 40 deletions momento-sdk/src/intTest/java/momento/sdk/CacheTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

final class CacheTest {
Expand Down Expand Up @@ -198,54 +197,28 @@ private static void testMissHappyPathInternal(Cache client) {

@Test
void testBadAuthToken() {
Cache badCredClient = getCache("BAD_TOKEN", "dummy", Optional.empty());
testBadAuthToken(badCredClient);
}

@Test
void testBadAuthTokenWithTracing() throws Exception {
startIntegrationTestOtel();
OpenTelemetrySdk openTelemetry = setOtelSDK();
Cache client = getCache("BAD_TOKEN", "dummy", Optional.of(openTelemetry));
testBadAuthToken(client);
// To accommodate for delays in tracing logs to appear in docker
Thread.sleep(1000);
verifySetTrace("1");
verifyGetTrace("1");
}

private static void testBadAuthToken(Cache badCredClient) {
// Bad Auth for Get
assertThrows(PermissionDeniedException.class, () -> badCredClient.get("myCacheKey"));

// Bad Auth for Set
assertThrows(
PermissionDeniedException.class,
() ->
badCredClient.set(
"myCacheKey",
ByteBuffer.wrap("cache me if you can".getBytes(StandardCharsets.UTF_8)),
500));
PermissionDeniedException.class, () -> getCache("BAD_TOKEN", "dummy", Optional.empty()));
}

@Test
public void unreachableEndpoint_ThrowsException() {
Cache cache =
new Cache(
System.getenv("TEST_AUTH_TOKEN"),
System.getenv("TEST_CACHE_NAME"),
"nonexistent.preprod.a.momentohq.com");

assertThrows(ClientSdkException.class, () -> cache.get("key"));
assertThrows(
ClientSdkException.class,
() ->
new Cache(
System.getenv("TEST_AUTH_TOKEN"),
System.getenv("TEST_CACHE_NAME"),
"nonexistent.preprod.a.momentohq.com"));
}

@Disabled("TODO: Update to catch cache not ready and then do a get again to see not found.")
@Test
public void invalidCache_ThrowsNotFoundException() {
Cache cache =
getCache(System.getenv("TEST_AUTH_TOKEN"), UUID.randomUUID().toString(), Optional.empty());

assertThrows(CacheNotFoundException.class, () -> cache.get("key"));
assertThrows(
CacheNotFoundException.class,
() ->
getCache(
System.getenv("TEST_AUTH_TOKEN"), UUID.randomUUID().toString(), Optional.empty()));
}

@Test
Expand Down
48 changes: 31 additions & 17 deletions momento-sdk/src/main/java/momento/sdk/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import grpc.cache_client.SetResponse;
import io.grpc.ClientInterceptor;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
Expand Down Expand Up @@ -47,15 +49,6 @@ public final class Cache implements Closeable {
private final ManagedChannel channel;
private final Optional<Tracer> tracer;

/**
* Builds an instance of {@link Cache} used to interact w/ SCS with a default endpoint.
*
* @param authToken Token to authenticate with Cache Service
*/
public Cache(String authToken, String cacheName) {
this(authToken, cacheName, Optional.empty());
}

/**
* Builds an instance of {@link Cache} that will interact with a specified endpoint
*
Expand All @@ -66,14 +59,6 @@ public Cache(String authToken, String cacheName, String endpoint) {
this(authToken, cacheName, Optional.empty(), endpoint);
}

/**
* @param authToken Token to authenticate with SCS
* @param openTelemetry Open telemetry instance to hook into client traces
*/
public Cache(String authToken, String cacheName, Optional<OpenTelemetry> openTelemetry) {
this(authToken, cacheName, openTelemetry, "alpha.cacheservice.com");
}

/**
* Builds an instance of {@link Cache} used to interact w/ SCS
*
Expand Down Expand Up @@ -124,6 +109,35 @@ public Cache(
this.futureStub = ScsGrpc.newFutureStub(channel);
this.channel = channel;
this.tracer = openTelemetry.map(ot -> ot.getTracer("momento-java-scs-client", "1.0.0"));
waitTillReady();
}

private void waitTillReady() {
long start = System.currentTimeMillis();
long maxRetryDurationMillis = 5000;
long backoffDurationMillis = 5;
StatusRuntimeException lastRetriedException = null;

while (System.currentTimeMillis() - start < maxRetryDurationMillis) {
try {
// The key has no special meaning. Just any key string would work.
this.blockingStub.get(buildGetRequest(convert("000")));
return;
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.UNKNOWN
|| e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
try {
Thread.sleep(backoffDurationMillis);
} catch (InterruptedException t) {
throw CacheServiceExceptionMapper.convert(t);
}
lastRetriedException = e;
} else {
throw CacheServiceExceptionMapper.convert(e);
}
}
}
throw CacheServiceExceptionMapper.convertUnhandledExceptions(lastRetriedException);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,22 @@ public static SdkException convert(Exception e) {
return new CacheNotFoundException(grpcException.getMessage());

default:
if (isDnsUnreachable(grpcException)) {
return new ClientSdkException(
String.format(
"Unable to reach request endpoint. Request failed with %s",
grpcException.getMessage()));
}
return new InternalServerException(INTERNAL_SERVER_ERROR_MESSAGE);
return convertUnhandledExceptions(grpcException);
}
}

return new ClientSdkException("SDK Failed to process the request", e);
}

public static SdkException convertUnhandledExceptions(StatusRuntimeException e) {
if (isDnsUnreachable(e)) {
return new ClientSdkException(
String.format(
"Unable to reach request endpoint. Request failed with %s", e.getMessage()));
}
return new InternalServerException(INTERNAL_SERVER_ERROR_MESSAGE);
}

private static boolean isDnsUnreachable(StatusRuntimeException e) {
return e.getStatus().getCode() == Status.Code.UNAVAILABLE
&& e.getCause() instanceof RuntimeException
Expand Down

0 comments on commit ab43e80

Please sign in to comment.