Skip to content

Commit 3a07bce

Browse files
authored
Improved the I/O architecture documentation. (#13558)
Small cosmetic fixes. Expanded the Sink.write() section with one more example. Added SelectorManager diagram. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
1 parent c724f58 commit 3a07bce

File tree

2 files changed

+122
-14
lines changed
  • documentation/jetty/modules

2 files changed

+122
-14
lines changed

documentation/jetty/modules/code/examples/src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.ArrayList;
2020
import java.util.List;
2121
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.ThreadLocalRandom;
2223

2324
import org.eclipse.jetty.io.Content;
2425
import org.eclipse.jetty.util.Callback;
@@ -284,6 +285,7 @@ static class SinkMany
284285
public void manyWrites(Content.Sink sink, ByteBuffer content1, ByteBuffer content2)
285286
{
286287
// Initiate a first write.
288+
// Callback.Completable is-a CompletableFuture.
287289
Callback.Completable resultOfWrites = Callback.Completable.with(callback1 -> sink.write(false, content1, callback1))
288290
// Chain a second write only when the first is complete.
289291
.compose(callback2 -> sink.write(true, content2, callback2));
@@ -301,8 +303,74 @@ public void manyWrites(Content.Sink sink, ByteBuffer content1, ByteBuffer conten
301303
// end::sinkMany[]
302304
}
303305

304-
// tag::copy[]
305306
@SuppressWarnings("InnerClassMayBeStatic")
307+
// tag::sinkChunks[]
308+
class LargeDownload extends IteratingCallback
309+
{
310+
public void download(Content.Sink sink, Callback callback)
311+
{
312+
// Create the IteratingCallback and start the iteration.
313+
IteratingCallback writer = new LargeDownload(sink, callback, 1024 * 1024);
314+
writer.iterate();
315+
}
316+
317+
private static final int CHUNK_SIZE = 1024;
318+
private final Content.Sink sink;
319+
private final Callback callback;
320+
private int length;
321+
322+
public LargeDownload(Content.Sink sink, Callback callback, int downloadLength)
323+
{
324+
this.sink = sink;
325+
// The callback to notify when the download is completed.
326+
this.callback = callback;
327+
this.length = downloadLength;
328+
}
329+
330+
@Override
331+
protected Action process() throws Throwable
332+
{
333+
// Return when the whole download is completed.
334+
if (length == 0)
335+
return Action.SUCCEEDED;
336+
337+
// Prepare the chunk to write.
338+
int size = Math.min(CHUNK_SIZE, length);
339+
byte[] bytes = new byte[size];
340+
ThreadLocalRandom.current().nextBytes(bytes);
341+
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
342+
length -= size;
343+
boolean last = length == 0;
344+
345+
// Start the non-blocking write, passing "this" as the callback.
346+
sink.write(last, byteBuffer, this);
347+
return Action.SCHEDULED;
348+
}
349+
350+
@Override
351+
protected void onCompleteSuccess()
352+
{
353+
// Download completed, notify the download callback.
354+
callback.succeeded();
355+
}
356+
357+
@Override
358+
protected void onCompleteFailure(Throwable failure)
359+
{
360+
// Download failed, notify the download callback.
361+
callback.failed(failure);
362+
}
363+
364+
@Override
365+
public InvocationType getInvocationType()
366+
{
367+
return InvocationType.NON_BLOCKING;
368+
}
369+
}
370+
// end::sinkChunks[]
371+
372+
@SuppressWarnings("InnerClassMayBeStatic")
373+
// tag::copy[]
306374
class Copy extends IteratingCallback
307375
{
308376
private final Content.Source source;
@@ -349,22 +417,22 @@ protected Action process() throws Throwable
349417
@Override
350418
protected void onSuccess()
351419
{
352-
// After every successful write, release the chunk
353-
// and reset to the next chunk
420+
// After every successful write, release
421+
// the chunk and reset to the next chunk.
354422
chunk = Content.Chunk.releaseAndNext(chunk);
355423
}
356424

357425
@Override
358426
protected void onCompleteSuccess()
359427
{
360-
// The copy is succeeded, succeed the callback.
428+
// The copy is complete, succeed the copy callback.
361429
callback.succeeded();
362430
}
363431

364432
@Override
365433
protected void onFailure(Throwable cause)
366434
{
367-
// The copy is failed, fail the callback.
435+
// The copy has failed, fail the copy callback.
368436
// This method is invoked before a write() has completed, so
369437
// the chunk is not released here, but in onCompleteFailure().
370438
callback.failed(cause);

documentation/jetty/modules/programming-guide/pages/arch/io.adoc

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,41 @@ The Jetty libraries (both client and server) use Java NIO to handle I/O, so that
2121
The main class of The Jetty I/O library is link:{javadoc-url}/org/eclipse/jetty/io/SelectorManager.html[`SelectorManager`].
2222

2323
`SelectorManager` manages internally a configurable number of link:{javadoc-url}/org/eclipse/jetty/io/ManagedSelector.html[`ManagedSelector`]s.
24-
Each `ManagedSelector` wraps an instance of `java.nio.channels.Selector` that in turn manages a number of `java.nio.channels.SocketChannel` instances.
24+
Each `ManagedSelector` wraps an instance of `java.nio.channels.Selector` that in turn manages a number of `java.nio.channels.SelectionKey` instances, that each wrap a `java.nio.channels.SelectableChannel` instance such as `java.nio.channels.SocketChannel`.
2525

26-
NOTE: TODO: add image
26+
[plantuml]
27+
----
28+
object SM as "SelectorManager"
29+
object MS1 as "ManagedSelector" {
30+
java.nio.channels.Selector
31+
}
32+
object MS2 as "ManagedSelector" {
33+
java.nio.channels.Selector
34+
}
35+
object SC1A as "SelectionKey" {
36+
SelectableChannel
37+
}
38+
object SC1B as "SelectionKey" {
39+
SelectableChannel
40+
}
41+
object SC2A as "SelectionKey" {
42+
SelectableChannel
43+
}
44+
object SC2B as "SelectionKey" {
45+
SelectableChannel
46+
}
47+
object SC2C as "SelectionKey" {
48+
SelectableChannel
49+
}
50+
51+
SM o-- MS1
52+
SM o-- MS2
53+
MS1 o-- SC1A
54+
MS1 o-- SC1B
55+
MS2 o-- SC2A
56+
MS2 o-- SC2B
57+
MS2 o-- SC2C
58+
----
2759

2860
`SocketChannel` instances are typically created by the Jetty implementation, on client-side when connecting to a server and on server-side when accepting connections from clients.
2961
In both cases the `SocketChannel` instance is passed to `SelectorManager` (which passes it to `ManagedSelector` and eventually to `java.nio.channels.Selector`) to be registered for use within Jetty.
@@ -90,7 +122,7 @@ In the Jetty I/O library, you can call `EndPoint.fillInterested(Callback)` to de
90122
At the Java NIO level, a `SocketChannel` or `DatagramChannel` is always writable, unless it becomes congested.
91123
In order to be notified when a channel uncongests and it is therefore writable again, the `SelectionKey.OP_WRITE` flag must be set.
92124

93-
In the Jetty I/O library, you can call `EndPoint.write(Callback, ByteBuffer...)` to write the ``ByteBuffer``s and the `Callback` parameter is the object that is notified when the whole write is finished (i.e. _all_ ``ByteBuffer``s have been fully written, even if they are delayed by congestion/uncongestion).
125+
In the Jetty I/O library, you can call `EndPoint.write(Callback, ByteBuffer\...)` to write the ``ByteBuffer``s and the `Callback` parameter is the object that is notified when the whole write is finished (i.e. _all_ ``ByteBuffer``s have been fully written, even if they are delayed by congestion/uncongestion).
94126

95127
The `EndPoint` APIs abstract out the Java NIO details by providing non-blocking APIs based on `Callback` objects for I/O operations.
96128
The `EndPoint` APIs are typically called by `Connection` implementations, see <<connection,this section>>.
@@ -176,7 +208,7 @@ Submitting the invocation of the callback to an `Executor` to be invoked in a di
176208

177209
This side effect of asynchronous programming leading to `StackOverflowError` is so common that the Jetty libraries have a generic solution for it: a specialized `Callback` implementation named `org.eclipse.jetty.util.IteratingCallback` that turns recursion into iteration, therefore avoiding the `StackOverflowError`.
178210

179-
`IteratingCallback` is a `Callback` implementation that should be passed to non-blocking APIs such as `EndPoint.write(Callback, ByteBuffer...)` when they are performed in a loop.
211+
`IteratingCallback` is a `Callback` implementation that should be passed to non-blocking APIs such as `EndPoint.write(Callback, ByteBuffer\...)` when they are performed in a loop.
180212

181213
`IteratingCallback` works by starting the loop with `IteratingCallback.iterate()`.
182214
In turn, this calls `IteratingCallback.process()`, an abstract method that must be implemented with the code that should be executed for each loop.
@@ -197,11 +229,11 @@ include::code:example$src/main/java/org/eclipse/jetty/docs/programming/SelectorM
197229
----
198230

199231
When `onFillable()` is called, for example the first time that bytes are available from the network, the iteration is started.
200-
Starting the iteration calls `process()`, where a buffer is allocated and filled with bytes read from the network via `EndPoint.fill(ByteBuffer)`; the buffer is subsequently written back via `EndPoint.write(Callback, ByteBuffer...)` -- note that the callback passed to `EndPoint.write()` is `this`, i.e. the `IteratingCallback` itself; finally `Action.SCHEDULED` is returned, returning from the `process()` method.
232+
Starting the iteration calls `process()`, where a buffer is allocated and filled with bytes read from the network via `EndPoint.fill(ByteBuffer)`; the buffer is subsequently written back via `EndPoint.write(Callback, ByteBuffer\...)` -- note that the callback passed to `EndPoint.write()` is `this`, i.e. the `IteratingCallback` itself; finally `Action.SCHEDULED` is returned, returning from the `process()` method.
201233

202-
At this point, the call to `EndPoint.write(Callback, ByteBuffer...)` may have completed synchronously; `IteratingCallback` would know that and call `process()` again; within `process()`, the buffer has already been allocated so it will be reused, saving further allocations; the buffer will be filled and possibly written again; `Action.SCHEDULED` is returned again, returning again from the `process()` method.
234+
At this point, the call to `EndPoint.write(Callback, ByteBuffer\...)` may have completed synchronously; `IteratingCallback` would know that and call `process()` again; within `process()`, the buffer has already been allocated so it will be reused, saving further allocations; the buffer will be filled and possibly written again; `Action.SCHEDULED` is returned again, returning again from the `process()` method.
203235

204-
At this point, the call to `EndPoint.write(Callback, ByteBuffer...)` may have not completed synchronously, so `IteratingCallback` will not call `process()` again; the processing thread is free to return to the Jetty I/O system where it may be put back into the thread pool.
236+
At this point, the call to `EndPoint.write(Callback, ByteBuffer\...)` may have not completed synchronously, so `IteratingCallback` will not call `process()` again; the processing thread is free to return to the Jetty I/O system where it may be put back into the thread pool.
205237
If this was the only active network connection, the system would now be idle, with no threads blocked, waiting that the `write()` completes. This thread-less wait is one of the most important features that make non-blocking asynchronous servers more scalable: they use less resources.
206238

207239
Eventually, the Jetty I/O system will notify that the `write()` completed; this notifies the `IteratingCallback` that can now resume the loop and call `process()` again.
@@ -360,8 +392,16 @@ include::code:example$src/main/java/org/eclipse/jetty/docs/programming/ContentDo
360392

361393
When you need to perform an unknown number of writes, you may use an `IteratingCallback`, explained in <<echo,this section>>, to avoid ``StackOverFlowError``s.
362394

363-
For example, to copy from a `Content.Source` to a `Content.Sink` you could use the convenience method `Content.copy(Content.Source, Content.Sink, Callback)`.
364-
For illustrative purposes, below you can find the implementation of `copy(Content.Source, Content.Sink, Callback)` that uses an `IteratingCallback`:
395+
For example, to download a large content in smaller chunks, you can do this:
396+
397+
[,java,indent=0]
398+
----
399+
include::code:example$src/main/java/org/eclipse/jetty/docs/programming/ContentDocs.java[tags=sinkChunks]
400+
----
401+
402+
If you want to copy from a `Content.Source` to a `Content.Sink` you could use the convenience method `Content.copy(Content.Source, Content.Sink, Callback)`.
403+
404+
For illustrative purposes, below you can find a more elaborate example with an implementation of `copy(Content.Source, Content.Sink, Callback)` that uses `IteratingCallback`:
365405

366406
[,java,indent=0]
367407
----

0 commit comments

Comments
 (0)