Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UPDATED: Channel(Sink) implementation for connecting to InputStream and OutputStream objects #22

Merged
merged 1 commit into from
Oct 29, 2011
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2011 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.iostream;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketAddress;

/**
* A {@link java.net.SocketAddress} implementation holding an {@link java.io.InputStream} and an {@link java.io.OutputStream} instance used as
* "remote" address to connect to with a {@link IOStreamChannel}.
*
* @author Daniel Bimschas
* @author Dennis Pfisterer
*/
public class IOStreamAddress extends SocketAddress {

private final InputStream inputStream;

private final OutputStream outputStream;

public IOStreamAddress(final InputStream inputStream, final OutputStream outputStream) {

this.inputStream = inputStream;
this.outputStream = outputStream;
}

public InputStream getInputStream() {
return inputStream;
}

public OutputStream getOutputStream() {
return outputStream;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2011 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.iostream;


import org.jboss.netty.channel.*;

import java.net.SocketAddress;

/**
* A channel to an {@link java.io.InputStream} and an {@link java.io.OutputStream}.
*
* @author Daniel Bimschas
* @author Dennis Pfisterer
*/
public class IOStreamChannel extends AbstractChannel {

IOStreamChannel(final ChannelFactory factory, final ChannelPipeline pipeline, final ChannelSink sink) {
super(null, factory, pipeline, sink);
}

@Override
public ChannelConfig getConfig() {
return ((IOStreamChannelSink) getPipeline().getSink()).getConfig();
}

@Override
public boolean isBound() {
return ((IOStreamChannelSink) getPipeline().getSink()).isBound();
}

@Override
public boolean isConnected() {
return ((IOStreamChannelSink) getPipeline().getSink()).isConnected();
}

@Override
public SocketAddress getLocalAddress() {
return null;
}

@Override
public SocketAddress getRemoteAddress() {
return ((IOStreamChannelSink) getPipeline().getSink()).getRemoteAddress();
}

@Override
public ChannelFuture bind(final SocketAddress localAddress) {
throw new UnsupportedOperationException();
}

@Override
public ChannelFuture unbind() {
throw new UnsupportedOperationException();
}

void doSetClosed() {
setClosed();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2011 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.iostream;


import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.util.internal.ExecutorUtil;

import java.util.concurrent.ExecutorService;

/**
* A {@link org.jboss.netty.channel.ChannelFactory} for creating {@link IOStreamChannel} instances.
*
* @author Daniel Bimschas
* @author Dennis Pfisterer
*/
public class IOStreamChannelFactory implements ChannelFactory {

private final ChannelGroup channels = new DefaultChannelGroup("IOStreamChannelFactory-ChannelGroup");

private final ExecutorService executorService;

public IOStreamChannelFactory(ExecutorService executorService) {
this.executorService = executorService;
}

@Override
public Channel newChannel(final ChannelPipeline pipeline) {
IOStreamChannelSink sink = new IOStreamChannelSink(executorService);
IOStreamChannel channel = new IOStreamChannel(this, pipeline, sink);
sink.setChannel(channel);
channels.add(channel);
return channel;
}

@Override
public void releaseExternalResources() {
ChannelGroupFuture close = channels.close();
close.awaitUninterruptibly();
ExecutorUtil.terminate(executorService);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright 2011 Red Hat, Inc.
*
* Red Hat licenses this file to you under the Apache License, version 2.0
* (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.jboss.netty.channel.iostream;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.*;

import java.io.OutputStream;
import java.io.PushbackInputStream;
import java.util.concurrent.ExecutorService;

import static org.jboss.netty.channel.Channels.*;

/**
* A {@link org.jboss.netty.channel.ChannelSink} implementation which reads from an {@link java.io.InputStream} and
* writes to an {@link java.io.OutputStream}.
*
* @author Daniel Bimschas
* @author Dennis Pfisterer
*/
public class IOStreamChannelSink extends AbstractChannelSink {

private static class ReadRunnable implements Runnable {

private final IOStreamChannelSink channelSink;

public ReadRunnable(final IOStreamChannelSink channelSink) {
this.channelSink = channelSink;
}

@Override
public void run() {

PushbackInputStream in = channelSink.inputStream;

while (channelSink.channel.isOpen()) {

byte[] buf;
int readBytes;
try {
int bytesToRead = in.available();
if (bytesToRead > 0) {
buf = new byte[bytesToRead];
readBytes = in.read(buf);
} else {
// peek into the stream if it was closed (value=-1)
int b = in.read();
if (b < 0) {
break;
}
// push back the byte which was read too much
in.unread(b);
continue;
}
} catch (Throwable t) {
if (!channelSink.channel.getCloseFuture().isDone()) {
fireExceptionCaught(channelSink.channel, t);
}
break;
}

fireMessageReceived(channelSink.channel, ChannelBuffers.wrappedBuffer(buf, 0, readBytes));
}

// Clean up.
close(channelSink.channel);
}
}

private final ExecutorService executorService;

private IOStreamChannel channel;

public IOStreamChannelSink(final ExecutorService executorService) {
this.executorService = executorService;
}

public boolean isConnected() {
return inputStream != null && outputStream != null;
}

public IOStreamAddress getRemoteAddress() {
return remoteAddress;
}

public boolean isBound() {
return false;
}

public ChannelConfig getConfig() {
return config;
}

public void setChannel(final IOStreamChannel channel) {
this.channel = channel;
}

private IOStreamAddress remoteAddress;

private OutputStream outputStream;

private PushbackInputStream inputStream;

private ChannelConfig config = new DefaultChannelConfig();

@Override
public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {

final ChannelFuture future = e.getFuture();

if (e instanceof ChannelStateEvent) {

final ChannelStateEvent stateEvent = (ChannelStateEvent) e;
final ChannelState state = stateEvent.getState();
final Object value = stateEvent.getValue();

switch (state) {

case OPEN:
if (Boolean.FALSE.equals(value)) {
outputStream = null;
inputStream = null;
((IOStreamChannel) e.getChannel()).doSetClosed();
}
break;

case BOUND:
throw new UnsupportedOperationException();

case CONNECTED:
if (value != null) {
remoteAddress = (IOStreamAddress) value;
outputStream = remoteAddress.getOutputStream();
inputStream = new PushbackInputStream(remoteAddress.getInputStream());
executorService.execute(new ReadRunnable(this));
future.setSuccess();
}
break;

case INTEREST_OPS:
// TODO implement
throw new UnsupportedOperationException();

}

} else if (e instanceof MessageEvent) {

final MessageEvent event = (MessageEvent) e;
if (event.getMessage() instanceof ChannelBuffer) {

final ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
buffer.readBytes(outputStream, buffer.readableBytes());
outputStream.flush();
future.setSuccess();

} else {
throw new IllegalArgumentException(
"Only ChannelBuffer objects are supported to be written onto the IOStreamChannelSink! "
+ "Please check if the encoder pipeline is configured correctly."
);
}
}
}
}
Loading