-
Notifications
You must be signed in to change notification settings - Fork 427
/
ReaderImpl.java
147 lines (138 loc) · 6.84 KB
/
ReaderImpl.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package com.xuhao.didi.core.iocore;
import com.xuhao.didi.core.exceptions.ReadException;
import com.xuhao.didi.core.iocore.interfaces.IOAction;
import com.xuhao.didi.core.pojo.OriginalData;
import com.xuhao.didi.core.protocol.IReaderProtocol;
import com.xuhao.didi.core.utils.BytesUtils;
import com.xuhao.didi.core.utils.SLog;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* Created by xuhao on 2017/5/31.
*/
public class ReaderImpl extends AbsReader {
private ByteBuffer mRemainingBuf;
@Override
public void read() throws RuntimeException {
OriginalData originalData = new OriginalData();
IReaderProtocol headerProtocol = mOkOptions.getReaderProtocol();
int headerLength = headerProtocol.getHeaderLength();
ByteBuffer headBuf = ByteBuffer.allocate(headerLength);
headBuf.order(mOkOptions.getReadByteOrder());
try {
if (mRemainingBuf != null) {
mRemainingBuf.flip();
int length = Math.min(mRemainingBuf.remaining(), headerLength);
headBuf.put(mRemainingBuf.array(), 0, length);
if (length < headerLength) {
//there are no data left
mRemainingBuf = null;
readHeaderFromChannel(headBuf, headerLength - length);
} else {
mRemainingBuf.position(headerLength);
}
} else {
readHeaderFromChannel(headBuf, headBuf.capacity());
}
originalData.setHeadBytes(headBuf.array());
if (SLog.isDebug()) {
SLog.i("read head: " + BytesUtils.toHexStringForLog(headBuf.array()));
}
int bodyLength = headerProtocol.getBodyLength(originalData.getHeadBytes(), mOkOptions.getReadByteOrder());
if (SLog.isDebug()) {
SLog.i("need read body length: " + bodyLength);
}
if (bodyLength > 0) {
if (bodyLength > mOkOptions.getMaxReadDataMB() * 1024 * 1024) {
throw new ReadException("Need to follow the transmission protocol.\r\n" +
"Please check the client/server code.\r\n" +
"According to the packet header data in the transport protocol, the package length is " + bodyLength + " Bytes.\r\n" +
"You need check your <ReaderProtocol> definition");
}
ByteBuffer byteBuffer = ByteBuffer.allocate(bodyLength);
byteBuffer.order(mOkOptions.getReadByteOrder());
if (mRemainingBuf != null) {
int bodyStartPosition = mRemainingBuf.position();
int length = Math.min(mRemainingBuf.remaining(), bodyLength);
byteBuffer.put(mRemainingBuf.array(), bodyStartPosition, length);
mRemainingBuf.position(bodyStartPosition + length);
if (length == bodyLength) {
if (mRemainingBuf.remaining() > 0) {//there are data left
ByteBuffer temp = ByteBuffer.allocate(mRemainingBuf.remaining());
temp.order(mOkOptions.getReadByteOrder());
temp.put(mRemainingBuf.array(), mRemainingBuf.position(), mRemainingBuf.remaining());
mRemainingBuf = temp;
} else {//there are no data left
mRemainingBuf = null;
}
//cause this time data from remaining buffer not from channel.
originalData.setBodyBytes(byteBuffer.array());
mStateSender.sendBroadcast(IOAction.ACTION_READ_COMPLETE, originalData);
return;
} else {//there are no data left in buffer and some data pieces in channel
mRemainingBuf = null;
}
}
readBodyFromChannel(byteBuffer);
originalData.setBodyBytes(byteBuffer.array());
} else if (bodyLength == 0) {
originalData.setBodyBytes(new byte[0]);
if (mRemainingBuf != null) {
//the body is empty so header remaining buf need set null
if (mRemainingBuf.hasRemaining()) {
ByteBuffer temp = ByteBuffer.allocate(mRemainingBuf.remaining());
temp.order(mOkOptions.getReadByteOrder());
temp.put(mRemainingBuf.array(), mRemainingBuf.position(), mRemainingBuf.remaining());
mRemainingBuf = temp;
} else {
mRemainingBuf = null;
}
}
} else if (bodyLength < 0) {
throw new ReadException(
"read body is wrong,this socket input stream is end of file read " + bodyLength + " ,that mean this socket is disconnected by server");
}
mStateSender.sendBroadcast(IOAction.ACTION_READ_COMPLETE, originalData);
} catch (Exception e) {
ReadException readException = new ReadException(e);
throw readException;
}
}
private void readHeaderFromChannel(ByteBuffer headBuf, int readLength) throws IOException {
for (int i = 0; i < readLength; i++) {
byte[] bytes = new byte[1];
int value = mInputStream.read(bytes);
if (value == -1) {
throw new ReadException(
"read head is wrong, this socket input stream is end of file read " + value + " ,that mean this socket is disconnected by server");
}
headBuf.put(bytes);
}
}
private void readBodyFromChannel(ByteBuffer byteBuffer) throws IOException {
while (byteBuffer.hasRemaining()) {
try {
byte[] bufArray = new byte[mOkOptions.getReadPackageBytes()];
int len = mInputStream.read(bufArray);
if (len == -1) {
break;
}
int remaining = byteBuffer.remaining();
if (len > remaining) {
byteBuffer.put(bufArray, 0, remaining);
mRemainingBuf = ByteBuffer.allocate(len - remaining);
mRemainingBuf.order(mOkOptions.getReadByteOrder());
mRemainingBuf.put(bufArray, remaining, len - remaining);
} else {
byteBuffer.put(bufArray, 0, len);
}
} catch (Exception e) {
throw e;
}
}
if (SLog.isDebug()) {
SLog.i("read total bytes: " + BytesUtils.toHexStringForLog(byteBuffer.array()));
SLog.i("read total length:" + (byteBuffer.capacity() - byteBuffer.remaining()));
}
}
}