Skip to content

Commit

Permalink
Point commit
Browse files Browse the repository at this point in the history
  • Loading branch information
dmlloyd committed Jan 13, 2011
1 parent cfe5093 commit d37ac54
Show file tree
Hide file tree
Showing 11 changed files with 414 additions and 8 deletions.
44 changes: 44 additions & 0 deletions api/src/main/java/org/xnio/AbstractChannelThread.java
Expand Up @@ -22,8 +22,11 @@

package org.xnio;

import java.lang.reflect.UndeclaredThrowableException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.jboss.logging.Logger;

/**
Expand All @@ -43,6 +46,47 @@ public abstract class AbstractChannelThread implements ChannelThread {

private final Set<Listener> listenerSet = new HashSet<Listener>();

private static final Task<Runnable, Void> RUNNABLE_TASK = new Task<Runnable, Void>() {
public Void run(final Runnable parameter) {
parameter.run();
return null;
}
};

public void execute(final Runnable command) {
submit(RUNNABLE_TASK, command);
}

public <P> void execute(final Task<P, ?> task, final P parameter) {
submit(task, parameter);
}

public <P, R> R run(final Task<P, R> task, final P parameter) {
final Future<R> future = submit(task, parameter);
boolean intr = false;
try {
for (;;) try {
return future.get();
} catch (InterruptedException e) {
intr = true;
}
} catch (ExecutionException e) {
try {
throw e.getCause();
} catch (Error er) {
throw er;
} catch (RuntimeException ex) {
throw ex;
} catch (Throwable throwable) {
throw new UndeclaredThrowableException(throwable);
}
} finally {
if (intr) {
Thread.currentThread().interrupt();
}
}
}

/** {@inheritDoc} */
public final void shutdown() {
final Set<Listener> listenerSet = this.listenerSet;
Expand Down
60 changes: 60 additions & 0 deletions api/src/main/java/org/xnio/Buffers.java
Expand Up @@ -692,6 +692,66 @@ public static long[] take(LongBuffer buffer, int cnt) {
return longs;
}

/**
* Take all of the remaining bytes from the buffer and return them in an array.
*
* @param buffer the buffer to read
* @return the bytes
*/
public static byte[] take(ByteBuffer buffer) {
final byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
return bytes;
}

/**
* Take all of the remaining chars from the buffer and return them in an array.
*
* @param buffer the buffer to read
* @return the chars
*/
public static char[] take(CharBuffer buffer) {
final char[] chars = new char[buffer.remaining()];
buffer.get(chars);
return chars;
}

/**
* Take all of the remaining shorts from the buffer and return them in an array.
*
* @param buffer the buffer to read
* @return the shorts
*/
public static short[] take(ShortBuffer buffer) {
final short[] shorts = new short[buffer.remaining()];
buffer.get(shorts);
return shorts;
}

/**
* Take all of the remaining ints from the buffer and return them in an array.
*
* @param buffer the buffer to read
* @return the ints
*/
public static int[] take(IntBuffer buffer) {
final int[] ints = new int[buffer.remaining()];
buffer.get(ints);
return ints;
}

/**
* Take all of the remaining longs from the buffer and return them in an array.
*
* @param buffer the buffer to read
* @return the longs
*/
public static long[] take(LongBuffer buffer) {
final long[] longs = new long[buffer.remaining()];
buffer.get(longs);
return longs;
}

/**
* Create an object that returns the dumped form of the given byte buffer when its {@code toString()} method is called.
* Useful for logging byte buffers; if the {@code toString()} method is never called, the process of dumping the
Expand Down
8 changes: 8 additions & 0 deletions api/src/main/java/org/xnio/ChannelListeners.java
Expand Up @@ -153,6 +153,10 @@ public void handleEvent(final AcceptingChannel<C> channel) {
listenerLog.errorf("Failed to accept a connection on %s: %s", channel, e);
}
}

public String toString() {
return "Accepting listener for " + openListener;
}
};
}

Expand All @@ -179,6 +183,10 @@ public void handleEvent(final AcceptingChannel<C> channel) {
listenerLog.errorf("Failed to accept a connection on %s: %s", channel, e);
}
}

public String toString() {
return "Accepting listener for " + openListener;
}
};
}

Expand Down
60 changes: 59 additions & 1 deletion api/src/main/java/org/xnio/ChannelThread.java
Expand Up @@ -22,12 +22,53 @@

package org.xnio;

import java.util.concurrent.Executor;
import java.util.concurrent.Future;

/**
* A channel thread.
*
* @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
*/
public interface ChannelThread {
public interface ChannelThread extends Executor {

/**
* Execute a runnable task on this channel thread.
*
* @param command the task to execute
*/
void execute(Runnable command);

/**
* Submit a parameterized task on this channel thread, collecting the result.
*
* @param task the task
* @param parameter the parameter to send to the task
* @param <P> the parameter type
* @param <R> the return type
* @return the future result
*/
<P, R> Future<R> submit(Task<P, R> task, P parameter);

/**
* Execute a task on this channel thread, ignoring the result.
*
* @param task the task to run
* @param parameter the parameter to send to the task
* @param <P> the parameter type
*/
<P> void execute(Task<P, ?> task, P parameter);

/**
* Run a task in the channel thread, waiting for the result.
*
* @param task the task
* @param parameter the parameter to send to the task
* @param <P> the parameter type
* @param <R> the return type
* @return the result
*/
<P, R> R run(Task<P, R> task, P parameter);

/**
* Get the approximate load on this thread, in channels.
Expand Down Expand Up @@ -85,4 +126,21 @@ interface Listener {
*/
void handleTerminationComplete(ChannelThread thread);
}

/**
* A task to run on a channel thread.
*
* @param <P> the task parameter type
* @param <R> the task result type
*/
interface Task<P, R> {

/**
* Run the task.
*
* @param parameter the passed-in parameter
* @return the result
*/
R run(P parameter);
}
}
Expand Up @@ -27,14 +27,14 @@
/**
* A multipoint datagram channel. A multipoint datagram channel is a bound multipoint message channel.
*/
public interface BoundMultipointChannel extends MultipointMessageChannel, BoundChannel {
public interface BoundMultipointMessageChannel extends MultipointMessageChannel, BoundChannel {

/** {@inheritDoc} */
ChannelListener.Setter<? extends BoundMultipointChannel> getReadSetter();
ChannelListener.Setter<? extends BoundMultipointMessageChannel> getReadSetter();

/** {@inheritDoc} */
ChannelListener.Setter<? extends BoundMultipointChannel> getCloseSetter();
ChannelListener.Setter<? extends BoundMultipointMessageChannel> getCloseSetter();

/** {@inheritDoc} */
ChannelListener.Setter<? extends BoundMultipointChannel> getWriteSetter();
ChannelListener.Setter<? extends BoundMultipointMessageChannel> getWriteSetter();
}
Expand Up @@ -33,7 +33,7 @@
*
* @apiviz.landmark
*/
public interface MulticastMessageChannel extends BoundMultipointChannel {
public interface MulticastMessageChannel extends BoundMultipointMessageChannel {

/**
* A registration key for a multicast group.
Expand Down
Expand Up @@ -51,6 +51,7 @@ public interface ReadableMultipointMessageChannel extends SuspendableReadChannel
* and destination addresses (if available) read into the address buffer. If there is no message immediately available,
* this method will return 0.
*
* @param addressBuffer the address buffer into which the source and destination addresses should be written ({@code null} to discard that information)
* @param buffers the buffers that will hold the message
* @return the size of the received message, 0 if no message is available, and -1 if the message channel has reached an end-of-file condition
* @throws IOException if an I/O error occurs
Expand All @@ -64,6 +65,7 @@ public interface ReadableMultipointMessageChannel extends SuspendableReadChannel
* and destination addresses (if available) read into the address buffer. If there is no message immediately available,
* this method will return 0.
*
* @param addressBuffer the address buffer into which the source and destination addresses should be written ({@code null} to discard that information)
* @param buffers the buffers that will hold the message
* @param offs the offset into the array of buffers of the first buffer to read into
* @param len the number of buffers to fill
Expand Down

0 comments on commit d37ac54

Please sign in to comment.