Skip to content

Commit

Permalink
Provide a way to implement a HealthChecker with periodic polling (#3406)
Browse files Browse the repository at this point in the history
Motivation
#3392
Let a server implement a HealthChecker that updates its healthiness by polling an external source (e.g. database) periodically.

Modifications
* Add factory method on HealthChecker interface to allow user to provide their asynchronous checking easily.
* Implement `ScheduledHealthChecker` which can be used by multiple servers/HealthCheckService

Result
* Closes #3392
  • Loading branch information
kojilin committed May 12, 2021
1 parent b405ba4 commit 96c6523
Show file tree
Hide file tree
Showing 6 changed files with 530 additions and 0 deletions.
Expand Up @@ -284,6 +284,10 @@ public void serverStarting(Server server) throws Exception {
c.addListener(healthCheckerListener);
});
}
healthCheckers.stream()
.filter(ScheduledHealthChecker.class::isInstance)
.map(ScheduledHealthChecker.class::cast)
.forEach(ScheduledHealthChecker::startHealthChecker);
}

@Override
Expand All @@ -306,6 +310,10 @@ public void serverStopped(Server server) throws Exception {
c.removeListener(healthCheckerListener);
});
}
healthCheckers.stream()
.filter(ScheduledHealthChecker.class::isInstance)
.map(ScheduledHealthChecker.class::cast)
.forEach(ScheduledHealthChecker::stopHealthChecker);
}
});
}
Expand Down
@@ -0,0 +1,66 @@
/*
* Copyright 2021 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the Licenses
*/

package com.linecorp.armeria.server.healthcheck;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.base.MoreObjects;

import com.linecorp.armeria.common.annotation.UnstableApi;

/**
* The result of health check with interval for next check.
*/
@UnstableApi
public final class HealthCheckStatus {
private final boolean isHealthy;
private final long ttlMillis;

/**
* Create the result of the health check.
*
* @param isHealthy health check result
* @param ttlMillis interval for scheduling the next check
*/
public HealthCheckStatus(boolean isHealthy, long ttlMillis) {
checkArgument(ttlMillis > 0, "ttlMillis: %s (expected: > 0)", ttlMillis);
this.isHealthy = isHealthy;
this.ttlMillis = ttlMillis;
}

/**
* Return the result of health check.
*/
public boolean isHealthy() {
return isHealthy;
}

/**
* Return the interval for scheduling the next check.
*/
public long ttlMillis() {
return ttlMillis;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("isHealthy", isHealthy)
.add("ttlMillis", ttlMillis)
.toString();
}
}
Expand Up @@ -16,14 +16,78 @@

package com.linecorp.armeria.server.healthcheck;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;

import com.linecorp.armeria.common.CommonPools;
import com.linecorp.armeria.common.annotation.UnstableApi;
import com.linecorp.armeria.server.Server;

import io.netty.util.concurrent.EventExecutor;

