Skip to content

Commit

Permalink
ISPN-14766 Use blocking manager as Insights scheduler executor
Browse files Browse the repository at this point in the history
  • Loading branch information
fax4ever committed May 25, 2023
1 parent 917f850 commit d60a72a
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,22 @@ default ScheduledBlockingCompletableStage<Void> scheduleRunBlocking(Runnable run
*/
<V> ScheduledBlockingCompletableStage<V> scheduleRunBlocking(Supplier<V> supplier, long delay, TimeUnit unit, Object traceId);

/**
* Replacement for {@link java.util.concurrent.ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)} that
* invokes the {@code Runnable} in a blocking thread periodically at fixed rate.
* <p>
* Unlike other methods in this interface, the submitting thread does not impact this method's behavior.
*
* @param runnable blocking operation that runs some code
* @param initialDelay the time to delay first execution
* @param period the period between successive executions
* @param unit the time unit of the delay parameter
* @param traceId an identifier that can be used to tell in a trace when an operation moves between threads
*
* @return a stage that is completed after the runnable is done or throws an exception.
*/
ScheduledFuture<Void> scheduleRunBlockingAtFixedRate(Runnable runnable, long initialDelay, long period, TimeUnit unit, Object traceId);

/**
* Executor interface that submits task to a blocking pool that returns a stage that is guaranteed
* to run any chained stages on a non-blocking thread if the stage is not yet complete.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,19 @@ public <V> ScheduledBlockingCompletableStage<V> scheduleRunBlocking(Supplier<V>
return scheduledStage;
}

@Override
public ScheduledFuture<Void> scheduleRunBlockingAtFixedRate(Runnable runnable, long initialDelay, long period, TimeUnit unit, Object traceId) {
Supplier supplier = () -> {
runnable.run();
return null;
};

var scheduledStage = new ScheduledBlockingFuture<>(supplier, traceId);
log.tracef("Scheduling supply operation %s for %s to run in %s %s", supplier, traceId, initialDelay, period, unit);
scheduledStage.scheduledFuture = scheduledExecutorService.scheduleAtFixedRate(runnable, initialDelay, period, unit);
return scheduledStage;
}

private class ScheduledBlockingFuture<V> extends CompletableFuture<V> implements ScheduledBlockingCompletableStage<V>, Runnable {
private volatile ScheduledFuture<?> scheduledFuture;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;

import javax.net.ssl.SSLContext;
Expand All @@ -15,6 +16,8 @@
import org.infinispan.server.insights.logging.Log;
import org.infinispan.server.insights.report.InfinispanSubreport;
import org.infinispan.server.insights.report.InfinispanTopReport;
import org.infinispan.server.insights.scheduler.InfinispanInsightsScheduler;
import org.infinispan.util.concurrent.BlockingManager;

import com.redhat.insights.InsightsReport;
import com.redhat.insights.InsightsReportController;
Expand All @@ -35,6 +38,8 @@ public class InsightsModule implements ModuleLifecycle {

@Override
public void cacheManagerStarted(GlobalComponentRegistry gcr) {
BlockingManager blockingManager = gcr.getComponent(BlockingManager.class);

InsightsConfiguration insightsConfiguration = new InfinispanInsightsConfiguration();
if (insightsConfiguration.isOptingOut()) {
throw log.insightsConfigurationError();
Expand All @@ -51,6 +56,9 @@ public void cacheManagerStarted(GlobalComponentRegistry gcr) {
}
};

InfinispanInsightsScheduler insightsScheduler =
new InfinispanInsightsScheduler(insightsLogger, insightsConfiguration, blockingManager);

CacheManagerInfo cacheManagerInfo = gcr.getCacheManager().getCacheManagerInfo();
Map<String, InsightsSubreport> subReports = new HashMap<>(2);

Expand All @@ -63,7 +71,8 @@ public void cacheManagerStarted(GlobalComponentRegistry gcr) {
insightsReportController = InsightsReportController.of(insightsLogger, insightsConfiguration, insightsReport,
() -> new InsightsMultiClient(insightsLogger,
new InsightsJdkHttpClient(insightsLogger, insightsConfiguration, sslContextSupplier),
new InsightsFileWritingClient(insightsLogger, insightsConfiguration)));
new InsightsFileWritingClient(insightsLogger, insightsConfiguration)), insightsScheduler,
new LinkedBlockingQueue<>());
insightsReportController.generate();
} catch (Throwable ex) {
throw log.insightsServiceSetupError(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ public interface Log extends BasicLogger {
@Message(value = "Error setting up Red Hat Insight report service", id = 32003)
CacheException insightsServiceSetupError(@Cause Throwable t);

@Message(value = "Red Hat Insight client shut down, it is not possible to schedule other tasks on it", id = 32004)
CacheConfigurationException clientSchedulerShutDown();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package org.infinispan.server.insights.scheduler;

import static com.redhat.insights.InsightsErrorCode.ERROR_SCHEDULED_SENT;

import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.infinispan.commons.logging.LogFactory;
import org.infinispan.server.insights.logging.Log;
import org.infinispan.util.concurrent.BlockingManager;

import com.redhat.insights.InsightsException;
import com.redhat.insights.InsightsScheduler;
import com.redhat.insights.config.InsightsConfiguration;
import com.redhat.insights.logging.InsightsLogger;

/**
* Copy and adapted from {@link com.redhat.insights.InsightsCustomScheduledExecutor}.
* Differently from the original scheduler,
* instead of creating a new {@link java.util.concurrent.ScheduledExecutorService},
* it delegates to {@link BlockingManager}.
*/
public class InfinispanInsightsScheduler implements InsightsScheduler {

private static final Log log = LogFactory.getLog(InfinispanInsightsScheduler.class, Log.class);

private final InsightsLogger logger;
private final InsightsConfiguration configuration;
private final BlockingManager blockingManager;

private volatile boolean active = true;

public InfinispanInsightsScheduler(InsightsLogger logger, InsightsConfiguration configuration,
BlockingManager blockingManager) {
this.logger = logger;
this.configuration = configuration;
this.blockingManager = blockingManager;
}

@Override
public ScheduledFuture<?> scheduleConnect(Runnable sendConnect) {
return scheduleAtFixedRate(sendConnect, 0, configuration.getConnectPeriod().getSeconds(), TimeUnit.SECONDS);
}

@Override
public ScheduledFuture<?> scheduleJarUpdate(Runnable sendNewJarsIfAny) {
return scheduleAtFixedRate(sendNewJarsIfAny,
configuration.getUpdatePeriod().getSeconds(),
configuration.getUpdatePeriod().getSeconds(),
TimeUnit.SECONDS);
}

@Override
public boolean isShutdown() {
return false;
}

@Override
public void shutdown() {
active = false;
}

@Override
public List<Runnable> shutdownNow() {
return List.of();
}

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
if (!active) {
log.clientSchedulerShutDown();
}

Runnable wrapped =
() -> {
try {
command.run();
} catch (InsightsException ix) {
logger.error(ERROR_SCHEDULED_SENT.formatMessage(
"Red Hat Insights client scheduler shutdown, scheduled send failed: " + ix.getMessage()), ix);
shutdown();
throw ix;
} catch (Exception x) {
logger.error(ERROR_SCHEDULED_SENT.formatMessage(
"Red Hat Insights client scheduler shutdown, non-Insights failure: " + x.getMessage()), x);
shutdown();
throw x;
}
};

return blockingManager.scheduleRunBlockingAtFixedRate(wrapped, initialDelay, period, unit, this);
}
}

0 comments on commit d60a72a

Please sign in to comment.