Skip to content

Commit

Permalink
issue #49: closing listening socket by client cancels query execution…
Browse files Browse the repository at this point in the history
… on server.
  • Loading branch information
bluestreak01 committed May 26, 2016
1 parent 4a6d380 commit cde23fd
Show file tree
Hide file tree
Showing 69 changed files with 953 additions and 707 deletions.
1 change: 1 addition & 0 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ cmake_minimum_required(VERSION 3.3)
project(questdb)

set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -m64 -Wimplicit-function-declaration")
#set(CMAKE_VERBOSE_MAKEFILE on)

# deal with windows slashes in JAVA_HOME
if ($ENV{JAVA_HOME})
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/c/share/net.c
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ JNIEXPORT jint JNICALL Java_com_questdb_misc_Net_recv
return convert_error(recv((int) fd, (void *) ptr, (size_t) len, 0));
}

JNIEXPORT jboolean JNICALL Java_com_questdb_misc_Net_isDead
(JNIEnv *e, jclass cl, jlong fd) {
int c;
return (jboolean) (recv((int) fd, &c, 1, 0) == 0);
}

JNIEXPORT jint JNICALL Java_com_questdb_misc_Net_configureNonBlocking
(JNIEnv *e, jclass cl, jlong fd) {
int flags;
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/c/share/net.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/src/main/java/com/questdb/BootstrapMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public static void main(String[] args) throws Exception {
JournalFactoryPool pool = new JournalFactoryPool(factory.getConfiguration(), configuration.getJournalPoolSize());
matcher.put("/imp", new ImportHandler(factory));
matcher.put("/js", new QueryHandler(pool, configuration));
matcher.put("/csv", new CsvHandler(pool));
matcher.put("/csv", new CsvHandler(pool, configuration));
matcher.put("/chk", new ExistenceCheckHandler(factory));
matcher.setDefaultHandler(new StaticContentHandler(configuration.getHttpPublic(), new MimeTypes(configuration.getMimeTypes())));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2016 Appsicle
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*
******************************************************************************/

package com.questdb.ex;

@SuppressWarnings("ThrowableInstanceNeverThrown")
public final class DisconnectedChannelRuntimeException extends RuntimeException {
public final static DisconnectedChannelRuntimeException INSTANCE = new DisconnectedChannelRuntimeException();

private DisconnectedChannelRuntimeException() {
}
}
2 changes: 2 additions & 0 deletions core/src/main/java/com/questdb/misc/Net.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public static boolean bind(long fd, CharSequence address, int port) {

public native static int getPeerPort(long fd);

public static native boolean isDead(long fd);

public native static void listen(long fd, int backlog);

public static native int recv(long fd, long ptr, int len);
Expand Down
17 changes: 1 addition & 16 deletions core/src/main/java/com/questdb/net/http/EpollDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import com.questdb.mp.*;
import com.questdb.net.Epoll;
import com.questdb.net.NetworkChannelImpl;
import com.questdb.net.NonBlockingSecureSocketChannel;
import com.questdb.std.LongMatrix;

import java.io.IOException;
Expand Down Expand Up @@ -169,21 +168,7 @@ private void addPending(long _fd, long timestamp) {
pending.set(r, M_ID, fdid++);

NetworkChannelImpl channel = new NetworkChannelImpl(_fd);
pending.set(r, new IOContext(
configuration.getSslConfig().isSecure() ?
new NonBlockingSecureSocketChannel(channel, configuration.getSslConfig()) :
channel,
clock,
configuration.getHttpBufReqHeader(),
configuration.getHttpBufReqContent(),
configuration.getHttpBufReqMultipart(),
configuration.getHttpBufRespHeader(),
configuration.getHttpBufRespContent(),
configuration.getHttpSoRcvSmall(),
configuration.getHttpSoRcvLarge(),
configuration.getHttpSoRetries()
)
);
pending.set(r, new IOContext(channel, configuration, clock));
}

private void disconnect(IOContext context, DisconnectReason reason) {
Expand Down
28 changes: 13 additions & 15 deletions core/src/main/java/com/questdb/net/http/IOContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import com.questdb.iter.clock.Clock;
import com.questdb.misc.Misc;
import com.questdb.net.NetworkChannel;
import com.questdb.std.FlyweightCharSequence;
import com.questdb.net.NonBlockingSecureSocketChannel;
import com.questdb.std.LocalValueMap;
import com.questdb.std.Locality;
import com.questdb.std.Mutable;
Expand All @@ -51,24 +51,18 @@
public class IOContext implements Closeable, Mutable, Locality {
public final NetworkChannel channel;
public final Request request;
public final FlyweightCharSequence ext = new FlyweightCharSequence();
private final ServerConfiguration serverConfiguration;
private final LocalValueMap map = new LocalValueMap();
private final Response response;
private final AtomicBoolean open = new AtomicBoolean(true);

public IOContext(NetworkChannel channel,
Clock clock,
int reqHeaderSize,
int reqContentSize,
int reqMultipartHeaderSize,
int respHeaderSize,
int respContentSize,
int soRcvSmall,
int soRcvLarge,
int soRetries) {
this.channel = channel;
this.request = new Request(channel, reqHeaderSize, reqContentSize, reqMultipartHeaderSize, soRcvSmall, soRcvLarge, soRetries);
this.response = new Response(channel, respHeaderSize, respContentSize, clock);
public IOContext(NetworkChannel channel, ServerConfiguration configuration, Clock clock) {
this.channel = configuration.getSslConfig().isSecure() ?
new NonBlockingSecureSocketChannel(channel, configuration.getSslConfig()) :
channel;
this.serverConfiguration = configuration;
this.request = new Request(this.channel, configuration);
this.response = new Response(this.channel, configuration, clock);
}

public ChunkedResponse chunkedResponse() {
Expand Down Expand Up @@ -106,6 +100,10 @@ public LocalValueMap getMap() {
return map;
}

public ServerConfiguration getServerConfiguration() {
return serverConfiguration;
}

public ResponseSink responseSink() {
return response.asSink();
}
Expand Down
23 changes: 12 additions & 11 deletions core/src/main/java/com/questdb/net/http/IOHttpJob.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
* <p>
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (C) 2014-2016 Appsicle
* <p>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
* <p>
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
* <p>
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* <p>
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
Expand All @@ -30,6 +30,7 @@
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*
******************************************************************************/

package com.questdb.net.http;
Expand Down Expand Up @@ -158,7 +159,7 @@ private void process(IOContext context, final ChannelStatus status) {
LOG.info().$("Headers too large").$();
logAccess(context);
result = ChannelStatus.READ;
} catch (MalformedHeaderException | DisconnectedChannelException e) {
} catch (MalformedHeaderException | DisconnectedChannelException | DisconnectedChannelRuntimeException e) {
result = ChannelStatus.DISCONNECTED;
} catch (EndOfChannelException e) {
result = ChannelStatus.EOF;
Expand Down
17 changes: 1 addition & 16 deletions core/src/main/java/com/questdb/net/http/KQueueDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import com.questdb.mp.*;
import com.questdb.net.Kqueue;
import com.questdb.net.NetworkChannelImpl;
import com.questdb.net.NonBlockingSecureSocketChannel;
import com.questdb.std.LongMatrix;

import java.io.IOException;
Expand Down Expand Up @@ -163,21 +162,7 @@ private void addPending(long _fd, long timestamp) {
pending.set(r, 0, timestamp);
pending.set(r, 1, _fd);
NetworkChannelImpl channel = new NetworkChannelImpl(_fd);
pending.set(r, new IOContext(
configuration.getSslConfig().isSecure() ?
new NonBlockingSecureSocketChannel(channel, configuration.getSslConfig()) :
channel,
clock,
configuration.getHttpBufReqHeader(),
configuration.getHttpBufReqContent(),
configuration.getHttpBufReqMultipart(),
configuration.getHttpBufRespHeader(),
configuration.getHttpBufRespContent(),
configuration.getHttpSoRcvSmall(),
configuration.getHttpSoRcvLarge(),
configuration.getHttpSoRetries()
)
);
pending.set(r, new IOContext(channel, configuration, clock));
}

private void disconnect(IOContext context, DisconnectReason reason) {
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/java/com/questdb/net/http/Request.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,15 @@ public class Request implements Closeable, Mutable {
private final int soRcvLarge;
private final int soRetries;

public Request(NetworkChannel channel, int headerBufferSize, int contentBufferSize, int multipartHeaderBufferSize, int soRcvSmall, int soRcvLarge, int soRetries) {
public Request(NetworkChannel channel, ServerConfiguration configuration) {
this.channel = channel;
this.hb = new RequestHeaderBuffer(headerBufferSize, pool);
this.in = ByteBuffer.allocateDirect(Numbers.ceilPow2(contentBufferSize));
this.hb = new RequestHeaderBuffer(configuration.getHttpBufReqHeader(), pool);
this.in = ByteBuffer.allocateDirect(Numbers.ceilPow2(configuration.getHttpBufReqContent()));
this.inAddr = ByteBuffers.getAddress(in);
this.multipartParser = new MultipartParser(multipartHeaderBufferSize, pool);
this.soRcvSmall = soRcvSmall;
this.soRcvLarge = soRcvLarge;
this.soRetries = soRetries;
this.multipartParser = new MultipartParser(configuration.getHttpBufReqMultipart(), pool);
this.soRcvSmall = configuration.getHttpSoRcvSmall();
this.soRcvLarge = configuration.getHttpSoRcvLarge();
this.soRetries = configuration.getHttpSoRetries();
}

@Override
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/java/com/questdb/net/http/Response.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,19 @@ public class Response implements Closeable, Mutable {
private long total = 0;
private boolean header = true;

public Response(WritableByteChannel channel, int headerBufferSize, int contentBufferSize, Clock clock) {
if (headerBufferSize <= 0) {
public Response(WritableByteChannel channel, ServerConfiguration configuration, Clock clock) {
if (configuration.getHttpBufRespHeader() <= 0) {
throw new IllegalArgumentException("headerBufferSize");
}

if (contentBufferSize <= 0) {
if (configuration.getHttpBufRespContent() <= 0) {
throw new IllegalArgumentException("contentBufferSize");
}

this.channel = channel;
this.sz = Numbers.ceilPow2(contentBufferSize);
this.sz = Numbers.ceilPow2(configuration.getHttpBufRespContent());
this.out = ByteBuffer.allocateDirect(sz);
this.hb = new ResponseHeaderBuffer(headerBufferSize, clock);
this.hb = new ResponseHeaderBuffer(configuration.getHttpBufRespHeader(), clock);
// size is 32bit int, as hex string max 8 bytes
this.chunkHeader = ByteBuffer.allocateDirect(8 + 2 * Misc.EOL.length());
this.chunkSink = new DirectUnboundedAnsiSink(ByteBuffers.getAddress(chunkHeader));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class ServerConfiguration {
private int dbHashKeyPage = 4 * 1024 * 1024;
private int dbHashDataPage = 8 * 1024 * 1024;
private int dbHashRowPage = 1024 * 1024;

private int dbCyclesBeforeCancel = 1024 * 1024;
private File dbPath = new File("db");
private File mimeTypes = new File("conf/mime.types");
private File httpPublic = new File("public");
Expand Down Expand Up @@ -201,6 +201,10 @@ public ServerConfiguration(File conf) throws Exception {
this.dbHashRowPage = n;
}

if ((n = parseInt(props, "db.cycles.before.cancel")) > -1) {
this.dbCyclesBeforeCancel = Numbers.ceilPow2(n);
}

if ((s = props.getProperty("mime.types")) != null) {
this.mimeTypes = normalize(root, new File(s));
} else {
Expand Down Expand Up @@ -276,6 +280,10 @@ public int getDbAsOfRowPage() {
return dbAsOfRowPage;
}

public int getDbCyclesBeforeCancel() {
return dbCyclesBeforeCancel;
}

public int getDbHashDataPage() {
return dbHashDataPage;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import com.questdb.misc.*;
import com.questdb.mp.*;
import com.questdb.net.NetworkChannelImpl;
import com.questdb.net.NonBlockingSecureSocketChannel;
import com.questdb.std.LongIntHashMap;
import com.questdb.std.LongMatrix;

Expand Down Expand Up @@ -185,21 +184,7 @@ private void addPending(long _fd, long timestamp) {
pending.set(r, M_FD, _fd);
pending.set(r, M_OPERATION, ChannelStatus.READ.ordinal());
NetworkChannelImpl channel = new NetworkChannelImpl(_fd);
pending.set(r, new IOContext(
configuration.getSslConfig().isSecure() ?
new NonBlockingSecureSocketChannel(channel, configuration.getSslConfig()) :
channel,
clock,
configuration.getHttpBufReqHeader(),
configuration.getHttpBufReqContent(),
configuration.getHttpBufReqMultipart(),
configuration.getHttpBufRespHeader(),
configuration.getHttpBufRespContent(),
configuration.getHttpSoRcvSmall(),
configuration.getHttpSoRcvLarge(),
configuration.getHttpSoRetries()
)
);
pending.set(r, new IOContext(channel, configuration, clock));
}

private void disconnect(IOContext context, DisconnectReason reason) {
Expand Down
Loading

0 comments on commit cde23fd

Please sign in to comment.