Skip to content

Commit

Permalink
Merge pull request mrniko#1 from YoonsooChang/yoonsoo
Browse files Browse the repository at this point in the history
Refactoring Scheduler Package => Changed CancelableScheduler From Int…
  • Loading branch information
YoonsooChang committed May 27, 2020
2 parents bed2a3b + 82e505b commit 2cbfece
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,57 @@
package com.corundumstudio.socketio.scheduler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.internal.PlatformDependent;

import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public interface CancelableScheduler {
//Change Interface To Abstract Class
public abstract class CancelableScheduler {

void update(ChannelHandlerContext ctx);
protected final ConcurrentMap<SchedulerKey, Timeout> scheduledFutures = PlatformDependent.newConcurrentHashMap();
protected final HashedWheelTimer executorService;

protected volatile ChannelHandlerContext ctx;

void cancel(SchedulerKey key);
public CancelableScheduler() {
executorService = new HashedWheelTimer();
}

public CancelableScheduler(ThreadFactory threadFactory) {
executorService = new HashedWheelTimer(threadFactory);
}

public void update(ChannelHandlerContext ctx) {
this.ctx = ctx;
}

public void cancel(SchedulerKey key) {
final Timeout timeout = scheduledFutures.remove(key);
if (timeout != null) {
timeout.cancel();
}
}

public void schedule(final Runnable runnable, long delay, TimeUnit unit) {
executorService.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
runnable.run();
}
}, delay, unit);
}

abstract void schedule(SchedulerKey key, Runnable runnable, long delay, TimeUnit unit);

void scheduleCallback(SchedulerKey key, Runnable runnable, long delay, TimeUnit unit);

void schedule(Runnable runnable, long delay, TimeUnit unit);

void schedule(SchedulerKey key, Runnable runnable, long delay, TimeUnit unit);

void shutdown();
abstract void scheduleCallback(SchedulerKey key, Runnable runnable, long delay, TimeUnit unit);

public void shutdown() {
executorService.stop();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,52 +15,23 @@
*/
package com.corundumstudio.socketio.scheduler;

import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.internal.PlatformDependent;

public class HashedWheelScheduler implements CancelableScheduler {
public class HashedWheelScheduler extends CancelableScheduler {

private volatile ChannelHandlerContext ctx;

private final Map<SchedulerKey, Timeout> scheduledFutures = PlatformDependent.newConcurrentHashMap();
private final HashedWheelTimer executorService;

public HashedWheelScheduler() {
executorService = new HashedWheelTimer();
super();
}

public HashedWheelScheduler(ThreadFactory threadFactory) {
executorService = new HashedWheelTimer(threadFactory);
}

private volatile ChannelHandlerContext ctx;

@Override
public void update(ChannelHandlerContext ctx) {
this.ctx = ctx;
}

@Override
public void cancel(SchedulerKey key) {
Timeout timeout = scheduledFutures.remove(key);
if (timeout != null) {
timeout.cancel();
}
}

@Override
public void schedule(final Runnable runnable, long delay, TimeUnit unit) {
executorService.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
runnable.run();
}
}, delay, unit);
super(threadFactory);
}

@Override
Expand All @@ -81,9 +52,7 @@ public void run() {
}
}, delay, unit);

if (!timeout.isExpired()) {
scheduledFutures.put(key, timeout);
}
addScheduledFuture(key, timeout);
}

@Override
Expand All @@ -99,14 +68,18 @@ public void run(Timeout timeout) throws Exception {
}
}, delay, unit);

if (!timeout.isExpired()) {
scheduledFutures.put(key, timeout);
}
addScheduledFuture(key, timeout);
}

@Override
public void shutdown() {
executorService.stop();
}

private void addScheduledFuture(final SchedulerKey key, Timeout timeout){
if (!timeout.isExpired()) {
scheduledFutures.put(key, timeout);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,51 +25,22 @@
package com.corundumstudio.socketio.scheduler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.internal.PlatformDependent;

import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public class HashedWheelTimeoutScheduler implements CancelableScheduler {

private final ConcurrentMap<SchedulerKey, Timeout> scheduledFutures = PlatformDependent.newConcurrentHashMap();
private final HashedWheelTimer executorService;
public class HashedWheelTimeoutScheduler extends CancelableScheduler {

private volatile ChannelHandlerContext ctx;

public HashedWheelTimeoutScheduler() {
executorService = new HashedWheelTimer();
super();
}

public HashedWheelTimeoutScheduler(ThreadFactory threadFactory) {
executorService = new HashedWheelTimer(threadFactory);
}

@Override
public void update(ChannelHandlerContext ctx) {
this.ctx = ctx;
}

@Override
public void cancel(SchedulerKey key) {
Timeout timeout = scheduledFutures.remove(key);
if (timeout != null) {
timeout.cancel();
}
}

@Override
public void schedule(final Runnable runnable, long delay, TimeUnit unit) {
executorService.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
runnable.run();
}
}, delay, unit);
super(threadFactory);
}

@Override
Expand Down Expand Up @@ -109,10 +80,6 @@ public void run(Timeout timeout) throws Exception {
replaceScheduledFuture(key, timeout);
}

@Override
public void shutdown() {
executorService.stop();
}

private void replaceScheduledFuture(final SchedulerKey key, final Timeout newTimeout) {
final Timeout oldTimeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,29 +23,31 @@ public enum Type {PING_TIMEOUT, ACK_TIMEOUT, UPGRADE_TIMEOUT};
private final Type type;
private final Object sessionId;

public SchedulerKey(Type type, Object sessionId) {
public SchedulerKey(final Type type, final Object sessionId) {
this.type = type;
this.sessionId = sessionId;
}

@Override
public int hashCode() {
final int prime = 31;
final int PRIME = 31;
int result = 1;
result = prime * result
result = PRIME * result
+ ((sessionId == null) ? 0 : sessionId.hashCode());
result = prime * result + ((type == null) ? 0 : type.hashCode());
result = PRIME * result + ((type == null) ? 0 : type.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)

if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;

SchedulerKey other = (SchedulerKey) obj;
if (sessionId == null) {
if (other.sessionId != null)
Expand Down

0 comments on commit 2cbfece

Please sign in to comment.