diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index f8ffa132b98f..6f920cedbde8 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -2,6 +2,7 @@ cmake_minimum_required(VERSION 3.3) project(questdb) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -m64 -Wimplicit-function-declaration") +#set(CMAKE_VERBOSE_MAKEFILE on) # deal with windows slashes in JAVA_HOME if ($ENV{JAVA_HOME}) diff --git a/core/src/main/c/share/net.c b/core/src/main/c/share/net.c index e5a2414fe315..626debfa832c 100644 --- a/core/src/main/c/share/net.c +++ b/core/src/main/c/share/net.c @@ -106,6 +106,12 @@ JNIEXPORT jint JNICALL Java_com_questdb_misc_Net_recv return convert_error(recv((int) fd, (void *) ptr, (size_t) len, 0)); } +JNIEXPORT jboolean JNICALL Java_com_questdb_misc_Net_isDead + (JNIEnv *e, jclass cl, jlong fd) { + int c; + return (jboolean) (recv((int) fd, &c, 1, 0) == 0); +} + JNIEXPORT jint JNICALL Java_com_questdb_misc_Net_configureNonBlocking (JNIEnv *e, jclass cl, jlong fd) { int flags; diff --git a/core/src/main/c/share/net.h b/core/src/main/c/share/net.h index 9bb057061f5e..e0bbcc0bb3b1 100644 --- a/core/src/main/c/share/net.h +++ b/core/src/main/c/share/net.h @@ -77,6 +77,14 @@ JNIEXPORT void JNICALL Java_com_questdb_misc_Net_listen JNIEXPORT jint JNICALL Java_com_questdb_misc_Net_recv (JNIEnv *, jclass, jlong, jlong, jint); +/* + * Class: com_questdb_misc_Net + * Method: recv + * Signature: (JJI)I + */ +JNIEXPORT jboolean JNICALL Java_com_questdb_misc_Net_isDead + (JNIEnv *, jclass, jlong); + /* * Class: com_questdb_misc_Net * Method: send diff --git a/core/src/main/java/com/questdb/BootstrapMain.java b/core/src/main/java/com/questdb/BootstrapMain.java index a3f52940276d..4c01b1074490 100644 --- a/core/src/main/java/com/questdb/BootstrapMain.java +++ b/core/src/main/java/com/questdb/BootstrapMain.java @@ -93,7 +93,7 @@ public static void main(String[] args) throws Exception { JournalFactoryPool pool = new JournalFactoryPool(factory.getConfiguration(), configuration.getJournalPoolSize()); matcher.put("/imp", new ImportHandler(factory)); matcher.put("/js", new QueryHandler(pool, configuration)); - matcher.put("/csv", new CsvHandler(pool)); + matcher.put("/csv", new CsvHandler(pool, configuration)); matcher.put("/chk", new ExistenceCheckHandler(factory)); matcher.setDefaultHandler(new StaticContentHandler(configuration.getHttpPublic(), new MimeTypes(configuration.getMimeTypes()))); diff --git a/core/src/main/java/com/questdb/ex/DisconnectedChannelRuntimeException.java b/core/src/main/java/com/questdb/ex/DisconnectedChannelRuntimeException.java new file mode 100644 index 000000000000..adcb5af52579 --- /dev/null +++ b/core/src/main/java/com/questdb/ex/DisconnectedChannelRuntimeException.java @@ -0,0 +1,44 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (C) 2014-2016 Appsicle + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + * + ******************************************************************************/ + +package com.questdb.ex; + +@SuppressWarnings("ThrowableInstanceNeverThrown") +public final class DisconnectedChannelRuntimeException extends RuntimeException { + public final static DisconnectedChannelRuntimeException INSTANCE = new DisconnectedChannelRuntimeException(); + + private DisconnectedChannelRuntimeException() { + } +} diff --git a/core/src/main/java/com/questdb/misc/Net.java b/core/src/main/java/com/questdb/misc/Net.java index fbbd9f1e7e96..36c00443aaba 100644 --- a/core/src/main/java/com/questdb/misc/Net.java +++ b/core/src/main/java/com/questdb/misc/Net.java @@ -72,6 +72,8 @@ public static boolean bind(long fd, CharSequence address, int port) { public native static int getPeerPort(long fd); + public static native boolean isDead(long fd); + public native static void listen(long fd, int backlog); public static native int recv(long fd, long ptr, int len); diff --git a/core/src/main/java/com/questdb/net/http/EpollDispatcher.java b/core/src/main/java/com/questdb/net/http/EpollDispatcher.java index d0e28b1b9fba..8d47d5a5eefe 100644 --- a/core/src/main/java/com/questdb/net/http/EpollDispatcher.java +++ b/core/src/main/java/com/questdb/net/http/EpollDispatcher.java @@ -46,7 +46,6 @@ import com.questdb.mp.*; import com.questdb.net.Epoll; import com.questdb.net.NetworkChannelImpl; -import com.questdb.net.NonBlockingSecureSocketChannel; import com.questdb.std.LongMatrix; import java.io.IOException; @@ -169,21 +168,7 @@ private void addPending(long _fd, long timestamp) { pending.set(r, M_ID, fdid++); NetworkChannelImpl channel = new NetworkChannelImpl(_fd); - pending.set(r, new IOContext( - configuration.getSslConfig().isSecure() ? - new NonBlockingSecureSocketChannel(channel, configuration.getSslConfig()) : - channel, - clock, - configuration.getHttpBufReqHeader(), - configuration.getHttpBufReqContent(), - configuration.getHttpBufReqMultipart(), - configuration.getHttpBufRespHeader(), - configuration.getHttpBufRespContent(), - configuration.getHttpSoRcvSmall(), - configuration.getHttpSoRcvLarge(), - configuration.getHttpSoRetries() - ) - ); + pending.set(r, new IOContext(channel, configuration, clock)); } private void disconnect(IOContext context, DisconnectReason reason) { diff --git a/core/src/main/java/com/questdb/net/http/IOContext.java b/core/src/main/java/com/questdb/net/http/IOContext.java index 58763e09a3fd..c79c399e5f9f 100644 --- a/core/src/main/java/com/questdb/net/http/IOContext.java +++ b/core/src/main/java/com/questdb/net/http/IOContext.java @@ -40,7 +40,7 @@ import com.questdb.iter.clock.Clock; import com.questdb.misc.Misc; import com.questdb.net.NetworkChannel; -import com.questdb.std.FlyweightCharSequence; +import com.questdb.net.NonBlockingSecureSocketChannel; import com.questdb.std.LocalValueMap; import com.questdb.std.Locality; import com.questdb.std.Mutable; @@ -51,24 +51,18 @@ public class IOContext implements Closeable, Mutable, Locality { public final NetworkChannel channel; public final Request request; - public final FlyweightCharSequence ext = new FlyweightCharSequence(); + private final ServerConfiguration serverConfiguration; private final LocalValueMap map = new LocalValueMap(); private final Response response; private final AtomicBoolean open = new AtomicBoolean(true); - public IOContext(NetworkChannel channel, - Clock clock, - int reqHeaderSize, - int reqContentSize, - int reqMultipartHeaderSize, - int respHeaderSize, - int respContentSize, - int soRcvSmall, - int soRcvLarge, - int soRetries) { - this.channel = channel; - this.request = new Request(channel, reqHeaderSize, reqContentSize, reqMultipartHeaderSize, soRcvSmall, soRcvLarge, soRetries); - this.response = new Response(channel, respHeaderSize, respContentSize, clock); + public IOContext(NetworkChannel channel, ServerConfiguration configuration, Clock clock) { + this.channel = configuration.getSslConfig().isSecure() ? + new NonBlockingSecureSocketChannel(channel, configuration.getSslConfig()) : + channel; + this.serverConfiguration = configuration; + this.request = new Request(this.channel, configuration); + this.response = new Response(this.channel, configuration, clock); } public ChunkedResponse chunkedResponse() { @@ -106,6 +100,10 @@ public LocalValueMap getMap() { return map; } + public ServerConfiguration getServerConfiguration() { + return serverConfiguration; + } + public ResponseSink responseSink() { return response.asSink(); } diff --git a/core/src/main/java/com/questdb/net/http/IOHttpJob.java b/core/src/main/java/com/questdb/net/http/IOHttpJob.java index 67b1d881a10c..7592472bc596 100644 --- a/core/src/main/java/com/questdb/net/http/IOHttpJob.java +++ b/core/src/main/java/com/questdb/net/http/IOHttpJob.java @@ -1,24 +1,24 @@ /******************************************************************************* - * ___ _ ____ ____ - * / _ \ _ _ ___ ___| |_| _ \| __ ) - * | | | | | | |/ _ \/ __| __| | | | _ \ - * | |_| | |_| | __/\__ \ |_| |_| | |_) | - * \__\_\\__,_|\___||___/\__|____/|____/ - *

+ * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * * Copyright (C) 2014-2016 Appsicle - *

+ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. - *

+ * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. - *

+ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - *

+ * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute @@ -30,6 +30,7 @@ * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. + * ******************************************************************************/ package com.questdb.net.http; @@ -158,7 +159,7 @@ private void process(IOContext context, final ChannelStatus status) { LOG.info().$("Headers too large").$(); logAccess(context); result = ChannelStatus.READ; - } catch (MalformedHeaderException | DisconnectedChannelException e) { + } catch (MalformedHeaderException | DisconnectedChannelException | DisconnectedChannelRuntimeException e) { result = ChannelStatus.DISCONNECTED; } catch (EndOfChannelException e) { result = ChannelStatus.EOF; diff --git a/core/src/main/java/com/questdb/net/http/KQueueDispatcher.java b/core/src/main/java/com/questdb/net/http/KQueueDispatcher.java index 11924b28ccf3..b4662c43a842 100644 --- a/core/src/main/java/com/questdb/net/http/KQueueDispatcher.java +++ b/core/src/main/java/com/questdb/net/http/KQueueDispatcher.java @@ -45,7 +45,6 @@ import com.questdb.mp.*; import com.questdb.net.Kqueue; import com.questdb.net.NetworkChannelImpl; -import com.questdb.net.NonBlockingSecureSocketChannel; import com.questdb.std.LongMatrix; import java.io.IOException; @@ -163,21 +162,7 @@ private void addPending(long _fd, long timestamp) { pending.set(r, 0, timestamp); pending.set(r, 1, _fd); NetworkChannelImpl channel = new NetworkChannelImpl(_fd); - pending.set(r, new IOContext( - configuration.getSslConfig().isSecure() ? - new NonBlockingSecureSocketChannel(channel, configuration.getSslConfig()) : - channel, - clock, - configuration.getHttpBufReqHeader(), - configuration.getHttpBufReqContent(), - configuration.getHttpBufReqMultipart(), - configuration.getHttpBufRespHeader(), - configuration.getHttpBufRespContent(), - configuration.getHttpSoRcvSmall(), - configuration.getHttpSoRcvLarge(), - configuration.getHttpSoRetries() - ) - ); + pending.set(r, new IOContext(channel, configuration, clock)); } private void disconnect(IOContext context, DisconnectReason reason) { diff --git a/core/src/main/java/com/questdb/net/http/Request.java b/core/src/main/java/com/questdb/net/http/Request.java index 9bf79f950bbc..22d175e280a2 100644 --- a/core/src/main/java/com/questdb/net/http/Request.java +++ b/core/src/main/java/com/questdb/net/http/Request.java @@ -64,15 +64,15 @@ public class Request implements Closeable, Mutable { private final int soRcvLarge; private final int soRetries; - public Request(NetworkChannel channel, int headerBufferSize, int contentBufferSize, int multipartHeaderBufferSize, int soRcvSmall, int soRcvLarge, int soRetries) { + public Request(NetworkChannel channel, ServerConfiguration configuration) { this.channel = channel; - this.hb = new RequestHeaderBuffer(headerBufferSize, pool); - this.in = ByteBuffer.allocateDirect(Numbers.ceilPow2(contentBufferSize)); + this.hb = new RequestHeaderBuffer(configuration.getHttpBufReqHeader(), pool); + this.in = ByteBuffer.allocateDirect(Numbers.ceilPow2(configuration.getHttpBufReqContent())); this.inAddr = ByteBuffers.getAddress(in); - this.multipartParser = new MultipartParser(multipartHeaderBufferSize, pool); - this.soRcvSmall = soRcvSmall; - this.soRcvLarge = soRcvLarge; - this.soRetries = soRetries; + this.multipartParser = new MultipartParser(configuration.getHttpBufReqMultipart(), pool); + this.soRcvSmall = configuration.getHttpSoRcvSmall(); + this.soRcvLarge = configuration.getHttpSoRcvLarge(); + this.soRetries = configuration.getHttpSoRetries(); } @Override diff --git a/core/src/main/java/com/questdb/net/http/Response.java b/core/src/main/java/com/questdb/net/http/Response.java index 4ab63c341fb5..41b38a195bf0 100644 --- a/core/src/main/java/com/questdb/net/http/Response.java +++ b/core/src/main/java/com/questdb/net/http/Response.java @@ -80,19 +80,19 @@ public class Response implements Closeable, Mutable { private long total = 0; private boolean header = true; - public Response(WritableByteChannel channel, int headerBufferSize, int contentBufferSize, Clock clock) { - if (headerBufferSize <= 0) { + public Response(WritableByteChannel channel, ServerConfiguration configuration, Clock clock) { + if (configuration.getHttpBufRespHeader() <= 0) { throw new IllegalArgumentException("headerBufferSize"); } - if (contentBufferSize <= 0) { + if (configuration.getHttpBufRespContent() <= 0) { throw new IllegalArgumentException("contentBufferSize"); } this.channel = channel; - this.sz = Numbers.ceilPow2(contentBufferSize); + this.sz = Numbers.ceilPow2(configuration.getHttpBufRespContent()); this.out = ByteBuffer.allocateDirect(sz); - this.hb = new ResponseHeaderBuffer(headerBufferSize, clock); + this.hb = new ResponseHeaderBuffer(configuration.getHttpBufRespHeader(), clock); // size is 32bit int, as hex string max 8 bytes this.chunkHeader = ByteBuffer.allocateDirect(8 + 2 * Misc.EOL.length()); this.chunkSink = new DirectUnboundedAnsiSink(ByteBuffers.getAddress(chunkHeader)); diff --git a/core/src/main/java/com/questdb/net/http/ServerConfiguration.java b/core/src/main/java/com/questdb/net/http/ServerConfiguration.java index b0ee01a3d7bb..bb20825a05d3 100644 --- a/core/src/main/java/com/questdb/net/http/ServerConfiguration.java +++ b/core/src/main/java/com/questdb/net/http/ServerConfiguration.java @@ -74,7 +74,7 @@ public class ServerConfiguration { private int dbHashKeyPage = 4 * 1024 * 1024; private int dbHashDataPage = 8 * 1024 * 1024; private int dbHashRowPage = 1024 * 1024; - + private int dbCyclesBeforeCancel = 1024 * 1024; private File dbPath = new File("db"); private File mimeTypes = new File("conf/mime.types"); private File httpPublic = new File("public"); @@ -201,6 +201,10 @@ public ServerConfiguration(File conf) throws Exception { this.dbHashRowPage = n; } + if ((n = parseInt(props, "db.cycles.before.cancel")) > -1) { + this.dbCyclesBeforeCancel = Numbers.ceilPow2(n); + } + if ((s = props.getProperty("mime.types")) != null) { this.mimeTypes = normalize(root, new File(s)); } else { @@ -276,6 +280,10 @@ public int getDbAsOfRowPage() { return dbAsOfRowPage; } + public int getDbCyclesBeforeCancel() { + return dbCyclesBeforeCancel; + } + public int getDbHashDataPage() { return dbHashDataPage; } diff --git a/core/src/main/java/com/questdb/net/http/Win32SelectDispatcher.java b/core/src/main/java/com/questdb/net/http/Win32SelectDispatcher.java index 57db0e33f1bd..ecf33be05e4c 100644 --- a/core/src/main/java/com/questdb/net/http/Win32SelectDispatcher.java +++ b/core/src/main/java/com/questdb/net/http/Win32SelectDispatcher.java @@ -42,7 +42,6 @@ import com.questdb.misc.*; import com.questdb.mp.*; import com.questdb.net.NetworkChannelImpl; -import com.questdb.net.NonBlockingSecureSocketChannel; import com.questdb.std.LongIntHashMap; import com.questdb.std.LongMatrix; @@ -185,21 +184,7 @@ private void addPending(long _fd, long timestamp) { pending.set(r, M_FD, _fd); pending.set(r, M_OPERATION, ChannelStatus.READ.ordinal()); NetworkChannelImpl channel = new NetworkChannelImpl(_fd); - pending.set(r, new IOContext( - configuration.getSslConfig().isSecure() ? - new NonBlockingSecureSocketChannel(channel, configuration.getSslConfig()) : - channel, - clock, - configuration.getHttpBufReqHeader(), - configuration.getHttpBufReqContent(), - configuration.getHttpBufReqMultipart(), - configuration.getHttpBufRespHeader(), - configuration.getHttpBufRespContent(), - configuration.getHttpSoRcvSmall(), - configuration.getHttpSoRcvLarge(), - configuration.getHttpSoRetries() - ) - ); + pending.set(r, new IOContext(channel, configuration, clock)); } private void disconnect(IOContext context, DisconnectReason reason) { diff --git a/core/src/main/java/com/questdb/net/http/handlers/AbstractQueryContext.java b/core/src/main/java/com/questdb/net/http/handlers/AbstractQueryContext.java new file mode 100644 index 000000000000..37b12b681ad3 --- /dev/null +++ b/core/src/main/java/com/questdb/net/http/handlers/AbstractQueryContext.java @@ -0,0 +1,215 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (C) 2014-2016 Appsicle + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + * + ******************************************************************************/ + +package com.questdb.net.http.handlers; + +import com.questdb.ex.*; +import com.questdb.factory.JournalFactoryPool; +import com.questdb.factory.JournalReaderFactory; +import com.questdb.factory.configuration.RecordMetadata; +import com.questdb.log.Log; +import com.questdb.log.LogFactory; +import com.questdb.log.LogRecord; +import com.questdb.misc.Chars; +import com.questdb.misc.Misc; +import com.questdb.misc.Numbers; +import com.questdb.net.http.ChunkedResponse; +import com.questdb.net.http.Request; +import com.questdb.net.http.ServerConfiguration; +import com.questdb.ql.Record; +import com.questdb.ql.RecordCursor; +import com.questdb.ql.RecordSource; +import com.questdb.ql.impl.ChannelCheckCancellationHandler; +import com.questdb.ql.parser.QueryCompiler; +import com.questdb.ql.parser.QueryError; +import com.questdb.std.AssociativeCache; +import com.questdb.std.Mutable; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; + + +public abstract class AbstractQueryContext implements Mutable, Closeable { + static final ThreadLocal COMPILER = new ThreadLocal<>(); + static final ThreadLocal> CACHE = new ThreadLocal<>(); + static final Log LOG = LogFactory.getLog(AbstractQueryContext.class); + final ChannelCheckCancellationHandler cancellationHandler; + final long fd; + RecordSource recordSource; + CharSequence query; + RecordMetadata metadata; + RecordCursor cursor; + long count; + long skip; + long stop; + Record record; + JournalReaderFactory factory; + QueryState state = QueryState.PREFIX; + int columnIndex; + + public AbstractQueryContext(long fd, int cyclesBeforeCancel) { + this.cancellationHandler = new ChannelCheckCancellationHandler(fd, cyclesBeforeCancel); + this.fd = fd; + } + + @Override + public void clear() { + debug().$("Cleaning context").$(); + metadata = null; + cursor = null; + record = null; + debug().$("Closing journal factory").$(); + factory = Misc.free(factory); + if (recordSource != null) { + CACHE.get().put(query.toString(), recordSource); + recordSource = null; + } + query = null; + state = QueryState.PREFIX; + columnIndex = 0; + } + + @Override + public void close() throws IOException { + debug().$("Closing context").$(); + clear(); + } + + public void compileQuery(ChunkedResponse r, JournalFactoryPool pool, AtomicLong misses, AtomicLong hits) throws IOException { + try { + // Prepare Context. + this.factory = pool.get(); + recordSource = CACHE.get().poll(query); + if (recordSource == null) { + recordSource = COMPILER.get().compileSource(factory, query); + misses.incrementAndGet(); + } else { + recordSource.reset(); + hits.incrementAndGet(); + } + cursor = recordSource.prepareCursor(factory, cancellationHandler); + metadata = cursor.getMetadata(); + header(r, 200); + } catch (ParserException e) { + info().$("Parser error executing query ").$(query).$(": at (").$(QueryError.getPosition()).$(") ").$(QueryError.getMessage()).$(); + sendException(r, QueryError.getPosition(), QueryError.getMessage(), 400); + } catch (JournalException e) { + error().$("Server error executing query ").$(query).$(e).$(); + sendException(r, 0, e.getMessage(), 500); + } catch (InterruptedException e) { + error().$("Error executing query. Server is shutting down. Query: ").$(query).$(e).$(); + sendException(r, 0, "Server is shutting down.", 500); + } + } + + public boolean parseUrl(ChunkedResponse r, Request request) throws DisconnectedChannelException, SlowWritableChannelException { + // Query text. + CharSequence query = request.getUrlParam("query"); + if (query == null || query.length() == 0) { + info().$("Empty query request received. Sending empty reply.").$(); + sendException(r, 0, "", 200); + return false; + } + + // Url Params. + long skip = 0; + long stop = Long.MAX_VALUE; + + CharSequence limit = request.getUrlParam("limit"); + if (limit != null) { + int sepPos = Chars.indexOf(limit, ','); + try { + if (sepPos > 0) { + skip = Numbers.parseLong(limit, 0, sepPos); + if (sepPos + 1 < limit.length()) { + stop = Numbers.parseLong(limit, sepPos + 1, limit.length()); + } + } else { + stop = Numbers.parseLong(limit); + } + } catch (NumericException ex) { + // Skip or stop will have default value. + } + } + if (stop < 0) { + stop = 0; + } + + if (skip < 0) { + skip = 0; + } + + this.query = query; + this.skip = skip; + this.count = 0L; + this.stop = stop; + + info().$("Query: ").$(query). + $(", skip: ").$(skip). + $(", stop: ").$(stop).$(); + + return true; + } + + static void setupThread(ServerConfiguration configuration) { + if (COMPILER.get() == null) { + COMPILER.set(new QueryCompiler(configuration)); + } + if (CACHE.get() == null) { + CACHE.set(new AssociativeCache(8, 128)); + } + } + + LogRecord debug() { + return LOG.debug().$('[').$(fd).$("] "); + } + + LogRecord error() { + return LOG.error().$('[').$(fd).$("] "); + } + + protected abstract void header(ChunkedResponse r, int code) throws DisconnectedChannelException, SlowWritableChannelException; + + LogRecord info() { + return LOG.info().$('[').$(fd).$("] "); + } + + protected abstract void sendException(ChunkedResponse r, int position, CharSequence message, int code) throws DisconnectedChannelException, SlowWritableChannelException; + + enum QueryState { + PREFIX, METADATA, META_SUFFIX, RECORD_START, RECORD_COLUMNS, RECORD_SUFFIX, DATA_SUFFIX + } +} diff --git a/core/src/main/java/com/questdb/net/http/handlers/CsvHandler.java b/core/src/main/java/com/questdb/net/http/handlers/CsvHandler.java index d82335a235be..0ab1d4f68660 100644 --- a/core/src/main/java/com/questdb/net/http/handlers/CsvHandler.java +++ b/core/src/main/java/com/questdb/net/http/handlers/CsvHandler.java @@ -1,24 +1,24 @@ /******************************************************************************* - * ___ _ ____ ____ - * / _ \ _ _ ___ ___| |_| _ \| __ ) - * | | | | | | |/ _ \/ __| __| | | | _ \ - * | |_| | |_| | __/\__ \ |_| |_| | |_) | - * \__\_\\__,_|\___||___/\__|____/|____/ - *

+ * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * * Copyright (C) 2014-2016 Appsicle - *

+ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. - *

+ * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. - *

+ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - *

+ * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute @@ -30,28 +30,23 @@ * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. + * ******************************************************************************/ package com.questdb.net.http.handlers; -import com.questdb.ex.*; -import com.questdb.factory.JournalCachingFactory; +import com.questdb.ex.DisconnectedChannelException; +import com.questdb.ex.ResponseContentBufferTooSmallException; +import com.questdb.ex.SlowWritableChannelException; import com.questdb.factory.JournalFactoryPool; import com.questdb.factory.configuration.RecordColumnMetadata; -import com.questdb.factory.configuration.RecordMetadata; -import com.questdb.log.Log; -import com.questdb.log.LogFactory; -import com.questdb.log.LogRecord; -import com.questdb.misc.Chars; import com.questdb.misc.Misc; import com.questdb.misc.Numbers; import com.questdb.net.http.ChunkedResponse; import com.questdb.net.http.ContextHandler; import com.questdb.net.http.IOContext; +import com.questdb.net.http.ServerConfiguration; import com.questdb.ql.Record; -import com.questdb.ql.RecordCursor; -import com.questdb.ql.RecordSource; -import com.questdb.ql.parser.QueryError; import com.questdb.std.CharSink; import com.questdb.std.LocalValue; import com.questdb.std.Mutable; @@ -61,76 +56,30 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; -import static com.questdb.net.http.handlers.QueryHandler.CACHE; -import static com.questdb.net.http.handlers.QueryHandler.COMPILER; - public class CsvHandler implements ContextHandler { private final JournalFactoryPool factoryPool; private final LocalValue localContext = new LocalValue<>(); private final AtomicLong cacheHits = new AtomicLong(); private final AtomicLong cacheMisses = new AtomicLong(); + private final ServerConfiguration configuration; + - public CsvHandler(JournalFactoryPool factoryPool) { + public CsvHandler(JournalFactoryPool factoryPool, ServerConfiguration configuration) { this.factoryPool = factoryPool; + this.configuration = configuration; } @Override public void handle(IOContext context) throws IOException { ExportHandlerContext ctx = localContext.get(context); if (ctx == null) { - localContext.set(context, ctx = new ExportHandlerContext()); + localContext.set(context, ctx = new ExportHandlerContext(context.channel.getFd(), context.getServerConfiguration().getDbCyclesBeforeCancel())); } - ctx.fd = context.channel.getFd(); - - // Query text. ChunkedResponse r = context.chunkedResponse(); - CharSequence query = context.request.getUrlParam("query"); - if (query == null || query.length() == 0) { - ctx.info().$("Empty query request received. Sending empty reply.").$(); - header(r, 200); - r.done(); - return; - } - - // Url Params. - long skip = 0; - long stop = Long.MAX_VALUE; - - CharSequence limit = context.request.getUrlParam("limit"); - if (limit != null) { - int sepPos = Chars.indexOf(limit, ','); - try { - if (sepPos > 0) { - skip = Numbers.parseLong(limit, 0, sepPos); - if (sepPos + 1 < limit.length()) { - stop = Numbers.parseLong(limit, sepPos + 1, limit.length()); - } - } else { - stop = Numbers.parseLong(limit); - } - } catch (NumericException ex) { - // Skip or stop will have default value. - } - } - if (stop < 0) { - stop = 0; - } - - if (skip < 0) { - skip = 0; + if (ctx.parseUrl(r, context.request)) { + ctx.compileQuery(r, factoryPool, cacheMisses, cacheHits); + resume(context); } - - ctx.query = query; - ctx.skip = skip; - ctx.count = 0L; - ctx.stop = stop; - - ctx.info().$("Query: ").$(query). - $(", skip: ").$(skip). - $(", stop: ").$(stop).$(); - - executeQuery(r, ctx); - resume(context); } @SuppressWarnings("ConstantConditions") @@ -160,7 +109,7 @@ public void resume(IOContext context) throws IOException { r.putQuoted(column.getName()); } r.put(Misc.EOL); - ctx.state = QueryState.RECORD_START; + ctx.state = AbstractQueryContext.QueryState.RECORD_START; // fall through case RECORD_START: if (ctx.record == null) { @@ -174,18 +123,18 @@ public void resume(IOContext context) throws IOException { break; } } else { - ctx.state = QueryState.DATA_SUFFIX; + ctx.state = AbstractQueryContext.QueryState.DATA_SUFFIX; break SWITCH; } } } if (ctx.count > ctx.stop) { - ctx.state = QueryState.DATA_SUFFIX; + ctx.state = AbstractQueryContext.QueryState.DATA_SUFFIX; break; } - ctx.state = QueryState.RECORD_COLUMNS; + ctx.state = AbstractQueryContext.QueryState.RECORD_COLUMNS; ctx.columnIndex = 0; // fall through case RECORD_COLUMNS: @@ -202,7 +151,7 @@ public void resume(IOContext context) throws IOException { r.bookmark(); r.put(Misc.EOL); ctx.record = null; - ctx.state = QueryState.RECORD_START; + ctx.state = AbstractQueryContext.QueryState.RECORD_START; break; case DATA_SUFFIX: sendDone(r, ctx); @@ -227,14 +176,9 @@ public void resume(IOContext context) throws IOException { @Override public void setupThread() { + AbstractQueryContext.setupThread(configuration); } - private static void sendException(ChunkedResponse r, int position, CharSequence message, int status) throws DisconnectedChannelException, SlowWritableChannelException { - header(r, status); - r.put("Error at(").put(position).put("): ").put(message).put(Misc.EOL); - r.sendChunk(); - r.done(); - } private static void putValue(CharSink sink, ColumnType type, Record rec, int col) { switch (type) { @@ -297,43 +241,6 @@ private static void putValue(CharSink sink, ColumnType type, Record rec, int col } } - private static void header(ChunkedResponse r, int code) throws DisconnectedChannelException, SlowWritableChannelException { - r.status(code, "text/csv; charset=utf-8"); - r.headers().put("Content-Disposition: attachment; filename=\"questdb-query-").put(System.currentTimeMillis()).put(".csv\"").put(Misc.EOL); - r.sendHeader(); - } - - private void executeQuery(ChunkedResponse r, ExportHandlerContext ctx) throws IOException { - try { - // Prepare Context. - JournalCachingFactory factory = factoryPool.get(); - ctx.factory = factory; - ctx.recordSource = CACHE.get().poll(ctx.query); - if (ctx.recordSource == null) { - ctx.recordSource = COMPILER.get().compileSource(factory, ctx.query); - cacheMisses.incrementAndGet(); - } else { - ctx.recordSource.reset(); - cacheHits.incrementAndGet(); - } - ctx.cursor = ctx.recordSource.prepareCursor(factory); - ctx.metadata = ctx.cursor.getMetadata(); - ctx.state = QueryState.METADATA; - ctx.columnIndex = 0; - - header(r, 200); - } catch (ParserException e) { - ctx.info().$("Parser error executing query ").$(ctx.query).$(": at (").$(QueryError.getPosition()).$(") ").$(QueryError.getMessage()).$(); - sendException(r, QueryError.getPosition(), QueryError.getMessage(), 400); - } catch (JournalException e) { - ctx.error().$("Server error executing query ").$(ctx.query).$(e).$(); - sendException(r, 0, e.getMessage(), 500); - } catch (InterruptedException e) { - ctx.error().$("Error executing query. Server is shutting down. Query: ").$(ctx.query).$(e).$(); - sendException(r, 0, "Server is shutting down.", 500); - } - } - private void sendDone(ChunkedResponse r, ExportHandlerContext ctx) throws DisconnectedChannelException, SlowWritableChannelException { if (ctx.count > -1) { ctx.count = -1; @@ -342,38 +249,15 @@ private void sendDone(ChunkedResponse r, ExportHandlerContext ctx) throws Discon r.done(); } - private enum QueryState { - METADATA, RECORD_START, RECORD_COLUMNS, DATA_SUFFIX - } - - private static class ExportHandlerContext implements Mutable, Closeable { - private static final Log LOG = LogFactory.getLog(ExportHandlerContext.class); - private RecordSource recordSource; - private CharSequence query; - private RecordMetadata metadata; - private RecordCursor cursor; - private long count; - private long skip; - private long stop; - private Record record; - private JournalCachingFactory factory; - private long fd; - private QueryState state = QueryState.METADATA; - private int columnIndex; + private static class ExportHandlerContext extends AbstractQueryContext implements Mutable, Closeable { + public ExportHandlerContext(long fd, int cyclesBeforeCancel) { + super(fd, cyclesBeforeCancel); + state = QueryState.METADATA; + } @Override public void clear() { - debug().$("Cleaning context").$(); - metadata = null; - cursor = null; - record = null; - debug().$("Closing journal factory").$(); - factory = Misc.free(factory); - if (recordSource != null) { - CACHE.get().put(query.toString(), recordSource); - recordSource = null; - } - query = null; + super.clear(); state = QueryState.METADATA; } @@ -383,16 +267,20 @@ public void close() throws IOException { clear(); } - private LogRecord debug() { - return LOG.debug().$('[').$(fd).$("] "); - } - - private LogRecord error() { - return LOG.error().$('[').$(fd).$("] "); + @Override + protected void header(ChunkedResponse r, int code) throws DisconnectedChannelException, SlowWritableChannelException { + state = QueryState.METADATA; + r.status(code, "text/csv; charset=utf-8"); + r.headers().put("Content-Disposition: attachment; filename=\"questdb-query-").put(System.currentTimeMillis()).put(".csv\"").put(Misc.EOL); + r.sendHeader(); } - private LogRecord info() { - return LOG.info().$('[').$(fd).$("] "); + @Override + protected void sendException(ChunkedResponse r, int position, CharSequence message, int status) throws DisconnectedChannelException, SlowWritableChannelException { + header(r, status); + r.put("Error at(").put(position).put("): ").put(message).put(Misc.EOL); + r.sendChunk(); + r.done(); } } } diff --git a/core/src/main/java/com/questdb/net/http/handlers/QueryHandler.java b/core/src/main/java/com/questdb/net/http/handlers/QueryHandler.java index b55c14bc8d50..d0f7640aaa59 100644 --- a/core/src/main/java/com/questdb/net/http/handlers/QueryHandler.java +++ b/core/src/main/java/com/questdb/net/http/handlers/QueryHandler.java @@ -1,24 +1,24 @@ /******************************************************************************* - * ___ _ ____ ____ - * / _ \ _ _ ___ ___| |_| _ \| __ ) - * | | | | | | |/ _ \/ __| __| | | | _ \ - * | |_| | |_| | __/\__ \ |_| |_| | |_) | - * \__\_\\__,_|\___||___/\__|____/|____/ - *

+ * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * * Copyright (C) 2014-2016 Appsicle - *

+ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. - *

+ * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. - *

+ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - *

+ * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute @@ -30,43 +30,30 @@ * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. + * ******************************************************************************/ package com.questdb.net.http.handlers; -import com.questdb.ex.*; -import com.questdb.factory.JournalCachingFactory; +import com.questdb.ex.DisconnectedChannelException; +import com.questdb.ex.ResponseContentBufferTooSmallException; +import com.questdb.ex.SlowWritableChannelException; import com.questdb.factory.JournalFactoryPool; import com.questdb.factory.configuration.RecordColumnMetadata; -import com.questdb.factory.configuration.RecordMetadata; -import com.questdb.log.Log; -import com.questdb.log.LogFactory; -import com.questdb.log.LogRecord; -import com.questdb.misc.Chars; -import com.questdb.misc.Misc; import com.questdb.misc.Numbers; import com.questdb.net.http.ChunkedResponse; import com.questdb.net.http.ContextHandler; import com.questdb.net.http.IOContext; import com.questdb.net.http.ServerConfiguration; import com.questdb.ql.Record; -import com.questdb.ql.RecordCursor; -import com.questdb.ql.RecordSource; -import com.questdb.ql.parser.QueryCompiler; -import com.questdb.ql.parser.QueryError; -import com.questdb.std.AssociativeCache; import com.questdb.std.CharSink; import com.questdb.std.LocalValue; -import com.questdb.std.Mutable; import com.questdb.store.ColumnType; -import java.io.Closeable; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; public class QueryHandler implements ContextHandler { - public static final ThreadLocal COMPILER = new ThreadLocal<>(); - public static final ThreadLocal> CACHE = new ThreadLocal<>(); private final JournalFactoryPool factoryPool; private final LocalValue localContext = new LocalValue<>(); @@ -83,64 +70,14 @@ public QueryHandler(JournalFactoryPool factoryPool, ServerConfiguration configur public void handle(IOContext context) throws IOException { QueryHandlerContext ctx = localContext.get(context); if (ctx == null) { - localContext.set(context, ctx = new QueryHandlerContext()); + localContext.set(context, + ctx = new QueryHandlerContext(context.channel.getFd(), context.getServerConfiguration().getDbCyclesBeforeCancel())); } - ctx.fd = context.channel.getFd(); - - // Query text. ChunkedResponse r = context.chunkedResponse(); - CharSequence query = context.request.getUrlParam("query"); - if (query == null || query.length() == 0) { - ctx.info().$("Empty query request received. Sending empty reply.").$(); - r.status(200, "application/json; charset=utf-8"); - r.sendHeader(); - r.put('{').putQuoted("query").put(':').putQuoted("").put('}'); - r.sendChunk(); - r.done(); - return; - } - - // Url Params. - long skip = 0; - long stop = Long.MAX_VALUE; - - CharSequence limit = context.request.getUrlParam("limit"); - if (limit != null) { - int sepPos = Chars.indexOf(limit, ','); - try { - if (sepPos > 0) { - skip = Numbers.parseLong(limit, 0, sepPos); - if (sepPos + 1 < limit.length()) { - stop = Numbers.parseLong(limit, sepPos + 1, limit.length()); - } - } else { - stop = Numbers.parseLong(limit); - } - } catch (NumericException ex) { - // Skip or stop will have default value. - } + if (ctx.parseUrl(r, context.request)) { + ctx.compileQuery(r, factoryPool, cacheMisses, cacheHits); + resume(context); } - if (stop < 0) { - stop = 0; - } - - if (skip < 0) { - skip = 0; - } - - ctx.noMeta = Chars.equalsNc("true", context.request.getUrlParam("nm")); - ctx.query = query; - ctx.skip = skip; - ctx.count = 0L; - ctx.stop = stop; - ctx.fetchAll = Chars.equalsNc("true", context.request.getUrlParam("count")); - - ctx.info().$("Query: ").$(query). - $(", skip: ").$(skip). - $(", stop: ").$(stop).$(); - - executeQuery(r, ctx); - resume(context); } @SuppressWarnings("ConstantConditions") @@ -162,13 +99,13 @@ public void resume(IOContext context) throws IOException { case PREFIX: if (ctx.noMeta) { r.put("{\"result\":["); - ctx.state = QueryState.RECORD_START; + ctx.state = QueryHandlerContext.QueryState.RECORD_START; break; } r.bookmark(); r.put('{').putQuoted("query").put(':').putUtf8EscapedAndQuoted(ctx.query); r.put(',').putQuoted("columns").put(':').put('['); - ctx.state = QueryState.METADATA; + ctx.state = QueryHandlerContext.QueryState.METADATA; ctx.columnIndex = 0; // fall through case METADATA: @@ -186,12 +123,12 @@ public void resume(IOContext context) throws IOException { putQuoted("type").put(':').putQuoted(column.getType().name()); r.put('}'); } - ctx.state = QueryState.META_SUFFIX; + ctx.state = QueryHandlerContext.QueryState.META_SUFFIX; // fall through case META_SUFFIX: r.bookmark(); r.put("],\"result\":["); - ctx.state = QueryState.RECORD_START; + ctx.state = QueryHandlerContext.QueryState.RECORD_START; // fall through case RECORD_START: @@ -210,14 +147,14 @@ public void resume(IOContext context) throws IOException { break; } } else { - ctx.state = QueryState.DATA_SUFFIX; + ctx.state = QueryHandlerContext.QueryState.DATA_SUFFIX; break SWITCH; } } } if (ctx.count > ctx.stop) { - ctx.state = QueryState.DATA_SUFFIX; + ctx.state = QueryHandlerContext.QueryState.DATA_SUFFIX; break; } @@ -227,7 +164,7 @@ public void resume(IOContext context) throws IOException { } r.put('['); - ctx.state = QueryState.RECORD_COLUMNS; + ctx.state = QueryHandlerContext.QueryState.RECORD_COLUMNS; ctx.columnIndex = 0; // fall through case RECORD_COLUMNS: @@ -241,14 +178,14 @@ public void resume(IOContext context) throws IOException { putValue(r, m.getType(), ctx.record, ctx.columnIndex); } - ctx.state = QueryState.RECORD_SUFFIX; + ctx.state = QueryHandlerContext.QueryState.RECORD_SUFFIX; // fall through case RECORD_SUFFIX: r.bookmark(); r.put(']'); ctx.record = null; - ctx.state = QueryState.RECORD_START; + ctx.state = QueryHandlerContext.QueryState.RECORD_START; break; case DATA_SUFFIX: sendDone(r, ctx); @@ -273,20 +210,7 @@ public void resume(IOContext context) throws IOException { @Override public void setupThread() { - COMPILER.set(new QueryCompiler(configuration)); - CACHE.set(new AssociativeCache(8, 128)); - } - - private static void sendException(ChunkedResponse r, CharSequence query, int position, CharSequence message, int status) throws DisconnectedChannelException, SlowWritableChannelException { - r.status(status, "application/json; charset=utf-8"); - r.sendHeader(); - r.put('{'). - putQuoted("query").put(':').putUtf8EscapedAndQuoted(query).put(','). - putQuoted("error").put(':').putQuoted(message).put(','). - putQuoted("position").put(':').put(position); - r.put('}'); - r.sendChunk(); - r.done(); + AbstractQueryContext.setupThread(configuration); } private static void putValue(CharSink sink, ColumnType type, Record rec, int col) { @@ -353,37 +277,6 @@ private static void putStringOrNull(CharSink r, CharSequence str) { } } - private void executeQuery(ChunkedResponse r, QueryHandlerContext ctx) throws IOException { - try { - // Prepare Context. - JournalCachingFactory factory = factoryPool.get(); - ctx.factory = factory; - ctx.recordSource = CACHE.get().poll(ctx.query); - if (ctx.recordSource == null) { - ctx.recordSource = COMPILER.get().compileSource(factory, ctx.query); - cacheMisses.incrementAndGet(); - } else { - ctx.recordSource.reset(); - cacheHits.incrementAndGet(); - } - ctx.cursor = ctx.recordSource.prepareCursor(factory); - ctx.metadata = ctx.cursor.getMetadata(); - ctx.state = QueryState.PREFIX; - - r.status(200, "application/json; charset=utf-8"); - r.sendHeader(); - } catch (ParserException e) { - ctx.info().$("Parser error executing query ").$(ctx.query).$(": at (").$(QueryError.getPosition()).$(") ").$(QueryError.getMessage()).$(); - sendException(r, ctx.query, QueryError.getPosition(), QueryError.getMessage(), 400); - } catch (JournalException e) { - ctx.error().$("Server error executing query ").$(ctx.query).$(e).$(); - sendException(r, ctx.query, 0, e.getMessage(), 500); - } catch (InterruptedException e) { - ctx.error().$("Error executing query. Server is shutting down. Query: ").$(ctx.query).$(e).$(); - sendException(r, ctx.query, 0, "Server is shutting down.", 500); - } - } - long getCacheHits() { return cacheHits.longValue(); } @@ -404,60 +297,4 @@ private void sendDone(ChunkedResponse r, QueryHandlerContext ctx) throws Disconn r.done(); } - private enum QueryState { - PREFIX, METADATA, META_SUFFIX, RECORD_START, RECORD_COLUMNS, RECORD_SUFFIX, DATA_SUFFIX - } - - private static class QueryHandlerContext implements Mutable, Closeable { - private static final Log LOG = LogFactory.getLog(QueryHandlerContext.class); - private RecordSource recordSource; - private CharSequence query; - private RecordMetadata metadata; - private RecordCursor cursor; - private long count; - private long skip; - private long stop; - private Record record; - private JournalCachingFactory factory; - private long fd; - private QueryState state = QueryState.PREFIX; - private int columnIndex; - private boolean noMeta = false; - private boolean fetchAll = false; - - @Override - public void clear() { - debug().$("Cleaning context").$(); - metadata = null; - cursor = null; - record = null; - debug().$("Closing journal factory").$(); - factory = Misc.free(factory); - if (recordSource != null) { - CACHE.get().put(query.toString(), recordSource); - recordSource = null; - } - query = null; - state = QueryState.PREFIX; - fetchAll = false; - } - - @Override - public void close() throws IOException { - debug().$("Closing context").$(); - clear(); - } - - private LogRecord debug() { - return LOG.debug().$('[').$(fd).$("] "); - } - - private LogRecord error() { - return LOG.error().$('[').$(fd).$("] "); - } - - private LogRecord info() { - return LOG.info().$('[').$(fd).$("] "); - } - } } diff --git a/core/src/main/java/com/questdb/net/http/handlers/QueryHandlerContext.java b/core/src/main/java/com/questdb/net/http/handlers/QueryHandlerContext.java new file mode 100644 index 000000000000..d6feb7ccbde2 --- /dev/null +++ b/core/src/main/java/com/questdb/net/http/handlers/QueryHandlerContext.java @@ -0,0 +1,86 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (C) 2014-2016 Appsicle + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + * + ******************************************************************************/ + +package com.questdb.net.http.handlers; + +import com.questdb.ex.DisconnectedChannelException; +import com.questdb.ex.SlowWritableChannelException; +import com.questdb.misc.Chars; +import com.questdb.net.http.ChunkedResponse; +import com.questdb.net.http.Request; + +public class QueryHandlerContext extends AbstractQueryContext { + boolean fetchAll = false; + boolean noMeta = false; + + public QueryHandlerContext(long fd, int cyclesBeforeCancel) { + super(fd, cyclesBeforeCancel); + } + + @Override + public void clear() { + super.clear(); + state = QueryState.PREFIX; + fetchAll = false; + } + + @Override + public boolean parseUrl(ChunkedResponse r, Request request) throws DisconnectedChannelException, SlowWritableChannelException { + if (super.parseUrl(r, request)) { + noMeta = Chars.equalsNc("true", request.getUrlParam("nm")); + fetchAll = Chars.equalsNc("true", request.getUrlParam("count")); + return true; + } + return false; + } + + @Override + protected void header(ChunkedResponse r, int status) throws DisconnectedChannelException, SlowWritableChannelException { + r.status(status, "application/json; charset=utf-8"); + r.sendHeader(); + } + + @Override + protected void sendException(ChunkedResponse r, int position, CharSequence message, int status) throws DisconnectedChannelException, SlowWritableChannelException { + header(r, status); + r.put('{'). + putQuoted("query").put(':').putUtf8EscapedAndQuoted(query == null ? "" : query).put(','). + putQuoted("error").put(':').putQuoted(message).put(','). + putQuoted("position").put(':').put(position); + r.put('}'); + r.sendChunk(); + r.done(); + } +} diff --git a/core/src/main/java/com/questdb/net/http/handlers/StaticContentHandler.java b/core/src/main/java/com/questdb/net/http/handlers/StaticContentHandler.java index 3ef5d28e48fa..41be8dfc4789 100644 --- a/core/src/main/java/com/questdb/net/http/handlers/StaticContentHandler.java +++ b/core/src/main/java/com/questdb/net/http/handlers/StaticContentHandler.java @@ -1,24 +1,24 @@ /******************************************************************************* - * ___ _ ____ ____ - * / _ \ _ _ ___ ___| |_| _ \| __ ) - * | | | | | | |/ _ \/ __| __| | | | _ \ - * | |_| | |_| | __/\__ \ |_| |_| | |_) | - * \__\_\\__,_|\___||___/\__|____/|____/ - *

+ * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * * Copyright (C) 2014-2016 Appsicle - *

+ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. - *

+ * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. - *

+ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - *

+ * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute @@ -30,6 +30,7 @@ * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. + * ******************************************************************************/ package com.questdb.net.http.handlers; @@ -50,6 +51,8 @@ public class StaticContentHandler implements ContextHandler { private final MimeTypes mimeTypes; private final ThreadLocal tlPrefixedPath = new ThreadLocal<>(); private final ThreadLocal tlRangeParser = new ThreadLocal<>(); + private final ThreadLocal tlExt = new ThreadLocal<>(); + private final LocalValue lvFd = new LocalValue<>(); private final File publicDir; @@ -103,6 +106,7 @@ public void resume(IOContext context) throws IOException { public void setupThread() { tlRangeParser.set(RangeParser.FACTORY.newInstance()); tlPrefixedPath.set(new PrefixedPath(publicDir.getAbsolutePath())); + tlExt.set(new FlyweightCharSequence()); } private void send(IOContext context, LPSZ path, boolean asAttachment) throws IOException { @@ -112,8 +116,7 @@ private void send(IOContext context, LPSZ path, boolean asAttachment) throws IOE return; } - CharSequence contentType = mimeTypes.get(context.ext.of(path, n + 1, path.length() - n - 1)); - + CharSequence contentType = mimeTypes.get(tlExt.get().of(path, n + 1, path.length() - n - 1)); CharSequence val; if ((val = context.request.getHeader("Range")) != null) { sendRange(context, val, path, contentType, asAttachment); diff --git a/core/src/main/java/com/questdb/ql/CancellationHandler.java b/core/src/main/java/com/questdb/ql/CancellationHandler.java new file mode 100644 index 000000000000..9332ac9ad699 --- /dev/null +++ b/core/src/main/java/com/questdb/ql/CancellationHandler.java @@ -0,0 +1,40 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (C) 2014-2016 Appsicle + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + * + ******************************************************************************/ + +package com.questdb.ql; + +public interface CancellationHandler { + void check(); +} diff --git a/core/src/main/java/com/questdb/ql/RecordSource.java b/core/src/main/java/com/questdb/ql/RecordSource.java index 081daa380546..51be8f1b6b3f 100644 --- a/core/src/main/java/com/questdb/ql/RecordSource.java +++ b/core/src/main/java/com/questdb/ql/RecordSource.java @@ -48,7 +48,7 @@ public interface RecordSource extends Sinkable { Parameter getParam(CharSequence name); - RecordCursor prepareCursor(JournalReaderFactory factory) throws JournalException; + RecordCursor prepareCursor(JournalReaderFactory factory, CancellationHandler cancellationHandler) throws JournalException; void reset(); diff --git a/core/src/main/java/com/questdb/ql/RowSource.java b/core/src/main/java/com/questdb/ql/RowSource.java index c5fbcf3ec450..045bc22d13f5 100644 --- a/core/src/main/java/com/questdb/ql/RowSource.java +++ b/core/src/main/java/com/questdb/ql/RowSource.java @@ -42,7 +42,7 @@ public interface RowSource extends Sinkable { void configure(JournalMetadata metadata); - void prepare(StorageFacade storageFacade); + void prepare(StorageFacade storageFacade, CancellationHandler cancellationHandler); RowCursor prepareCursor(PartitionSlice slice); diff --git a/core/src/main/java/com/questdb/ql/impl/AbstractRowSource.java b/core/src/main/java/com/questdb/ql/impl/AbstractRowSource.java index 2c7c43117fb7..ba49237e19ca 100644 --- a/core/src/main/java/com/questdb/ql/impl/AbstractRowSource.java +++ b/core/src/main/java/com/questdb/ql/impl/AbstractRowSource.java @@ -1,24 +1,24 @@ /******************************************************************************* - * ___ _ ____ ____ - * / _ \ _ _ ___ ___| |_| _ \| __ ) - * | | | | | | |/ _ \/ __| __| | | | _ \ - * | |_| | |_| | __/\__ \ |_| |_| | |_) | - * \__\_\\__,_|\___||___/\__|____/|____/ - *

+ * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * * Copyright (C) 2014-2016 Appsicle - *

+ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. - *

+ * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. - *

+ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - *

+ * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute @@ -30,10 +30,12 @@ * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. + * ******************************************************************************/ package com.questdb.ql.impl; +import com.questdb.ql.CancellationHandler; import com.questdb.ql.RowCursor; import com.questdb.ql.RowSource; import com.questdb.ql.StorageFacade; @@ -42,6 +44,6 @@ public abstract class AbstractRowSource implements RowSource, RowCursor { @SuppressFBWarnings({"ACEM_ABSTRACT_CLASS_EMPTY_METHODS"}) @Override - public void prepare(StorageFacade storageFacade) { + public void prepare(StorageFacade storageFacade, CancellationHandler cancellationHandler) { } } diff --git a/core/src/main/java/com/questdb/ql/impl/ChannelCheckCancellationHandler.java b/core/src/main/java/com/questdb/ql/impl/ChannelCheckCancellationHandler.java new file mode 100644 index 000000000000..84259b953c69 --- /dev/null +++ b/core/src/main/java/com/questdb/ql/impl/ChannelCheckCancellationHandler.java @@ -0,0 +1,70 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (C) 2014-2016 Appsicle + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + * + ******************************************************************************/ + +package com.questdb.ql.impl; + +import com.questdb.ex.DisconnectedChannelRuntimeException; +import com.questdb.misc.Net; +import com.questdb.misc.Numbers; +import com.questdb.ql.CancellationHandler; + +public class ChannelCheckCancellationHandler implements CancellationHandler { + private final long fd; + private long loop = 0; + private long mask; + + public ChannelCheckCancellationHandler(long fd, int cyclesBeforeCheck) { + this.fd = fd; + this.mask = Numbers.ceilPow2(cyclesBeforeCheck) - 1; + } + + @Override + public void check() { + if (loop > 0 && (loop & mask) == 0) { + checkChannel(); + } + loop++; + } + + public void reset() { + this.loop = 0; + } + + private void checkChannel() { + if (Net.isDead(fd)) { + throw DisconnectedChannelRuntimeException.INSTANCE; + } + } +} diff --git a/core/src/main/java/com/questdb/ql/impl/FilteredJournalRecordSource.java b/core/src/main/java/com/questdb/ql/impl/FilteredJournalRecordSource.java index 039acc2b47a6..92d1c83ff3e7 100644 --- a/core/src/main/java/com/questdb/ql/impl/FilteredJournalRecordSource.java +++ b/core/src/main/java/com/questdb/ql/impl/FilteredJournalRecordSource.java @@ -38,10 +38,7 @@ import com.questdb.ex.JournalException; import com.questdb.factory.JournalReaderFactory; import com.questdb.factory.configuration.RecordMetadata; -import com.questdb.ql.Record; -import com.questdb.ql.RecordCursor; -import com.questdb.ql.RecordSource; -import com.questdb.ql.StorageFacade; +import com.questdb.ql.*; import com.questdb.ql.model.ExprNode; import com.questdb.ql.ops.AbstractCombinedRecordSource; import com.questdb.ql.ops.VirtualColumn; @@ -83,8 +80,8 @@ public RecordMetadata getMetadata() { } @Override - public RecordCursor prepareCursor(JournalReaderFactory factory) throws JournalException { - this.cursor = delegate.prepareCursor(factory); + public RecordCursor prepareCursor(JournalReaderFactory factory, CancellationHandler cancellationHandler) throws JournalException { + this.cursor = delegate.prepareCursor(factory, cancellationHandler); filter.prepare(cursor.getStorageFacade()); return this; } diff --git a/core/src/main/java/com/questdb/ql/impl/FilteredRowSource.java b/core/src/main/java/com/questdb/ql/impl/FilteredRowSource.java index fb005893d71a..48319c2f6d77 100644 --- a/core/src/main/java/com/questdb/ql/impl/FilteredRowSource.java +++ b/core/src/main/java/com/questdb/ql/impl/FilteredRowSource.java @@ -36,10 +36,7 @@ package com.questdb.ql.impl; import com.questdb.factory.configuration.JournalMetadata; -import com.questdb.ql.PartitionSlice; -import com.questdb.ql.RowCursor; -import com.questdb.ql.RowSource; -import com.questdb.ql.StorageFacade; +import com.questdb.ql.*; import com.questdb.ql.ops.VirtualColumn; import com.questdb.std.CharSink; @@ -90,8 +87,8 @@ public long next() { } @Override - public void prepare(StorageFacade facade) { - delegate.prepare(facade); + public void prepare(StorageFacade facade, CancellationHandler cancellationHandler) { + delegate.prepare(facade, cancellationHandler); filter.prepare(facade); } diff --git a/core/src/main/java/com/questdb/ql/impl/JournalSource.java b/core/src/main/java/com/questdb/ql/impl/JournalSource.java index 8fff7e0afbdc..ec2d033617fb 100644 --- a/core/src/main/java/com/questdb/ql/impl/JournalSource.java +++ b/core/src/main/java/com/questdb/ql/impl/JournalSource.java @@ -78,9 +78,9 @@ public RecordMetadata getMetadata() { } @Override - public RecordCursor prepareCursor(JournalReaderFactory factory) throws JournalException { + public RecordCursor prepareCursor(JournalReaderFactory factory, CancellationHandler cancellationHandler) throws JournalException { this.partitionCursor = partitionSource.prepareCursor(factory); - this.rowSource.prepare(partitionCursor.getStorageFacade()); + this.rowSource.prepare(partitionCursor.getStorageFacade(), cancellationHandler); return this; } diff --git a/core/src/main/java/com/questdb/ql/impl/NoOpCancellationHandler.java b/core/src/main/java/com/questdb/ql/impl/NoOpCancellationHandler.java new file mode 100644 index 000000000000..433058f66fed --- /dev/null +++ b/core/src/main/java/com/questdb/ql/impl/NoOpCancellationHandler.java @@ -0,0 +1,49 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (C) 2014-2016 Appsicle + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + * + ******************************************************************************/ + +package com.questdb.ql.impl; + +import com.questdb.ql.CancellationHandler; + +public class NoOpCancellationHandler implements CancellationHandler { + public static final NoOpCancellationHandler INSTANCE = new NoOpCancellationHandler(); + + private NoOpCancellationHandler() { + } + + @Override + public void check() { + } +} diff --git a/core/src/main/java/com/questdb/ql/impl/NoOpJournalRecordSource.java b/core/src/main/java/com/questdb/ql/impl/NoOpJournalRecordSource.java index 5a5fdd1522f5..74efbbdfc45a 100644 --- a/core/src/main/java/com/questdb/ql/impl/NoOpJournalRecordSource.java +++ b/core/src/main/java/com/questdb/ql/impl/NoOpJournalRecordSource.java @@ -38,10 +38,7 @@ import com.questdb.ex.JournalException; import com.questdb.factory.JournalReaderFactory; import com.questdb.factory.configuration.RecordMetadata; -import com.questdb.ql.Record; -import com.questdb.ql.RecordCursor; -import com.questdb.ql.RecordSource; -import com.questdb.ql.StorageFacade; +import com.questdb.ql.*; import com.questdb.ql.ops.AbstractCombinedRecordSource; import com.questdb.std.CharSink; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -71,8 +68,8 @@ public RecordMetadata getMetadata() { } @Override - public RecordCursor prepareCursor(JournalReaderFactory factory) throws JournalException { - this.cursor = delegate.prepareCursor(factory); + public RecordCursor prepareCursor(JournalReaderFactory factory, CancellationHandler cancellationHandler) throws JournalException { + this.cursor = delegate.prepareCursor(factory, cancellationHandler); return this; } diff --git a/core/src/main/java/com/questdb/ql/impl/TopRecordSource.java b/core/src/main/java/com/questdb/ql/impl/TopRecordSource.java index 36b76d89cfbc..9a745b5bd30a 100644 --- a/core/src/main/java/com/questdb/ql/impl/TopRecordSource.java +++ b/core/src/main/java/com/questdb/ql/impl/TopRecordSource.java @@ -38,10 +38,7 @@ import com.questdb.ex.JournalException; import com.questdb.factory.JournalReaderFactory; import com.questdb.factory.configuration.RecordMetadata; -import com.questdb.ql.Record; -import com.questdb.ql.RecordCursor; -import com.questdb.ql.RecordSource; -import com.questdb.ql.StorageFacade; +import com.questdb.ql.*; import com.questdb.ql.ops.AbstractCombinedRecordSource; import com.questdb.ql.ops.VirtualColumn; import com.questdb.std.CharSink; @@ -77,10 +74,10 @@ public RecordMetadata getMetadata() { } @Override - public RecordCursor prepareCursor(JournalReaderFactory factory) throws JournalException { + public RecordCursor prepareCursor(JournalReaderFactory factory, CancellationHandler cancellationHandler) throws JournalException { this._top = lo.getLong(null); this._count = hi.getLong(null) - this._top; - this.recordCursor = recordSource.prepareCursor(factory); + this.recordCursor = recordSource.prepareCursor(factory, cancellationHandler); return this; } diff --git a/core/src/main/java/com/questdb/ql/impl/aggregation/AggregatedRecordSource.java b/core/src/main/java/com/questdb/ql/impl/aggregation/AggregatedRecordSource.java index b48737e3aef7..ec5808ecca19 100644 --- a/core/src/main/java/com/questdb/ql/impl/aggregation/AggregatedRecordSource.java +++ b/core/src/main/java/com/questdb/ql/impl/aggregation/AggregatedRecordSource.java @@ -131,9 +131,9 @@ public RecordMetadata getMetadata() { } @Override - public RecordCursor prepareCursor(JournalReaderFactory factory) throws JournalException { - this.recordCursor = recordSource.prepareCursor(factory); - buildMap(); + public RecordCursor prepareCursor(JournalReaderFactory factory, CancellationHandler cancellationHandler) throws JournalException { + this.recordCursor = recordSource.prepareCursor(factory, cancellationHandler); + buildMap(cancellationHandler); return this; } @@ -166,10 +166,12 @@ public void toSink(CharSink sink) { sink.put('}'); } - private void buildMap() { + private void buildMap(CancellationHandler cancellationHandler) { while (recordCursor.hasNext()) { + cancellationHandler.check(); + Record rec = recordCursor.next(); // we are inside of time window, compute aggregates diff --git a/core/src/main/java/com/questdb/ql/impl/aggregation/CountRecordSource.java b/core/src/main/java/com/questdb/ql/impl/aggregation/CountRecordSource.java index 6c16476ec682..f29caae4b853 100644 --- a/core/src/main/java/com/questdb/ql/impl/aggregation/CountRecordSource.java +++ b/core/src/main/java/com/questdb/ql/impl/aggregation/CountRecordSource.java @@ -80,7 +80,7 @@ public RecordMetadata getMetadata() { } @Override - public RecordCursor prepareCursor(JournalReaderFactory factory) throws JournalException { + public RecordCursor prepareCursor(JournalReaderFactory factory, CancellationHandler cancellationHandler) throws JournalException { partitionCursor = partitionSource.prepareCursor(factory); computeCount(); return this; diff --git a/core/src/main/java/com/questdb/ql/impl/aggregation/ResampledRecordSource.java b/core/src/main/java/com/questdb/ql/impl/aggregation/ResampledRecordSource.java index 2566b3864885..de051a75d13e 100644 --- a/core/src/main/java/com/questdb/ql/impl/aggregation/ResampledRecordSource.java +++ b/core/src/main/java/com/questdb/ql/impl/aggregation/ResampledRecordSource.java @@ -134,8 +134,8 @@ public RecordMetadata getMetadata() { } @Override - public RecordCursor prepareCursor(JournalReaderFactory factory) throws JournalException { - this.recordCursor = recordSource.prepareCursor(factory); + public RecordCursor prepareCursor(JournalReaderFactory factory, CancellationHandler cancellationHandler) throws JournalException { + this.recordCursor = recordSource.prepareCursor(factory, cancellationHandler); return this; } diff --git a/core/src/main/java/com/questdb/ql/impl/interval/IntervalRecordSource.java b/core/src/main/java/com/questdb/ql/impl/interval/IntervalRecordSource.java index 72d0667eaeb3..c5fd42c6fba6 100644 --- a/core/src/main/java/com/questdb/ql/impl/interval/IntervalRecordSource.java +++ b/core/src/main/java/com/questdb/ql/impl/interval/IntervalRecordSource.java @@ -39,10 +39,7 @@ import com.questdb.factory.JournalReaderFactory; import com.questdb.factory.configuration.RecordMetadata; import com.questdb.misc.Interval; -import com.questdb.ql.Record; -import com.questdb.ql.RecordCursor; -import com.questdb.ql.RecordSource; -import com.questdb.ql.StorageFacade; +import com.questdb.ql.*; import com.questdb.ql.ops.AbstractCombinedRecordSource; import com.questdb.std.CharSink; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -81,8 +78,8 @@ public RecordMetadata getMetadata() { } @Override - public RecordCursor prepareCursor(JournalReaderFactory factory) throws JournalException { - this.cursor = delegate.prepareCursor(factory); + public RecordCursor prepareCursor(JournalReaderFactory factory, CancellationHandler cancellationHandler) throws JournalException { + this.cursor = delegate.prepareCursor(factory, cancellationHandler); return this; } diff --git a/core/src/main/java/com/questdb/ql/impl/join/AsOfJoinRecordSource.java b/core/src/main/java/com/questdb/ql/impl/join/AsOfJoinRecordSource.java index 55004551fcc0..17d4fe939cfc 100644 --- a/core/src/main/java/com/questdb/ql/impl/join/AsOfJoinRecordSource.java +++ b/core/src/main/java/com/questdb/ql/impl/join/AsOfJoinRecordSource.java @@ -40,10 +40,7 @@ import com.questdb.factory.JournalReaderFactory; import com.questdb.factory.configuration.RecordMetadata; import com.questdb.misc.Misc; -import com.questdb.ql.Record; -import com.questdb.ql.RecordCursor; -import com.questdb.ql.RecordSource; -import com.questdb.ql.StorageFacade; +import com.questdb.ql.*; import com.questdb.ql.impl.join.asof.FixRecordHolder; import com.questdb.ql.impl.join.asof.RecordHolder; import com.questdb.ql.impl.join.asof.RowidRecordHolder; @@ -133,9 +130,9 @@ public RecordMetadata getMetadata() { } @Override - public RecordCursor prepareCursor(JournalReaderFactory factory) throws JournalException { - this.masterCursor = master.prepareCursor(factory); - this.slaveCursor = slave.prepareCursor(factory); + public RecordCursor prepareCursor(JournalReaderFactory factory, CancellationHandler cancellationHandler) throws JournalException { + this.masterCursor = master.prepareCursor(factory, cancellationHandler); + this.slaveCursor = slave.prepareCursor(factory, cancellationHandler); this.recordHolder.setCursor(slaveCursor); this.delayedHolder.setCursor(slaveCursor); this.storageFacade.prepare(factory, masterCursor.getStorageFacade(), slaveCursor.getStorageFacade()); diff --git a/core/src/main/java/com/questdb/ql/impl/join/AsOfPartitionedJoinRecordSource.java b/core/src/main/java/com/questdb/ql/impl/join/AsOfPartitionedJoinRecordSource.java index c92aed2353e8..1628164ae47f 100644 --- a/core/src/main/java/com/questdb/ql/impl/join/AsOfPartitionedJoinRecordSource.java +++ b/core/src/main/java/com/questdb/ql/impl/join/AsOfPartitionedJoinRecordSource.java @@ -40,10 +40,7 @@ import com.questdb.factory.JournalReaderFactory; import com.questdb.factory.configuration.RecordMetadata; import com.questdb.misc.Misc; -import com.questdb.ql.Record; -import com.questdb.ql.RecordCursor; -import com.questdb.ql.RecordSource; -import com.questdb.ql.StorageFacade; +import com.questdb.ql.*; import com.questdb.ql.impl.join.asof.*; import com.questdb.ql.ops.AbstractCombinedRecordSource; import com.questdb.std.CharSequenceHashSet; @@ -141,9 +138,9 @@ public RecordMetadata getMetadata() { } @Override - public RecordCursor prepareCursor(JournalReaderFactory factory) throws JournalException { - this.masterCursor = master.prepareCursor(factory); - this.slaveCursor = slave.prepareCursor(factory); + public RecordCursor prepareCursor(JournalReaderFactory factory, CancellationHandler cancellationHandler) throws JournalException { + this.masterCursor = master.prepareCursor(factory, cancellationHandler); + this.slaveCursor = slave.prepareCursor(factory, cancellationHandler); map.setSlaveCursor(slaveCursor); holder.setCursor(slaveCursor); storageFacade.prepare(factory, masterCursor.getStorageFacade(), map.getStorageFacade()); diff --git a/core/src/main/java/com/questdb/ql/impl/join/CrossJoinRecordSource.java b/core/src/main/java/com/questdb/ql/impl/join/CrossJoinRecordSource.java index 5e20c61b8b52..743e630df9e1 100644 --- a/core/src/main/java/com/questdb/ql/impl/join/CrossJoinRecordSource.java +++ b/core/src/main/java/com/questdb/ql/impl/join/CrossJoinRecordSource.java @@ -38,10 +38,7 @@ import com.questdb.ex.JournalException; import com.questdb.factory.JournalReaderFactory; import com.questdb.factory.configuration.RecordMetadata; -import com.questdb.ql.Record; -import com.questdb.ql.RecordCursor; -import com.questdb.ql.RecordSource; -import com.questdb.ql.StorageFacade; +import com.questdb.ql.*; import com.questdb.ql.ops.AbstractCombinedRecordSource; import com.questdb.std.CharSink; @@ -77,9 +74,9 @@ public RecordMetadata getMetadata() { } @Override - public RecordCursor prepareCursor(JournalReaderFactory factory) throws JournalException { - masterCursor = masterSource.prepareCursor(factory); - slaveCursor = slaveSource.prepareCursor(factory); + public RecordCursor prepareCursor(JournalReaderFactory factory, CancellationHandler cancellationHandler) throws JournalException { + masterCursor = masterSource.prepareCursor(factory, cancellationHandler); + slaveCursor = slaveSource.prepareCursor(factory, cancellationHandler); return this; } diff --git a/core/src/main/java/com/questdb/ql/impl/join/HashJoinRecordSource.java b/core/src/main/java/com/questdb/ql/impl/join/HashJoinRecordSource.java index 79984cc4963d..ecab9ae39e9a 100644 --- a/core/src/main/java/com/questdb/ql/impl/join/HashJoinRecordSource.java +++ b/core/src/main/java/com/questdb/ql/impl/join/HashJoinRecordSource.java @@ -40,10 +40,7 @@ import com.questdb.factory.configuration.RecordColumnMetadata; import com.questdb.factory.configuration.RecordMetadata; import com.questdb.misc.Misc; -import com.questdb.ql.Record; -import com.questdb.ql.RecordCursor; -import com.questdb.ql.RecordSource; -import com.questdb.ql.StorageFacade; +import com.questdb.ql.*; import com.questdb.ql.impl.join.hash.FakeRecord; import com.questdb.ql.impl.join.hash.MultiRecordMap; import com.questdb.ql.impl.join.hash.NullRecord; @@ -127,10 +124,10 @@ public RecordMetadata getMetadata() { } @Override - public RecordCursor prepareCursor(JournalReaderFactory factory) throws JournalException { - this.slaveCursor = slave.prepareCursor(factory); - this.masterCursor = master.prepareCursor(factory); - buildHashTable(); + public RecordCursor prepareCursor(JournalReaderFactory factory, CancellationHandler cancellationHandler) throws JournalException { + this.slaveCursor = slave.prepareCursor(factory, cancellationHandler); + this.masterCursor = master.prepareCursor(factory, cancellationHandler); + buildHashTable(cancellationHandler); recordMap.setStorageFacade(slaveCursor.getStorageFacade()); storageFacade.prepare(factory, masterCursor.getStorageFacade(), slaveCursor.getStorageFacade()); return this; @@ -189,8 +186,9 @@ public void toSink(CharSink sink) { sink.put("]]}"); } - private void buildHashTable() { + private void buildHashTable(CancellationHandler cancellationHandler) { for (Record r : slaveCursor) { + cancellationHandler.check(); MultiMap.KeyWriter key = recordMap.claimKey(); for (int i = 0, k = slaveColumns.size(); i < k; i++) { setKey(key, r, slaveColIndex.getQuick(i), slaveColumns.getQuick(i).getType()); diff --git a/core/src/main/java/com/questdb/ql/impl/lambda/KvIndexIntLambdaHeadRowSource.java b/core/src/main/java/com/questdb/ql/impl/lambda/KvIndexIntLambdaHeadRowSource.java index c89e6fcabd52..554db8e441d7 100644 --- a/core/src/main/java/com/questdb/ql/impl/lambda/KvIndexIntLambdaHeadRowSource.java +++ b/core/src/main/java/com/questdb/ql/impl/lambda/KvIndexIntLambdaHeadRowSource.java @@ -125,14 +125,14 @@ public long next() { @SuppressFBWarnings("EXS_EXCEPTION_SOFTENING_NO_CHECKED") @Override - public void prepare(StorageFacade fa) { + public void prepare(StorageFacade fa, CancellationHandler cancellationHandler) { if (filter != null) { filter.prepare(fa); } keys.clear(); try { - for (Record r : recordSource.prepareCursor(fa.getFactory())) { + for (Record r : recordSource.prepareCursor(fa.getFactory(), cancellationHandler)) { keys.add(r.getInt(recordSourceColumn)); } } catch (JournalException e) { diff --git a/core/src/main/java/com/questdb/ql/impl/lambda/KvIndexStrLambdaHeadRowSource.java b/core/src/main/java/com/questdb/ql/impl/lambda/KvIndexStrLambdaHeadRowSource.java index d34116241bda..779e39461b3b 100644 --- a/core/src/main/java/com/questdb/ql/impl/lambda/KvIndexStrLambdaHeadRowSource.java +++ b/core/src/main/java/com/questdb/ql/impl/lambda/KvIndexStrLambdaHeadRowSource.java @@ -127,7 +127,7 @@ public long next() { @SuppressFBWarnings("EXS_EXCEPTION_SOFTENING_NO_CHECKED") @Override - public void prepare(StorageFacade fa) { + public void prepare(StorageFacade fa, CancellationHandler cancellationHandler) { if (filter != null) { filter.prepare(fa); } @@ -135,7 +135,7 @@ public void prepare(StorageFacade fa) { keys.clear(); hashes.clear(); try { - for (Record r : recordSource.prepareCursor(fa.getFactory())) { + for (Record r : recordSource.prepareCursor(fa.getFactory(), cancellationHandler)) { CharSequence s = getKey(r, recordSourceColumn); if (keys.add(s)) { hashes.add(Hash.boundedHash(s, buckets)); diff --git a/core/src/main/java/com/questdb/ql/impl/lambda/KvIndexSymLambdaHeadRowSource.java b/core/src/main/java/com/questdb/ql/impl/lambda/KvIndexSymLambdaHeadRowSource.java index d34d712db03a..7e31de1b1646 100644 --- a/core/src/main/java/com/questdb/ql/impl/lambda/KvIndexSymLambdaHeadRowSource.java +++ b/core/src/main/java/com/questdb/ql/impl/lambda/KvIndexSymLambdaHeadRowSource.java @@ -117,7 +117,7 @@ public long next() { @SuppressFBWarnings("EXS_EXCEPTION_SOFTENING_NO_CHECKED") @Override - public void prepare(StorageFacade fa) { + public void prepare(StorageFacade fa, CancellationHandler cancellationHandler) { if (filter != null) { filter.prepare(fa); @@ -126,7 +126,7 @@ public void prepare(StorageFacade fa) { SymbolTable tab = fa.getSymbolTable(column); keys.clear(); try { - for (Record r : recordSource.prepareCursor(fa.getFactory())) { + for (Record r : recordSource.prepareCursor(fa.getFactory(), cancellationHandler)) { int k = tab.getQuick(getKey(r, recordSourceColumn)); if (k > -1) { keys.add(k); diff --git a/core/src/main/java/com/questdb/ql/impl/latest/HeapMergingRowSource.java b/core/src/main/java/com/questdb/ql/impl/latest/HeapMergingRowSource.java index df6a9ecfeade..2e272fdd89b5 100644 --- a/core/src/main/java/com/questdb/ql/impl/latest/HeapMergingRowSource.java +++ b/core/src/main/java/com/questdb/ql/impl/latest/HeapMergingRowSource.java @@ -37,10 +37,7 @@ import com.questdb.factory.configuration.JournalMetadata; import com.questdb.misc.Unsafe; -import com.questdb.ql.PartitionSlice; -import com.questdb.ql.RowCursor; -import com.questdb.ql.RowSource; -import com.questdb.ql.StorageFacade; +import com.questdb.ql.*; import com.questdb.std.CharSink; import com.questdb.std.IntLongPriorityQueue; @@ -57,15 +54,15 @@ public HeapMergingRowSource(RowSource... sources) { @Override public void configure(JournalMetadata metadata) { - for (RowSource src : sources) { - src.configure(metadata); + for (int i = 0, n = sources.length; i < n; i++) { + Unsafe.arrayGet(sources, i).configure(metadata); } } @Override - public void prepare(StorageFacade facade) { - for (RowSource src : sources) { - src.prepare(facade); + public void prepare(StorageFacade facade, CancellationHandler cancellationHandler) { + for (int i = 0, n = sources.length; i < n; i++) { + Unsafe.arrayGet(sources, i).prepare(facade, cancellationHandler); } } diff --git a/core/src/main/java/com/questdb/ql/impl/latest/KvIndexIntListHeadRowSource.java b/core/src/main/java/com/questdb/ql/impl/latest/KvIndexIntListHeadRowSource.java index b18ac61630fd..5a2cd62d31b1 100644 --- a/core/src/main/java/com/questdb/ql/impl/latest/KvIndexIntListHeadRowSource.java +++ b/core/src/main/java/com/questdb/ql/impl/latest/KvIndexIntListHeadRowSource.java @@ -39,6 +39,7 @@ import com.questdb.ex.JournalException; import com.questdb.ex.JournalRuntimeException; import com.questdb.factory.configuration.JournalMetadata; +import com.questdb.ql.CancellationHandler; import com.questdb.ql.PartitionSlice; import com.questdb.ql.RowCursor; import com.questdb.ql.StorageFacade; @@ -127,7 +128,7 @@ public long next() { } @Override - public void prepare(StorageFacade storageFacade) { + public void prepare(StorageFacade storageFacade, CancellationHandler cancellationHandler) { if (filter != null) { filter.prepare(storageFacade); } diff --git a/core/src/main/java/com/questdb/ql/impl/latest/KvIndexStrListHeadRowSource.java b/core/src/main/java/com/questdb/ql/impl/latest/KvIndexStrListHeadRowSource.java index ed420c83afb9..e2d6df792ff8 100644 --- a/core/src/main/java/com/questdb/ql/impl/latest/KvIndexStrListHeadRowSource.java +++ b/core/src/main/java/com/questdb/ql/impl/latest/KvIndexStrListHeadRowSource.java @@ -40,6 +40,7 @@ import com.questdb.ex.JournalRuntimeException; import com.questdb.factory.configuration.JournalMetadata; import com.questdb.misc.Hash; +import com.questdb.ql.CancellationHandler; import com.questdb.ql.PartitionSlice; import com.questdb.ql.RowCursor; import com.questdb.ql.StorageFacade; @@ -123,7 +124,7 @@ public long next() { } @Override - public void prepare(StorageFacade storageFacade) { + public void prepare(StorageFacade storageFacade, CancellationHandler cancellationHandler) { if (filter != null) { filter.prepare(storageFacade); } diff --git a/core/src/main/java/com/questdb/ql/impl/latest/KvIndexSymAllHeadRowSource.java b/core/src/main/java/com/questdb/ql/impl/latest/KvIndexSymAllHeadRowSource.java index c06124fdeeb3..16b99ef867eb 100644 --- a/core/src/main/java/com/questdb/ql/impl/latest/KvIndexSymAllHeadRowSource.java +++ b/core/src/main/java/com/questdb/ql/impl/latest/KvIndexSymAllHeadRowSource.java @@ -39,6 +39,7 @@ import com.questdb.ex.JournalException; import com.questdb.ex.JournalRuntimeException; import com.questdb.factory.configuration.JournalMetadata; +import com.questdb.ql.CancellationHandler; import com.questdb.ql.PartitionSlice; import com.questdb.ql.RowCursor; import com.questdb.ql.StorageFacade; @@ -115,7 +116,7 @@ public long next() { } @Override - public void prepare(StorageFacade facade) { + public void prepare(StorageFacade facade, CancellationHandler cancellationHandler) { if (filter != null) { filter.prepare(facade); } diff --git a/core/src/main/java/com/questdb/ql/impl/latest/KvIndexSymListHeadRowSource.java b/core/src/main/java/com/questdb/ql/impl/latest/KvIndexSymListHeadRowSource.java index 05556a3b432e..8340e5ff6eb5 100644 --- a/core/src/main/java/com/questdb/ql/impl/latest/KvIndexSymListHeadRowSource.java +++ b/core/src/main/java/com/questdb/ql/impl/latest/KvIndexSymListHeadRowSource.java @@ -39,6 +39,7 @@ import com.questdb.ex.JournalException; import com.questdb.ex.JournalRuntimeException; import com.questdb.factory.configuration.JournalMetadata; +import com.questdb.ql.CancellationHandler; import com.questdb.ql.PartitionSlice; import com.questdb.ql.RowCursor; import com.questdb.ql.StorageFacade; @@ -123,7 +124,7 @@ public long next() { } @Override - public void prepare(StorageFacade fa) { + public void prepare(StorageFacade fa, CancellationHandler cancellationHandler) { if (filter != null) { filter.prepare(fa); diff --git a/core/src/main/java/com/questdb/ql/impl/latest/KvIndexSymLookupRowSource.java b/core/src/main/java/com/questdb/ql/impl/latest/KvIndexSymLookupRowSource.java index 0df0fc8cb559..d5925473c933 100644 --- a/core/src/main/java/com/questdb/ql/impl/latest/KvIndexSymLookupRowSource.java +++ b/core/src/main/java/com/questdb/ql/impl/latest/KvIndexSymLookupRowSource.java @@ -38,6 +38,7 @@ import com.questdb.ex.JournalException; import com.questdb.ex.JournalRuntimeException; import com.questdb.factory.configuration.JournalMetadata; +import com.questdb.ql.CancellationHandler; import com.questdb.ql.PartitionSlice; import com.questdb.ql.RowCursor; import com.questdb.ql.StorageFacade; @@ -127,7 +128,7 @@ public long next() { } @Override - public void prepare(StorageFacade facade) { + public void prepare(StorageFacade facade, CancellationHandler cancellationHandler) { symbolKey = facade.getSymbolTable(symbol).getQuick(value); } diff --git a/core/src/main/java/com/questdb/ql/impl/latest/MergingRowSource.java b/core/src/main/java/com/questdb/ql/impl/latest/MergingRowSource.java index dbf84c0ae3ea..6a6858270080 100644 --- a/core/src/main/java/com/questdb/ql/impl/latest/MergingRowSource.java +++ b/core/src/main/java/com/questdb/ql/impl/latest/MergingRowSource.java @@ -36,10 +36,7 @@ package com.questdb.ql.impl.latest; import com.questdb.factory.configuration.JournalMetadata; -import com.questdb.ql.PartitionSlice; -import com.questdb.ql.RowCursor; -import com.questdb.ql.RowSource; -import com.questdb.ql.StorageFacade; +import com.questdb.ql.*; import com.questdb.std.CharSink; public class MergingRowSource implements RowSource, RowCursor { @@ -62,9 +59,9 @@ public void configure(JournalMetadata metadata) { } @Override - public void prepare(StorageFacade facade) { - lhs.prepare(facade); - rhs.prepare(facade); + public void prepare(StorageFacade facade, CancellationHandler cancellationHandler) { + lhs.prepare(facade, cancellationHandler); + rhs.prepare(facade, cancellationHandler); } @Override diff --git a/core/src/main/java/com/questdb/ql/impl/select/SelectedColumnsRecordSource.java b/core/src/main/java/com/questdb/ql/impl/select/SelectedColumnsRecordSource.java index 5d33d71a44f1..523bc473d17e 100644 --- a/core/src/main/java/com/questdb/ql/impl/select/SelectedColumnsRecordSource.java +++ b/core/src/main/java/com/questdb/ql/impl/select/SelectedColumnsRecordSource.java @@ -38,10 +38,7 @@ import com.questdb.ex.JournalException; import com.questdb.factory.JournalReaderFactory; import com.questdb.factory.configuration.RecordMetadata; -import com.questdb.ql.Record; -import com.questdb.ql.RecordCursor; -import com.questdb.ql.RecordSource; -import com.questdb.ql.StorageFacade; +import com.questdb.ql.*; import com.questdb.ql.ops.AbstractCombinedRecordSource; import com.questdb.std.CharSequenceHashSet; import com.questdb.std.CharSink; @@ -87,8 +84,8 @@ public RecordMetadata getMetadata() { } @Override - public RecordCursor prepareCursor(JournalReaderFactory factory) throws JournalException { - this.recordCursor = recordSource.prepareCursor(factory); + public RecordCursor prepareCursor(JournalReaderFactory factory, CancellationHandler cancellationHandler) throws JournalException { + this.recordCursor = recordSource.prepareCursor(factory, cancellationHandler); this.storageFacade.of(recordCursor.getStorageFacade()); return this; } diff --git a/core/src/main/java/com/questdb/ql/impl/sort/RBTreeSortedRecordSource.java b/core/src/main/java/com/questdb/ql/impl/sort/RBTreeSortedRecordSource.java index 387171d2e82b..688be1e7e3ad 100644 --- a/core/src/main/java/com/questdb/ql/impl/sort/RBTreeSortedRecordSource.java +++ b/core/src/main/java/com/questdb/ql/impl/sort/RBTreeSortedRecordSource.java @@ -39,10 +39,7 @@ import com.questdb.factory.JournalReaderFactory; import com.questdb.factory.configuration.RecordMetadata; import com.questdb.misc.Unsafe; -import com.questdb.ql.Record; -import com.questdb.ql.RecordCursor; -import com.questdb.ql.RecordSource; -import com.questdb.ql.StorageFacade; +import com.questdb.ql.*; import com.questdb.ql.impl.RecordList; import com.questdb.ql.impl.join.hash.FakeRecord; import com.questdb.ql.ops.AbstractRecordSource; @@ -101,13 +98,13 @@ public RecordMetadata getMetadata() { } @Override - public RecordCursor prepareCursor(JournalReaderFactory factory) throws JournalException { - sourceCursor = recordSource.prepareCursor(factory); + public RecordCursor prepareCursor(JournalReaderFactory factory, CancellationHandler cancellationHandler) throws JournalException { + sourceCursor = recordSource.prepareCursor(factory, cancellationHandler); records.setStorageFacade(sourceCursor.getStorageFacade()); if (byRowId) { - buildMapByRowId(sourceCursor); + buildMapByRowId(sourceCursor, cancellationHandler); } else { - buildMap(sourceCursor); + buildMap(sourceCursor, cancellationHandler); } cursor.setup(); return cursor; @@ -218,14 +215,16 @@ private long allocateBlock() { return p; } - private void buildMap(RecordCursor cursor) { + private void buildMap(RecordCursor cursor, CancellationHandler cancellationHandler) { while (cursor.hasNext()) { + cancellationHandler.check(); put(cursor.next()); } } - private void buildMapByRowId(RecordCursor cursor) { + private void buildMapByRowId(RecordCursor cursor, CancellationHandler cancellationHandler) { while (cursor.hasNext()) { + cancellationHandler.check(); put(cursor.next().getRowId()); } } diff --git a/core/src/main/java/com/questdb/ql/impl/virtual/VirtualColumnRecordSource.java b/core/src/main/java/com/questdb/ql/impl/virtual/VirtualColumnRecordSource.java index 7968c9d33c8b..e54cefa21dc6 100644 --- a/core/src/main/java/com/questdb/ql/impl/virtual/VirtualColumnRecordSource.java +++ b/core/src/main/java/com/questdb/ql/impl/virtual/VirtualColumnRecordSource.java @@ -38,10 +38,7 @@ import com.questdb.ex.JournalException; import com.questdb.factory.JournalReaderFactory; import com.questdb.factory.configuration.RecordMetadata; -import com.questdb.ql.Record; -import com.questdb.ql.RecordCursor; -import com.questdb.ql.RecordSource; -import com.questdb.ql.StorageFacade; +import com.questdb.ql.*; import com.questdb.ql.ops.AbstractCombinedRecordSource; import com.questdb.ql.ops.VirtualColumn; import com.questdb.std.CharSink; @@ -80,8 +77,8 @@ public RecordMetadata getMetadata() { } @Override - public RecordCursor prepareCursor(JournalReaderFactory factory) throws JournalException { - this.recordCursor = recordSource.prepareCursor(factory); + public RecordCursor prepareCursor(JournalReaderFactory factory, CancellationHandler cancellationHandler) throws JournalException { + this.recordCursor = recordSource.prepareCursor(factory, cancellationHandler); current.prepare(recordCursor.getStorageFacade()); return this; } diff --git a/core/src/main/java/com/questdb/ql/parser/QueryCompiler.java b/core/src/main/java/com/questdb/ql/parser/QueryCompiler.java index 44040bd50451..4ef6cc32551a 100644 --- a/core/src/main/java/com/questdb/ql/parser/QueryCompiler.java +++ b/core/src/main/java/com/questdb/ql/parser/QueryCompiler.java @@ -145,7 +145,7 @@ public RecordCursor compile(JournalReaderFactory factory, Class clazz) th } public RecordCursor compile(JournalReaderFactory factory, CharSequence query) throws ParserException, JournalException { - return compileSource(factory, query).prepareCursor(factory); + return compileSource(factory, query).prepareCursor(factory, NoOpCancellationHandler.INSTANCE); } public RecordSource compileSource(JournalReaderFactory factory, CharSequence query) throws ParserException, JournalException { diff --git a/core/src/main/resources/binaries/osx/libquestdb.dylib b/core/src/main/resources/binaries/osx/libquestdb.dylib index 3b44b028c6cf..f4dbca6aa076 100755 Binary files a/core/src/main/resources/binaries/osx/libquestdb.dylib and b/core/src/main/resources/binaries/osx/libquestdb.dylib differ diff --git a/core/src/main/resources/site/conf/questdb.conf b/core/src/main/resources/site/conf/questdb.conf index ae4c7dae0a3c..01e6f82f44bd 100644 --- a/core/src/main/resources/site/conf/questdb.conf +++ b/core/src/main/resources/site/conf/questdb.conf @@ -137,4 +137,10 @@ db.hash.datapage = 8M # Size of memory allocation page for storing row ids in hash join algorithm. # Default value is 4Mb -db.hash.rowpage = 1M \ No newline at end of file +db.hash.rowpage = 1M + +# Number of rows processed by internal algorithms before they check if receiving socket is +# still open. This is applicable to non-streaming algorithms, such as hashing or sorting. +# Making this value too large increases time interval between socket closed and +# thread releasing. Making it too small adds overhead of socket check making queries slower. +db.cycles.before.cancel = 1048576 \ No newline at end of file diff --git a/core/src/test/java/com/questdb/net/http/handlers/QueryHandlerConsistencyTest.java b/core/src/test/java/com/questdb/net/http/handlers/QueryHandlerConsistencyTest.java index e6f68c5df537..7cfc0ecdf10f 100644 --- a/core/src/test/java/com/questdb/net/http/handlers/QueryHandlerConsistencyTest.java +++ b/core/src/test/java/com/questdb/net/http/handlers/QueryHandlerConsistencyTest.java @@ -1,3 +1,38 @@ +/******************************************************************************* + * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * + * Copyright (C) 2014-2016 Appsicle + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + * + ******************************************************************************/ + package com.questdb.net.http.handlers; import com.questdb.factory.JournalFactoryPool; @@ -31,12 +66,12 @@ public static void setUp() throws Exception { @Test public void testCsvHandlerConsistency() throws Exception { - testHandler(new CsvHandler(new JournalFactoryPool(factory.getConfiguration(), 1))); + testHandler(new CsvHandler(new JournalFactoryPool(factory.getConfiguration(), 1), new ServerConfiguration())); } @Test public void testCsvOutput() throws Exception { - ContextHandler handler = new CsvHandler(new JournalFactoryPool(factory.getConfiguration(), 1)); + ContextHandler handler = new CsvHandler(new JournalFactoryPool(factory.getConfiguration(), 1), new ServerConfiguration()); handler.setupThread(); TestChannel channel = new TestChannel(QUERY1); String expected = "\"id\",\"x\",\"y\",\"z\",\"w\",\"timestamp\"\r\n" + @@ -142,7 +177,7 @@ public void testCsvOutput() throws Exception { "id99,-384.0000000000,0.0000000980,-264,355,\"2015-03-12T00:00:00.990Z\"\r\n"; try { channel.reset(); - try (IOContext context = new IOContext(channel, MilliClock.INSTANCE, 1024, 1024, 1024, 512, 1024, 128 * 1024, 4 * 1024 * 1024, 1024)) { + try (IOContext context = new IOContext(channel, new ServerConfiguration(), MilliClock.INSTANCE)) { context.request.read(); handler.handle(context); TestUtils.assertEquals(expected, channel.getOutput()); @@ -162,9 +197,11 @@ private void testHandler(ContextHandler handler) throws Exception { String expected = null; handler.setupThread(); try { + ServerConfiguration configuration = new ServerConfiguration(); for (int i = 128; i < 7500; i++) { channel.reset(); - try (IOContext context = new IOContext(channel, MilliClock.INSTANCE, 1024, 1024, 1024, 512, i, 128 * 1024, 4 * 1024 * 1024, 1024)) { + configuration.setHttpBufRespContent(i); + try (IOContext context = new IOContext(channel, configuration, MilliClock.INSTANCE)) { context.request.read(); handler.handle(context); if (expected != null) { @@ -177,5 +214,6 @@ private void testHandler(ContextHandler handler) throws Exception { } finally { channel.free(); } + System.out.println(expected); } } diff --git a/core/src/test/java/com/questdb/ql/AsOfPartitionedJoinRecordSourceTest.java b/core/src/test/java/com/questdb/ql/AsOfPartitionedJoinRecordSourceTest.java index b05269d69afe..08caaae734f0 100644 --- a/core/src/test/java/com/questdb/ql/AsOfPartitionedJoinRecordSourceTest.java +++ b/core/src/test/java/com/questdb/ql/AsOfPartitionedJoinRecordSourceTest.java @@ -1,24 +1,24 @@ /******************************************************************************* - * ___ _ ____ ____ - * / _ \ _ _ ___ ___| |_| _ \| __ ) - * | | | | | | |/ _ \/ __| __| | | | _ \ - * | |_| | |_| | __/\__ \ |_| |_| | |_) | - * \__\_\\__,_|\___||___/\__|____/|____/ - *

+ * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * * Copyright (C) 2014-2016 Appsicle - *

+ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. - *

+ * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. - *

+ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - *

+ * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute @@ -30,6 +30,7 @@ * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. + * ******************************************************************************/ package com.questdb.ql; @@ -43,6 +44,7 @@ import com.questdb.misc.Chars; import com.questdb.misc.Dates; import com.questdb.misc.Rnd; +import com.questdb.ql.impl.NoOpCancellationHandler; import com.questdb.ql.impl.NoRowidSource; import com.questdb.ql.impl.join.AsOfJoinRecordSource; import com.questdb.ql.impl.join.AsOfPartitionedJoinRecordSource; @@ -291,11 +293,11 @@ public void testFixJoin() throws Exception { , 128 , 128 )) { - printer.printCursor(source.prepareCursor(factory)); + printer.printCursor(source.prepareCursor(factory, NoOpCancellationHandler.INSTANCE)); TestUtils.assertEquals(expected, sink); source.reset(); sink.clear(); - printer.printCursor(source.prepareCursor(factory)); + printer.printCursor(source.prepareCursor(factory, NoOpCancellationHandler.INSTANCE)); TestUtils.assertEquals(expected, sink); } } @@ -319,11 +321,11 @@ public void testFixNonPartitionedJoin() throws Exception { , new NoRowidSource().of(compiler.compileSource(factory, "select timestamp, ccy, rate, amount, contra, ln, fl, sh, b from x")) , 0 )) { - printer.printCursor(source.prepareCursor(factory)); + printer.printCursor(source.prepareCursor(factory, NoOpCancellationHandler.INSTANCE)); TestUtils.assertEquals(expected, sink); source.reset(); sink.clear(); - printer.printCursor(source.prepareCursor(factory)); + printer.printCursor(source.prepareCursor(factory, NoOpCancellationHandler.INSTANCE)); TestUtils.assertEquals(expected, sink); } } @@ -383,11 +385,11 @@ public void testRowidJoin() throws Exception { , 512 , 512 )) { - printer.printCursor(source.prepareCursor(factory), true); + printer.printCursor(source.prepareCursor(factory, NoOpCancellationHandler.INSTANCE), true); TestUtils.assertEquals(expected, sink); sink.clear(); source.reset(); - printer.printCursor(source.prepareCursor(factory), true); + printer.printCursor(source.prepareCursor(factory, NoOpCancellationHandler.INSTANCE), true); TestUtils.assertEquals(expected, sink); } } @@ -412,11 +414,11 @@ public void testRowidNonPartitioned() throws Exception { "2015-03-10T00:08:00.000Z\tSWHYRX\t-810.375000000000\tPULKHMJLLKQZJIONCLBYNYYWYBEPKPNZXNYWIGPCMLCBMUPYMRIGQWSZMUMXMSYXCEEDCL\t2015-03-10T00:07:50.000Z\tPEHNRX\t-969.125000000000\t0.207036912441\tSUZHUEVVELXBCOGQQGZZNTEZNOOZGQPKNLKUWCXHYPNZEBESMTXULVCTMKCZJGHRIMUNWUUQHXCRSLYJFTDNSEPESIUROKI\tVTJWCP\t0.3852\t27447\t3768436831039810156\ttrue\n" + "2015-03-10T00:09:00.000Z\tSWHYRX\t-384.000000000000\tZGUJBKNTPYXUBYXGDDULXVVSCNJINCQSDOQILSLXZEMDBLNXHYUUTVSXURFLRJLIUC\t2015-03-10T00:08:50.000Z\tVTJWCP\t-1024.000000000000\t0.000000084048\tJOZWRXKMTFXRYPHFPUYWNLBVVHNSJLVKRTLXHBHDHIMFYOJREFU\tSWHYRX\t0.4008\t-25237\t-2694211234414702926\ttrue\n" + "2015-03-10T00:10:00.000Z\tVTJWCP\t384.000000000000\tPGKJRQGKHQHXYUVDUZQTICMPWFZEINPQOGHUGZGDCFLNGCEFBTDNSYQTIGUTKIESOSYYLIBUFGPWTQJQWTGERXRSYZCKPFWECEH\t2015-03-10T00:09:50.000Z\tVTJWCP\t0.062803771347\t896.000000000000\tYVJISIQFNSEUHOSVSIKJFJLNEKTSLZFPGDVCLMZTXOYEPKECCJZJOSDCIWCZECJGNWQNKCYVZJRRZYDBL\tPEHNRX\t0.9202\t-15664\t-5743731661904518905\ttrue\n"; - printer.printCursor(source.prepareCursor(factory)); + printer.printCursor(source.prepareCursor(factory, NoOpCancellationHandler.INSTANCE)); TestUtils.assertEquals(expected, sink); source.reset(); sink.clear(); - printer.printCursor(source.prepareCursor(factory)); + printer.printCursor(source.prepareCursor(factory, NoOpCancellationHandler.INSTANCE)); TestUtils.assertEquals(expected, sink); } @@ -435,7 +437,7 @@ public void testStrings() throws Exception { )) { StringSink testSink = new StringSink(); int idx = source.getMetadata().getColumnIndex("trader"); - for (Record r : source.prepareCursor(factory)) { + for (Record r : source.prepareCursor(factory, NoOpCancellationHandler.INSTANCE)) { testSink.clear(); r.getStr(idx, testSink); @@ -474,11 +476,11 @@ public void testVarJoin() throws Exception { , 512 , 512 )) { - printer.printCursor(source.prepareCursor(factory), true); + printer.printCursor(source.prepareCursor(factory, NoOpCancellationHandler.INSTANCE), true); TestUtils.assertEquals(expected, sink); source.reset(); sink.clear(); - printer.printCursor(source.prepareCursor(factory), true); + printer.printCursor(source.prepareCursor(factory, NoOpCancellationHandler.INSTANCE), true); TestUtils.assertEquals(expected, sink); } } @@ -503,7 +505,7 @@ public void testVarNonPartitioned() throws Exception { "2015-03-10T00:08:00.000Z\tSWHYRX\t-810.375000000000\tPULKHMJLLKQZJIONCLBYNYYWYBEPKPNZXNYWIGPCMLCBMUPYMRIGQWSZMUMXMSYXCEEDCL\t2015-03-10T00:07:50.000Z\tPEHNRX\t-969.125000000000\t0.207036912441\tSUZHUEVVELXBCOGQQGZZNTEZNOOZGQPKNLKUWCXHYPNZEBESMTXULVCTMKCZJGHRIMUNWUUQHXCRSLYJFTDNSEPESIUROKI\tVTJWCP\t0.3852\t27447\t3768436831039810156\ttrue\n" + "2015-03-10T00:09:00.000Z\tSWHYRX\t-384.000000000000\tZGUJBKNTPYXUBYXGDDULXVVSCNJINCQSDOQILSLXZEMDBLNXHYUUTVSXURFLRJLIUC\t2015-03-10T00:08:50.000Z\tVTJWCP\t-1024.000000000000\t0.000000084048\tJOZWRXKMTFXRYPHFPUYWNLBVVHNSJLVKRTLXHBHDHIMFYOJREFU\tSWHYRX\t0.4008\t-25237\t-2694211234414702926\ttrue\n" + "2015-03-10T00:10:00.000Z\tVTJWCP\t384.000000000000\tPGKJRQGKHQHXYUVDUZQTICMPWFZEINPQOGHUGZGDCFLNGCEFBTDNSYQTIGUTKIESOSYYLIBUFGPWTQJQWTGERXRSYZCKPFWECEH\t2015-03-10T00:09:50.000Z\tVTJWCP\t0.062803771347\t896.000000000000\tYVJISIQFNSEUHOSVSIKJFJLNEKTSLZFPGDVCLMZTXOYEPKECCJZJOSDCIWCZECJGNWQNKCYVZJRRZYDBL\tPEHNRX\t0.9202\t-15664\t-5743731661904518905\ttrue\n"; - printer.printCursor(source.prepareCursor(factory)); + printer.printCursor(source.prepareCursor(factory, NoOpCancellationHandler.INSTANCE)); TestUtils.assertEquals(expected, sink); } diff --git a/core/src/test/java/com/questdb/ql/HashJoinRecordSourceTest.java b/core/src/test/java/com/questdb/ql/HashJoinRecordSourceTest.java index a7580b10f5f7..2b13f9cbfc1f 100644 --- a/core/src/test/java/com/questdb/ql/HashJoinRecordSourceTest.java +++ b/core/src/test/java/com/questdb/ql/HashJoinRecordSourceTest.java @@ -48,6 +48,7 @@ import com.questdb.ql.impl.AllRowSource; import com.questdb.ql.impl.JournalPartitionSource; import com.questdb.ql.impl.JournalSource; +import com.questdb.ql.impl.NoOpCancellationHandler; import com.questdb.ql.impl.join.HashJoinRecordSource; import com.questdb.ql.impl.select.SelectedColumnsRecordSource; import com.questdb.std.IntList; @@ -118,7 +119,7 @@ public void testHashJoinJournalRecordSource() throws Exception { add("genre"); }} ); - p.printCursor(joinResult.prepareCursor(factory)); + p.printCursor(joinResult.prepareCursor(factory, NoOpCancellationHandler.INSTANCE)); Assert.assertEquals("pop\n" + "rock\n" + "metal\n" + @@ -153,7 +154,7 @@ public void testHashJoinPerformance() throws Exception { long t = System.currentTimeMillis(); int count = 0; // ExportManager.export(j, new File("c:/temp/join.csv"), TextFileFormat.TAB); - RecordCursor c = j.prepareCursor(factory); + RecordCursor c = j.prepareCursor(factory, NoOpCancellationHandler.INSTANCE); while (c.hasNext()) { c.next(); count++; @@ -203,7 +204,7 @@ public void testHashJoinRecordSource() throws Exception { add("genre"); }} ); - p.printCursor(joinResult.prepareCursor(factory)); + p.printCursor(joinResult.prepareCursor(factory, NoOpCancellationHandler.INSTANCE)); Assert.assertEquals("pop\n" + "rock\n" + "metal\n" + @@ -249,7 +250,7 @@ public void testOuterHashJoin() throws Exception { add("url"); }} ); - p.printCursor(joinResult.prepareCursor(factory)); + p.printCursor(joinResult.prepareCursor(factory, NoOpCancellationHandler.INSTANCE)); Assert.assertEquals("pop\thttp://band1.com\n" + "rock\thttp://band1.com\n" + "\thttp://band2.com\n" + diff --git a/core/src/test/java/com/questdb/ql/JoinStringToSymbolTest.java b/core/src/test/java/com/questdb/ql/JoinStringToSymbolTest.java index 15465f0f2622..8f19eae73be8 100644 --- a/core/src/test/java/com/questdb/ql/JoinStringToSymbolTest.java +++ b/core/src/test/java/com/questdb/ql/JoinStringToSymbolTest.java @@ -47,6 +47,7 @@ import com.questdb.ql.impl.AllRowSource; import com.questdb.ql.impl.JournalPartitionSource; import com.questdb.ql.impl.JournalSource; +import com.questdb.ql.impl.NoOpCancellationHandler; import com.questdb.ql.impl.join.CrossJoinRecordSource; import com.questdb.test.tools.JournalTestFactory; import com.questdb.test.tools.TestUtils; @@ -116,7 +117,7 @@ public void testCrossJoin() throws Exception { new JournalSource( new JournalPartitionSource(bw.getMetadata(), false), new AllRowSource() ) - ).prepareCursor(factory) + ).prepareCursor(factory, NoOpCancellationHandler.INSTANCE) ); final String expected = "band1\talbum X\tpop\t1970-01-01T00:00:00.000Z\t1970-01-01T00:00:00.000Z\tband1\thttp://band1.com\trock\t\n" + diff --git a/core/src/test/java/com/questdb/ql/MergingRowSourceTest.java b/core/src/test/java/com/questdb/ql/MergingRowSourceTest.java index 89c7d24be968..e9cf6e4d2889 100644 --- a/core/src/test/java/com/questdb/ql/MergingRowSourceTest.java +++ b/core/src/test/java/com/questdb/ql/MergingRowSourceTest.java @@ -1,24 +1,24 @@ /******************************************************************************* - * ___ _ ____ ____ - * / _ \ _ _ ___ ___| |_| _ \| __ ) - * | | | | | | |/ _ \/ __| __| | | | _ \ - * | |_| | |_| | __/\__ \ |_| |_| | |_) | - * \__\_\\__,_|\___||___/\__|____/|____/ - *

+ * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * * Copyright (C) 2014-2016 Appsicle - *

+ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. - *

+ * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. - *

+ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - *

+ * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute @@ -30,6 +30,7 @@ * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. + * ******************************************************************************/ package com.questdb.ql; @@ -41,6 +42,7 @@ import com.questdb.model.Quote; import com.questdb.ql.impl.JournalPartitionSource; import com.questdb.ql.impl.JournalSource; +import com.questdb.ql.impl.NoOpCancellationHandler; import com.questdb.ql.impl.latest.HeapMergingRowSource; import com.questdb.ql.impl.latest.KvIndexSymLookupRowSource; import com.questdb.ql.impl.latest.MergingRowSource; @@ -61,7 +63,7 @@ public void testHeapMerge() throws JournalException, NumericException { RecordSource rs = new JournalSource(new JournalPartitionSource(w.getMetadata(), true), new HeapMergingRowSource(srcA, srcB)); long last = 0; - RecordCursor c = rs.prepareCursor(factory); + RecordCursor c = rs.prepareCursor(factory, NoOpCancellationHandler.INSTANCE); int ts = rs.getMetadata().getColumnIndex("timestamp"); while (c.hasNext()) { long r = c.next().getDate(ts); @@ -81,7 +83,7 @@ public void testMerge() throws JournalException, NumericException { RecordSource rs = new JournalSource(new JournalPartitionSource(w.getMetadata(), true), new MergingRowSource(srcA, srcB)); long last = 0; - RecordCursor c = rs.prepareCursor(factory); + RecordCursor c = rs.prepareCursor(factory, NoOpCancellationHandler.INSTANCE); int ts = rs.getMetadata().getColumnIndex("timestamp"); while (c.hasNext()) { long r = c.next().getDate(ts); diff --git a/core/src/test/java/com/questdb/ql/SingleJournalSearchTest.java b/core/src/test/java/com/questdb/ql/SingleJournalSearchTest.java index 7a44e6b07b3b..2239914de952 100644 --- a/core/src/test/java/com/questdb/ql/SingleJournalSearchTest.java +++ b/core/src/test/java/com/questdb/ql/SingleJournalSearchTest.java @@ -1,24 +1,24 @@ /******************************************************************************* - * ___ _ ____ ____ - * / _ \ _ _ ___ ___| |_| _ \| __ ) - * | | | | | | |/ _ \/ __| __| | | | _ \ - * | |_| | |_| | __/\__ \ |_| |_| | |_) | - * \__\_\\__,_|\___||___/\__|____/|____/ - *

+ * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * * Copyright (C) 2014-2016 Appsicle - *

+ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. - *

+ * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. - *

+ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - *

+ * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute @@ -30,6 +30,7 @@ * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. + * ******************************************************************************/ package com.questdb.ql; @@ -46,6 +47,7 @@ import com.questdb.model.Quote; import com.questdb.ql.impl.JournalPartitionSource; import com.questdb.ql.impl.JournalSource; +import com.questdb.ql.impl.NoOpCancellationHandler; import com.questdb.ql.impl.interval.MultiIntervalPartitionSource; import com.questdb.ql.impl.interval.SingleIntervalSource; import com.questdb.ql.impl.latest.KvIndexSymAllHeadRowSource; @@ -158,7 +160,7 @@ public void testHeadAfterFilter() throws Exception { } private void assertEquals(CharSequence expected, RecordSource src) throws JournalException, IOException { - new RecordSourcePrinter(sink).printCursor(src.prepareCursor(factory)); + new RecordSourcePrinter(sink).printCursor(src.prepareCursor(factory, NoOpCancellationHandler.INSTANCE)); Assert.assertEquals(expected, sink.toString()); sink.flush(); } diff --git a/core/src/test/java/com/questdb/ql/TailPartitionSourceTest.java b/core/src/test/java/com/questdb/ql/TailPartitionSourceTest.java index 884138ef6a62..ae26221ae627 100644 --- a/core/src/test/java/com/questdb/ql/TailPartitionSourceTest.java +++ b/core/src/test/java/com/questdb/ql/TailPartitionSourceTest.java @@ -1,24 +1,24 @@ /******************************************************************************* - * ___ _ ____ ____ - * / _ \ _ _ ___ ___| |_| _ \| __ ) - * | | | | | | |/ _ \/ __| __| | | | _ \ - * | |_| | |_| | __/\__ \ |_| |_| | |_) | - * \__\_\\__,_|\___||___/\__|____/|____/ - *

+ * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * * Copyright (C) 2014-2016 Appsicle - *

+ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. - *

+ * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. - *

+ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - *

+ * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute @@ -30,6 +30,7 @@ * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. + * ******************************************************************************/ package com.questdb.ql; @@ -43,6 +44,7 @@ import com.questdb.model.Quote; import com.questdb.ql.impl.AllRowSource; import com.questdb.ql.impl.JournalSource; +import com.questdb.ql.impl.NoOpCancellationHandler; import com.questdb.ql.impl.unused.JournalTailPartitionSource; import com.questdb.test.tools.AbstractTest; import com.questdb.test.tools.TestUtils; @@ -110,7 +112,7 @@ public void testTail() throws Exception { new JournalSource( new JournalTailPartitionSource(w.getMetadata(), false, Rows.toRowID(1, 30)) , new AllRowSource() - ).prepareCursor(factory) + ).prepareCursor(factory, NoOpCancellationHandler.INSTANCE) ); Assert.assertEquals(expected, sink.toString()); diff --git a/core/src/test/java/com/questdb/ql/TopRecordSourceTest.java b/core/src/test/java/com/questdb/ql/TopRecordSourceTest.java index 4fce0599f309..c3f0210670a3 100644 --- a/core/src/test/java/com/questdb/ql/TopRecordSourceTest.java +++ b/core/src/test/java/com/questdb/ql/TopRecordSourceTest.java @@ -1,24 +1,24 @@ /******************************************************************************* - * ___ _ ____ ____ - * / _ \ _ _ ___ ___| |_| _ \| __ ) - * | | | | | | |/ _ \/ __| __| | | | _ \ - * | |_| | |_| | __/\__ \ |_| |_| | |_) | - * \__\_\\__,_|\___||___/\__|____/|____/ - *

+ * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * * Copyright (C) 2014-2016 Appsicle - *

+ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. - *

+ * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. - *

+ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - *

+ * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute @@ -30,6 +30,7 @@ * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. + * ******************************************************************************/ package com.questdb.ql; @@ -38,6 +39,7 @@ import com.questdb.io.RecordSourcePrinter; import com.questdb.io.sink.StringSink; import com.questdb.model.Quote; +import com.questdb.ql.impl.NoOpCancellationHandler; import com.questdb.ql.impl.TopRecordSource; import com.questdb.ql.ops.constant.LongConstant; import com.questdb.test.tools.AbstractTest; @@ -54,7 +56,7 @@ public void testBottomSource() throws Exception { StringSink sink = new StringSink(); RecordSourcePrinter p = new RecordSourcePrinter(sink); - p.printCursor(new TopRecordSource(compiler.compileSource(factory, "quote"), new LongConstant(99997), new LongConstant(100000)).prepareCursor(factory)); + p.printCursor(new TopRecordSource(compiler.compileSource(factory, "quote"), new LongConstant(99997), new LongConstant(100000)).prepareCursor(factory, NoOpCancellationHandler.INSTANCE)); final String expected = "2013-11-04T10:00:00.000Z\tBT-A.L\t168.000000000000\t0.001307277009\t319936098\t1456039311\tFast trading\tLXE\n" + "2013-11-04T10:00:00.000Z\tAGK.L\t0.000031983279\t878.000000000000\t819380635\t1732419403\tFast trading\tLXE\n" + @@ -69,7 +71,7 @@ public void testMiddleSource() throws Exception { StringSink sink = new StringSink(); RecordSourcePrinter p = new RecordSourcePrinter(sink); - p.printCursor(new TopRecordSource(compiler.compileSource(factory, "quote"), new LongConstant(102), new LongConstant(112)).prepareCursor(factory)); + p.printCursor(new TopRecordSource(compiler.compileSource(factory, "quote"), new LongConstant(102), new LongConstant(112)).prepareCursor(factory, NoOpCancellationHandler.INSTANCE)); final String expected = "2013-09-04T10:00:00.000Z\tTLW.L\t0.003675992833\t0.000000006044\t233699709\t984001343\tFast trading\tLXE\n" + "2013-09-04T10:00:00.000Z\tGKN.L\t0.000001392326\t0.000000010696\t1921077830\t83098719\tFast trading\tLXE\n" + @@ -91,7 +93,7 @@ public void testNoRows() throws Exception { StringSink sink = new StringSink(); RecordSourcePrinter p = new RecordSourcePrinter(sink); - p.printCursor(new TopRecordSource(compiler.compileSource(factory, "quote"), new LongConstant(99997), new LongConstant(10)).prepareCursor(factory)); + p.printCursor(new TopRecordSource(compiler.compileSource(factory, "quote"), new LongConstant(99997), new LongConstant(10)).prepareCursor(factory, NoOpCancellationHandler.INSTANCE)); Assert.assertEquals("", sink.toString()); } @@ -103,7 +105,7 @@ public void testTopSource() throws Exception { StringSink sink = new StringSink(); RecordSourcePrinter p = new RecordSourcePrinter(sink); - p.printCursor(new TopRecordSource(compiler.compileSource(factory, "quote"), new LongConstant(0), new LongConstant(10)).prepareCursor(factory)); + p.printCursor(new TopRecordSource(compiler.compileSource(factory, "quote"), new LongConstant(0), new LongConstant(10)).prepareCursor(factory, NoOpCancellationHandler.INSTANCE)); final String expected = "2013-09-04T10:00:00.000Z\tBT-A.L\t0.000001189157\t1.050231933594\t1326447242\t948263339\tFast trading\tLXE\n" + "2013-09-04T10:00:00.000Z\tADM.L\t104.021850585938\t0.006688738358\t1575378703\t1436881714\tFast trading\tLXE\n" + diff --git a/core/src/test/java/com/questdb/ql/VirtualColumnTest.java b/core/src/test/java/com/questdb/ql/VirtualColumnTest.java index d3eaeef9b588..27ee11cc8a37 100644 --- a/core/src/test/java/com/questdb/ql/VirtualColumnTest.java +++ b/core/src/test/java/com/questdb/ql/VirtualColumnTest.java @@ -1,24 +1,24 @@ /******************************************************************************* - * ___ _ ____ ____ - * / _ \ _ _ ___ ___| |_| _ \| __ ) - * | | | | | | |/ _ \/ __| __| | | | _ \ - * | |_| | |_| | __/\__ \ |_| |_| | |_) | - * \__\_\\__,_|\___||___/\__|____/|____/ - *

+ * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * * Copyright (C) 2014-2016 Appsicle - *

+ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. - *

+ * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. - *

+ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - *

+ * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute @@ -30,6 +30,7 @@ * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. + * ******************************************************************************/ package com.questdb.ql; @@ -40,6 +41,7 @@ import com.questdb.io.RecordSourcePrinter; import com.questdb.io.sink.StringSink; import com.questdb.misc.Rnd; +import com.questdb.ql.impl.NoOpCancellationHandler; import com.questdb.ql.impl.select.SelectedColumnsRecordSource; import com.questdb.ql.impl.virtual.VirtualColumnRecordSource; import com.questdb.ql.ops.VirtualColumn; @@ -83,7 +85,7 @@ public void testPlusDouble() throws Exception { add(plus); }}); - p.printCursor(src.prepareCursor(factory)); + p.printCursor(src.prepareCursor(factory, NoOpCancellationHandler.INSTANCE)); final String expected = "VTJWCPSWHY\t-104.021850585938\t-91.521850585938\n" + "PEHNRXGZSX\t0.000020634160\t12.500020634160\n" + @@ -227,7 +229,7 @@ public void testSelectedColumns() throws Exception { add("plus"); }}); - p.printCursor(src.prepareCursor(factory)); + p.printCursor(src.prepareCursor(factory, NoOpCancellationHandler.INSTANCE)); final String expected = "VTJWCPSWHY\t-91.521850585938\n" + "PEHNRXGZSX\t12.500020634160\n" + diff --git a/core/src/test/java/com/questdb/ql/impl/MultiIntervalPartitionSourceTest.java b/core/src/test/java/com/questdb/ql/impl/MultiIntervalPartitionSourceTest.java index 1f2b101b0653..ed703ef02bc7 100644 --- a/core/src/test/java/com/questdb/ql/impl/MultiIntervalPartitionSourceTest.java +++ b/core/src/test/java/com/questdb/ql/impl/MultiIntervalPartitionSourceTest.java @@ -1,24 +1,24 @@ /******************************************************************************* - * ___ _ ____ ____ - * / _ \ _ _ ___ ___| |_| _ \| __ ) - * | | | | | | |/ _ \/ __| __| | | | _ \ - * | |_| | |_| | __/\__ \ |_| |_| | |_) | - * \__\_\\__,_|\___||___/\__|____/|____/ - *

+ * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * * Copyright (C) 2014-2016 Appsicle - *

+ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. - *

+ * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. - *

+ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - *

+ * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute @@ -30,6 +30,7 @@ * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. + * ******************************************************************************/ package com.questdb.ql.impl; @@ -71,7 +72,7 @@ public void testIntervalMerge() throws Exception { ) ), new AllRowSource() - ).prepareCursor(factory) + ).prepareCursor(factory, NoOpCancellationHandler.INSTANCE) ); final String expected = "2014-03-10T07:00:00.000Z\tGKN.L\t290.000000000000\t320.000000000000\t1070060020\t627764827\tFast trading\tLXE\n" + diff --git a/core/src/test/java/com/questdb/ql/impl/NoRowidSource.java b/core/src/test/java/com/questdb/ql/impl/NoRowidSource.java index 427aedd08955..5bcecac17b60 100644 --- a/core/src/test/java/com/questdb/ql/impl/NoRowidSource.java +++ b/core/src/test/java/com/questdb/ql/impl/NoRowidSource.java @@ -38,6 +38,7 @@ import com.questdb.ex.JournalException; import com.questdb.factory.JournalReaderFactory; import com.questdb.factory.configuration.RecordMetadata; +import com.questdb.ql.CancellationHandler; import com.questdb.ql.RecordCursor; import com.questdb.ql.RecordSource; import com.questdb.ql.ops.Parameter; @@ -59,8 +60,8 @@ public Parameter getParam(CharSequence name) { @SuppressWarnings("unchecked") @Override - public RecordCursor prepareCursor(JournalReaderFactory factory) throws JournalException { - return delegate.prepareCursor(factory); + public RecordCursor prepareCursor(JournalReaderFactory factory, CancellationHandler cancellationHandler) throws JournalException { + return delegate.prepareCursor(factory, cancellationHandler); } @Override diff --git a/core/src/test/java/com/questdb/ql/impl/sort/ComparatorCompilerTest.java b/core/src/test/java/com/questdb/ql/impl/sort/ComparatorCompilerTest.java index a6bca3e4054a..613d1c3dfe44 100644 --- a/core/src/test/java/com/questdb/ql/impl/sort/ComparatorCompilerTest.java +++ b/core/src/test/java/com/questdb/ql/impl/sort/ComparatorCompilerTest.java @@ -1,24 +1,24 @@ /******************************************************************************* - * ___ _ ____ ____ - * / _ \ _ _ ___ ___| |_| _ \| __ ) - * | | | | | | |/ _ \/ __| __| | | | _ \ - * | |_| | |_| | __/\__ \ |_| |_| | |_) | - * \__\_\\__,_|\___||___/\__|____/|____/ - *

+ * ___ _ ____ ____ + * / _ \ _ _ ___ ___| |_| _ \| __ ) + * | | | | | | |/ _ \/ __| __| | | | _ \ + * | |_| | |_| | __/\__ \ |_| |_| | |_) | + * \__\_\\__,_|\___||___/\__|____/|____/ + * * Copyright (C) 2014-2016 Appsicle - *

+ * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. - *

+ * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. - *

+ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - *

+ * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute @@ -30,6 +30,7 @@ * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. + * ******************************************************************************/ package com.questdb.ql.impl.sort; @@ -41,6 +42,7 @@ import com.questdb.factory.configuration.RecordColumnMetadata; import com.questdb.ql.RecordSource; import com.questdb.ql.StorageFacade; +import com.questdb.ql.impl.NoOpCancellationHandler; import com.questdb.ql.ops.AbstractVirtualColumn; import com.questdb.ql.parser.AbstractOptimiserTest; import com.questdb.std.IntList; @@ -107,7 +109,7 @@ public void testAllGetters() throws Exception { RBTreeSortedRecordSource map = new RBTreeSortedRecordSource(rs, rc, 1024 * 1024, 4 * 1024 * 1024); sink.clear(); - printer.printCursor(map.prepareCursor(factory)); + printer.printCursor(map.prepareCursor(factory, NoOpCancellationHandler.INSTANCE)); TestUtils.assertEquals( "false\t13\t20.120000000000\t10.1500\t4\t9988908080988890\t1970-01-02T00:42:59.879Z\t902\tcomplexity made simple\tappsicle\n" + "true\t13\t20.120000000000\t10.1500\t4\t9988908080988890\t1970-01-02T00:42:59.879Z\t902\tcomplexity made simple\tquestdb\n", diff --git a/core/src/test/java/com/questdb/ql/parser/AbstractOptimiserTest.java b/core/src/test/java/com/questdb/ql/parser/AbstractOptimiserTest.java index 66e5606180cb..a32515531b5e 100644 --- a/core/src/test/java/com/questdb/ql/parser/AbstractOptimiserTest.java +++ b/core/src/test/java/com/questdb/ql/parser/AbstractOptimiserTest.java @@ -47,6 +47,7 @@ import com.questdb.model.configuration.ModelConfiguration; import com.questdb.net.http.ServerConfiguration; import com.questdb.ql.RecordSource; +import com.questdb.ql.impl.NoOpCancellationHandler; import com.questdb.std.AssociativeCache; import com.questdb.test.tools.JournalTestFactory; import com.questdb.test.tools.TestUtils; @@ -84,7 +85,7 @@ protected void assertThat(String expected, String query, boolean header) throws } else { rs.reset(); } - printer.printCursor(rs.prepareCursor(factory), header); + printer.printCursor(rs.prepareCursor(factory, NoOpCancellationHandler.INSTANCE), header); TestUtils.assertEquals(expected, sink); } diff --git a/core/src/test/java/com/questdb/ql/parser/JoinQueryTest.java b/core/src/test/java/com/questdb/ql/parser/JoinQueryTest.java index 8d00a58aecf3..ce349840d560 100644 --- a/core/src/test/java/com/questdb/ql/parser/JoinQueryTest.java +++ b/core/src/test/java/com/questdb/ql/parser/JoinQueryTest.java @@ -45,6 +45,7 @@ import com.questdb.misc.Dates; import com.questdb.misc.Rnd; import com.questdb.ql.RecordSource; +import com.questdb.ql.impl.NoOpCancellationHandler; import com.questdb.ql.impl.NoRowidSource; import com.questdb.ql.impl.join.HashJoinRecordSource; import com.questdb.std.IntHashSet; @@ -646,7 +647,7 @@ public void testJoinNoRowid() throws Exception { 1024 * 1024 ); sink.clear(); - printer.printCursor(r.prepareCursor(factory)); + printer.printCursor(r.prepareCursor(factory, NoOpCancellationHandler.INSTANCE)); TestUtils.assertEquals(expected, sink); assertThat(expected, "customers c join orders o on c.customerId = o.customerId where customerName ~ 'PJFSREKEUNMKWOF'"); } diff --git a/core/src/test/java/com/questdb/ql/parser/SingleJournalQueryTest.java b/core/src/test/java/com/questdb/ql/parser/SingleJournalQueryTest.java index 58c66d3b6042..547eb46633b6 100644 --- a/core/src/test/java/com/questdb/ql/parser/SingleJournalQueryTest.java +++ b/core/src/test/java/com/questdb/ql/parser/SingleJournalQueryTest.java @@ -51,6 +51,7 @@ import com.questdb.model.Quote; import com.questdb.net.http.ServerConfiguration; import com.questdb.ql.RecordSource; +import com.questdb.ql.impl.NoOpCancellationHandler; import com.questdb.std.ObjHashSet; import com.questdb.test.tools.AbstractTest; import com.questdb.test.tools.TestUtils; @@ -2084,21 +2085,21 @@ public void testParamInLimit() throws Exception { sink.clear(); RecordSource src = compiler.compileSource(factory, "select id, z from tab limit :xyz"); src.getParam(":xyz").set(10L); - printer.printCursor(src.prepareCursor(factory), false); + printer.printCursor(src.prepareCursor(factory, NoOpCancellationHandler.INSTANCE), false); TestUtils.assertEquals(expected, sink); // and one more time sink.clear(); src = compiler.compileSource(factory, "select id, z from tab limit :xyz"); src.getParam(":xyz").set(10L); - printer.printCursor(src.prepareCursor(factory), false); + printer.printCursor(src.prepareCursor(factory, NoOpCancellationHandler.INSTANCE), false); TestUtils.assertEquals(expected, sink); // and now change parameter sink.clear(); src = compiler.compileSource(factory, "select id, z from tab limit :xyz"); src.getParam(":xyz").set(5L); - printer.printCursor(src.prepareCursor(factory), false); + printer.printCursor(src.prepareCursor(factory, NoOpCancellationHandler.INSTANCE), false); final String expected2 = "YDVRVNGSTEQODRZ\t-99\n" + "RIIYMHOWKCDNZNL\t-397\n" + @@ -2127,14 +2128,14 @@ public void testParamInQuery() throws Exception { RecordSource src = compiler.compileSource(factory, "select id, z from tab where z > :min limit :lim"); src.getParam(":min").set(450); src.getParam(":lim").set(10L); - printer.printCursor(src.prepareCursor(factory), false); + printer.printCursor(src.prepareCursor(factory, NoOpCancellationHandler.INSTANCE), false); sink.clear(); src = compiler.compileSource(factory, "select id, z from tab where :min < z limit :lim"); src.getParam(":min").set(450); src.getParam(":lim").set(10L); - printer.printCursor(src.prepareCursor(factory), false); + printer.printCursor(src.prepareCursor(factory, NoOpCancellationHandler.INSTANCE), false); TestUtils.assertEquals(expected, sink); } @@ -2151,7 +2152,7 @@ public void testParamNotSet() throws Exception { createTabWithNaNs2(); sink.clear(); RecordSource src = compiler.compileSource(factory, "select id, z from tab where z > :min limit :lim"); - printer.printCursor(src.prepareCursor(factory), false); + printer.printCursor(src.prepareCursor(factory, NoOpCancellationHandler.INSTANCE), false); } @Test