Skip to content

Commit

Permalink
Merge pull request #22 from pfisterer/master. Which adds a
Browse files Browse the repository at this point in the history
IOStreamChannelFactory which can be used to connect to InputStream and
OutputStream
  • Loading branch information
normanmaurer committed Oct 29, 2011
1 parent b1c2706 commit d742a11
Show file tree
Hide file tree
Showing 5 changed files with 453 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2011 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.iostream;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketAddress;

/**
* A {@link java.net.SocketAddress} implementation holding an {@link java.io.InputStream} and an {@link java.io.OutputStream} instance used as
* "remote" address to connect to with a {@link IOStreamChannel}.
*
* @author Daniel Bimschas
* @author Dennis Pfisterer
*/
public class IOStreamAddress extends SocketAddress {

private final InputStream inputStream;

private final OutputStream outputStream;

public IOStreamAddress(final InputStream inputStream, final OutputStream outputStream) {

this.inputStream = inputStream;
this.outputStream = outputStream;
}

public InputStream getInputStream() {
return inputStream;
}

public OutputStream getOutputStream() {
return outputStream;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2011 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.iostream;


import org.jboss.netty.channel.*;

import java.net.SocketAddress;

/**
* A channel to an {@link java.io.InputStream} and an {@link java.io.OutputStream}.
*
* @author Daniel Bimschas
* @author Dennis Pfisterer
*/
public class IOStreamChannel extends AbstractChannel {

IOStreamChannel(final ChannelFactory factory, final ChannelPipeline pipeline, final ChannelSink sink) {
super(null, factory, pipeline, sink);
}

@Override
public ChannelConfig getConfig() {
return ((IOStreamChannelSink) getPipeline().getSink()).getConfig();
}

@Override
public boolean isBound() {
return ((IOStreamChannelSink) getPipeline().getSink()).isBound();
}

@Override
public boolean isConnected() {
return ((IOStreamChannelSink) getPipeline().getSink()).isConnected();
}

@Override
public SocketAddress getLocalAddress() {
return null;
}

@Override
public SocketAddress getRemoteAddress() {
return ((IOStreamChannelSink) getPipeline().getSink()).getRemoteAddress();
}

@Override
public ChannelFuture bind(final SocketAddress localAddress) {
throw new UnsupportedOperationException();
}

@Override
public ChannelFuture unbind() {
throw new UnsupportedOperationException();
}

void doSetClosed() {
setClosed();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2011 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.iostream;


import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.util.internal.ExecutorUtil;

import java.util.concurrent.ExecutorService;

/**
* A {@link org.jboss.netty.channel.ChannelFactory} for creating {@link IOStreamChannel} instances.
*
* @author Daniel Bimschas
* @author Dennis Pfisterer
*/
public class IOStreamChannelFactory implements ChannelFactory {

private final ChannelGroup channels = new DefaultChannelGroup("IOStreamChannelFactory-ChannelGroup");

private final ExecutorService executorService;

public IOStreamChannelFactory(ExecutorService executorService) {
this.executorService = executorService;
}

@Override
public Channel newChannel(final ChannelPipeline pipeline) {
IOStreamChannelSink sink = new IOStreamChannelSink(executorService);
IOStreamChannel channel = new IOStreamChannel(this, pipeline, sink);
sink.setChannel(channel);
channels.add(channel);
return channel;
}

@Override
public void releaseExternalResources() {
ChannelGroupFuture close = channels.close();
close.awaitUninterruptibly();
ExecutorUtil.terminate(executorService);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright 2011 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.iostream;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.*;

import java.io.OutputStream;
import java.io.PushbackInputStream;
import java.util.concurrent.ExecutorService;

import static org.jboss.netty.channel.Channels.*;

/**
* A {@link org.jboss.netty.channel.ChannelSink} implementation which reads from an {@link java.io.InputStream} and
* writes to an {@link java.io.OutputStream}.
*
* @author Daniel Bimschas
* @author Dennis Pfisterer
*/
public class IOStreamChannelSink extends AbstractChannelSink {

private static class ReadRunnable implements Runnable {

private final IOStreamChannelSink channelSink;

public ReadRunnable(final IOStreamChannelSink channelSink) {
this.channelSink = channelSink;
}

@Override
public void run() {

PushbackInputStream in = channelSink.inputStream;

while (channelSink.channel.isOpen()) {

byte[] buf;
int readBytes;
try {
int bytesToRead = in.available();
if (bytesToRead > 0) {
buf = new byte[bytesToRead];
readBytes = in.read(buf);
} else {
// peek into the stream if it was closed (value=-1)
int b = in.read();
if (b < 0) {
break;
}
// push back the byte which was read too much
in.unread(b);
continue;
}
} catch (Throwable t) {
if (!channelSink.channel.getCloseFuture().isDone()) {
fireExceptionCaught(channelSink.channel, t);
}
break;
}

fireMessageReceived(channelSink.channel, ChannelBuffers.wrappedBuffer(buf, 0, readBytes));
}

// Clean up.
close(channelSink.channel);
}
}

private final ExecutorService executorService;

private IOStreamChannel channel;

public IOStreamChannelSink(final ExecutorService executorService) {
this.executorService = executorService;
}

public boolean isConnected() {
return inputStream != null && outputStream != null;
}

public IOStreamAddress getRemoteAddress() {
return remoteAddress;
}

public boolean isBound() {
return false;
}

public ChannelConfig getConfig() {
return config;
}

public void setChannel(final IOStreamChannel channel) {
this.channel = channel;
}

private IOStreamAddress remoteAddress;

private OutputStream outputStream;

private PushbackInputStream inputStream;

private ChannelConfig config = new DefaultChannelConfig();

@Override
public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {

final ChannelFuture future = e.getFuture();

if (e instanceof ChannelStateEvent) {

final ChannelStateEvent stateEvent = (ChannelStateEvent) e;
final ChannelState state = stateEvent.getState();
final Object value = stateEvent.getValue();

switch (state) {

case OPEN:
if (Boolean.FALSE.equals(value)) {
outputStream = null;
inputStream = null;
((IOStreamChannel) e.getChannel()).doSetClosed();
}
break;

case BOUND:
throw new UnsupportedOperationException();

case CONNECTED:
if (value != null) {
remoteAddress = (IOStreamAddress) value;
outputStream = remoteAddress.getOutputStream();
inputStream = new PushbackInputStream(remoteAddress.getInputStream());
executorService.execute(new ReadRunnable(this));
future.setSuccess();
}
break;

case INTEREST_OPS:
// TODO implement
throw new UnsupportedOperationException();

}

} else if (e instanceof MessageEvent) {

final MessageEvent event = (MessageEvent) e;
if (event.getMessage() instanceof ChannelBuffer) {

final ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
buffer.readBytes(outputStream, buffer.readableBytes());
outputStream.flush();
future.setSuccess();

} else {
throw new IllegalArgumentException(
"Only ChannelBuffer objects are supported to be written onto the IOStreamChannelSink! "
+ "Please check if the encoder pipeline is configured correctly."
);
}
}
}
}
Loading

0 comments on commit d742a11

Please sign in to comment.