Skip to content

Commit

Permalink
重置或初始化输出流时预先占4个字节用于存放协议包长度
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Jul 12, 2017
1 parent d568bfd commit 97e50ac
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 15 deletions.
Expand Up @@ -155,7 +155,7 @@ public void close() {
} }
session.traceOperation("COMMAND_CLOSE", id); session.traceOperation("COMMAND_CLOSE", id);
try { try {
transfer.writeRequestHeader(id, Session.COMMAND_CLOSE).flush(); transfer.writeRequestHeader(id, Session.COMMAND_CLOSE).writeInt(session.getSessionId()).flush();
} catch (IOException e) { } catch (IOException e) {
trace.error(e, "close"); trace.error(e, "close");
} }
Expand Down
Expand Up @@ -398,7 +398,7 @@ public void close() {
} }
session.traceOperation("COMMAND_CLOSE", id); session.traceOperation("COMMAND_CLOSE", id);
try { try {
transfer.writeRequestHeader(id, Session.COMMAND_CLOSE).flush(); transfer.writeRequestHeader(id, Session.COMMAND_CLOSE).writeInt(session.getSessionId()).flush();
} catch (IOException e) { } catch (IOException e) {
trace.error(e, "close"); trace.error(e, "close");
} }
Expand Down
24 changes: 11 additions & 13 deletions lealone-net/src/main/java/org/lealone/net/Transfer.java
Expand Up @@ -85,20 +85,15 @@ public Transfer(AsyncConnection conn, NetSocket socket, Session session) {
this.session = session; this.session = session;
} }


public Transfer(AsyncConnection conn, NetSocket socket, Buffer buffer) { public Transfer(AsyncConnection conn, NetSocket socket, Buffer inBuffer) {
this.conn = conn; this.conn = conn;
this.socket = socket; this.socket = socket;


resettableOutputStream = new ResettableBufferOutputStream(BUFFER_SIZE); resettableOutputStream = new ResettableBufferOutputStream(BUFFER_SIZE);
out = new DataOutputStream(resettableOutputStream); out = new DataOutputStream(resettableOutputStream);


try { if (inBuffer != null) {
out.writeInt(0); BufferInputStream bufferInputStream = new BufferInputStream(inBuffer);
} catch (IOException e) {
throw new AssertionError();
}
if (buffer != null) {
BufferInputStream bufferInputStream = new BufferInputStream(buffer);
in = new DataInputStream(bufferInputStream); in = new DataInputStream(bufferInputStream);
} }
} }
Expand Down Expand Up @@ -146,7 +141,6 @@ public DataOutputStream getDataOutputStream() {
*/ */
public void reset() throws IOException { public void reset() throws IOException {
resettableOutputStream.reset(); resettableOutputStream.reset();
out.writeInt(0);
} }


public void setSession(Session session) { public void setSession(Session session) {
Expand All @@ -169,10 +163,8 @@ public void setVersion(int version) { // TODO 以后协议修改了再使用版
*/ */
public void flush() throws IOException { public void flush() throws IOException {
resettableOutputStream.writePacketLength(); resettableOutputStream.writePacketLength();
out.flush();
socket.write(resettableOutputStream.buffer); socket.write(resettableOutputStream.buffer);
resettableOutputStream.reset(); resettableOutputStream.reset();
out.writeInt(0); // write packet header for next
} }


/** /**
Expand Down Expand Up @@ -813,7 +805,7 @@ public synchronized Throwable fillInStackTrace() {


private static class BufferInputStream extends InputStream { private static class BufferInputStream extends InputStream {
final Buffer buffer; final Buffer buffer;
int size; final int size;
int pos; int pos;


BufferInputStream(Buffer buffer) { BufferInputStream(Buffer buffer) {
Expand Down Expand Up @@ -842,12 +834,18 @@ private static class ResettableBufferOutputStream extends OutputStream {
} }


@Override @Override
public void write(int b) throws IOException { public void write(int b) {
buffer.appendByte((byte) b); buffer.appendByte((byte) b);
} }


@Override
public void write(byte b[], int off, int len) {
buffer.appendBytes(b, off, len);
}

void reset() { void reset() {
buffer = Buffer.buffer(initialSizeHint); buffer = Buffer.buffer(initialSizeHint);
buffer.appendInt(0); // write packet header for next
} }


void writePacketLength() { void writePacketLength() {
Expand Down

0 comments on commit 97e50ac

Please sign in to comment.