Skip to content

Commit

Permalink
fix(http): fix HTTP protocol errors following non-successful requests (
Browse files Browse the repository at this point in the history
  • Loading branch information
ideoma committed Jan 19, 2024
1 parent 29b1b01 commit 6119117
Show file tree
Hide file tree
Showing 30 changed files with 555 additions and 374 deletions.
56 changes: 32 additions & 24 deletions benchmarks/src/main/java/org/questdb/PongMain.java
Expand Up @@ -87,7 +87,7 @@ public void close() {
LOG.info().$("closed").$();
}

public void receivePing() {
public void receivePing() throws PeerIsSlowToWriteException, PeerIsSlowToReadException, ServerDisconnectException {
// expect "PING"
int n = Net.recv(getFd(), buf, (int) (bufSize - (buf - bufStart)));
if (n > 0) {
Expand All @@ -97,65 +97,73 @@ public void receivePing() {
// accrue protocol artefacts while they still make sense
buf += n;
// fair resource use
getDispatcher().registerChannel(this, IOOperation.READ);
throw registerDispatcherRead();
} else {
// reset buffer
this.buf = bufStart;
// send PONG by preparing the buffer and asking client to receive
LOG.info().$(flyweight).$();
Utf8s.strCpy(PONG, PONG.size(), bufStart);
writtenLen = PONG.size();
getDispatcher().registerChannel(this, IOOperation.WRITE);
throw registerDispatcherWrite();
}
} else {
getDispatcher().disconnect(this, DISCONNECT_REASON_PROTOCOL_VIOLATION);
throw registerDispatcherDisconnect(DISCONNECT_REASON_PROTOCOL_VIOLATION);
}
} else {
// handle peer disconnect
getDispatcher().disconnect(this, DISCONNECT_REASON_PEER_DISCONNECT_AT_RECV);
throw registerDispatcherDisconnect(DISCONNECT_REASON_PEER_DISCONNECT_AT_RECV);
}
}

public void sendPong() {
public void sendPong() throws PeerIsSlowToReadException, PeerIsSlowToWriteException, ServerDisconnectException {
int n = Net.send(getFd(), buf, (int) (writtenLen - (buf - bufStart)));
if (n > -1) {
if (n > 0) {
buf += n;
if (buf - bufStart < writtenLen) {
getDispatcher().registerChannel(this, IOOperation.WRITE);
throw registerDispatcherWrite();
} else {
flyweight.of(bufStart, bufStart + writtenLen);
LOG.info().$(flyweight).$();
buf = bufStart;
writtenLen = 0;
getDispatcher().registerChannel(this, IOOperation.READ);
throw registerDispatcherRead();
}
} else {
getDispatcher().registerChannel(this, IOOperation.WRITE);
throw registerDispatcherWrite();
}
} else {
// handle peer disconnect
getDispatcher().disconnect(this, DISCONNECT_REASON_PEER_DISCONNECT_AT_SEND);
throw registerDispatcherDisconnect(DISCONNECT_REASON_PEER_DISCONNECT_AT_SEND);
}
}
}

private static class PongRequestProcessor implements IORequestProcessor<PongConnectionContext> {
@Override
public boolean onRequest(int operation, PongConnectionContext context) {
switch (operation) {
case IOOperation.READ:
context.receivePing();
break;
case IOOperation.WRITE:
context.sendPong();
break;
case IOOperation.HEARTBEAT:
context.getDispatcher().registerChannel(context, IOOperation.HEARTBEAT);
return false;
default:
context.getDispatcher().disconnect(context, DISCONNECT_REASON_UNKNOWN_OPERATION);
break;
public boolean onRequest(int operation, PongConnectionContext context, IODispatcher<PongConnectionContext> dispatcher) {
try {
switch (operation) {
case IOOperation.READ:
context.receivePing();
break;
case IOOperation.WRITE:
context.sendPong();
break;
case IOOperation.HEARTBEAT:
dispatcher.registerChannel(context, IOOperation.HEARTBEAT);
return false;
default:
dispatcher.disconnect(context, DISCONNECT_REASON_UNKNOWN_OPERATION);
break;
}
} catch (PeerIsSlowToWriteException e) {
dispatcher.registerChannel(context, IOOperation.READ);
} catch (PeerIsSlowToReadException e) {
dispatcher.registerChannel(context, IOOperation.WRITE);
} catch (ServerDisconnectException e) {
dispatcher.disconnect(context, context.getDisconnectReason());
}
return true;
}
Expand Down
Expand Up @@ -487,7 +487,7 @@ private void reloadFromTablesFile(
tableNameToTableTokenMap.put(tableName, token);
if (!Chars.startsWith(token.getDirName(), token.getTableName())) {
// This table is renamed, log system to real table name mapping
LOG.info().$("table dir name does not match logical name [table=").utf8(tableName).$(", dirName=").utf8(dirName).I$();
LOG.debug().$("table dir name does not match logical name [table=").utf8(tableName).$(", dirName=").utf8(dirName).I$();
}
dirNameToTableTokenMap.put(token.getDirName(), ReverseTableMapItem.of(token));
}
Expand Down

0 comments on commit 6119117

Please sign in to comment.