-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
lua based ack implementation #3
Conversation
try { | ||
jedis.evalsha(this.LightKeeperScriptHash.get(), 1, getServerName()); | ||
} catch (RuntimeException e) { | ||
e.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please... never swallow an exception unless you have a very good reason to do so (almost never!).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, will fix it. I made quick fix in order to see error message and forgot to re-throw exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So please commit a fix, so the PR will be cleaner :)
} | ||
|
||
public void activateLightKeeper() { | ||
jedis.evalsha(this.RequeueJobsScriptHash.get(), 1, getServerName()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Cool!!
protected final Jedis jedis; | ||
private final ScheduledExecutorService scheduler = | ||
Executors.newScheduledThreadPool(POOL_SIZE); | ||
private final AtomicReference<String> LightKeeperScriptHash = new AtomicReference<>(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need the AtomicReference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While i can't see real need for AtomicReference in this case, I've followed the way scripts are loaded in WorkerImpl only as precaution and in order to save on consistency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it doesn't really have an affect, it's only confusing the reader.
I suggest to remove it.
@@ -615,7 +618,8 @@ private void removeInFlight(final String curQueue) { | |||
if (SHUTDOWN_IMMEDIATE.equals(this.state.get())) { | |||
lpoplpush(key(INFLIGHT, this.name, curQueue), key(QUEUE, curQueue)); | |||
} else { | |||
this.jedis.lpop(key(INFLIGHT, this.name, curQueue)); | |||
this.jedis.evalsha(this.removeInFlightHash.get(), 2, key(INFLIGHT, this.name, curQueue),key("log")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove "this"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Referencing instance variable thru this is the convention in this project, there are few other occurrences of this.jedis in this class. Why should we remove this exact reference that even was not introduced by us?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, makes sense :)
end | ||
|
||
|
||
local function inspectLiveKeeper(currentTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you getting the currentTime from outside?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohh !!! I'll give another look on it. Generally, I was not able to use any global variable unless it was defined before function that is going to use it . And I didn't wanted to call redis.time again and again
@@ -219,6 +221,7 @@ public void run() { | |||
this.lpoplpushScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(LPOPLPUSH_LUA))); | |||
this.multiPriorityQueuesScriptHash | |||
.set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_FROM_MULTIPLE_PRIO_QUEUES))); | |||
this.removeInFlightHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(REMOVE_INFLIGHT_LUA)));; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Load the script only once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove in flight cache is loaded only once ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I see.
|
||
inspectLiveKeeper(currentTime); | ||
|
||
redis.call('SET', isAliveKey,currentTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updating is alive key every 2 sec on every server
|
||
local currentTime = getCurrentTime() | ||
|
||
inspectLiveKeeper(currentTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When are we scheduling this to run every 5 seconds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will be fixed
protected final Jedis jedis; | ||
private final ScheduledExecutorService scheduler = | ||
Executors.newScheduledThreadPool(POOL_SIZE); | ||
private final AtomicReference<String> LightKeeperScriptHash = new AtomicReference<>(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it doesn't really have an affect, it's only confusing the reader.
I suggest to remove it.
try { | ||
jedis.evalsha(this.LightKeeperScriptHash.get(), 1, getServerName()); | ||
} catch (RuntimeException e) { | ||
e.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So please commit a fix, so the PR will be cleaner :)
@@ -615,7 +618,8 @@ private void removeInFlight(final String curQueue) { | |||
if (SHUTDOWN_IMMEDIATE.equals(this.state.get())) { | |||
lpoplpush(key(INFLIGHT, this.name, curQueue), key(QUEUE, curQueue)); | |||
} else { | |||
this.jedis.lpop(key(INFLIGHT, this.name, curQueue)); | |||
this.jedis.evalsha(this.removeInFlightHash.get(), 2, key(INFLIGHT, this.name, curQueue),key("log")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, makes sense :)
@@ -219,6 +221,7 @@ public void run() { | |||
this.lpoplpushScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(LPOPLPUSH_LUA))); | |||
this.multiPriorityQueuesScriptHash | |||
.set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_FROM_MULTIPLE_PRIO_QUEUES))); | |||
this.removeInFlightHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(REMOVE_INFLIGHT_LUA)));; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I see.
} | ||
} | ||
|
||
public void activateLightKeeper() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to constructor...
Refactor lua scripts into one script providing all watchdog related functionality
} catch (RuntimeException e) { | ||
LOG.error("Failed to run " + REQUEUE_JOBS + " job " + WATCHDOG_LUA + " script", e); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is doing too much and is a bit long... please break it down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
Add recover from exception functionality to watchdog
@nanimrod I feel like Watchdog needs to expose start method . just creating watchdog and expecting it to run automatically looks very unusual in code :
|
I think the best would be to have start method just like Thread has. |
I don't like a solution that forces the client to run 2 methods when none of them has any meaning without the other. |
What 2 methods are you talking about? 1 method start , that's all . And that's the standard behaviour for Java as you can see from the Thread example. |
If I need to choose between 2 bad solutions: having everything executed in constructor or having static method executing watchdog initialization logic, I will go with the constructor. At least in my tests I could extend watchdog with class having empty constructor and set it instead the original one. Just in case, few links bellow are discussing what's wrong with static methods in java https://testing.googleblog.com/2008/12/static-methods-are-death-to-testability.html |
No description provided.