Skip to content

Commit

Permalink
Remove unneeded classes
Browse files Browse the repository at this point in the history
git-svn-id: file:///home/igor/dev/stuff/svnbackup/trunk@5676 ef7698a4-5110-0410-9fc6-c7eb3693863f
  • Loading branch information
Rodolfo Hansen committed Dec 6, 2010
1 parent 25b22c0 commit 1b1f321
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 112 deletions.
3 changes: 0 additions & 3 deletions jdk-1.6-parent/push-parent/push-core/META-INF/MANIFEST.MF

This file was deleted.

This file was deleted.

Expand Up @@ -19,9 +19,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
Expand All @@ -40,14 +38,13 @@
import org.wicketstuff.push.IPushChannelDisconnectedListener;
import org.wicketstuff.push.IPushEventHandler;
import org.wicketstuff.push.IPushService;
import org.wicketstuff.push.PushDetachListener;

/**
* @author <a href="http://sebthom.de/">Sebastian Thomschke</a>
*/
public class TimerPushService implements IPushService
{
private static final class PushChannelState
private static final class PushChannelState
{
protected final TimerPushChannel<?> channel;
protected Time lastPolledAt = Time.now();
Expand All @@ -62,8 +59,8 @@ protected PushChannelState(final TimerPushChannel<?> channel)

private static final Logger LOG = LoggerFactory.getLogger(TimerPushService.class);

private static final Map<Application, TimerPushService> INSTANCES =
new WeakHashMap<Application, TimerPushService>(2);
static final ConcurrentHashMap<Application, TimerPushService> INSTANCES = new ConcurrentHashMap<Application, TimerPushService>(
2);

public static TimerPushService get()
{
Expand All @@ -75,32 +72,23 @@ public static TimerPushService get(final Application application)
TimerPushService service = INSTANCES.get(application);
if (service == null)
{
service = new TimerPushService(application);
INSTANCES.put(application, service);
service = new TimerPushService();
final TimerPushService existingInstance = INSTANCES.putIfAbsent(application, service);

if (existingInstance == null) {
/*
* If this is the first instance of this service for the given application, then
* schedule the cleanup task.
*/
service.setCleanupInterval(Duration.seconds(60));
} else {
// If it is not the first instance, throw it away.
service = existingInstance;
}
}
return service;
}

/**
* Used to
* @author Rodolfo Hansen
*/
private final class TimerPushDetachListener extends PushDetachListener
{
private static final long serialVersionUID = 1L;

private TimerPushDetachListener(Application app)
{
super(app);
}

@Override
protected void destroyService()
{
_cleanupExecutor.shutdown();
}
}

private Duration _defaultPollingInterval = Duration.seconds(2);
private Duration _maxTimeLag = Duration.seconds(10);

Expand All @@ -111,34 +99,35 @@ protected void destroyService()
private ScheduledFuture<?> _cleanupFuture = null;
private final Runnable _cleanupTask = new Runnable()
{
public void run()
@Override
public void run()
{
LOG.debug("Running timer push channel cleanup task...");
final Time now = Time.now();
int count = 0;
for (final PushChannelState state : _channelStates.values())
if (now.subtract(state.lastPolledAt).greaterThan(_maxTimeLag))
for (final PushChannelState state : _channelStates.values()) {
if (now.subtract(state.lastPolledAt).greaterThan(_maxTimeLag))
{
onDisconnect(state.channel);
count++;
}
}
LOG.debug("Cleaned up {} timer push channels.", count);
}
};


private TimerPushService(Application app)
private TimerPushService()
{
app.getFrameworkSettings().setDetachListener(new TimerPushDetachListener(app));
setCleanupInterval(Duration.seconds(60));

super();
}

private TimerPushBehavior _findPushBehaviour(final Component component)
{
for (final Behavior behavior : component.getBehaviors())
if (behavior instanceof TimerPushBehavior)
return (TimerPushBehavior)behavior;
for (final Behavior behavior : component.getBehaviors()) {
if (behavior instanceof TimerPushBehavior) {
return (TimerPushBehavior)behavior;
}
}
return null;
}

Expand All @@ -150,7 +139,8 @@ private void _onConnect(final TimerPushChannel<?> pushChannel)
/**
* {@inheritDoc}
*/
public void addPushChannelDisconnectedListener(final IPushChannelDisconnectedListener listener)
@Override
public void addPushChannelDisconnectedListener(final IPushChannelDisconnectedListener listener)
{
_disconnectListeners.add(listener);
}
Expand Down Expand Up @@ -186,7 +176,8 @@ public <EventType> TimerPushChannel<EventType> installPush(final Component compo
/**
* {@inheritDoc}
*/
public <EventType> TimerPushChannel<EventType> installPushChannel(final Component component,
@Override
public <EventType> TimerPushChannel<EventType> installPushChannel(final Component component,
final IPushEventHandler<EventType> pushEventHandler)
{
return installPush(component, pushEventHandler, _defaultPollingInterval);
Expand All @@ -195,13 +186,15 @@ public <EventType> TimerPushChannel<EventType> installPushChannel(final Componen
/**
* {@inheritDoc}
*/
public boolean isConnected(final IPushChannel<?> pushChannel)
@Override
public boolean isConnected(final IPushChannel<?> pushChannel)
{
if (pushChannel instanceof TimerPushChannel)
{
final PushChannelState state = _channelStates.get(pushChannel);
if (state == null)
return false;
if (state == null) {
return false;
}

if (Time.now().subtract(state.lastPolledAt).greaterThan(_maxTimeLag))
{
Expand All @@ -214,21 +207,31 @@ public boolean isConnected(final IPushChannel<?> pushChannel)
return false;
}

void onApplicationShutdown()
{
LOG.info("Shutting down timer push service...");
_cleanupFuture.cancel(false);
_cleanupFuture = null;
_cleanupExecutor.shutdown();
INSTANCES.remove(this);
}

void onDisconnect(final TimerPushChannel<?> pushChannel)
{
if (_channelStates.remove(pushChannel) != null)
{
LOG.debug("Timer push channel {} disconnected.", pushChannel);

for (final IPushChannelDisconnectedListener listener : _disconnectListeners)
try
for (final IPushChannelDisconnectedListener listener : _disconnectListeners) {
try
{
listener.onDisconnect(pushChannel);
}
catch (final RuntimeException ex)
{
LOG.error("Failed to notify " + listener, ex);
}
}
}
}

Expand All @@ -245,8 +248,9 @@ <EventType> List<EventType> pollEvents(final TimerPushChannel<EventType> pushCha

state.lastPolledAt = Time.now();

if (state.queuedEvents.size() == 0)
return Collections.EMPTY_LIST;
if (state.queuedEvents.size() == 0) {
return Collections.EMPTY_LIST;
}

synchronized (state.queuedEventsLock)
{
Expand All @@ -259,30 +263,33 @@ <EventType> List<EventType> pollEvents(final TimerPushChannel<EventType> pushCha
/**
* {@inheritDoc}
*/
public <EventType> void publish(final IPushChannel<EventType> pushChannel, final EventType event)
@Override
public <EventType> void publish(final IPushChannel<EventType> pushChannel, final EventType event)
{
if (pushChannel instanceof TimerPushChannel)
{
if (isConnected(pushChannel))
{
final PushChannelState state = _channelStates.get(pushChannel);
if (state == null)
return;
if (state == null) {
return;
}

synchronized (state.queuedEventsLock)
{
state.queuedEvents.add(event);
}
}
}
else
LOG.warn("Unsupported push channel type {}", pushChannel);
} else {
LOG.warn("Unsupported push channel type {}", pushChannel);
}
}

/**
* {@inheritDoc}
*/
public void removePushChannelDisconnectedListener(
@Override
public void removePushChannelDisconnectedListener(
final IPushChannelDisconnectedListener listener)
{
_disconnectListeners.remove(listener);
Expand All @@ -296,8 +303,9 @@ public void setCleanupInterval(final Duration interval)
{
synchronized (this)
{
if (_cleanupFuture != null)
_cleanupFuture.cancel(false);
if (_cleanupFuture != null) {
_cleanupFuture.cancel(false);
}
_cleanupFuture = _cleanupExecutor.scheduleAtFixedRate(_cleanupTask,
interval.getMilliseconds(), interval.getMilliseconds(), TimeUnit.MILLISECONDS);
}
Expand All @@ -316,18 +324,21 @@ public void setMaxTimeLag(final Duration maxTimeLag)
/**
* {@inheritDoc}
*/
public void uninstallPushChannel(final Component component, final IPushChannel<?> pushChannel)
@Override
public void uninstallPushChannel(final Component component, final IPushChannel<?> pushChannel)
{
if (pushChannel instanceof TimerPushChannel)
{
final TimerPushBehavior behavior = _findPushBehaviour(component);
if (behavior == null)
return;
if (behavior.removePushChannel(pushChannel) == 0)
component.remove(behavior);
}
else
LOG.warn("Unsupported push channel type {}", pushChannel);
if (behavior == null) {
return;
}
if (behavior.removePushChannel(pushChannel) == 0) {
component.remove(behavior);
}
} else {
LOG.warn("Unsupported push channel type {}", pushChannel);
}
}

}

0 comments on commit 1b1f321

Please sign in to comment.