/
BufferedChannelInput.java
156 lines (137 loc) · 4.09 KB
/
BufferedChannelInput.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/*
* Copyright (c) "Neo4j"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.packstream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import org.neo4j.io.memory.ByteBuffers;
import static org.neo4j.memory.EmptyMemoryTracker.INSTANCE;
/**
* An {@link PackInput} implementation that reads from an input channel into an internal buffer.
*/
public class BufferedChannelInput implements PackInput
{
private final ByteBuffer buffer;
private ReadableByteChannel channel;
public BufferedChannelInput( int bufferCapacity )
{
this.buffer = ByteBuffers.allocate( bufferCapacity, INSTANCE );
}
public BufferedChannelInput reset( ReadableByteChannel ch )
{
this.channel = ch;
this.buffer.position( 0 );
this.buffer.limit( 0 );
return this;
}
private boolean attempt( int numBytes ) throws IOException
{
if ( buffer.remaining() >= numBytes )
{
return true;
}
if ( buffer.remaining() > 0 )
{
// If there is data remaining in the buffer, shift that remaining data to the beginning of the buffer.
buffer.compact();
}
else
{
buffer.clear();
}
int count;
do
{
count = channel.read( buffer );
}
while ( count >= 0 && (buffer.position() < numBytes && buffer.remaining() != 0) );
buffer.flip();
return buffer.remaining() >= numBytes;
}
@Override
public byte readByte() throws IOException
{
ensure( 1 );
return buffer.get();
}
@Override
public short readShort() throws IOException
{
ensure( 2 );
return buffer.getShort();
}
@Override
public int readInt() throws IOException
{
ensure( 4 );
return buffer.getInt();
}
@Override
public long readLong() throws IOException
{
ensure( 8 );
return buffer.getLong();
}
@Override
public double readDouble() throws IOException
{
ensure( 8 );
return buffer.getDouble();
}
@Override
public PackInput readBytes( byte[] into, int index, int toRead ) throws IOException
{
int endIndex = index + toRead;
while ( index < endIndex )
{
toRead = Math.min( buffer.remaining(), endIndex - index );
buffer.get( into, index, toRead );
index += toRead;
if ( buffer.remaining() == 0 && index < endIndex )
{
attempt( endIndex - index );
if ( buffer.remaining() == 0 )
{
throw new PackStream.EndOfStream( "Expected " + (endIndex - index) + " bytes available, " +
"but no more bytes accessible from underlying stream." );
}
}
}
return this;
}
@Override
public byte peekByte() throws IOException
{
ensure( 1 );
return buffer.get( buffer.position() );
}
@Override
public int readableBytes()
{
return buffer.remaining();
}
private void ensure( int numBytes ) throws IOException
{
if ( !attempt( numBytes ) )
{
throw new PackStream.EndOfStream( "Unexpected end of stream while trying to read " + numBytes + " bytes." );
}
}
}