Skip to content

Commit

Permalink
Write hooks with {'primary_key': , 'timestamp':}, also fixes r.now …
Browse files Browse the repository at this point in the history
…determinism problems (#6396)

* Restore deterministic r.now

* encapsulate deterministic_t

* Fix 'class deterministic_t' decl in term.hpp.

* Make deterministic_t interface clearer.

Avoids external bitmask magic.

* Rename deterministic_t static members to traditional names.

* Allow r.now() before .changes().

* Make rcheck visitors in seq.cc not use variadic.

Just to make code more readable.

* Allow r.now() in comparison functions.

* Add test polyglot/changefeeds/now

* Added order_by r.now() regression test.

* add _t suffix to avoid name clash

* Fix simple compilation error.

* Misc cleanup of env_t ctor usage

- Remove an "explicit" from a multi-arg env_t ctor

- Use get_serializable_env in ordered_union_datum_stream_t

Because why not.

* Make query cache maintain deterministic r.now().

Fixes problem where long datum streams would use a fresh time value
every time a new batch is requested.  For example, this query would
see multiple "b" field values:

r.range(150000).map(lambda x: {"a": x, "b": r.now()})

This also makes changefeed requests use the same r.now() value.

* Removed defunct term_walker_t time_now field.

* Make read_all_rows_as_vector abstract.

Everybody overrides it, and nobody overrides read_all_rows_as_stream.
Made the latter not be virtual.

* Factor out read_all_rows_filtered to return as vector.

Make callers use it:
- caching_cfeed_artificial_table_backend_t
- table_estimate_doc_counts

* Make from_optargs take deterministic_time.

So we don't crash when r.now() is used inside a global optarg.

* Pass deterministic_time to limit_manager_t env.

* Tweak env_t construction in range_sub_t.

No functional change, just a cleanup.

* Unimplemented write_timestamp_term_t WIP.

* env_t style cleanup

* Implemented write_timestamp_term_t::eval_impl

* Write hooks ostensibly getting run in their own environment.

* Add WRITE_TIMESTAMP to protobuf definition

* r.writeTimestamp() in the JavaScript driver

* r.write_timestamp() in the Python driver

* js_pprint of write_timestamp is just fine.

* Comment on env_t constructor

* Write hooks disallowing r.now().

* Removed duplicate write hook checking logic.

* Pass {"primary_key":, "timestamp":} object to write hooks

* Remove r.write_timestamp() ReQL term

* Remove extra write_timestamp stuff from Python and JS drivers

* Add polyglot tests for write hook context object
  • Loading branch information
Sam Hughes authored and danielmewes committed Aug 7, 2017
1 parent 8d7e97b commit 1a6f685
Show file tree
Hide file tree
Showing 49 changed files with 300 additions and 274 deletions.
1 change: 0 additions & 1 deletion drivers/python/rethinkdb/ast.py
Expand Up @@ -1526,7 +1526,6 @@ class Grant(RqlMethodQuery):
tt = pTerm.GRANT
st = 'grant'


class GrantTL(RqlTopLevelQuery):
tt = pTerm.GRANT
st = 'grant'
Expand Down
18 changes: 4 additions & 14 deletions src/clustering/administration/artificial_reql_cluster_interface.cc
Expand Up @@ -207,30 +207,20 @@ bool artificial_reql_cluster_interface_t::table_estimate_doc_counts(
if (db->name == artificial_reql_cluster_interface_t::database_name) {
auto it = m_table_backends.find(name);
if (it != m_table_backends.end()) {
counted_t<ql::datum_stream_t> docs;
/* We arbitrarily choose to read from the UUID version of the system table
rather than the name version. */
if (!it->second.second->read_all_rows_as_stream(
std::vector<ql::datum_t> datums;
if (!it->second.second->read_all_rows_filtered(
user_context,
ql::backtrace_id_t::empty(),
ql::datumspec_t(ql::datum_range_t::universe()),
sorting_t::UNORDERED,
env->interruptor,
&docs,
&datums,
error_out)) {
error_out->msg = "When estimating doc count: " + error_out->msg;
return false;
}
try {
scoped_ptr_t<ql::val_t> count =
docs->run_terminal(env, ql::count_wire_func_t());
*doc_counts_out = std::vector<int64_t>({ count->as_int<int64_t>() });
} catch (const ql::base_exc_t &msg) {
*error_out = admin_err_t{
"When estimating doc count: " + std::string(msg.what()),
query_state_t::FAILED};
return false;
}
*doc_counts_out = std::vector<int64_t>({ static_cast<int64_t>(datums.size()) });
return true;
} else {
*error_out = admin_err_t{
Expand Down
1 change: 1 addition & 0 deletions src/pprint/js_pprint.cc
Expand Up @@ -850,6 +850,7 @@ std::string pretty_print_as_js(size_t width, const ql::raw_term_t &t) {
// - `should_continue_string` should add the new Term to the list if
// it should be used in a string-of-dotted-expressions context. So
// like `r.foo(1).bar(2).baz(4)` instead of `r.eq(1, 2)`.
// ^^ What does this mean??
// - `should_use_rdot` should add the new Term to the list if it
// represents some sort of specialized language feature--for example
// literals like 4, or strings, or variable names. These are very
Expand Down
2 changes: 1 addition & 1 deletion src/rdb_protocol/artificial_table/artificial_table.cc
Expand Up @@ -101,7 +101,7 @@ counted_t<ql::datum_stream_t> artificial_table_t::read_all(
}

admin_err_t error;
if (!m_backend->read_all_rows_as_stream(
if (!m_backend->read_all_rows_filtered_as_stream(
env->get_user_context(),
bt,
datumspec,
Expand Down
45 changes: 28 additions & 17 deletions src/rdb_protocol/artificial_table/backend.cc
Expand Up @@ -25,14 +25,14 @@ uuid_u const &artificial_table_backend_t::get_table_id() const {
return m_table_id;
}

bool artificial_table_backend_t::read_all_rows_as_stream(
bool artificial_table_backend_t::read_all_rows_filtered(
auth::user_context_t const &user_context,
ql::backtrace_id_t bt,
const ql::datumspec_t &datumspec,
sorting_t sorting,
signal_t *interruptor,
counted_t<ql::datum_stream_t> *rows_out,
std::vector<ql::datum_t> *rows_out,
admin_err_t *error_out) {

/* Fetch the rows from the backend */
std::vector<ql::datum_t> rows;
if (!read_all_rows_as_vector(user_context, interruptor, &rows, error_out)) {
Expand Down Expand Up @@ -74,9 +74,31 @@ bool artificial_table_backend_t::read_all_rows_as_stream(
});
}

ql::changefeed::keyspec_t::range_t range_keyspec;
range_keyspec.sorting = sorting;
range_keyspec.datumspec = datumspec;
*rows_out = std::move(rows);
return true;
}

bool artificial_table_backend_t::read_all_rows_filtered_as_stream(
auth::user_context_t const &user_context,
ql::backtrace_id_t bt,
const ql::datumspec_t &datumspec,
sorting_t sorting,
signal_t *interruptor,
counted_t<ql::datum_stream_t> *rows_out,
admin_err_t *error_out) {
std::vector<ql::datum_t> rows;
if (!read_all_rows_filtered(user_context, datumspec, sorting, interruptor,
&rows, error_out)) {
return false;
}

ql::changefeed::keyspec_t::range_t range_keyspec = {
std::vector<ql::transform_variant_t>(),
r_nullopt,
sorting,
datumspec,
r_nullopt
};
optional<ql::changefeed::keyspec_t> keyspec(ql::changefeed::keyspec_t(
std::move(range_keyspec),
counted_t<base_table_t>(
Expand All @@ -90,14 +112,3 @@ bool artificial_table_backend_t::read_all_rows_as_stream(
return true;
}

bool artificial_table_backend_t::read_all_rows_as_vector(
UNUSED auth::user_context_t const &user_context,
UNUSED signal_t *interruptor,
UNUSED std::vector<ql::datum_t> *rows_out,
UNUSED admin_err_t *error_out) {
crash("Oops, the default implementation of `artificial_table_backend_t::"
"read_all_rows_as_vector()` was called. The `artificial_table_backend_t` "
"subclass must override at least one of `read_all_rows_as_stream()` or "
"`read_all_rows_as_vector()`. Also, the `artificial_table_backend_t` user "
"shouldn't ever call `read_all_rows_as_vector()` directly.");
}
32 changes: 18 additions & 14 deletions src/rdb_protocol/artificial_table/backend.hpp
Expand Up @@ -36,28 +36,26 @@ class artificial_table_backend_t : public home_thread_mixin_t {
change. This must not block. */
virtual std::string get_primary_key_name() = 0;

/* `read_all_rows_as_*()` returns the full dataset either as a stream or as a vector
depending on the version being called. Subclasses should override one or the
other, but not both. The `artificial_table_t` will only ever call
`read_all_rows_as_stream()`; the default implementation of
`read_all_rows_as_stream()` calls `read_all_rows_as_vector()`, while the default
implementation of `read_all_row_as_vector()` crashes. So it will work correctly
no matter which one the subclass overrides. The default implemention of
`read_all_rows_as_stream()` will also take care of the filtering and sorting,
which you must handle yourself when overriding it. */
virtual bool read_all_rows_as_stream(
// Returns the full dataset in a vector (in `rows_out`) after applying the filtering
// and sorting specified by `datumspec` and `sorting`.
bool read_all_rows_filtered(
auth::user_context_t const &user_context,
ql::backtrace_id_t bt,
const ql::datumspec_t &datumspec,
sorting_t sorting,
signal_t *interruptor,
counted_t<ql::datum_stream_t> *rows_out,
std::vector<ql::datum_t> *rows_out,
admin_err_t *error_out);

virtual bool read_all_rows_as_vector(
/* `read_all_rows_filtered_as_stream()` returns the full dataset as a stream (using
read_all_rows_as_vector) and applies the applicable filtering and sorting (as
specified in `datumspec` and `sorting`). */
bool read_all_rows_filtered_as_stream(
auth::user_context_t const &user_context,
ql::backtrace_id_t bt,
const ql::datumspec_t &datumspec,
sorting_t sorting,
signal_t *interruptor,
std::vector<ql::datum_t> *rows_out,
counted_t<ql::datum_stream_t> *rows_out,
admin_err_t *error_out);

/* Sets `*row_out` to the current value of the row, or an empty `datum_t` if no such
Expand Down Expand Up @@ -100,6 +98,12 @@ class artificial_table_backend_t : public home_thread_mixin_t {
static const uuid_u base_table_id;

private:
virtual bool read_all_rows_as_vector(
auth::user_context_t const &user_context,
signal_t *interruptor,
std::vector<ql::datum_t> *rows_out,
admin_err_t *error_out) = 0;

name_string_t m_table_name;
uuid_u m_table_id;
rdb_context_t *m_rdb_context;
Expand Down
19 changes: 3 additions & 16 deletions src/rdb_protocol/artificial_table/caching_cfeed_backend.cc
Expand Up @@ -253,29 +253,16 @@ bool caching_cfeed_artificial_table_backend_t::caching_machinery_t::get_values(
signal_t *interruptor, std::map<store_key_t, ql::datum_t> *out) {
out->clear();
admin_err_t error;
counted_t<ql::datum_stream_t> stream;
if (!parent->read_all_rows_as_stream(
std::vector<ql::datum_t> datums;
if (!parent->read_all_rows_filtered(
m_user_context,
ql::backtrace_id_t(),
ql::datumspec_t(ql::datum_range_t::universe()),
sorting_t::UNORDERED,
interruptor,
&stream,
&datums,
&error)) {
return false;
}
guarantee(stream->cfeed_type() == ql::feed_type_t::not_feed);
ql::env_t env(interruptor,
ql::return_empty_normal_batches_t::NO,
reql_version_t::LATEST);
std::vector<ql::datum_t> datums;
try {
datums = stream->next_batch(&env, ql::batchspec_t::all());
} catch (const ql::base_exc_t &) {
return false;
}
guarantee(stream->is_exhausted(), "We expect ql::batchspec_t::all() to read the "
"entire stream");
for (const ql::datum_t &doc : datums) {
ql::datum_t key = doc.get_field(
datum_string_t(parent->get_primary_key_name()),
Expand Down
15 changes: 12 additions & 3 deletions src/rdb_protocol/btree.cc
Expand Up @@ -318,10 +318,10 @@ batched_replace_response_t rdb_replace_and_return_superblock(
}

ql::datum_t btree_batched_replacer_t::apply_write_hook(
ql::env_t *env,
const datum_string_t &pkey,
const ql::datum_t &d,
const ql::datum_t &res_,
const ql::datum_t &write_timestamp,
const counted_t<const ql::func_t> &write_hook) const {
ql::datum_t res = res_;
if (write_hook.has()) {
Expand All @@ -336,9 +336,18 @@ ql::datum_t btree_batched_replacer_t::apply_write_hook(
}
ql::datum_t modified;
try {
modified = write_hook->call(env,
cond_t non_interruptor;
ql::env_t write_hook_env(&non_interruptor,
ql::return_empty_normal_batches_t::NO,
reql_version_t::LATEST);

ql::datum_object_builder_t builder;
builder.overwrite("primary_key", std::move(primary_key));
builder.overwrite("timestamp", write_timestamp);

modified = write_hook->call(&write_hook_env,
std::vector<ql::datum_t>{
primary_key,
std::move(builder).to_datum(),
d,
res})->as_datum();
} catch (ql::exc_t &e) {
Expand Down
2 changes: 1 addition & 1 deletion src/rdb_protocol/btree.hpp
Expand Up @@ -98,10 +98,10 @@ struct btree_batched_replacer_t {
virtual return_changes_t should_return_changes() const = 0;

ql::datum_t apply_write_hook(
ql::env_t *env,
const datum_string_t &pkey,
const ql::datum_t &d,
const ql::datum_t &res_,
const ql::datum_t &write_timestamp,
const counted_t<const ql::func_t> &write_hook) const;
};
struct btree_point_replacer_t {
Expand Down
18 changes: 5 additions & 13 deletions src/rdb_protocol/changefeed.cc
Expand Up @@ -427,8 +427,7 @@ void server_t::add_limit_client(
const std::string &table,
const optional<uuid_u> &sindex_id,
rdb_context_t *ctx,
global_optargs_t optargs,
auth::user_context_t user_context,
const serializable_env_t &s_env,
const uuid_u &client_uuid,
const keyspec_t::limit_t &spec,
limit_order_t lt,
Expand All @@ -450,8 +449,7 @@ void server_t::add_limit_client(
table,
sindex_id,
ctx,
std::move(optargs),
std::move(user_context),
s_env,
client_uuid,
this,
it->first,
Expand Down Expand Up @@ -858,8 +856,7 @@ limit_manager_t::limit_manager_t(
std::string _table,
optional<uuid_u> _sindex_id,
rdb_context_t *ctx,
global_optargs_t optargs,
auth::user_context_t user_context,
const serializable_env_t &s_env,
uuid_u _uuid,
server_t *_parent,
client_t::addr_t _parent_client,
Expand All @@ -883,9 +880,7 @@ limit_manager_t::limit_manager_t(
ctx,
return_empty_normal_batches_t::NO,
drainer.get_drain_signal(),
std::move(optargs),
std::move(user_context),
datum_t(),
s_env,
nullptr);

guarantee(ops.size() == 0);
Expand Down Expand Up @@ -2397,10 +2392,7 @@ class range_sub_t : public flat_sub_t {
outer_env->get_rdb_ctx(),
outer_env->return_empty_normal_batches,
drainer.get_drain_signal(),
serializable_env_t{
outer_env->get_all_optargs(),
outer_env->get_user_context(),
outer_env->get_deterministic_time()},
outer_env->get_serializable_env(),
nullptr/*don't profile*/);
}

Expand Down
7 changes: 3 additions & 4 deletions src/rdb_protocol/changefeed.hpp
Expand Up @@ -43,6 +43,7 @@ class name_resolver_t;
class real_superblock_t;
class sindex_superblock_t;
struct rdb_modification_report_t;
struct serializable_env_t;
struct sindex_disk_info_t;

// The string is the btree index key
Expand Down Expand Up @@ -404,8 +405,7 @@ class limit_manager_t {
std::string _table,
optional<uuid_u> _sindex_id,
rdb_context_t *ctx,
global_optargs_t optargs,
auth::user_context_t user_context,
const serializable_env_t &s_env,
uuid_u _uuid,
server_t *_parent,
client_t::addr_t _parent_client,
Expand Down Expand Up @@ -479,8 +479,7 @@ class server_t {
const std::string &table,
const optional<uuid_u> &sindex_id,
rdb_context_t *ctx,
global_optargs_t optargs,
auth::user_context_t user_context,
const serializable_env_t &s_env,
const uuid_u &client_uuid,
const keyspec_t::limit_t &spec,
limit_order_t lt,
Expand Down
5 changes: 3 additions & 2 deletions src/rdb_protocol/configured_limits.cc
Expand Up @@ -9,7 +9,8 @@
namespace ql {

configured_limits_t from_optargs(
rdb_context_t *ctx, signal_t *interruptor, global_optargs_t *args) {
rdb_context_t *ctx, signal_t *interruptor, global_optargs_t *args,
ql::datum_t deterministic_time) {
size_t changefeed_queue_size = configured_limits_t::default_changefeed_queue_size;
size_t array_size_limit = configured_limits_t::default_array_size_limit;
// Fake an environment with no arguments. We have to fake it
Expand All @@ -30,7 +31,7 @@ configured_limits_t from_optargs(
serializable_env_t{
global_optargs_t(),
auth::user_context_t(auth::permissions_t(tribool::False, tribool::False, tribool::False, tribool::False)),
datum_t()},
deterministic_time},
nullptr);
if (has_changefeed_queue_size) {
int64_t sz = args->get_optarg(&env, "changefeed_queue_size")->as_int();
Expand Down
6 changes: 4 additions & 2 deletions src/rdb_protocol/configured_limits.hpp
Expand Up @@ -10,8 +10,9 @@ class signal_t;

namespace ql {

class wire_func_t;
class datum_t;
class global_optargs_t;
class wire_func_t;

class configured_limits_t {
public:
Expand All @@ -35,7 +36,8 @@ class configured_limits_t {
};

configured_limits_t from_optargs(rdb_context_t *ctx, signal_t *interruptor,
global_optargs_t *optargs);
global_optargs_t *optargs,
ql::datum_t deterministic_time);
size_t check_limit(const char *name, int64_t limit);

} // namespace ql
Expand Down
4 changes: 1 addition & 3 deletions src/rdb_protocol/datum_stream.cc
Expand Up @@ -1892,9 +1892,7 @@ ordered_union_datum_stream_t::ordered_union_datum_stream_t(
env->get_rdb_ctx(),
env->return_empty_normal_batches,
&non_interruptor,
env->get_all_optargs(),
env->get_user_context(),
env->get_deterministic_time(),
env->get_serializable_env(),
nullptr)),
lt(_comparisons),
merge_cache(merge_less_t{merge_env.get(), nullptr, &lt}) {
Expand Down

0 comments on commit 1a6f685

Please sign in to comment.