From 98eb938448aad4b53cfaf69fdc6bef39512dc02d Mon Sep 17 00:00:00 2001 From: raviprakashgiri29 <56813441+raviprakashgiri29@users.noreply.github.com> Date: Wed, 8 Apr 2020 10:25:22 +1000 Subject: [PATCH] WT-5675 Prepare support workgen changes (#5453) * WT-5675 Prepare support workgen changes * Encapsulated timestamps to use epoch time * Added oldest, stable timestamps to move forward periodically * Moved irrelevant functions from Workload to WorkloadRunner class * Renamed _transaction to transaction in all related files --- bench/workgen/runner/example_prepare.py | 78 +++++++++++ bench/workgen/runner/runner/core.py | 4 +- bench/workgen/workgen.cxx | 172 ++++++++++++++++++++---- bench/workgen/workgen.h | 19 ++- bench/workgen/workgen_int.h | 31 ++++- 5 files changed, 267 insertions(+), 37 deletions(-) create mode 100644 bench/workgen/runner/example_prepare.py diff --git a/bench/workgen/runner/example_prepare.py b/bench/workgen/runner/example_prepare.py new file mode 100644 index 00000000000..4520f2cb787 --- /dev/null +++ b/bench/workgen/runner/example_prepare.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python +# +# Public Domain 2014-2020 MongoDB, Inc. +# Public Domain 2008-2014 WiredTiger, Inc. +# +# This is free and unencumbered software released into the public domain. +# +# Anyone is free to copy, modify, publish, use, compile, sell, or +# distribute this software, either in source code form or as a compiled +# binary, for any purpose, commercial or non-commercial, and by any +# means. +# +# In jurisdictions that recognize copyright laws, the author or authors +# of this software dedicate any and all copyright interest in the +# software to the public domain. We make this dedication for the benefit +# of the public at large and to the detriment of our heirs and +# successors. We intend this dedication to be an overt act of +# relinquishment in perpetuity of all present and future rights to this +# software under copyright law. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +# IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR +# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, +# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +# OTHER DEALINGS IN THE SOFTWARE. +# + +from runner import * +from wiredtiger import * +from workgen import * + +conn = wiredtiger_open("WT_TEST", "create,cache_size=500MB") +s = conn.open_session() +tname = "table:test" +config = "key_format=S,value_format=S," +s.create(tname, config) +table = Table(tname) +table.options.key_size = 20 +table.options.value_size = 10 + +context = Context() +op = Operation(Operation.OP_INSERT, table) +thread = Thread(op * 5000) +pop_workload = Workload(context, thread) +print('populate:') +pop_workload.run(conn) + +opread = Operation(Operation.OP_SEARCH, table) +read_txn = txn(opread * 10, 'read_timestamp') +# read_timestamp_lag is the lag to the read_timestamp from current time +read_txn.transaction.read_timestamp_lag = 5 +treader = Thread(read_txn) + +opwrite = Operation(Operation.OP_INSERT, table) +write_txn = txn(opwrite * 10, 'isolation=snapshot') +# use_prepare_timestamp - Commit the transaction with stable_timestamp. +write_txn.transaction.use_prepare_timestamp = True +twriter = Thread(write_txn) + +opupdate = Operation(Operation.OP_UPDATE, table) +update_txn = txn(opupdate * 10, 'isolation=snapshot') +# use_commit_timestamp - Commit the transaction with commit_timestamp. +update_txn.transaction.use_commit_timestamp = True +tupdate = Thread(update_txn) + +workload = Workload(context, 10 * twriter + 10 * tupdate + 10 * treader) +workload.options.run_time = 50 +workload.options.report_interval=500 +# read_timestamp_lag - Number of seconds lag to the oldest_timestamp from current time. +workload.options.oldest_timestamp_lag=30 +# read_timestamp_lag - Number of seconds lag to the stable_timestamp from current time. +workload.options.stable_timestamp_lag=10 +# timestamp_advance is the number of seconds to wait before moving oldest and stable timestamp. +workload.options.timestamp_advance=1 +print('transactional prepare workload:') +workload.run(conn) diff --git a/bench/workgen/runner/runner/core.py b/bench/workgen/runner/runner/core.py index ae3de8efa64..158a65d1fbd 100755 --- a/bench/workgen/runner/runner/core.py +++ b/bench/workgen/runner/runner/core.py @@ -35,7 +35,7 @@ # Put the operation (and any suboperations) within a transaction. def txn(op, config=None): t = Transaction(config) - op._transaction = t + op.transaction = t return op # sleep -- @@ -301,7 +301,7 @@ def _op_transaction_list(oplist, txn_config): def op_group_transaction(ops_arg, ops_per_txn, txn_config): if ops_arg != Operation.OP_NONE: return txn(ops_arg, txn_config) - if ops_arg._transaction != None: + if ops_arg.transaction != None: raise Exception('nested transactions not supported') if ops_arg._repeatgroup != None: raise Exception('grouping transactions with multipliers not supported') diff --git a/bench/workgen/workgen.cxx b/bench/workgen/workgen.cxx index 2347cf9d6b5..ca0cd4b308d 100644 --- a/bench/workgen/workgen.cxx +++ b/bench/workgen/workgen.cxx @@ -35,7 +35,6 @@ #include "wiredtiger.h" #include "workgen.h" #include "workgen_int.h" -#include "workgen_time.h" extern "C" { // Include some specific WT files, as some files included by wt_internal.h // have some C-ism's that don't work in C++. @@ -43,12 +42,11 @@ extern "C" { #include #include #include -#include #include -#include #include "error.h" #include "misc.h" } +#define BUF_SIZE 100 #define LATENCY_US_BUCKETS 1000 #define LATENCY_MS_BUCKETS 1000 @@ -102,6 +100,12 @@ extern "C" { namespace workgen { + +struct WorkloadRunnerConnection { + WorkloadRunner *runner; + WT_CONNECTION *connection; +}; + // The number of contexts. Normally there is one context created, but it will // be possible to use several eventually. More than one is not yet // implemented, but we must at least guard against the caller creating more @@ -118,6 +122,48 @@ static void *thread_runner_main(void *arg) { return (NULL); } +static void *thread_workload(void *arg) { + + WorkloadRunnerConnection *runnerConnection = (WorkloadRunnerConnection *) arg; + WorkloadRunner *runner = runnerConnection->runner; + WT_CONNECTION *connection = runnerConnection->connection; + + try { + runner->increment_timestamp(connection); + } catch (WorkgenException &wge) { + std::cerr << "Exception while incrementing timestamp." << std::endl; + } + + return (NULL); +} + +/* + * This function will sleep for "timestamp_advance" seconds, increment and set oldest_timestamp, + * stable_timestamp with the specified lag until stopping is set to true + */ +int WorkloadRunner::increment_timestamp(WT_CONNECTION *conn) { + char buf[BUF_SIZE]; + uint64_t time_us; + + while (!stopping) + { + if (_workload->options.oldest_timestamp_lag > 0) { + time_us = WorkgenTimeStamp::get_timestamp_lag(_workload->options.oldest_timestamp_lag); + sprintf(buf, "oldest_timestamp=%" PRIu64, time_us); + conn->set_timestamp(conn, buf); + } + + if (_workload->options.stable_timestamp_lag > 0) { + time_us = WorkgenTimeStamp::get_timestamp_lag(_workload->options.stable_timestamp_lag); + sprintf(buf, "stable_timestamp=%" PRIu64, time_us); + conn->set_timestamp(conn, buf); + } + + WorkgenTimeStamp::sleep(_workload->options.timestamp_advance); + } + return 0; +} + static void *monitor_main(void *arg) { Monitor *monitor = (Monitor *)arg; try { @@ -715,6 +761,9 @@ int ThreadRunner::op_run(Operation *op) { uint64_t recno; uint64_t range; bool measure_latency, own_cursor, retry_op; + timespec start_time; + uint64_t time_us; + char buf[BUF_SIZE]; track = NULL; cursor = NULL; @@ -795,6 +844,7 @@ int ThreadRunner::op_run(Operation *op) { timespec start; if (measure_latency) workgen_epoch(&start); + // Whether or not we are measuring latency, we track how many operations // are in progress, or that complete. if (track != NULL) @@ -814,11 +864,22 @@ int ThreadRunner::op_run(Operation *op) { } // Retry on rollback until success. while (retry_op) { - if (op->_transaction != NULL) { + if (op->transaction != NULL) { if (_in_transaction) THROW("nested transactions not supported"); - WT_ERR(_session->begin_transaction(_session, - op->_transaction->_begin_config.c_str())); + if (op->transaction->use_commit_timestamp && op->transaction->use_prepare_timestamp) + { + THROW("Either use_prepare_timestamp or use_commit_timestamp must be set."); + } + if (op->transaction->read_timestamp_lag > 0) { + uint64_t read = WorkgenTimeStamp::get_timestamp_lag(op->transaction->read_timestamp_lag); + sprintf(buf, "%s=%" PRIu64, op->transaction->_begin_config.c_str(), read); + } + else { + sprintf(buf, "%s", op->transaction->_begin_config.c_str()); + } + WT_ERR(_session->begin_transaction(_session, buf)); + _in_transaction = true; } if (op->is_table_op()) { @@ -899,12 +960,28 @@ int ThreadRunner::op_run(Operation *op) { err: if (own_cursor) WT_TRET(cursor->close(cursor)); - if (op->_transaction != NULL) { - if (ret != 0 || op->_transaction->_rollback) + if (op->transaction != NULL) { + if (ret != 0 || op->transaction->_rollback) WT_TRET(_session->rollback_transaction(_session, NULL)); - else if (_in_transaction) - ret = _session->commit_transaction(_session, - op->_transaction->_commit_config.c_str()); + else if (_in_transaction) { + // Set prepare, commit and durable timestamp if prepare is set. + if (op->transaction->use_prepare_timestamp) { + time_us = WorkgenTimeStamp::get_timestamp(); + sprintf(buf, "prepare_timestamp=%" PRIu64, time_us); + ret = _session->prepare_transaction(_session, buf); + sprintf(buf, "commit_timestamp=%" PRIu64 ",durable_timestamp=%" PRIu64, time_us, time_us); + ret = _session->commit_transaction(_session, buf); + } + else if (op->transaction->use_commit_timestamp) { + uint64_t commit_time_us = WorkgenTimeStamp::get_timestamp(); + sprintf(buf, "commit_timestamp=%" PRIu64, commit_time_us); + ret = _session->commit_transaction(_session, buf); + } + else { + ret = _session->commit_transaction(_session, + op->transaction->_commit_config.c_str()); + } + } _in_transaction = false; } return (ret); @@ -1077,27 +1154,27 @@ void Thread::describe(std::ostream &os) const { Operation::Operation() : _optype(OP_NONE), _internal(NULL), _table(), _key(), _value(), _config(), - _transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) { + transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) { init_internal(NULL); } Operation::Operation(OpType optype, Table table, Key key, Value value) : _optype(optype), _internal(NULL), _table(table), _key(key), _value(value), - _config(), _transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) { + _config(), transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) { init_internal(NULL); size_check(); } Operation::Operation(OpType optype, Table table, Key key) : _optype(optype), _internal(NULL), _table(table), _key(key), _value(), - _config(), _transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) { + _config(), transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) { init_internal(NULL); size_check(); } Operation::Operation(OpType optype, Table table) : _optype(optype), _internal(NULL), _table(table), _key(), _value(), - _config(), _transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) { + _config(), transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) { init_internal(NULL); size_check(); } @@ -1105,22 +1182,22 @@ Operation::Operation(OpType optype, Table table) : Operation::Operation(const Operation &other) : _optype(other._optype), _internal(NULL), _table(other._table), _key(other._key), _value(other._value), _config(other._config), - _transaction(other._transaction), _group(other._group), + transaction(other.transaction), _group(other._group), _repeatgroup(other._repeatgroup), _timed(other._timed) { - // Creation and destruction of _group and _transaction is managed + // Creation and destruction of _group and transaction is managed // by Python. init_internal(other._internal); } Operation::Operation(OpType optype, const char *config) : _optype(optype), _internal(NULL), _table(), _key(), _value(), - _config(config), _transaction(NULL), _group(NULL), _repeatgroup(0), + _config(config), transaction(NULL), _group(NULL), _repeatgroup(0), _timed(0.0) { init_internal(NULL); } Operation::~Operation() { - // Creation and destruction of _group, _transaction is managed by Python. + // Creation and destruction of _group, transaction is managed by Python. delete _internal; } @@ -1129,7 +1206,7 @@ Operation& Operation::operator=(const Operation &other) { _table = other._table; _key = other._key; _value = other._value; - _transaction = other._transaction; + transaction = other.transaction; _group = other._group; _repeatgroup = other._repeatgroup; _timed = other._timed; @@ -1184,7 +1261,7 @@ void Operation::init_internal(OperationInternal *other) { bool Operation::combinable() const { return (_group != NULL && _repeatgroup == 1 && _timed == 0.0 && - _transaction == NULL && _config == ""); + transaction == NULL && _config == ""); } void Operation::create_all() { @@ -1203,9 +1280,9 @@ void Operation::describe(std::ostream &os) const { } if (!_config.empty()) os << ", '" << _config << "'"; - if (_transaction != NULL) { + if (transaction != NULL) { os << ", ["; - _transaction->describe(os); + transaction->describe(os); os << "]"; } if (_timed != 0.0) @@ -1439,7 +1516,7 @@ int SleepOperationInternal::run(ThreadRunner *runner, WT_SESSION *session) uint64_t SleepOperationInternal::sync_time_us() const { - return (secs_us(_sleepvalue)); + return (secs_us(_sleepvalue)); } void TableOperationInternal::parse_config(const std::string &config) @@ -1857,7 +1934,8 @@ TableInternal::~TableInternal() {} WorkloadOptions::WorkloadOptions() : max_latency(0), report_file("workload.stat"), report_interval(0), run_time(0), sample_file("monitor.json"), sample_interval_ms(0), sample_rate(1), - warmup(0), _options() { + warmup(0), oldest_timestamp_lag(0.0), stable_timestamp_lag(0.0), + timestamp_advance(0.0), _options() { _options.add_int("max_latency", max_latency, "prints warning if any latency measured exceeds this number of " "milliseconds. Requires sample_interval to be configured."); @@ -1881,6 +1959,13 @@ WorkloadOptions::WorkloadOptions() : max_latency(0), "2 for every second operation, 3 for every third operation etc."); _options.add_int("warmup", warmup, "how long to run the workload phase before starting measurements"); + _options.add_double("oldest_timestamp_lag", oldest_timestamp_lag, + "how much lag to the oldest timestamp from epoch time"); + _options.add_double("stable_timestamp_lag", stable_timestamp_lag, + "how much lag to the oldest timestamp from epoch time"); + _options.add_double("timestamp_advance", timestamp_advance, + "how many seconds to wait before moving oldest and stable" + "timestamp forward"); } WorkloadOptions::WorkloadOptions(const WorkloadOptions &other) : @@ -1917,13 +2002,12 @@ Workload& Workload::operator=(const Workload &other) { int Workload::run(WT_CONNECTION *conn) { WorkloadRunner runner(this); - return (runner.run(conn)); } WorkloadRunner::WorkloadRunner(Workload *workload) : _workload(workload), _trunners(workload->_threads.size()), - _report_out(&std::cout), _start() { + _report_out(&std::cout), _start(), stopping(false) { ts_clear(_start); } WorkloadRunner::~WorkloadRunner() {} @@ -1934,6 +2018,9 @@ int WorkloadRunner::run(WT_CONNECTION *conn) { std::ofstream report_out; _wt_home = conn->get_home(conn); + + if ( (options->oldest_timestamp_lag > 0 || options->stable_timestamp_lag > 0) && options->timestamp_advance < 0 ) + THROW("Workload.options.timestamp_advance must be positive if either Workload.options.oldest_timestamp_lag or Workload.options.stable_timestamp_lag is set"); if (options->sample_interval_ms > 0 && options->sample_rate <= 0) THROW("Workload.options.sample_rate must be positive"); if (!options->report_file.empty()) { @@ -1944,7 +2031,7 @@ int WorkloadRunner::run(WT_CONNECTION *conn) { WT_ERR(create_all(conn, _workload->_context)); WT_ERR(open_all()); WT_ERR(ThreadRunner::cross_check(_trunners)); - WT_ERR(run_all()); + WT_ERR(run_all(conn)); err: //TODO: (void)close_all(); _report_out = &std::cout; @@ -2031,16 +2118,18 @@ void WorkloadRunner::final_report(timespec &totalsecs) { out << "Run completed: " << totalsecs << " seconds" << std::endl; } -int WorkloadRunner::run_all() { +int WorkloadRunner::run_all(WT_CONNECTION *conn) { void *status; std::vector thread_handles; Stats counts(false); WorkgenException *exception; WorkloadOptions *options = &_workload->options; + WorkloadRunnerConnection *runnerConnection; Monitor monitor(*this); std::ofstream monitor_out; std::ofstream monitor_json; std::ostream &out = *_report_out; + pthread_t time_thandle; WT_DECL_RET; for (size_t i = 0; i < _trunners.size(); i++) @@ -2086,6 +2175,22 @@ int WorkloadRunner::run_all() { thread_handles.push_back(thandle); } + // Start Timestamp increment thread + if (options->oldest_timestamp_lag > 0 || options->stable_timestamp_lag > 0) { + + runnerConnection = new WorkloadRunnerConnection(); + runnerConnection->runner = this; + runnerConnection->connection = conn; + + if ((ret = pthread_create(&time_thandle, NULL, thread_workload, + runnerConnection)) != 0) { + std::cerr << "pthread_create failed err=" << ret << std::endl; + std::cerr << "Stopping Time threads." << std::endl; + (void)pthread_join(time_thandle, &status); + delete runnerConnection; + } + } + // Treat warmup separately from report interval so that if we have a // warmup period we clear and ignore stats after it ends. if (options->warmup != 0) @@ -2132,6 +2237,9 @@ int WorkloadRunner::run_all() { _trunners[i]._stop = true; if (options->sample_interval_ms > 0) monitor._stop = true; + if (options->oldest_timestamp_lag > 0 || options->stable_timestamp_lag > 0) { + stopping = true; + } // wait for all threads exception = NULL; @@ -2146,6 +2254,12 @@ int WorkloadRunner::run_all() { exception = &_trunners[i]._exception; } + // Wait for the time increment thread + if (options->oldest_timestamp_lag > 0 || options->stable_timestamp_lag > 0) { + WT_TRET(pthread_join(time_thandle, &status)); + delete runnerConnection; + } + workgen_epoch(&now); if (options->sample_interval_ms > 0) { WT_TRET(pthread_join(monitor._handle, &status)); diff --git a/bench/workgen/workgen.h b/bench/workgen/workgen.h index 382ca65dcfc..b963cf3d47e 100644 --- a/bench/workgen/workgen.h +++ b/bench/workgen/workgen.h @@ -292,7 +292,7 @@ struct Operation { Key _key; Value _value; std::string _config; - Transaction *_transaction; + Transaction *transaction; std::vector *_group; int _repeatgroup; double _timed; @@ -386,11 +386,15 @@ struct Thread { struct Transaction { bool _rollback; + bool use_commit_timestamp; + bool use_prepare_timestamp; std::string _begin_config; std::string _commit_config; + double read_timestamp_lag; - Transaction(const char *_config = NULL) : _rollback(false), - _begin_config(_config == NULL ? "" : _config), _commit_config() {} + Transaction(const char *_config = NULL) : _rollback(false), use_commit_timestamp(false), use_prepare_timestamp(false), _begin_config(_config == NULL ? "" : _config), _commit_config(), + read_timestamp_lag(0.0) + {} void describe(std::ostream &os) const { os << "Transaction: "; @@ -399,6 +403,12 @@ struct Transaction { os << "begin_config: " << _begin_config; if (!_commit_config.empty()) os << ", commit_config: " << _commit_config; + if (use_commit_timestamp) + os << "(use_commit_timestamp) "; + if (use_prepare_timestamp) + os << "(use_prepare_timestamp) "; + if (read_timestamp_lag) + os << "(read_timestamp_lag)"; } }; @@ -414,6 +424,9 @@ struct WorkloadOptions { int sample_rate; std::string sample_file; int warmup; + double oldest_timestamp_lag; + double stable_timestamp_lag; + double timestamp_advance; WorkloadOptions(); WorkloadOptions(const WorkloadOptions &other); diff --git a/bench/workgen/workgen_int.h b/bench/workgen/workgen_int.h index ca93e5c2733..d5ed99c8c53 100644 --- a/bench/workgen/workgen_int.h +++ b/bench/workgen/workgen_int.h @@ -30,11 +30,12 @@ #include #include #include -#ifndef SWIG extern "C" { +#include #include "workgen_func.h" +#include } -#endif +#include "workgen_time.h" namespace workgen { @@ -46,6 +47,28 @@ typedef uint32_t tint_t; struct ThreadRunner; struct WorkloadRunner; +struct WorkgenTimeStamp { + WorkgenTimeStamp() {} + + static uint64_t get_timestamp_lag(double seconds) { + timespec start_time; + workgen_epoch(&start_time); + + return (ts_us(start_time) - secs_us(seconds)); + } + + static void sleep(double seconds) { + usleep(ceil(secs_us(seconds))); + } + + static uint64_t get_timestamp() { + timespec start_time; + workgen_epoch(&start_time); + + return (ts_us(start_time)); + } +}; + // A exception generated by the workgen classes. Methods generally return an // int errno, so this is useful primarily for notifying the caller about // failures in constructors. @@ -250,10 +273,12 @@ struct WorkloadRunner { std::ostream *_report_out; std::string _wt_home; timespec _start; + bool stopping; WorkloadRunner(Workload *); ~WorkloadRunner(); int run(WT_CONNECTION *conn); + int increment_timestamp(WT_CONNECTION *conn); private: int close_all(); @@ -263,7 +288,7 @@ struct WorkloadRunner { int open_all(); void open_report_file(std::ofstream &, const char *, const char *); void report(time_t, time_t, Stats *stats); - int run_all(); + int run_all(WT_CONNECTION *conn); WorkloadRunner(const WorkloadRunner &); // disallowed WorkloadRunner& operator=(const WorkloadRunner &other); // disallowed