/**
* Determines whether the {@link Server} is healthy. All registered {@link HealthChecker}s must return
* {@code true} for the {@link Server} to be considered healthy.
*/
@FunctionalInterface
public interface HealthChecker {

/**
* Returns a newly created {@link HealthChecker} that invokes the specified health checker
* repetitively on an arbitrary {@link EventExecutor} from {@link CommonPools#workerGroup()}.
*
* @param healthChecker A {@link Supplier} that performs a health check asynchronously and
* returns a future that will complete with a {@link HealthCheckStatus}.
* {@link HealthCheckStatus#ttlMillis()} determines the delay before the
* next health check. If the future completed exceptionally, the specified
* {@code fallbackTtl} will be used instead to determine the delay.
* {@link Supplier} should avoid returning null or throwing exception.
* The {@link CompletionStage} from {@link Supplier} should avoid completing
* with null result or failing.
* @param fallbackTtl The amount of delay between each health check if the previous health
* check failed unexpectedly so it's not possible to determine how long
* we have to wait until the next health check.
*
* @see #of(Supplier, Duration, EventExecutor)
*/
@UnstableApi
static HealthChecker of(Supplier<? extends CompletionStage<HealthCheckStatus>> healthChecker,
Duration fallbackTtl) {
return of(healthChecker, fallbackTtl, CommonPools.workerGroup().next());
}

/**
* Returns a newly created {@link HealthChecker} that invokes the specified health checker
* repetitively on the specified {@link EventExecutor}.
*
* @param healthChecker A {@link Supplier} that performs a health check asynchronously and
* returns a future that will complete with a {@link HealthCheckStatus}.
* {@link HealthCheckStatus#ttlMillis()} determines the delay before the
* next health check. If the future completed exceptionally, the specified
* {@code fallbackTtl} will be used instead to determine the delay.
* {@link Supplier} should avoid returning null or throwing exception.
* The {@link CompletionStage} from {@link Supplier} should avoid completing
* with null result or failing.
* @param fallbackTtl The amount of delay between each health check if the previous health
* check failed unexpectedly so it's not possible to determine how long
* we have to wait until the next health check.
* @param eventExecutor The {@link EventExecutor} that will invoke the specified {@code healthChecker}.
*/
@UnstableApi
static HealthChecker of(Supplier<? extends CompletionStage<HealthCheckStatus>> healthChecker,
Duration fallbackTtl, EventExecutor eventExecutor) {
requireNonNull(fallbackTtl, "fallbackTtl");
checkArgument(!fallbackTtl.isNegative() && !fallbackTtl.isZero(), "fallbackTtl: %s (expected: > 0)",
fallbackTtl);
return new ScheduledHealthChecker(requireNonNull(healthChecker, "healthChecker"),
fallbackTtl,
requireNonNull(eventExecutor, "eventExecutor"));
}

/**
* Returns {@code true} if and only if the {@link Server} is healthy.
*/
Expand Down
@@ -0,0 +1,217 @@
/*
* Copyright 2021 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the Licenses
*/

package com.linecorp.armeria.server.healthcheck;

import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.Futures;

import com.linecorp.armeria.common.util.AbstractListenable;

import io.netty.util.concurrent.EventExecutor;

/**
* A {@link ListenableHealthChecker} whose state can be set by the result of supplied CompletionStage.
* This class can be shared by multiple {@link HealthCheckService}.
*/
final class ScheduledHealthChecker extends AbstractListenable<HealthChecker>
implements ListenableHealthChecker {
private final Supplier<? extends CompletionStage<HealthCheckStatus>> healthChecker;
private final Duration fallbackTtl;
private final EventExecutor eventExecutor;
private final Consumer<HealthChecker> onHealthCheckerUpdate;
private final AtomicBoolean isHealthy = new AtomicBoolean();
private final AtomicInteger requestCount = new AtomicInteger();
private final AtomicReference<ScheduledHealthCheckerImpl> impl = new AtomicReference<>();

ScheduledHealthChecker(Supplier<? extends CompletionStage<HealthCheckStatus>> healthChecker,
Duration fallbackTtl, EventExecutor eventExecutor) {
this.healthChecker = healthChecker;
this.fallbackTtl = fallbackTtl;
this.eventExecutor = eventExecutor;

onHealthCheckerUpdate = latestValue -> {
isHealthy.set(latestValue.isHealthy());
notifyListeners(latestValue);
};
}

@Override
public boolean isHealthy() {
return isHealthy.get();
}

void startHealthChecker() {
if (requestCount.getAndIncrement() != 0) {
return;
}
final ScheduledHealthCheckerImpl newlyScheduled =
new ScheduledHealthCheckerImpl(healthChecker, fallbackTtl, eventExecutor);
// This spin prevents the following race condition, which occurs when this instance is shared
// by more than one server instance:
//
// 1. Server A starts.
// 2. Server A stops. decrementAndGet() returns 0, but it didn't clear `impl` yet.
// 3. Server B starts; getAndIncrement() returns 0, but `impl` isn't `null` yet.
//
// There should be no unexpectedly long spin here, as long as a caller makes sure to call
// `stopHealthChecker()` for each `startHealthChecker()`, because this method is guarded by
// `requestCount.getAndIncrement()` to allow only the first request to start to schedule
// a task.
for (;;) {
if (impl.compareAndSet(null, newlyScheduled)) {
newlyScheduled.startHealthChecker(onHealthCheckerUpdate);
break;
}
}
}

/**
* This method must be called after the paired startHealthChecker completes to
* guarantee the state consistency.
*/
void stopHealthChecker() {
final int currentCount = requestCount.decrementAndGet();
// Must be called after startHealthChecker, so it's always greater than or equal to 0.
assert currentCount >= 0;
if (currentCount != 0) {
return;
}

final ScheduledHealthCheckerImpl current = impl.getAndSet(null);
// Must be called after startHealthChecker, so it's always non null.
assert current != null;

current.stopHealthChecker(onHealthCheckerUpdate);
}

/**
* Used for test verification.
*/
@VisibleForTesting
int getRequestCount() {
return requestCount.get();
}

/**
* Used for test verification.
*/
@VisibleForTesting
boolean isActive() {
return impl.get() != null;
}

/**
* Health checker can be scheduled only once. Calling startHealthChecker won't work after stopHealthChecker.
* This class itself is not thread-safe and guarded by {@link ScheduledHealthChecker} to avoid calling
* {@link ScheduledHealthChecker#startHealthChecker()} and
* {@link ScheduledHealthChecker#stopHealthChecker()} concurrently.
*/
private static final class ScheduledHealthCheckerImpl {
private static final Logger logger = LoggerFactory.getLogger(ScheduledHealthCheckerImpl.class);

private final Supplier<? extends CompletionStage<HealthCheckStatus>> healthChecker;
private final Duration fallbackTtl;
private final EventExecutor eventExecutor;

private final SettableHealthChecker settableHealthChecker = new SettableHealthChecker(false);
// The state will be set by startHealthChecker and stopHealthChecker, then read by another
// scheduled task in runHealthCheck.
private volatile State state = State.INIT;
private volatile Future<?> scheduledFuture = Futures.immediateVoidFuture();

ScheduledHealthCheckerImpl(Supplier<? extends CompletionStage<HealthCheckStatus>> healthChecker,
Duration fallbackTtl, EventExecutor eventExecutor) {
this.healthChecker = healthChecker;
this.fallbackTtl = fallbackTtl;
this.eventExecutor = eventExecutor;
}

private void startHealthChecker(Consumer<? super HealthChecker> listener) {
if (state != State.INIT) {
return;
}
state = State.SCHEDULED;
settableHealthChecker.addListener(listener);
scheduledFuture = eventExecutor.submit(this::runHealthCheck);
}

private void stopHealthChecker(Consumer<?> listener) {
if (state != State.SCHEDULED) {
return;
}
state = State.FINISHED;
scheduledFuture.cancel(true);
settableHealthChecker.removeListener(listener);
}

private void runHealthCheck() {
if (state != State.SCHEDULED) {
return;
}
try {
healthChecker.get().handle((result, throwable) -> {
final boolean isHealthy;
final long intervalMillis;
if (throwable != null) {
logger.warn("Health checker throws an exception, schedule the next check after {}ms.",
fallbackTtl.toMillis(), throwable);
isHealthy = false;
intervalMillis = fallbackTtl.toMillis();
} else if (result == null) {
logger.warn("Health checker returns an unexpected null result, " +
"schedule the next check after {}ms.",
fallbackTtl.toMillis());
isHealthy = false;
intervalMillis = fallbackTtl.toMillis();
} else {
isHealthy = result.isHealthy();
intervalMillis = result.ttlMillis();
}
settableHealthChecker.setHealthy(isHealthy);
scheduledFuture = eventExecutor.schedule(this::runHealthCheck, intervalMillis,
TimeUnit.MILLISECONDS);
return null;
});
} catch (Throwable throwable) {
logger.warn("Health checker throws an exception, schedule the next check after {}ms.",
fallbackTtl.toMillis(), throwable);
settableHealthChecker.setHealthy(false);
scheduledFuture = eventExecutor.schedule(this::runHealthCheck, fallbackTtl.toMillis(),
TimeUnit.MILLISECONDS);
}
}

enum State {
INIT,
SCHEDULED,
FINISHED
}
}
}

0 comments on commit 96c6523

Please sign in to comment.