Skip to content

Commit

Permalink
fewChangesAroundDiagnosticsHandler (Azure#39077)
Browse files Browse the repository at this point in the history
* few changes in diagnostics provider

---------

Co-authored-by: annie-mac <xinlian@microsoft.com>

(cherry picked from commit b052394)
  • Loading branch information
xinlian12 authored and jeet1995 committed Mar 12, 2024
1 parent 2f760e8 commit 8e22760
Show file tree
Hide file tree
Showing 7 changed files with 386 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@

import com.azure.core.util.Context;
import com.azure.cosmos.implementation.ConsoleLoggingRegistryFactory;
import com.azure.cosmos.implementation.DiagnosticsProviderJvmFatalErrorMapper;
import com.azure.cosmos.implementation.Exceptions;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosClientTelemetryConfig;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosMicrometerMetricsOptions;
Expand All @@ -21,11 +25,14 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Factory;
import org.testng.annotations.Test;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
Expand Down Expand Up @@ -232,6 +239,104 @@ public void defaultLoggerWithLegacyOpenTelemetryTraces() {
System.setProperty("COSMOS.USE_LEGACY_TRACING", "false");
}

@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void nullPointerDiagnosticsHandler() {
NullPointerDiagnosticsHandle capturingHandler = new NullPointerDiagnosticsHandle();

CosmosClientBuilder builder = this
.getClientBuilder()
.clientTelemetryConfig(new CosmosClientTelemetryConfig().diagnosticsHandler(capturingHandler));

CosmosContainer container = this.getContainer(builder);

AtomicBoolean systemExited = new AtomicBoolean(false);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
systemExited.set(true);
}
});

executeTestCase(container);
assertThat(systemExited.get()).isFalse();
}

@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void OOMDiagnosticsHandler() throws InterruptedException {
// validate when false, System.exit will not be called
System.setProperty("COSMOS.DIAGNOSTICS_PROVIDER_SYSTEM_EXIT_ON_ERROR", "false");

try {
OOMDiagnosticsHandle capturingHandler = new OOMDiagnosticsHandle();

CosmosClientBuilder builder = this
.getClientBuilder()
.clientTelemetryConfig(new CosmosClientTelemetryConfig().diagnosticsHandler(capturingHandler));

CosmosContainer container = this.getContainer(builder);

AtomicBoolean systemExited = new AtomicBoolean(false);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
systemExited.set(true);
}
});

// with OOM exception, if no system.exit is called, the reactor thread will hang
// so put it in different thead
Mono.just(this)
.doOnNext(t -> executeTestCase(container))
.subscribeOn(Schedulers.parallel())
.subscribe();

Thread.sleep(2000);
assertThat(systemExited.get()).isFalse();
} finally {
System.clearProperty("COSMOS.DIAGNOSTICS_PROVIDER_SYSTEM_EXIT_ON_ERROR");
}
}

@Test(groups = { "emulator" }, timeOut = TIMEOUT)
public void OOMDiagnosticsHandlerWithErrorMapper() throws JsonProcessingException {
DiagnosticsProviderJvmFatalErrorMapper
.getMapper()
.registerFatalErrorMapper((error) -> new NullPointerException("test"));

// validate when false, System.exit will not be called
OOMDiagnosticsHandle capturingHandler = new OOMDiagnosticsHandle(ResourceType.Document, OperationType.Create);

CosmosClientBuilder builder = this
.getClientBuilder()
.clientTelemetryConfig(new CosmosClientTelemetryConfig().diagnosticsHandler(capturingHandler));

CosmosContainer container = this.getContainer(builder);

AtomicBoolean systemExited = new AtomicBoolean(false);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
systemExited.set(true);
}
});

try {
executeTestCase(container);
fail("should fail with RuntimeException");

} catch (RuntimeException e) {
assertThat(e.getCause() instanceof NullPointerException).isTrue();
assertThat(systemExited.get()).isFalse();
}

