Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce redis recovery listener #6

Merged
merged 6 commits into from
Sep 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 98 additions & 21 deletions src/main/java/net/greghaines/jesque/worker/Watchdog.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,20 @@
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.exceptions.JedisNoScriptException;
import redis.clients.util.Pool;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.concurrent.TimeUnit.SECONDS;
import static net.greghaines.jesque.utils.PoolUtils.doWorkInPool;


/**
* Created by dimav on 17/08/17.
* <p>
Expand All @@ -28,41 +31,56 @@ public class Watchdog {
private static final int POOL_SIZE = 2;
private static final int INITIAL_DELAY = 0;
private static final int LIGHT_KEEPER_PERIOD = 10;
private static final int IS_ALIVE_PERIOD = 2;
private static final int REPORT_ALIVE_PERIOD = 2;
private static final int TIME_TO_REQUEUE_JOBS_ON_INACTIVE_SERVER_SEC = 60;
private static final String WATCHDOG_LUA = "/workerScripts/watchdog.lua";
private static final String REQUEUE_JOBS = "requeueJobs";
private static final String IS_ALIVE = "isAlive";
private static final String UPDATE_RECOVERY_STATUS_JOB = "updateRecoveryStatus";
private static final String FIX_INTERRUPTED_RECOVERY_JOB = "fixInterruptedRecovery";
private static final String REPORT_ALIVE_JOB = "reportAlive";
private static final int WATCHDOG_SCRIPT_KEYS_NUMBER = 0;
private static final String WATCHDOG = "watchdog";
private static final String FAILED = "FAILED";
private static final String FINISHED = "FINISHED";
private static boolean isActivated = false;
private static Watchdog instance = new Watchdog();
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(POOL_SIZE);
private final AtomicReference<String> watchdogScriptHash = new AtomicReference<>(null);
private Pool<Jedis> jedisPool;
private boolean redisRestartRecoveryOn;
private Runnable redisRestartRecoveryListener;
private String serverName;
private String isDebug;

private Watchdog() {
}

public static synchronized void activate(Config config) {
activate(config, null);
}

public static synchronized void activate(Config config, Runnable redisRestartRecoveryListener) {
if (isActivated) {
throw new RuntimeException("Watchdog was already activated with config " + config);
}

instance.init(config);
instance.init(config, redisRestartRecoveryListener);
isActivated = true;
}

private void init(Config config) {
private void init(Config config, Runnable redisRestartRecoveryListener) {
jedisPool = PoolUtils.createJedisPool(config, new WatchdogJedisPoolConfig());
final String isDebug = LOG.isDebugEnabled() ? Boolean.TRUE.toString() : Boolean.FALSE.toString();
final String serverName = getServerName();
isDebug = LOG.isDebugEnabled() ? Boolean.TRUE.toString() : Boolean.FALSE.toString();
serverName = getServerName();
this.redisRestartRecoveryListener = redisRestartRecoveryListener;
redisRestartRecoveryOn = (redisRestartRecoveryListener != null);

loadScript();
activateIsAliveService(serverName);
activateWatchdogService(serverName, isDebug);
requeueLostJobs(serverName, isDebug);
fixInterruptedRecovery();
activateReportAliveService();
activateWatchdogService();
requeueLostJobs();
}

private void loadScript() {
Expand All @@ -78,10 +96,22 @@ private void loadScript() {
}


private void requeueLostJobs(final String serverName, final String isDebug) {
private void fixInterruptedRecovery() {
// On watchdog activation check whether recovery process failed on this server and rerun process if needed
try {
workInPool(this.jedisPool, (PoolUtils.PoolWork<Jedis, Void>) jedis -> {
jedis.evalsha(watchdogScriptHash.get(), WATCHDOG_SCRIPT_KEYS_NUMBER, serverName, FIX_INTERRUPTED_RECOVERY_JOB, getCurrTime(), isDebug);
return null;
});
} catch (Exception e) {
LOG.error("Failed to run " + FIX_INTERRUPTED_RECOVERY_JOB + " job " + WATCHDOG_LUA + " script", e);
}
}

private void requeueLostJobs() {
// On watchdog activation check whether server contains unhandled in-flight jobs and re-queuing them
try {
doWorkInPool(this.jedisPool, (PoolUtils.PoolWork<Jedis, Void>) jedis -> {
workInPool(this.jedisPool, (PoolUtils.PoolWork<Jedis, Void>) jedis -> {
jedis.evalsha(watchdogScriptHash.get(), WATCHDOG_SCRIPT_KEYS_NUMBER, serverName, REQUEUE_JOBS, getCurrTime(), isDebug);
return null;
});
Expand All @@ -90,12 +120,12 @@ private void requeueLostJobs(final String serverName, final String isDebug) {
}
}

private void activateWatchdogService(final String serverName, final String isDebug) {
private void activateWatchdogService() {
// Schedule watchdog job , checking whether one of the servers was inactive too long and re-queuing jobs of a such inactive server
final Runnable lightKeeper = () -> {

try {
doWorkInPool(this.jedisPool, (PoolUtils.PoolWork<Jedis, Void>) jedis -> {
workInPool(this.jedisPool, (PoolUtils.PoolWork<Jedis, Void>) jedis -> {
jedis.evalsha(watchdogScriptHash.get(), WATCHDOG_SCRIPT_KEYS_NUMBER, serverName, WATCHDOG, getCurrTime(), isDebug, String.valueOf(TIME_TO_REQUEUE_JOBS_ON_INACTIVE_SERVER_SEC), String.valueOf(LIGHT_KEEPER_PERIOD));
return null;
});
Expand All @@ -107,21 +137,32 @@ private void activateWatchdogService(final String serverName, final String isDeb
scheduler.scheduleAtFixedRate(lightKeeper, LIGHT_KEEPER_PERIOD, LIGHT_KEEPER_PERIOD, SECONDS);
}

private void activateIsAliveService(final String serverName) throws RuntimeException {
// Schedule isAlive job used to mark in redis server is still alive
final Runnable isAlive = () -> {
private void activateReportAliveService() throws RuntimeException {
// Schedule reportAlive job used to mark in redis server is still alive
final Runnable reportAlive = () -> {

try {
doWorkInPool(this.jedisPool, (PoolUtils.PoolWork<Jedis, Void>) jedis -> {
jedis.evalsha(watchdogScriptHash.get(), WATCHDOG_SCRIPT_KEYS_NUMBER, serverName, getCurrTime(), IS_ALIVE);
workInPool(this.jedisPool, (PoolUtils.PoolWork<Jedis, Void>) jedis -> {
reportAlive(jedis);

return null;
});
} catch (Exception e) {
LOG.error("Failed to run " + IS_ALIVE + " function " + WATCHDOG_LUA + " script", e);
LOG.error("Failed to run " + REPORT_ALIVE_JOB + " job " + WATCHDOG_LUA + " script", e);
}
};

scheduler.scheduleAtFixedRate(isAlive, INITIAL_DELAY, IS_ALIVE_PERIOD, SECONDS);
scheduler.scheduleAtFixedRate(reportAlive, INITIAL_DELAY, REPORT_ALIVE_PERIOD, SECONDS);
}

private void reportAlive(final Jedis jedis) {

final String needRedisRecovery = (String) jedis.evalsha(watchdogScriptHash.get(), WATCHDOG_SCRIPT_KEYS_NUMBER, serverName, REPORT_ALIVE_JOB, getCurrTime(), isDebug, String.valueOf(redisRestartRecoveryOn), String.valueOf(LIGHT_KEEPER_PERIOD));


if (Boolean.valueOf(needRedisRecovery)) {
recoverFromRedisRestart();
}
}

private String getCurrTime() {
Expand All @@ -136,8 +177,44 @@ private String getServerName() {
}
}

}

private void recoverFromRedisRestart() {
ExecutorService executor
= Executors.newSingleThreadExecutor();

executor.submit(this::invokeListener);
}

private void invokeListener() {
try {
redisRestartRecoveryListener.run();
workInPool(this.jedisPool, (PoolUtils.PoolWork<Jedis, Void>) jedis -> {
jedis.evalsha(watchdogScriptHash.get(), WATCHDOG_SCRIPT_KEYS_NUMBER, serverName, UPDATE_RECOVERY_STATUS_JOB, getCurrTime(), isDebug, FINISHED);
return null;
});

} catch (Exception e) {
LOG.error("Recovery process has failed ", e);
try {
workInPool(this.jedisPool, (PoolUtils.PoolWork<Jedis, Void>) jedis -> {
jedis.evalsha(watchdogScriptHash.get(), WATCHDOG_SCRIPT_KEYS_NUMBER, serverName, UPDATE_RECOVERY_STATUS_JOB, getCurrTime(), isDebug, FAILED);
return null;
});
} catch (Exception ie) {
throw new RuntimeException(ie);
}
}
}

private <V> void workInPool(final Pool<Jedis> pool, final PoolUtils.PoolWork<Jedis, V> work) throws Exception {
try {
doWorkInPool(pool, work);
} catch (JedisNoScriptException e) {
loadScript();
throw e;
}
}
}

class WatchdogJedisPoolConfig extends JedisPoolConfig {
@Override
Expand Down
50 changes: 42 additions & 8 deletions src/main/java/net/greghaines/jesque/worker/WorkerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import net.greghaines.jesque.utils.VersionUtils;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.exceptions.JedisNoScriptException;

/**
* WorkerImpl is an implementation of the Worker interface. Obeys the contract of a Resque worker in Redis.
Expand Down Expand Up @@ -126,7 +127,7 @@ protected static void checkQueues(final Iterable<String> queues) {
private final String threadNameBase = "Worker-" + this.workerId + " Jesque-" + VersionUtils.getVersion() + ": ";
private final AtomicReference<Thread> threadRef = new AtomicReference<Thread>(null);
private final AtomicReference<ExceptionHandler> exceptionHandlerRef = new AtomicReference<ExceptionHandler>(
new DefaultExceptionHandler());
new WorkerImplExceptionHandler());
private final AtomicReference<FailQueueStrategy> failQueueStrategyRef;
private final JobFactory jobFactory;

Expand Down Expand Up @@ -217,11 +218,7 @@ public void run() {
this.jedis.sadd(key(WORKERS), this.name);
this.jedis.set(key(WORKER, this.name, STARTED), new SimpleDateFormat(DATE_FORMAT).format(new Date()));
this.listenerDelegate.fireEvent(WORKER_START, this, null, null, null, null, null);
this.popScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_LUA)));
this.lpoplpushScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(LPOPLPUSH_LUA)));
this.multiPriorityQueuesScriptHash
.set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_FROM_MULTIPLE_PRIO_QUEUES)));
this.removeInFlightHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(REMOVE_INFLIGHT_LUA)));;
loadScripts();
poll();
} catch (Exception ex) {
LOG.error("Uncaught exception in worker run-loop!", ex);
Expand All @@ -242,6 +239,14 @@ public void run() {
}
}

private void loadScripts() throws IOException {
this.popScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_LUA)));
this.lpoplpushScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(LPOPLPUSH_LUA)));
this.multiPriorityQueuesScriptHash
.set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_FROM_MULTIPLE_PRIO_QUEUES)));
this.removeInFlightHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(REMOVE_INFLIGHT_LUA)));
}

/**
* Shutdown this Worker.<br>
* <b>The worker cannot be started again; create a new worker in this case.</b>
Expand Down Expand Up @@ -531,7 +536,8 @@ protected String pop(final String curQueue) {
* @param ex the exception that was thrown
*/
protected void recoverFromException(final String curQueue, final Exception ex) {
final RecoveryStrategy recoveryStrategy = this.exceptionHandlerRef.get().onException(this, ex, curQueue);
RecoveryStrategy recoveryStrategy = this.exceptionHandlerRef.get().onException(this, ex, curQueue);

switch (recoveryStrategy) {
case RECONNECT:
LOG.info("Reconnecting to Redis in response to exception", ex);
Expand All @@ -541,7 +547,7 @@ protected void recoverFromException(final String curQueue, final Exception ex) {
end(false);
} else {
authenticateAndSelectDB();
LOG.info("Reconnected to Redis");
LOG.info("Reconnected to Redis ");
}
break;
case TERMINATE:
Expand Down Expand Up @@ -794,4 +800,32 @@ protected String lpoplpush(final String from, final String to) {
public String toString() {
return this.namespace + COLON + WORKER + COLON + this.name;
}

class WorkerImplExceptionHandler implements ExceptionHandler {

ExceptionHandler defaultExceptionHandler = new DefaultExceptionHandler();

/**
* {@inheritDoc}
*/
@Override
public RecoveryStrategy onException(final JobExecutor jobExecutor, final Exception ex,
final String curQueue) {
RecoveryStrategy recoveryStrategy;

if(ex instanceof JedisNoScriptException){
try {
loadScripts();
recoveryStrategy = RecoveryStrategy.RECONNECT;
} catch (IOException e) {
recoveryStrategy = defaultExceptionHandler.onException(jobExecutor,ex,curQueue);;
}
} else {
recoveryStrategy = defaultExceptionHandler.onException(jobExecutor,ex,curQueue);;
}

return recoveryStrategy;
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we touch the WorkerImpl class as part of this PR? Doesn't look related to what we're trying to achieve...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without reloading script after redis restart workers will not work. And we are trying to achieve situation when listener will not only be called, but will have a chance to requeue aggregations :-)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, cool :)

}
Loading