Skip to content

Commit

Permalink
如果多个协议包同时出现时有可能后面一个是不完整的
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Mar 19, 2016
1 parent 67d6bc4 commit f770db6
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 14 deletions.
Expand Up @@ -18,7 +18,7 @@
package org.lealone.mvcc.log; package org.lealone.mvcc.log;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


Expand All @@ -30,7 +30,7 @@ public abstract class LogSyncService extends Thread {
protected final Semaphore haveWork = new Semaphore(1); protected final Semaphore haveWork = new Semaphore(1);
protected final WaitQueue syncComplete = new WaitQueue(); protected final WaitQueue syncComplete = new WaitQueue();


protected final LinkedBlockingQueue<MVCCTransaction> transactions = new LinkedBlockingQueue<>(); protected final LinkedTransferQueue<MVCCTransaction> transactions = new LinkedTransferQueue<>();


protected long syncIntervalMillis; protected long syncIntervalMillis;
protected volatile long lastSyncedAt = System.currentTimeMillis(); protected volatile long lastSyncedAt = System.currentTimeMillis();
Expand Down
18 changes: 12 additions & 6 deletions lealone-net/src/main/java/org/lealone/net/AsyncConnection.java
Expand Up @@ -504,8 +504,10 @@ private void processResponse(Transfer transfer, int id) throws IOException {
} }


AsyncCallback<?> ac = callbackMap.remove(id); AsyncCallback<?> ac = callbackMap.remove(id);
if (ac == null) if (ac == null) {
logger.warn("Async callback is null, may be a bug! id = " + id);
return; return;
}
if (e != null) if (e != null)
ac.setDbException(e); ac.setDbException(e);
else else
Expand Down Expand Up @@ -983,10 +985,6 @@ public void handle(Buffer buffer) {
} }
int pos = 0; int pos = 0;
while (true) { while (true) {
if (length < 4) {
tmpBuffer = buffer;
break;
}
int packetLength = buffer.getInt(pos); int packetLength = buffer.getInt(pos);
if (length - 4 == packetLength) { if (length - 4 == packetLength) {
if (pos == 0) { if (pos == 0) {
Expand All @@ -1004,7 +1002,15 @@ public void handle(Buffer buffer) {


pos = pos + packetLength + 4; pos = pos + packetLength + 4;
length = length - (packetLength + 4); length = length - (packetLength + 4);
continue;
// 有可能剩下的不够4个字节了
if (length < 4) {
tmpBuffer = Buffer.buffer();
tmpBuffer.appendBuffer(buffer, pos, length);
break;
} else {
continue;
}
} else { } else {
tmpBuffer = Buffer.buffer(); tmpBuffer = Buffer.buffer();
tmpBuffer.appendBuffer(buffer, pos, length); tmpBuffer.appendBuffer(buffer, pos, length);
Expand Down
Expand Up @@ -192,13 +192,10 @@ static void benchmark() throws Exception {
Statement stmt = conn.createStatement(); Statement stmt = conn.createStatement();
stmt.executeUpdate("DROP TABLE IF EXISTS test"); stmt.executeUpdate("DROP TABLE IF EXISTS test");
stmt.executeUpdate("CREATE TABLE IF NOT EXISTS test (f1 int primary key, f2 long)"); stmt.executeUpdate("CREATE TABLE IF NOT EXISTS test (f1 int primary key, f2 long)");
// // stmt.executeUpdate("set MULTI_THREADED 1"); stmt.close();
//
// // stmt.close();
// // conn.close();


int threadsCount = 1;// Runtime.getRuntime().availableProcessors() * 4;// 10; int threadsCount = 10;// Runtime.getRuntime().availableProcessors() * 4;// 10;
int loop = 100; int loop = 500;
latch = new CountDownLatch(threadsCount); latch = new CountDownLatch(threadsCount);


MyThread[] threads = new MyThread[threadsCount]; MyThread[] threads = new MyThread[threadsCount];
Expand All @@ -211,6 +208,7 @@ static void benchmark() throws Exception {
} }


latch.await(); latch.await();
conn.close();


long write_sum = 0; long write_sum = 0;
for (int i = 0; i < threadsCount; i++) { for (int i = 0; i < threadsCount; i++) {
Expand Down

0 comments on commit f770db6

Please sign in to comment.