// doing an upsert and verify in the diagnostics contain mapper execution count
CosmosDiagnostics cosmosDiagnostics =
container.upsertItem(getDocumentDefinition(UUID.randomUUID().toString())).getDiagnostics();
ObjectNode cosmosDiagnosticsNode = (ObjectNode) Utils.getSimpleObjectMapper().readTree(cosmosDiagnostics.toString());
assertThat(cosmosDiagnosticsNode.get("jvmFatalErrorMapperExecutionCount")).isNotNull();
assertThat(cosmosDiagnosticsNode.get("jvmFatalErrorMapperExecutionCount").asLong()).isGreaterThan(0);
}

private void executeTestCase(CosmosContainer container) {
String id = UUID.randomUUID().toString();
CosmosItemResponse<ObjectNode> response = container.createItem(
Expand Down Expand Up @@ -340,4 +445,40 @@ public List<String> getLoggedMessages() {
return this.loggedMessages;
}
}

private static class NullPointerDiagnosticsHandle implements CosmosDiagnosticsHandler {

@Override
public void handleDiagnostics(CosmosDiagnosticsContext diagnosticsContext, Context traceContext) {
throw new NullPointerException("NullPointerDiagnosticsHandle");
}
}

private static class OOMDiagnosticsHandle implements CosmosDiagnosticsHandler {
private ResourceType resourceType;
private OperationType operationType;
private final boolean oomLimitByResourceAndOperationType;

public OOMDiagnosticsHandle(ResourceType resourceType, OperationType operationType) {
this.resourceType = resourceType;
this.operationType = operationType;
this.oomLimitByResourceAndOperationType = true;
}

public OOMDiagnosticsHandle() {
this.oomLimitByResourceAndOperationType = false;
}

@Override
public void handleDiagnostics(CosmosDiagnosticsContext diagnosticsContext, Context traceContext) {
if (this.oomLimitByResourceAndOperationType) {
if (diagnosticsContext.getOperationType() == this.operationType.toString()
&& diagnosticsContext.getResourceType() == this.resourceType.toString()) {
throw new OutOfMemoryError("OOMDiagnosticsHandle");
}
} else {
throw new OutOfMemoryError("OOMDiagnosticsHandle");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation;

import org.testng.annotations.Test;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

public class DiagnosticsProviderJvmFatalErrorMapperTest {
@Test(groups = "unit")
public void noFatalErrorMapperRegistered() {
Exception mappedException =
DiagnosticsProviderJvmFatalErrorMapper.getMapper().mapFatalError(new OutOfMemoryError("Test"));
assertThat(mappedException).isNull();
}

@Test(groups = "unit")
public void fatalErrorMapperRegistered() {
DiagnosticsProviderJvmFatalErrorMapper.getMapper().registerFatalErrorMapper(
(error) -> new NullPointerException(error.getMessage()));

Exception mappedException =
DiagnosticsProviderJvmFatalErrorMapper.getMapper().mapFatalError(new OutOfMemoryError("Test"));
assertThat(mappedException).isNotNull();
assertThat(mappedException).isInstanceOf(NullPointerException.class);

// test the exception mapper throw exception
DiagnosticsProviderJvmFatalErrorMapper.getMapper().registerFatalErrorMapper(
(error) -> { throw new RuntimeException("Failed during mapping"); });
mappedException = DiagnosticsProviderJvmFatalErrorMapper.getMapper().mapFatalError(new OutOfMemoryError("Test"));
assertThat(mappedException).isNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,12 @@ public void serialize(
// Error while evaluating system information, do nothing
}

long diagnosticsProviderFatalErrorExecutionCount =
DiagnosticsProviderJvmFatalErrorMapper.getMapper().getMapperExecutionCount();
if (diagnosticsProviderFatalErrorExecutionCount > 0) {
generator.writeNumberField("jvmFatalErrorMapperExecutionCount", diagnosticsProviderFatalErrorExecutionCount);
}

generator.writeObjectField("clientCfgs", statistics.diagnosticsClientConfig);
generator.writeEndObject();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,19 @@ public class Configs {
private static final String MAX_HTTP_HEADER_SIZE_IN_BYTES = "COSMOS.MAX_HTTP_HEADER_SIZE_IN_BYTES";
private static final String MAX_DIRECT_HTTPS_POOL_SIZE = "COSMOS.MAX_DIRECT_HTTP_CONNECTION_LIMIT";
private static final String HTTP_RESPONSE_TIMEOUT_IN_SECONDS = "COSMOS.HTTP_RESPONSE_TIMEOUT_IN_SECONDS";

public static final int DEFAULT_HTTP_DEFAULT_CONNECTION_POOL_SIZE = 1000;
public static final String HTTP_DEFAULT_CONNECTION_POOL_SIZE = "COSMOS.DEFAULT_HTTP_CONNECTION_POOL_SIZE";
public static final String HTTP_DEFAULT_CONNECTION_POOL_SIZE_VARIABLE = "COSMOS_DEFAULT_HTTP_CONNECTION_POOL_SIZE";

public static final boolean DEFAULT_E2E_FOR_NON_POINT_DISABLED_DEFAULT = false;
public static final String DEFAULT_E2E_FOR_NON_POINT_DISABLED = "COSMOS.E2E_FOR_NON_POINT_DISABLED";
public static final String DEFAULT_E2E_FOR_NON_POINT_DISABLED_VARIABLE = "COSMOS_E2E_FOR_NON_POINT_DISABLED";

public static final int DEFAULT_HTTP_MAX_REQUEST_TIMEOUT = 60;
public static final String HTTP_MAX_REQUEST_TIMEOUT = "COSMOS.HTTP_MAX_REQUEST_TIMEOUT";
public static final String HTTP_MAX_REQUEST_TIMEOUT_VARIABLE = "COSMOS_HTTP_MAX_REQUEST_TIMEOUT";

private static final String QUERY_PLAN_RESPONSE_TIMEOUT_IN_SECONDS = "COSMOS.QUERY_PLAN_RESPONSE_TIMEOUT_IN_SECONDS";
private static final String ADDRESS_REFRESH_RESPONSE_TIMEOUT_IN_SECONDS = "COSMOS.ADDRESS_REFRESH_RESPONSE_TIMEOUT_IN_SECONDS";

Expand Down Expand Up @@ -93,24 +106,29 @@ public class Configs {
// Reactor Netty Constants
private static final Duration MAX_IDLE_CONNECTION_TIMEOUT = Duration.ofSeconds(60);
private static final Duration CONNECTION_ACQUIRE_TIMEOUT = Duration.ofSeconds(45);
private static final int REACTOR_NETTY_MAX_CONNECTION_POOL_SIZE = 1000;
private static final String REACTOR_NETTY_CONNECTION_POOL_NAME = "reactor-netty-connection-pool";
private static final int DEFAULT_HTTP_RESPONSE_TIMEOUT_IN_SECONDS = 60;
private static final int DEFAULT_QUERY_PLAN_RESPONSE_TIMEOUT_IN_SECONDS = 5;
private static final int DEFAULT_ADDRESS_REFRESH_RESPONSE_TIMEOUT_IN_SECONDS = 5;

// SessionTokenMismatchRetryPolicy Constants
private static final String DEFAULT_SESSION_TOKEN_MISMATCH_WAIT_TIME_IN_MILLISECONDS_NAME =
public static final String DEFAULT_SESSION_TOKEN_MISMATCH_WAIT_TIME_IN_MILLISECONDS_NAME =
"COSMOS.DEFAULT_SESSION_TOKEN_MISMATCH_WAIT_TIME_IN_MILLISECONDS";
private static final int DEFAULT_SESSION_TOKEN_MISMATCH_WAIT_TIME_IN_MILLISECONDS = 5000;

private static final String DEFAULT_SESSION_TOKEN_MISMATCH_INITIAL_BACKOFF_TIME_IN_MILLISECONDS_NAME =
public static final String DEFAULT_SESSION_TOKEN_MISMATCH_INITIAL_BACKOFF_TIME_IN_MILLISECONDS_NAME =
"COSMOS.DEFAULT_SESSION_TOKEN_MISMATCH_INITIAL_BACKOFF_TIME_IN_MILLISECONDS";
private static final int DEFAULT_SESSION_TOKEN_MISMATCH_INITIAL_BACKOFF_TIME_IN_MILLISECONDS = 5;

private static final String DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS_NAME =
public static final String DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS_NAME =
"COSMOS.DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS";
private static final int DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS = 50;
private static final int DEFAULT_SESSION_TOKEN_MISMATCH_MAXIMUM_BACKOFF_TIME_IN_MILLISECONDS = 500;

public static final int MIN_MIN_IN_REGION_RETRY_TIME_FOR_WRITES_MS = 100;

private static final String DEFAULT_MIN_IN_REGION_RETRY_TIME_FOR_WRITES_MS_NAME =
"COSMOS.DEFAULT_SESSION_TOKEN_MISMATCH_IN_REGION-RETRY_TIME_IN_MILLISECONDS";
private static final int DEFAULT_MIN_IN_REGION_RETRY_TIME_FOR_WRITES_MS = 500;

// Whether to process the response on a different thread
private static final String SWITCH_OFF_IO_THREAD_FOR_RESPONSE_NAME = "COSMOS.SWITCH_OFF_IO_THREAD_FOR_RESPONSE";
Expand All @@ -135,11 +153,27 @@ public class Configs {
private static final String MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT = "COSMOS.MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT";
private static final int DEFAULT_MIN_CONNECTION_POOL_SIZE_PER_ENDPOINT = 1;

private static final String MAX_TRACE_MESSAGE_LENGTH = "COSMOS.MAX_TRACE_MESSAGE_LENGTH";
private static final int DEFAULT_MAX_TRACE_MESSAGE_LENGTH = 32 * 1024;

private static final int MIN_MAX_TRACE_MESSAGE_LENGTH = 8 * 1024;

private static final String AGGRESSIVE_WARMUP_CONCURRENCY = "COSMOS.AGGRESSIVE_WARMUP_CONCURRENCY";
private static final int DEFAULT_AGGRESSIVE_WARMUP_CONCURRENCY = Configs.getCPUCnt();

private static final String DEFENSIVE_WARMUP_CONCURRENCY = "COSMOS.DEFENSIVE_WARMUP_CONCURRENCY";
private static final int DEFAULT_DEFENSIVE_WARMUP_CONCURRENCY = 1;
private static final String OPEN_CONNECTIONS_CONCURRENCY = "COSMOS.OPEN_CONNECTIONS_CONCURRENCY";
private static final int DEFAULT_OPEN_CONNECTIONS_CONCURRENCY = 1;

public static final String MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = "COSMOS.MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED";
private static final int DEFAULT_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = 1;

public static final int MIN_MAX_RETRIES_IN_LOCAL_REGION_WHEN_REMOTE_REGION_PREFERRED = 1;

public static final String TCP_CONNECTION_ACQUISITION_TIMEOUT_IN_MS = "COSMOS.TCP_CONNECTION_ACQUISITION_TIMEOUT_IN_MS";

// Error handling strategy in diagnostics provider
public static final String DIAGNOSTICS_PROVIDER_SYSTEM_EXIT_ON_ERROR = "COSMOS.DIAGNOSTICS_PROVIDER_SYSTEM_EXIT_ON_ERROR";
public static final boolean DEFAULT_DIAGNOSTICS_PROVIDER_SYSTEM_EXIT_ON_ERROR = true;

public Configs() {
this.sslContext = sslContextInit();
Expand Down Expand Up @@ -251,10 +285,6 @@ public Duration getConnectionAcquireTimeout() {
return CONNECTION_ACQUIRE_TIMEOUT;
}

public int getReactorNettyMaxConnectionPoolSize() {
return REACTOR_NETTY_MAX_CONNECTION_POOL_SIZE;
}

public static int getHttpResponseTimeoutInSeconds() {
return getJVMConfigAsInt(HTTP_RESPONSE_TIMEOUT_IN_SECONDS, DEFAULT_HTTP_RESPONSE_TIMEOUT_IN_SECONDS);
}
Expand All @@ -280,6 +310,48 @@ public static String getNonIdempotentWriteRetryPolicy() {
return System.getenv(NON_IDEMPOTENT_WRITE_RETRY_POLICY_VARIABLE);
}

public static int getDefaultHttpPoolSize() {
String valueFromSystemProperty = System.getProperty(HTTP_DEFAULT_CONNECTION_POOL_SIZE);
if (valueFromSystemProperty != null && !valueFromSystemProperty.isEmpty()) {
return Integer.valueOf(valueFromSystemProperty);
}

String valueFromEnvVariable = System.getenv(HTTP_DEFAULT_CONNECTION_POOL_SIZE_VARIABLE);
if (valueFromEnvVariable != null && !valueFromEnvVariable.isEmpty()) {
return Integer.valueOf(valueFromEnvVariable);
}

return DEFAULT_HTTP_DEFAULT_CONNECTION_POOL_SIZE;
}

public static boolean isDefaultE2ETimeoutDisabledForNonPointOperations() {
String valueFromSystemProperty = System.getProperty(DEFAULT_E2E_FOR_NON_POINT_DISABLED);
if (valueFromSystemProperty != null && !valueFromSystemProperty.isEmpty()) {
return Boolean.valueOf(valueFromSystemProperty);
}

String valueFromEnvVariable = System.getenv(DEFAULT_E2E_FOR_NON_POINT_DISABLED_VARIABLE);
if (valueFromEnvVariable != null && !valueFromEnvVariable.isEmpty()) {
return Boolean.valueOf(valueFromEnvVariable);
}

return DEFAULT_E2E_FOR_NON_POINT_DISABLED_DEFAULT;
}

public static int getMaxHttpRequestTimeout() {
String valueFromSystemProperty = System.getProperty(HTTP_MAX_REQUEST_TIMEOUT);
if (valueFromSystemProperty != null && !valueFromSystemProperty.isEmpty()) {
return Integer.valueOf(valueFromSystemProperty);
}

String valueFromEnvVariable = System.getenv(HTTP_MAX_REQUEST_TIMEOUT_VARIABLE);
if (valueFromEnvVariable != null && !valueFromEnvVariable.isEmpty()) {
return Integer.valueOf(valueFromEnvVariable);
}

return DEFAULT_HTTP_MAX_REQUEST_TIMEOUT;
}

public static String getEnvironmentName() {
return System.getProperty(ENVIRONMENT_NAME);
}
Expand Down Expand Up @@ -390,4 +462,15 @@ public static int getDefensiveWarmupConcurrency() {
public static int getAggressiveWarmupConcurrency() {
return getIntValue(System.getProperty(AGGRESSIVE_WARMUP_CONCURRENCY), DEFAULT_AGGRESSIVE_WARMUP_CONCURRENCY);
}

public static boolean shouldDiagnosticsProviderSystemExitOnError() {
String shouldSystemExit =
System.getProperty(
DIAGNOSTICS_PROVIDER_SYSTEM_EXIT_ON_ERROR,
firstNonNull(
emptyToNull(System.getenv().get(DIAGNOSTICS_PROVIDER_SYSTEM_EXIT_ON_ERROR)),
String.valueOf(DEFAULT_DIAGNOSTICS_PROVIDER_SYSTEM_EXIT_ON_ERROR)));

return Boolean.parseBoolean(shouldSystemExit);
}
}
Loading

0 comments on commit 8e22760

Please sign in to comment.