Skip to content

Commit

Permalink
Merge pull request mrniko#2 from YoonsooChang/revert-1-yoonsoo
Browse files Browse the repository at this point in the history
Revert "Refactoring Scheduler Package => Changed CancelableScheduler From Int…"
  • Loading branch information
YoonsooChang committed May 27, 2020
2 parents 2cbfece + a78be65 commit 04f9dba
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,57 +16,21 @@
package com.corundumstudio.socketio.scheduler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.internal.PlatformDependent;

import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

//Change Interface To Abstract Class
public abstract class CancelableScheduler {
public interface CancelableScheduler {

protected final ConcurrentMap<SchedulerKey, Timeout> scheduledFutures = PlatformDependent.newConcurrentHashMap();
protected final HashedWheelTimer executorService;

protected volatile ChannelHandlerContext ctx;
void update(ChannelHandlerContext ctx);

public CancelableScheduler() {
executorService = new HashedWheelTimer();
}

public CancelableScheduler(ThreadFactory threadFactory) {
executorService = new HashedWheelTimer(threadFactory);
}

public void update(ChannelHandlerContext ctx) {
this.ctx = ctx;
}

public void cancel(SchedulerKey key) {
final Timeout timeout = scheduledFutures.remove(key);
if (timeout != null) {
timeout.cancel();
}
}

public void schedule(final Runnable runnable, long delay, TimeUnit unit) {
executorService.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
runnable.run();
}
}, delay, unit);
}

abstract void schedule(SchedulerKey key, Runnable runnable, long delay, TimeUnit unit);
void cancel(SchedulerKey key);

abstract void scheduleCallback(SchedulerKey key, Runnable runnable, long delay, TimeUnit unit);
void scheduleCallback(SchedulerKey key, Runnable runnable, long delay, TimeUnit unit);

void schedule(Runnable runnable, long delay, TimeUnit unit);

void schedule(SchedulerKey key, Runnable runnable, long delay, TimeUnit unit);

void shutdown();

public void shutdown() {
executorService.stop();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,52 @@
*/
package com.corundumstudio.socketio.scheduler;

import java.util.Map;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.internal.PlatformDependent;

public class HashedWheelScheduler extends CancelableScheduler {

private volatile ChannelHandlerContext ctx;
public class HashedWheelScheduler implements CancelableScheduler {

private final Map<SchedulerKey, Timeout> scheduledFutures = PlatformDependent.newConcurrentHashMap();
private final HashedWheelTimer executorService;

public HashedWheelScheduler() {
super();
executorService = new HashedWheelTimer();
}

public HashedWheelScheduler(ThreadFactory threadFactory) {
super(threadFactory);
executorService = new HashedWheelTimer(threadFactory);
}

private volatile ChannelHandlerContext ctx;

@Override
public void update(ChannelHandlerContext ctx) {
this.ctx = ctx;
}

@Override
public void cancel(SchedulerKey key) {
Timeout timeout = scheduledFutures.remove(key);
if (timeout != null) {
timeout.cancel();
}
}

@Override
public void schedule(final Runnable runnable, long delay, TimeUnit unit) {
executorService.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
runnable.run();
}
}, delay, unit);
}

@Override
Expand All @@ -52,7 +81,9 @@ public void run() {
}
}, delay, unit);

addScheduledFuture(key, timeout);
if (!timeout.isExpired()) {
scheduledFutures.put(key, timeout);
}
}

@Override
Expand All @@ -68,18 +99,14 @@ public void run(Timeout timeout) throws Exception {
}
}, delay, unit);

addScheduledFuture(key, timeout);
if (!timeout.isExpired()) {
scheduledFutures.put(key, timeout);
}
}

@Override
public void shutdown() {
executorService.stop();
}

private void addScheduledFuture(final SchedulerKey key, Timeout timeout){
if (!timeout.isExpired()) {
scheduledFutures.put(key, timeout);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,51 @@
package com.corundumstudio.socketio.scheduler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.internal.PlatformDependent;

import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

public class HashedWheelTimeoutScheduler extends CancelableScheduler {
public class HashedWheelTimeoutScheduler implements CancelableScheduler {

private final ConcurrentMap<SchedulerKey, Timeout> scheduledFutures = PlatformDependent.newConcurrentHashMap();
private final HashedWheelTimer executorService;

private volatile ChannelHandlerContext ctx;

public HashedWheelTimeoutScheduler() {
super();
executorService = new HashedWheelTimer();
}

public HashedWheelTimeoutScheduler(ThreadFactory threadFactory) {
super(threadFactory);
executorService = new HashedWheelTimer(threadFactory);
}

@Override
public void update(ChannelHandlerContext ctx) {
this.ctx = ctx;
}

@Override
public void cancel(SchedulerKey key) {
Timeout timeout = scheduledFutures.remove(key);
if (timeout != null) {
timeout.cancel();
}
}

@Override
public void schedule(final Runnable runnable, long delay, TimeUnit unit) {
executorService.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
runnable.run();
}
}, delay, unit);
}

@Override
Expand Down Expand Up @@ -80,6 +109,10 @@ public void run(Timeout timeout) throws Exception {
replaceScheduledFuture(key, timeout);
}

@Override
public void shutdown() {
executorService.stop();
}

private void replaceScheduledFuture(final SchedulerKey key, final Timeout newTimeout) {
final Timeout oldTimeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,31 +23,29 @@ public enum Type {PING_TIMEOUT, ACK_TIMEOUT, UPGRADE_TIMEOUT};
private final Type type;
private final Object sessionId;

public SchedulerKey(final Type type, final Object sessionId) {
public SchedulerKey(Type type, Object sessionId) {
this.type = type;
this.sessionId = sessionId;
}

@Override
public int hashCode() {
final int PRIME = 31;
final int prime = 31;
int result = 1;
result = PRIME * result
result = prime * result
+ ((sessionId == null) ? 0 : sessionId.hashCode());
result = PRIME * result + ((type == null) ? 0 : type.hashCode());
result = prime * result + ((type == null) ? 0 : type.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {

if (this == obj)
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;

SchedulerKey other = (SchedulerKey) obj;
if (sessionId == null) {
if (other.sessionId != null)
Expand Down

0 comments on commit 04f9dba

Please sign in to comment.