Skip to content
This repository

UPDATED: Channel(Sink) implementation for connecting to InputStream and OutputStream objects #22

Merged
merged 1 commit into from over 2 years ago

2 participants

Dennis Pfisterer Norman Maurer
Dennis Pfisterer
Collaborator

Hi Trustin,
Here is the updated pull request (please ignore #16).
Best,
Dennis


Hi Trustin,

In the spirit of our RXTX implementation we've created a Channel(Sink) implementation for connecting to InputStream and OutputStream objects. The general purpose of this is to be able to connect to legacy streams. E.g. in our case, we connect to streams that are exposed by device drivers. However, it can be used to connect to arbitrary Input- and OutputStreams.

There's also an example class that connects to System.out and System.in for demonstration purposes.

We hope it finds its way into the 4.0 release.

Best regards,
Daniel & Dennis

Norman Maurer normanmaurer merged commit a017961 into from October 29, 2011
Norman Maurer normanmaurer closed this October 29, 2011
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Showing 1 unique commit by 1 author.

Aug 01, 2011
Dennis Pfisterer Updated version of netty-iostream channels 07803b3
This page is out of date. Refresh to see the latest.
48  src/main/java/org/jboss/netty/channel/iostream/IOStreamAddress.java
... ...
@@ -0,0 +1,48 @@
  1
+/*
  2
+ * Copyright 2011 Red Hat, Inc.
  3
+ *
  4
+ * Red Hat licenses this file to you under the Apache License, version 2.0
  5
+ * (the "License"); you may not use this file except in compliance with the
  6
+ * License.  You may obtain a copy of the License at:
  7
+ *
  8
+ *    http://www.apache.org/licenses/LICENSE-2.0
  9
+ *
  10
+ * Unless required by applicable law or agreed to in writing, software
  11
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
  13
+ * License for the specific language governing permissions and limitations
  14
+ * under the License.
  15
+ */
  16
+package org.jboss.netty.channel.iostream;
  17
+
  18
+import java.io.InputStream;
  19
+import java.io.OutputStream;
  20
+import java.net.SocketAddress;
  21
+
  22
+/**
  23
+ * A {@link java.net.SocketAddress} implementation holding an {@link java.io.InputStream} and an {@link java.io.OutputStream} instance used as
  24
+ * "remote" address to connect to with a {@link IOStreamChannel}.
  25
+ *
  26
+ * @author Daniel Bimschas
  27
+ * @author Dennis Pfisterer
  28
+ */
  29
+public class IOStreamAddress extends SocketAddress {
  30
+
  31
+	private final InputStream inputStream;
  32
+
  33
+	private final OutputStream outputStream;
  34
+
  35
+	public IOStreamAddress(final InputStream inputStream, final OutputStream outputStream) {
  36
+
  37
+		this.inputStream = inputStream;
  38
+		this.outputStream = outputStream;
  39
+	}
  40
+
  41
+	public InputStream getInputStream() {
  42
+		return inputStream;
  43
+	}
  44
+
  45
+	public OutputStream getOutputStream() {
  46
+		return outputStream;
  47
+	}
  48
+}
73  src/main/java/org/jboss/netty/channel/iostream/IOStreamChannel.java
... ...
@@ -0,0 +1,73 @@
  1
+/*
  2
+ * Copyright 2011 Red Hat, Inc.
  3
+ *
  4
+ * Red Hat licenses this file to you under the Apache License, version 2.0
  5
+ * (the "License"); you may not use this file except in compliance with the
  6
+ * License.  You may obtain a copy of the License at:
  7
+ *
  8
+ *    http://www.apache.org/licenses/LICENSE-2.0
  9
+ *
  10
+ * Unless required by applicable law or agreed to in writing, software
  11
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
  13
+ * License for the specific language governing permissions and limitations
  14
+ * under the License.
  15
+ */
  16
+package org.jboss.netty.channel.iostream;
  17
+
  18
+
  19
+import org.jboss.netty.channel.*;
  20
+
  21
+import java.net.SocketAddress;
  22
+
  23
+/**
  24
+ * A channel to an {@link java.io.InputStream} and an {@link java.io.OutputStream}.
  25
+ *
  26
+ * @author Daniel Bimschas
  27
+ * @author Dennis Pfisterer
  28
+ */
  29
+public class IOStreamChannel extends AbstractChannel {
  30
+
  31
+	IOStreamChannel(final ChannelFactory factory, final ChannelPipeline pipeline, final ChannelSink sink) {
  32
+		super(null, factory, pipeline, sink);
  33
+	}
  34
+
  35
+	@Override
  36
+	public ChannelConfig getConfig() {
  37
+		return ((IOStreamChannelSink) getPipeline().getSink()).getConfig();
  38
+	}
  39
+
  40
+	@Override
  41
+	public boolean isBound() {
  42
+		return ((IOStreamChannelSink) getPipeline().getSink()).isBound();
  43
+	}
  44
+
  45
+	@Override
  46
+	public boolean isConnected() {
  47
+		return ((IOStreamChannelSink) getPipeline().getSink()).isConnected();
  48
+	}
  49
+
  50
+	@Override
  51
+	public SocketAddress getLocalAddress() {
  52
+		return null;
  53
+	}
  54
+
  55
+	@Override
  56
+	public SocketAddress getRemoteAddress() {
  57
+		return ((IOStreamChannelSink) getPipeline().getSink()).getRemoteAddress();
  58
+	}
  59
+
  60
+	@Override
  61
+	public ChannelFuture bind(final SocketAddress localAddress) {
  62
+		throw new UnsupportedOperationException();
  63
+	}
  64
+
  65
+	@Override
  66
+	public ChannelFuture unbind() {
  67
+		throw new UnsupportedOperationException();
  68
+	}
  69
+
  70
+	void doSetClosed() {
  71
+		setClosed();
  72
+	}
  73
+}
60  src/main/java/org/jboss/netty/channel/iostream/IOStreamChannelFactory.java
... ...
@@ -0,0 +1,60 @@
  1
+/*
  2
+ * Copyright 2011 Red Hat, Inc.
  3
+ *
  4
+ * Red Hat licenses this file to you under the Apache License, version 2.0
  5
+ * (the "License"); you may not use this file except in compliance with the
  6
+ * License.  You may obtain a copy of the License at:
  7
+ *
  8
+ *    http://www.apache.org/licenses/LICENSE-2.0
  9
+ *
  10
+ * Unless required by applicable law or agreed to in writing, software
  11
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
  13
+ * License for the specific language governing permissions and limitations
  14
+ * under the License.
  15
+ */
  16
+package org.jboss.netty.channel.iostream;
  17
+
  18
+
  19
+import org.jboss.netty.channel.Channel;
  20
+import org.jboss.netty.channel.ChannelFactory;
  21
+import org.jboss.netty.channel.ChannelPipeline;
  22
+import org.jboss.netty.channel.group.ChannelGroup;
  23
+import org.jboss.netty.channel.group.ChannelGroupFuture;
  24
+import org.jboss.netty.channel.group.DefaultChannelGroup;
  25
+import org.jboss.netty.util.internal.ExecutorUtil;
  26
+
  27
+import java.util.concurrent.ExecutorService;
  28
+
  29
+/**
  30
+ * A {@link org.jboss.netty.channel.ChannelFactory} for creating {@link IOStreamChannel} instances.
  31
+ *
  32
+ * @author Daniel Bimschas
  33
+ * @author Dennis Pfisterer
  34
+ */
  35
+public class IOStreamChannelFactory implements ChannelFactory {
  36
+
  37
+    private final ChannelGroup channels = new DefaultChannelGroup("IOStreamChannelFactory-ChannelGroup");
  38
+
  39
+    private final ExecutorService executorService;
  40
+
  41
+    public IOStreamChannelFactory(ExecutorService executorService) {
  42
+        this.executorService = executorService;
  43
+    }
  44
+
  45
+    @Override
  46
+    public Channel newChannel(final ChannelPipeline pipeline) {
  47
+        IOStreamChannelSink sink = new IOStreamChannelSink(executorService);
  48
+        IOStreamChannel channel = new IOStreamChannel(this, pipeline, sink);
  49
+        sink.setChannel(channel);
  50
+        channels.add(channel);
  51
+        return channel;
  52
+    }
  53
+
  54
+    @Override
  55
+    public void releaseExternalResources() {
  56
+        ChannelGroupFuture close = channels.close();
  57
+        close.awaitUninterruptibly();
  58
+        ExecutorUtil.terminate(executorService);
  59
+    }
  60
+}
178  src/main/java/org/jboss/netty/channel/iostream/IOStreamChannelSink.java
... ...
@@ -0,0 +1,178 @@
  1
+/*
  2
+ * Copyright 2011 Red Hat, Inc.
  3
+ *
  4
+ * Red Hat licenses this file to you under the Apache License, version 2.0
  5
+ * (the "License"); you may not use this file except in compliance with the
  6
+ * License.  You may obtain a copy of the License at:
  7
+ *
  8
+ *    http://www.apache.org/licenses/LICENSE-2.0
  9
+ *
  10
+ * Unless required by applicable law or agreed to in writing, software
  11
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
  13
+ * License for the specific language governing permissions and limitations
  14
+ * under the License.
  15
+ */
  16
+package org.jboss.netty.channel.iostream;
  17
+
  18
+import org.jboss.netty.buffer.ChannelBuffer;
  19
+import org.jboss.netty.buffer.ChannelBuffers;
  20
+import org.jboss.netty.channel.*;
  21
+
  22
+import java.io.OutputStream;
  23
+import java.io.PushbackInputStream;
  24
+import java.util.concurrent.ExecutorService;
  25
+
  26
+import static org.jboss.netty.channel.Channels.*;
  27
+
  28
+/**
  29
+ * A {@link org.jboss.netty.channel.ChannelSink} implementation which reads from an {@link java.io.InputStream} and
  30
+ * writes to an {@link java.io.OutputStream}.
  31
+ *
  32
+ * @author Daniel Bimschas
  33
+ * @author Dennis Pfisterer
  34
+ */
  35
+public class IOStreamChannelSink extends AbstractChannelSink {
  36
+
  37
+	private static class ReadRunnable implements Runnable {
  38
+
  39
+		private final IOStreamChannelSink channelSink;
  40
+
  41
+		public ReadRunnable(final IOStreamChannelSink channelSink) {
  42
+			this.channelSink = channelSink;
  43
+		}
  44
+
  45
+		@Override
  46
+		public void run() {
  47
+
  48
+			PushbackInputStream in = channelSink.inputStream;
  49
+
  50
+			while (channelSink.channel.isOpen()) {
  51
+
  52
+				byte[] buf;
  53
+				int readBytes;
  54
+				try {
  55
+					int bytesToRead = in.available();
  56
+					if (bytesToRead > 0) {
  57
+						buf = new byte[bytesToRead];
  58
+						readBytes = in.read(buf);
  59
+					} else {
  60
+						// peek into the stream if it was closed (value=-1)
  61
+						int b = in.read();
  62
+						if (b < 0) {
  63
+							break;
  64
+						}
  65
+						// push back the byte which was read too much
  66
+						in.unread(b);
  67
+						continue;
  68
+					}
  69
+				} catch (Throwable t) {
  70
+					if (!channelSink.channel.getCloseFuture().isDone()) {
  71
+						fireExceptionCaught(channelSink.channel, t);
  72
+					}
  73
+					break;
  74
+				}
  75
+
  76
+				fireMessageReceived(channelSink.channel, ChannelBuffers.wrappedBuffer(buf, 0, readBytes));
  77
+			}
  78
+
  79
+			// Clean up.
  80
+			close(channelSink.channel);
  81
+		}
  82
+	}
  83
+
  84
+	private final ExecutorService executorService;
  85
+
  86
+	private IOStreamChannel channel;
  87
+
  88
+	public IOStreamChannelSink(final ExecutorService executorService) {
  89
+		this.executorService = executorService;
  90
+	}
  91
+
  92
+	public boolean isConnected() {
  93
+		return inputStream != null && outputStream != null;
  94
+	}
  95
+
  96
+	public IOStreamAddress getRemoteAddress() {
  97
+		return remoteAddress;
  98
+	}
  99
+
  100
+	public boolean isBound() {
  101
+		return false;
  102
+	}
  103
+
  104
+	public ChannelConfig getConfig() {
  105
+		return config;
  106
+	}
  107
+
  108
+	public void setChannel(final IOStreamChannel channel) {
  109
+		this.channel = channel;
  110
+	}
  111
+
  112
+	private IOStreamAddress remoteAddress;
  113
+
  114
+	private OutputStream outputStream;
  115
+
  116
+	private PushbackInputStream inputStream;
  117
+
  118
+	private ChannelConfig config = new DefaultChannelConfig();
  119
+
  120
+	@Override
  121
+	public void eventSunk(final ChannelPipeline pipeline, final ChannelEvent e) throws Exception {
  122
+
  123
+		final ChannelFuture future = e.getFuture();
  124
+
  125
+		if (e instanceof ChannelStateEvent) {
  126
+
  127
+			final ChannelStateEvent stateEvent = (ChannelStateEvent) e;
  128
+			final ChannelState state = stateEvent.getState();
  129
+			final Object value = stateEvent.getValue();
  130
+
  131
+			switch (state) {
  132
+
  133
+				case OPEN:
  134
+					if (Boolean.FALSE.equals(value)) {
  135
+						outputStream = null;
  136
+						inputStream = null;
  137
+						((IOStreamChannel) e.getChannel()).doSetClosed();
  138
+					}
  139
+					break;
  140
+
  141
+				case BOUND:
  142
+					throw new UnsupportedOperationException();
  143
+
  144
+				case CONNECTED:
  145
+					if (value != null) {
  146
+						remoteAddress = (IOStreamAddress) value;
  147
+						outputStream = remoteAddress.getOutputStream();
  148
+						inputStream = new PushbackInputStream(remoteAddress.getInputStream());
  149
+						executorService.execute(new ReadRunnable(this));
  150
+						future.setSuccess();
  151
+					}
  152
+					break;
  153
+
  154
+				case INTEREST_OPS:
  155
+					// TODO implement
  156
+					throw new UnsupportedOperationException();
  157
+
  158
+			}
  159
+
  160
+		} else if (e instanceof MessageEvent) {
  161
+
  162
+			final MessageEvent event = (MessageEvent) e;
  163
+			if (event.getMessage() instanceof ChannelBuffer) {
  164
+
  165
+				final ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
  166
+				buffer.readBytes(outputStream, buffer.readableBytes());
  167
+				outputStream.flush();
  168
+				future.setSuccess();
  169
+
  170
+			} else {
  171
+				throw new IllegalArgumentException(
  172
+						"Only ChannelBuffer objects are supported to be written onto the IOStreamChannelSink! "
  173
+								+ "Please check if the encoder pipeline is configured correctly."
  174
+				);
  175
+			}
  176
+		}
  177
+	}
  178
+}
94  src/main/java/org/jboss/netty/example/iostream/IOStream.java
... ...
@@ -0,0 +1,94 @@
  1
+/*
  2
+ * Copyright 2011 Red Hat, Inc.
  3
+ *
  4
+ * Red Hat licenses this file to you under the Apache License, version 2.0
  5
+ * (the "License"); you may not use this file except in compliance with the
  6
+ * License.  You may obtain a copy of the License at:
  7
+ *
  8
+ *    http://www.apache.org/licenses/LICENSE-2.0
  9
+ *
  10
+ * Unless required by applicable law or agreed to in writing, software
  11
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  See the
  13
+ * License for the specific language governing permissions and limitations
  14
+ * under the License.
  15
+ */
  16
+package org.jboss.netty.example.iostream;
  17
+
  18
+import org.jboss.netty.bootstrap.ClientBootstrap;
  19
+import org.jboss.netty.channel.*;
  20
+import org.jboss.netty.channel.iostream.IOStreamAddress;
  21
+import org.jboss.netty.channel.iostream.IOStreamChannelFactory;
  22
+import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder;
  23
+import org.jboss.netty.handler.codec.frame.Delimiters;
  24
+import org.jboss.netty.handler.codec.string.StringDecoder;
  25
+import org.jboss.netty.handler.codec.string.StringEncoder;
  26
+
  27
+import java.util.concurrent.ExecutorService;
  28
+import java.util.concurrent.Executors;
  29
+
  30
+/**
  31
+ * An example demonstrating the use of the {@link org.jboss.netty.channel.iostream.IOStreamChannel}.
  32
+ *
  33
+ * @author Daniel Bimschas
  34
+ * @author Dennis Pfisterer
  35
+ */
  36
+public class IOStream {
  37
+
  38
+	private static volatile boolean running = true;
  39
+
  40
+	public static void main(String[] args) {
  41
+
  42
+		final ExecutorService executorService = Executors.newCachedThreadPool();
  43
+		final ClientBootstrap bootstrap = new ClientBootstrap(new IOStreamChannelFactory(executorService));
  44
+
  45
+		// Configure the event pipeline factory.
  46
+		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
  47
+			public ChannelPipeline getPipeline() throws Exception {
  48
+				DefaultChannelPipeline pipeline = new DefaultChannelPipeline();
  49
+				pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
  50
+				pipeline.addLast("decoder", new StringDecoder());
  51
+				pipeline.addLast("encoder", new StringEncoder());
  52
+				pipeline.addLast("loggingHandler", new SimpleChannelHandler() {
  53
+					@Override
  54
+					public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e)
  55
+							throws Exception {
  56
+
  57
+						final String message = (String) e.getMessage();
  58
+						synchronized (System.out) {
  59
+							e.getChannel().write("Message received: " + message);
  60
+						}
  61
+						if ("exit".equals(message)) {
  62
+							IOStream.running = false;
  63
+						}
  64
+						super.messageReceived(ctx, e);
  65
+					}
  66
+				}
  67
+				);
  68
+				return pipeline;
  69
+			}
  70
+		});
  71
+
  72
+		// Make a new connection.
  73
+		ChannelFuture connectFuture = bootstrap.connect(new IOStreamAddress(System.in, System.out));
  74
+
  75
+		// Wait until the connection is made successfully.
  76
+		Channel channel = connectFuture.awaitUninterruptibly().getChannel();
  77
+
  78
+		while (running) {
  79
+			try {
  80
+				Thread.sleep(100);
  81
+			} catch (InterruptedException e) {
  82
+				e.printStackTrace();
  83
+			}
  84
+		}
  85
+
  86
+		// Close the connection.
  87
+		channel.close().awaitUninterruptibly();
  88
+
  89
+		// Shut down all thread pools to exit.
  90
+		bootstrap.releaseExternalResources();
  91
+
  92
+	}
  93
+
  94
+}
Commit_comment_tip

Tip: You can add notes to lines in a file. Hover to the left of a line to make a note

Something went wrong with that request. Please try again.