/
VertxBlockingInputStream.java
145 lines (106 loc) · 3.38 KB
/
VertxBlockingInputStream.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package net.trajano.ms.engine.internal;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
public class VertxBlockingInputStream extends InputStream {
private static final Buffer END_BUFFER = Symbol.newSymbol(Buffer.class);
private static final Buffer END_BUFFER_WITH_ERROR = Symbol.newSymbol(Buffer.class);
private static final Logger LOG = LoggerFactory.getLogger(VertxBlockingInputStream.class);
private int availableBytes = 0;
private long bytesRead = 0;
/**
* Flag to indicate that the stream is closed.
*/
private boolean closed = false;
private Buffer currentBuffer;
private IOException exceptionToThrow = null;
private int pos;
private final BlockingQueue<Buffer> queue = new LinkedBlockingQueue<>();
/**
* Constructs VertxBlockingInputStream without any associated handlers
* configured on a ReadStream.
*/
public VertxBlockingInputStream() {
}
public VertxBlockingInputStream(final ReadStream<Buffer> readStream) {
readStream
.handler(this::populate)
.endHandler(aVoid -> end());
}
@Override
public int available() throws IOException {
return availableBytes;
}
@Override
public void close() throws IOException {
closed = true;
}
public void end() {
queue.add(END_BUFFER);
}
/**
* End the buffer because of an error.
*
* @param e
*/
public void error(final Throwable e) {
exceptionToThrow = new IOException(e);
queue.add(END_BUFFER_WITH_ERROR);
}
@Override
public boolean markSupported() {
return false;
}
public void populate(final Buffer buffer) {
queue.add(buffer);
availableBytes += buffer.length();
}
@Override
public int read() throws IOException {
if (closed) {
throw new IOException("Stream is closed");
}
if (currentBuffer == null) {
try {
currentBuffer = queue.take();
pos = 0;
} catch (final InterruptedException e) {
LOG.error("Interrupted while waiting for next buffer", e);
Thread.currentThread().interrupt();
}
}
if (currentBuffer == null) {
throw new IOException("Obtained a null buffer from the queue");
} else if (currentBuffer == END_BUFFER_WITH_ERROR) {
throw exceptionToThrow;
} else if (currentBuffer == END_BUFFER) {
return -1;
} else {
// Convert to unsigned byte
final int b = currentBuffer.getByte(pos++) & 0xFF;
--availableBytes;
++bytesRead;
if (pos == currentBuffer.length()) {
currentBuffer = null;
}
return b;
}
}
@Override
public synchronized void reset() throws IOException {
throw new IOException("reset not supported");
}
/**
* Gets a count of how much bytes have been read from this input stream.
*
* @return total bytes read
*/
public long totalBytesRead() {
return bytesRead;
}
}