Skip to content

Commit

Permalink
Improve generics of AtmosphereResource to AtmosphereResource<?,?>
Browse files Browse the repository at this point in the history
  • Loading branch information
jfarcand committed Jun 4, 2010
1 parent 1f7ebee commit 9311dde
Show file tree
Hide file tree
Showing 12 changed files with 74 additions and 84 deletions.
Expand Up @@ -43,7 +43,6 @@
import org.atmosphere.cpr.AtmosphereServlet.Action;
import org.atmosphere.cpr.AtmosphereServlet.AtmosphereConfig;
import org.atmosphere.cpr.CometSupport;
import org.atmosphere.websocket.WebSocketSupport;
import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationSupport;

Expand Down
Expand Up @@ -365,7 +365,7 @@ public Action cancelled(HttpServletRequest req, HttpServletResponse res)

void shutdown() {
closedDetector.shutdownNow();
for (AtmosphereResource r : aliveRequests.values()) {
for (AtmosphereResource<HttpServletRequest,HttpServletResponse> r : aliveRequests.values()) {
try {
r.resume();
} catch (Throwable t) {
Expand Down
46 changes: 24 additions & 22 deletions modules/cpr/src/main/java/org/atmosphere/cpr/Broadcaster.java
Expand Up @@ -38,16 +38,18 @@

package org.atmosphere.cpr;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
* A Broadcaster is responsible for delivering messages to its subscribed
* {@link AtmosphereResource}, which are representing a suspended response.
* {@link AtmosphereResource} can be added using {@link Broadcaster#addAtmosphereResource},
* {@link AtmosphereResource<?,?>}, which are representing a suspended response.
* {@link AtmosphereResource<?,?>} can be added using {@link Broadcaster#addAtmosphereResource},
* so when {@link #broadcast(java.lang.Object)} execute,
* {@link AtmosphereHandler#onStateChange(org.atmosphere.cpr.AtmosphereResourceEvent)} will
* be invoked and the suspended connection will have a chance to write the
Expand All @@ -60,7 +62,7 @@
* the operation has completed.
* <br>
* One final word on Broadcaster: by default, a Broadcaster will broadcast using
* all {@link AtmosphereResource} on which the response has been suspended, e.g. {AtmosphereResource#suspend()}
* all {@link AtmosphereResource<?,?>} on which the response has been suspended, e.g. {AtmosphereResource<?,?>#suspend()}
* has been invoked. This behavior is configurable and you can configure it by invoking the
* {@link Broadcaster#setScope(org.atmosphere.cpr.Broadcaster.SCOPE)} ):<ul>
* <li>REQUEST: broadcast events only to the AtmosphereResourceEvent associated with the current request.</li>
Expand Down Expand Up @@ -135,43 +137,43 @@ public enum SCOPE {

/**
* Broadcast the {@link Object} to all suspended response, e.g. invoke
* {@link AtmosphereHandler#onStateChange} with an instance of {@link AtmosphereResource}, representing
* {@link AtmosphereHandler#onStateChange} with an instance of {@link AtmosphereResource<?,?>}, representing
* a single suspended response..
*
* @param o and {@link Object} to be broadcasted.
* @param resource an {@link AtmosphereResource}
* @param resource an {@link AtmosphereResource<?,?>}
* @return a {@link Future} that can be used to synchronize using the {@link Future#get()}
*/
public Future<Object> broadcast(Object o, AtmosphereResource resource);
public Future<Object> broadcast(Object o, AtmosphereResource<?,?> resource);

/**
* Broadcast the {@link Object} to all suspended response, e.g. invoke
* {@link AtmosphereHandler#onStateChange} with a {@link Set} of {@link AtmosphereResource},
* {@link AtmosphereHandler#onStateChange} with a {@link Set} of {@link AtmosphereResource<?,?>},
* representing a set of {@link AtmosphereHandler}.
*
* @param o and {@link Object} to be broadcasted.
* @param subset a Set of {@link AtmosphereResource}
* @param subset a Set of {@link AtmosphereResource<?,?>}
* @return a {@link Future} that can be used to synchronize using the {@link Future#get()}
*/
public Future<Object> broadcast(Object o, Set<AtmosphereResource> subset);
public Future<Object> broadcast(Object o, Set<AtmosphereResource<?,?>> subset);

/**
* Add a {@link AtmosphereResource} to the list of item to be notified when
* Add a {@link AtmosphereResource<?,?>} to the list of item to be notified when
* the {@link Broadcaster#broadcast} is invoked.
*
* @param resource an {@link AtmosphereResource}
* @return {@link AtmosphereResource} if added, or null if it was already there.
* @param resource an {@link AtmosphereResource<?,?>}
* @return {@link AtmosphereResource<?,?>} if added, or null if it was already there.
*/
public AtmosphereResource addAtmosphereResource(AtmosphereResource resource);
public AtmosphereResource<?,?> addAtmosphereResource(AtmosphereResource<?,?> resource);

/**
* Remove a {@link AtmosphereResource} from the list of item to be notified when
* Remove a {@link AtmosphereResource<?,?>} from the list of item to be notified when
* the {@link Broadcaster#broadcast} is invoked.
*
* @param resource an {@link AtmosphereResource}
* @return {@link AtmosphereResource} if removed, or null if it was not.
* @param resource an {@link AtmosphereResource<?,?>}
* @return {@link AtmosphereResource<?,?>} if removed, or null if it was not.
*/
public AtmosphereResource removeAtmosphereResource(AtmosphereResource resource);
public AtmosphereResource<?,?> removeAtmosphereResource(AtmosphereResource<?,?> resource);

/**
* Set the {@link BroadcasterConfig} instance.
Expand All @@ -193,12 +195,12 @@ public enum SCOPE {
public void destroy();

/**
* Return an {@link Iterator} of {@link AtmosphereResource}.
* Return an {@link List} of {@link AtmosphereResource<?,?>}.
*
* @return {@link Iterator} of {@link AtmosphereResource} associated with this {@link Broadcaster}.
* @see org.atmosphere.cpr.Broadcaster#addAtmosphereResource(AtmosphereResource)
* @return {@link List} of {@link AtmosphereResource<?,?>} associated with this {@link Broadcaster}.
* @see org.atmosphere.cpr.Broadcaster#addAtmosphereResource(AtmosphereResource<?,?>)
*/
public Iterator<AtmosphereResource> getAtmosphereResources();
public Collection<AtmosphereResource<?,?>> getAtmosphereResources();

/**
* Set the scope.
Expand Down Expand Up @@ -229,7 +231,7 @@ public enum SCOPE {
public String getID();

/**
* Resume all suspended responses ({@link AtmosphereResource}) added via
* Resume all suspended responses ({@link AtmosphereResource<?,?>}) added via
* {@link Broadcaster#addAtmosphereResource}.
*/
public void resumeAll();
Expand Down
Expand Up @@ -43,6 +43,8 @@

import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
Expand All @@ -67,8 +69,8 @@
*/
public class DefaultBroadcaster implements Broadcaster {

protected final ConcurrentLinkedQueue<AtmosphereResource> events =
new ConcurrentLinkedQueue<AtmosphereResource>();
protected final Collection<AtmosphereResource<?,?>> events =
new ConcurrentLinkedQueue<AtmosphereResource<?,?>>();
protected BroadcasterConfig bc = AtmosphereServlet.getBroadcasterConfig();
protected final BlockingQueue<Entry> messages =
new LinkedBlockingQueue<Entry>();
Expand Down Expand Up @@ -111,8 +113,8 @@ public void destroy() {
/**
* {@inheritDoc}
*/
public Iterator<AtmosphereResource> getAtmosphereResources() {
return events.iterator();
public Collection<AtmosphereResource<?,?>> getAtmosphereResources() {
return Collections.unmodifiableCollection(events);
}

/**
Expand Down Expand Up @@ -161,7 +163,7 @@ public String getID() {
* {@inheritDoc}
*/
public void resumeAll() {
for (AtmosphereResource r : events) {
for (AtmosphereResource<?,?> r : events) {
r.resume();
}
}
Expand All @@ -170,12 +172,12 @@ public class Entry {

public Object message;
public Object eventsToPush;
public Future f;
public Future<?> future;

public Entry(Object message, Object eventsToPush, Future f) {
public Entry(Object message, Object eventsToPush, Future future) {
this.message = message;
this.eventsToPush = eventsToPush;
this.f = f;
this.future = future;
}
}

Expand Down Expand Up @@ -205,10 +207,10 @@ public void run() {
LoggerUtils.getLogger().log(Level.SEVERE, null, ex);
} finally {
if (msg != null) {
if (msg.f instanceof BroadcasterFuture) {
((BroadcasterFuture) msg.f).done();
if (msg.future instanceof BroadcasterFuture) {
((BroadcasterFuture) msg.future).done();
} else {
msg.f.cancel(true);
msg.future.cancel(true);
}
}
}
Expand All @@ -223,8 +225,8 @@ protected void push(Entry msg) {
Iterator<Entry> i = delayedBroadcast.iterator();
while (i.hasNext()) {
Entry e = i.next();
if (!(e.f instanceof BroadcasterFuture)) {
e.f.cancel(true);
if (!(e.future instanceof BroadcasterFuture)) {
e.future.cancel(true);
}
try {
// Append so we do a single flush
Expand All @@ -237,8 +239,8 @@ protected void push(Entry msg) {
}
} finally {
i.remove();
if (e.f instanceof BroadcasterFuture) {
((BroadcasterFuture) e.f).done();
if (e.future instanceof BroadcasterFuture) {
((BroadcasterFuture) e.future).done();
}
}
}
Expand All @@ -249,20 +251,20 @@ protected void push(Entry msg) {
}

if (msg.eventsToPush == null) {
for (AtmosphereResource r : events) {
for (AtmosphereResource<?,?> r : events) {
push(r, msg.message);
}
} else if (msg.eventsToPush instanceof AtmosphereResource) {
push((AtmosphereResource) msg.eventsToPush, msg.message);
} else if (msg.eventsToPush instanceof AtmosphereResource<?,?>) {
push((AtmosphereResource<?,?>) msg.eventsToPush, msg.message);
} else if (msg.eventsToPush instanceof Set) {
Set<AtmosphereResource> sub = (Set<AtmosphereResource>) msg.eventsToPush;
for (AtmosphereResource r : sub) {
Set<AtmosphereResource<?,?>> sub = (Set<AtmosphereResource<?,?>>) msg.eventsToPush;
for (AtmosphereResource<?,?> r : sub) {
push(r, msg.message);
}
}
}

protected void push(AtmosphereResource r, Object msg) {
protected void push(AtmosphereResource<?,?> r, Object msg) {
AtmosphereResourceEvent e = null;
synchronized (r) {
if (!r.getAtmosphereResourceEvent().isSuspended())
Expand All @@ -284,14 +286,14 @@ protected void push(AtmosphereResource r, Object msg) {
}
}

protected void checkCachedAndPush(AtmosphereResource r, AtmosphereResourceEvent e) {
protected void checkCachedAndPush(AtmosphereResource<?,?> r, AtmosphereResourceEvent e) {
retrieveTrackedBroadcast(r, e);
if (e.getMessage() instanceof List && !((List) e.getMessage()).isEmpty()) {
broadcast(r, e);
}
}

protected boolean retrieveTrackedBroadcast(final AtmosphereResource r, final AtmosphereResourceEvent e) {
protected boolean retrieveTrackedBroadcast(final AtmosphereResource<?,?> r, final AtmosphereResourceEvent e) {
List<Object> missedMsg = broadcasterCache.retrieveFromCache(r);
if (!missedMsg.isEmpty()) {
e.setMessage(missedMsg);
Expand All @@ -300,11 +302,11 @@ protected boolean retrieveTrackedBroadcast(final AtmosphereResource r, final Atm
return false;
}

protected void trackBroadcastMessage(final AtmosphereResource r, Object msg) {
protected void trackBroadcastMessage(final AtmosphereResource<?,?> r, Object msg) {
broadcasterCache.addToCache(r, msg);
}

protected void broadcast(final AtmosphereResource r, final AtmosphereResourceEvent e) {
protected void broadcast(final AtmosphereResource<?,?> r, final AtmosphereResourceEvent e) {
try {
r.getAtmosphereConfig().getAtmosphereHandler(this).onStateChange(e);
} catch (IOException ex) {
Expand All @@ -314,7 +316,7 @@ protected void broadcast(final AtmosphereResource r, final AtmosphereResourceEve
}
}

protected void onException(Throwable t, AtmosphereResource r) {
protected void onException(Throwable t, AtmosphereResource<?,?> r) {
if (LoggerUtils.getLogger().isLoggable(Level.FINE)) {
LoggerUtils.getLogger().log(Level.FINE, "", t);
}
Expand Down Expand Up @@ -355,7 +357,7 @@ protected Object filter(Object msg) {
/**
* {@inheritDoc}
*/
public Future<Object> broadcast(Object msg, AtmosphereResource r) {
public Future<Object> broadcast(Object msg, AtmosphereResource<?,?> r) {
start();
msg = filter(msg);
if (msg == null) return null;
Expand All @@ -368,7 +370,7 @@ public Future<Object> broadcast(Object msg, AtmosphereResource r) {
/**
* {@inheritDoc}
*/
public Future<Object> broadcast(Object msg, Set<AtmosphereResource> subset) {
public Future<Object> broadcast(Object msg, Set<AtmosphereResource<?,?>> subset) {
start();
msg = filter(msg);
if (msg == null) return null;
Expand All @@ -381,7 +383,7 @@ public Future<Object> broadcast(Object msg, Set<AtmosphereResource> subset) {
/**
* {@inheritDoc}
*/
public AtmosphereResource addAtmosphereResource(AtmosphereResource r) {
public AtmosphereResource<?,?> addAtmosphereResource(AtmosphereResource<?,?> r) {
if (events.contains(r)) {
return r;
}
Expand All @@ -391,15 +393,15 @@ public AtmosphereResource addAtmosphereResource(AtmosphereResource r) {
BroadcasterFactory.getDefault().add(this, name);
}

events.offer(r);
events.add(r);
checkCachedAndPush(r, r.getAtmosphereResourceEvent());
return r;
}

/**
* {@inheritDoc}
*/
public AtmosphereResource removeAtmosphereResource(AtmosphereResource r) {
public AtmosphereResource<?,?> removeAtmosphereResource(AtmosphereResource r) {
if (!events.contains(r)) {
return null;
}
Expand All @@ -412,15 +414,6 @@ public AtmosphereResource removeAtmosphereResource(AtmosphereResource r) {
return r;
}

/**
* Return the list of AtmosphereResource registered with this Broadcaster
*
* @return
*/
protected ConcurrentLinkedQueue<AtmosphereResource> atmosphereResources() {
return events;
}

/**
* Set the {@link BroadcasterConfig} instance.
*
Expand Down Expand Up @@ -464,7 +457,7 @@ public Object call() throws Exception {
return msg;
}
}, delay, t);
e.f = f;
e.future = f;
}
delayedBroadcast.offer(e);
return f;
Expand Down
Expand Up @@ -91,7 +91,7 @@ public BroadcasterFuture<Object> broadcast(Object msg) {
* {@inheritDoc}
*/
@Override
public BroadcasterFuture<Object> broadcast(Object msg, AtmosphereResource r) {
public BroadcasterFuture<Object> broadcast(Object msg, AtmosphereResource<?,?> r) {
msg = filter(msg);
if (msg == null) return null;
BroadcasterFuture<Object> f = new BroadcasterFuture<Object>(msg);
Expand All @@ -104,7 +104,7 @@ public BroadcasterFuture<Object> broadcast(Object msg, AtmosphereResource r) {
* {@inheritDoc}
*/
@Override
public BroadcasterFuture<Object> broadcast(Object msg, Set<AtmosphereResource> subset) {
public BroadcasterFuture<Object> broadcast(Object msg, Set<AtmosphereResource<?,?>> subset) {
msg = filter(msg);
if (msg == null) return null;

Expand Down

0 comments on commit 9311dde

Please sign in to comment.