Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add user interrupt check while waiting for query results to be ready. #193

Merged
merged 6 commits into from Oct 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions DESCRIPTION
@@ -1,7 +1,7 @@
Package: RPostgres
Title: 'Rcpp' Interface to 'PostgreSQL'
Version: 1.1.1.9001
Date: 2018-08-27
Version: 1.1.1.9002
Date: 2018-08-28
Authors@R: c(
person("Hadley", "Wickham", role = "aut"),
person("Jeroen", "Ooms", role = "aut"),
Expand Down
5 changes: 5 additions & 0 deletions NEWS.md
@@ -1,3 +1,8 @@
# RPostgres 1.1.9002

- Add user interrupt check while waiting for query results to be ready. Allows safe interruption of long-running queries (@zozlak).
krlmlr marked this conversation as resolved.
Show resolved Hide resolved


# RPostgres 1.1.1.9001

- Avoid aggressive rounding when passing numeric values to the database (#184).
Expand Down
12 changes: 9 additions & 3 deletions R/PqConnection.R
Expand Up @@ -7,7 +7,7 @@ NULL
#' @export
setClass("PqConnection",
contains = "DBIConnection",
slots = list(ptr = "externalptr", bigint = "character", typnames = "data.frame")
slots = list(ptr = "externalptr", bigint = "character", typnames = "data.frame", checkInterrupts = "logical")
krlmlr marked this conversation as resolved.
Show resolved Hide resolved
)

# format()
Expand Down Expand Up @@ -142,6 +142,9 @@ setMethod("dbGetInfo", "PqConnection", function(dbObj, ...) {
#' @param bigint The R type that 64-bit integer types should be mapped to,
#' default is [bit64::integer64], which allows the full range of 64 bit
#' integers.
#' @param checkInterrupts Should user interrupts be checked during the query execution (before
#' first row of data is available)? Setting to `TRUE` allows interruption of queries
#' running too long.
#' @param conn Connection to disconnect.
#' @export
#' @examples
Expand All @@ -154,22 +157,25 @@ setMethod("dbGetInfo", "PqConnection", function(dbObj, ...) {
setMethod("dbConnect", "PqDriver",
function(drv, dbname = NULL,
host = NULL, port = NULL, password = NULL, user = NULL, service = NULL, ...,
bigint = c("integer64", "integer", "numeric", "character")) {
bigint = c("integer64", "integer", "numeric", "character"),
checkInterrupts = FALSE) {

opts <- unlist(list(dbname = dbname, user = user, password = password,
host = host, port = as.character(port), service = service, client_encoding = "utf8", ...))
if (!is.character(opts)) {
stop("All options should be strings", call. = FALSE)
}
bigint <- match.arg(bigint)
stopifnot(is.logical(checkInterrupts), all(!is.na(checkInterrupts)), length(checkInterrupts) == 1)

if (length(opts) == 0) {
ptr <- connection_create(character(), character())
} else {
ptr <- connection_create(names(opts), as.vector(opts))
}

con <- new("PqConnection", ptr = ptr, bigint = bigint, typnames = data.frame())
con <- new("PqConnection", ptr = ptr, bigint = bigint, typnames = data.frame(),
checkInterrupts = checkInterrupts)
dbExecute(con, "SET TIMEZONE='UTC'")
con@typnames <- dbGetQuery(con, "SELECT oid, typname FROM pg_type")

Expand Down
2 changes: 1 addition & 1 deletion R/PqResult.R
Expand Up @@ -95,7 +95,7 @@ setMethod("dbSendQuery", c("PqConnection", "character"), function(conn, statemen

rs <- new("PqResult",
conn = conn,
ptr = result_create(conn@ptr, statement),
ptr = result_create(conn@ptr, statement, FALSE, conn@checkInterrupts),
sql = statement,
bigint = conn@bigint
)
Expand Down
4 changes: 2 additions & 2 deletions R/RcppExports.R
Expand Up @@ -53,8 +53,8 @@ init_logging <- function(log_level) {
invisible(.Call(`_RPostgres_init_logging`, log_level))
}

result_create <- function(con, sql, is_statement = FALSE) {
.Call(`_RPostgres_result_create`, con, sql, is_statement)
result_create <- function(con, sql, is_statement = FALSE, check_interrupts = FALSE) {
.Call(`_RPostgres_result_create`, con, sql, is_statement, check_interrupts)
}

result_release <- function(res) {
Expand Down
8 changes: 4 additions & 4 deletions src/DbResult.cpp
Expand Up @@ -4,14 +4,14 @@
#include "PqResultImpl.h"


DbResult::DbResult(const DbConnectionPtr& pConn, const std::string& sql) :
DbResult::DbResult(const DbConnectionPtr& pConn, const std::string& sql, const bool check_interrupts) :
pConn_(pConn)
{
pConn->check_connection();
pConn->set_current_result(this);

try {
impl.reset(new PqResultImpl(this, pConn->conn(), sql));
impl.reset(new PqResultImpl(this, pConn->conn(), sql, check_interrupts));
}
catch (...) {
pConn->set_current_result(NULL);
Expand All @@ -27,9 +27,9 @@ DbResult::~DbResult() {
} catch (...) {}
}

DbResult* DbResult::create_and_send_query(const DbConnectionPtr& con, const std::string& sql, bool is_statement) {
DbResult* DbResult::create_and_send_query(const DbConnectionPtr& con, const std::string& sql, bool is_statement, const bool check_interrupts) {
(void)is_statement;
return new DbResult(con, sql);
return new DbResult(con, sql, check_interrupts);
}

void DbResult::bind(const List& params) {
Expand Down
4 changes: 2 additions & 2 deletions src/DbResult.h
Expand Up @@ -21,11 +21,11 @@ class DbResult : boost::noncopyable {
boost::scoped_ptr<PqResultImpl> impl;

public:
DbResult(const DbConnectionPtr& pConn, const std::string& sql);
DbResult(const DbConnectionPtr& pConn, const std::string& sql, const bool check_interrupts);
~DbResult();

public:
static DbResult* create_and_send_query(const DbConnectionPtr& con, const std::string& sql, bool is_statement);
static DbResult* create_and_send_query(const DbConnectionPtr& con, const std::string& sql, bool is_statement, const bool check_interrupts);

public:
bool complete() const;
Expand Down
43 changes: 42 additions & 1 deletion src/PqResultImpl.cpp
Expand Up @@ -5,13 +5,15 @@
#include "DbColumnStorage.h"
#include "PqDataFrame.h"

PqResultImpl::PqResultImpl(DbResult* pRes, PGconn* pConn, const std::string& sql) :
PqResultImpl::PqResultImpl(DbResult* pRes, PGconn* pConn, const std::string& sql, const bool check_interrupts) :
res(pRes),
pConn_(pConn),
pSpec_(prepare(pConn, sql)),
cache(pSpec_),
complete_(false),
ready_(false),
data_ready_(false),
check_interrupts_(check_interrupts),
nrows_(0),
rows_affected_(0),
group_(0),
Expand Down Expand Up @@ -329,6 +331,7 @@ bool PqResultImpl::bind_row() {
PQsendQueryPrepared(pConn_, "", cache.nparams_, &c_params[0],
&lengths[0], &formats[0], 0) :
PQsendQueryPrepared(pConn_, "", 0, NULL, NULL, NULL, 0);
data_ready_ = !check_interrupts_;
krlmlr marked this conversation as resolved.
Show resolved Hide resolved

if (!success)
conn_stop("Failed to send query");
Expand Down Expand Up @@ -379,6 +382,13 @@ bool PqResultImpl::step_run() {
LOG_VERBOSE;

if (pRes_) PQclear(pRes_);

// Check user interrupts while waiting for the data to be ready
krlmlr marked this conversation as resolved.
Show resolved Hide resolved
if (!data_ready_) {
krlmlr marked this conversation as resolved.
Show resolved Hide resolved
wait_for_data();
data_ready_ = true;
}

pRes_ = PQgetResult(pConn_);

// We're done, but we need to call PQgetResult until it returns NULL
Expand Down Expand Up @@ -453,3 +463,34 @@ void PqResultImpl::add_oids(List& data) const {
PGresult* PqResultImpl::get_result() {
return pRes_;
}

// checks user interrupts while waiting for the first row of data to be ready
// see https://www.postgresql.org/docs/current/static/libpq-async.html
void PqResultImpl::wait_for_data() {
int socket, ret;
fd_set input;
timeval timeout = {0, 0};
krlmlr marked this conversation as resolved.
Show resolved Hide resolved

socket = PQsocket(pConn_);
if (socket < 0) {
stop("Failed to get connection socket");
}
FD_ZERO(&input);
FD_SET(socket, &input);

do {
// wait for any traffic on the db connection socket but no longet then 1s
timeout.tv_sec = 1;
ret = select(socket + 1, &input, NULL, NULL, &timeout);
krlmlr marked this conversation as resolved.
Show resolved Hide resolved
if (ret == 0) {
// timeout reached - check user interrupt
checkUserInterrupt();
} else if(ret < 0) {
stop("select() on the connection failed");
}
// update db connection state using data available on the socket
if (!PQconsumeInput(pConn_)) {
stop("Failed to consume input from the server");
}
} while (PQisBusy(pConn_)); // check if PQgetResult will still block
}
7 changes: 6 additions & 1 deletion src/PqResultImpl.h
Expand Up @@ -37,14 +37,16 @@ class PqResultImpl : boost::noncopyable, public PqResultSource {
// State
bool complete_;
bool ready_;
bool data_ready_;
bool check_interrupts_;
int nrows_;
int rows_affected_;
List params_;
int group_, groups_;
PGresult* pRes_;

public:
PqResultImpl(DbResult* pRes, PGconn* pConn, const std::string& sql);
PqResultImpl(DbResult* pRes, PGconn* pConn, const std::string& sql, const bool check_interrupts);
~PqResultImpl();

private:
Expand Down Expand Up @@ -81,6 +83,9 @@ class PqResultImpl : boost::noncopyable, public PqResultSource {
public:
// PqResultSource
PGresult* get_result();

private:
void wait_for_data();
};

#endif //RPOSTGRES_PQRESULTIMPL_H
9 changes: 5 additions & 4 deletions src/RcppExports.cpp
Expand Up @@ -153,15 +153,16 @@ BEGIN_RCPP
END_RCPP
}
// result_create
XPtr<DbResult> result_create(XPtr<DbConnectionPtr> con, std::string sql, bool is_statement);
RcppExport SEXP _RPostgres_result_create(SEXP conSEXP, SEXP sqlSEXP, SEXP is_statementSEXP) {
XPtr<DbResult> result_create(XPtr<DbConnectionPtr> con, std::string sql, bool is_statement, bool check_interrupts);
RcppExport SEXP _RPostgres_result_create(SEXP conSEXP, SEXP sqlSEXP, SEXP is_statementSEXP, SEXP check_interruptsSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< XPtr<DbConnectionPtr> >::type con(conSEXP);
Rcpp::traits::input_parameter< std::string >::type sql(sqlSEXP);
Rcpp::traits::input_parameter< bool >::type is_statement(is_statementSEXP);
rcpp_result_gen = Rcpp::wrap(result_create(con, sql, is_statement));
Rcpp::traits::input_parameter< bool >::type check_interrupts(check_interruptsSEXP);
rcpp_result_gen = Rcpp::wrap(result_create(con, sql, is_statement, check_interrupts));
return rcpp_result_gen;
END_RCPP
}
Expand Down Expand Up @@ -268,7 +269,7 @@ static const R_CallMethodDef CallEntries[] = {
{"_RPostgres_encode_data_frame", (DL_FUNC) &_RPostgres_encode_data_frame, 1},
{"_RPostgres_encrypt_password", (DL_FUNC) &_RPostgres_encrypt_password, 2},
{"_RPostgres_init_logging", (DL_FUNC) &_RPostgres_init_logging, 1},
{"_RPostgres_result_create", (DL_FUNC) &_RPostgres_result_create, 3},
{"_RPostgres_result_create", (DL_FUNC) &_RPostgres_result_create, 4},
{"_RPostgres_result_release", (DL_FUNC) &_RPostgres_result_release, 1},
{"_RPostgres_result_valid", (DL_FUNC) &_RPostgres_result_valid, 1},
{"_RPostgres_result_fetch", (DL_FUNC) &_RPostgres_result_fetch, 2},
Expand Down
4 changes: 2 additions & 2 deletions src/result.cpp
Expand Up @@ -4,9 +4,9 @@


// [[Rcpp::export]]
XPtr<DbResult> result_create(XPtr<DbConnectionPtr> con, std::string sql, bool is_statement = false) {
XPtr<DbResult> result_create(XPtr<DbConnectionPtr> con, std::string sql, bool is_statement = false, bool check_interrupts = false) {
(*con)->check_connection();
DbResult* res = DbResult::create_and_send_query(*con, sql, is_statement);
DbResult* res = DbResult::create_and_send_query(*con, sql, is_statement, check_interrupts);
return XPtr<DbResult>(res, true);
}

Expand Down