Skip to content

Commit

Permalink
WT-7332 Add ability to create and drop tables in workgen (#6417)
Browse files Browse the repository at this point in the history
Added max_idle_table_cycle and max_idle_table_cycle_fatal options in workgen similar to wtperf.
  • Loading branch information
raviprakashgiri29 committed Mar 29, 2021
1 parent 8b7aaad commit 716858f
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 42 deletions.
3 changes: 3 additions & 0 deletions bench/workgen/runner/many-dhandle-stress.py
Expand Up @@ -90,6 +90,9 @@
workload.options.max_latency=1000
workload.options.sample_rate=1
workload.options.sample_interval_ms = 5000
# Uncomment to fail instead of generating a warning
# workload.options.max_idle_table_cycle_fatal = True
workload.options.max_idle_table_cycle = 2
workload.run(conn)

latency_filename = context.args.home + "/latency.out"
Expand Down
212 changes: 171 additions & 41 deletions bench/workgen/workgen.cxx
Expand Up @@ -137,13 +137,104 @@ static void *thread_workload(void *arg) {
return (NULL);
}

static void *thread_idle_table_cycle_workload(void *arg) {
WorkloadRunnerConnection *runnerConnection = (WorkloadRunnerConnection *) arg;
WT_CONNECTION *connection = runnerConnection->connection;
WorkloadRunner *runner = runnerConnection->runner;

try {
runner->start_table_idle_cycle(connection);
} catch (WorkgenException &wge) {
std::cerr << "Exception while create/drop tables." << std::endl;
}

return (NULL);
}

int WorkloadRunner::check_timing(const char *name, uint64_t last_interval) {
WorkloadOptions *options = &_workload->options;
int msg_err;
const char *str;

msg_err = 0;

if (last_interval > options->max_idle_table_cycle) {
if (options->max_idle_table_cycle_fatal) {
msg_err = ETIMEDOUT;
str = "ERROR";
} else {
str = "WARNING";
}
std::cerr << str << ": Cycling idle table failed because " << name << " took " << last_interval << " seconds which is longer than configured acceptable maximum of " << options->max_idle_table_cycle << std::endl;
}
return (msg_err);
}

int WorkloadRunner::start_table_idle_cycle(WT_CONNECTION *conn) {
WT_SESSION *session;
WT_CURSOR *cursor;
uint64_t start, stop, last_interval;
int ret, cycle_count;
char uri[BUF_SIZE];

cycle_count = 0;
if (ret = conn->open_session(conn, NULL, NULL, &session) != 0) {
THROW("Error Opening a Session.");
}

for (cycle_count = 0 ; !stopping ; ++cycle_count) {
sprintf(uri, "table:test_cycle%04d", cycle_count);

workgen_clock(&start);
/* Create a table. */
if ((ret = session->create(session, uri, "key_format=S,value_format=S")) != 0) {
if (ret == EBUSY)
continue;
THROW("Table create failed in start_table_idle_cycle.");
}
workgen_clock(&stop);
last_interval = ns_to_sec(stop - start);
if ((ret = check_timing("CREATE", last_interval)) != 0)
THROW_ERRNO(ret, "WT_SESSION->create timeout.");
start = stop;

/* Open and close cursor. */
if ((ret = session->open_cursor(session, uri, NULL, NULL, &cursor)) != 0) {
THROW("Cursor open failed.");
}
if ((ret = cursor->close(cursor)) != 0) {
THROW("Cursor close failed.");
}
workgen_clock(&stop);
last_interval = ns_to_sec(stop - start);
if ((ret = check_timing("CURSOR", last_interval)) != 0)
THROW_ERRNO(ret, "WT_SESSION->open_cursor timeout.");
start = stop;

/*
* Drop the table. Keep retrying on EBUSY failure - it is an expected return when
* checkpoints are happening.
*/
while ((ret = session->drop(session, uri, "force,checkpoint_wait=false")) == EBUSY)
sleep(1);

if (ret != 0) {
THROW("Table drop failed in cycle_idle_tables.");
}
workgen_clock(&stop);
last_interval = ns_to_sec(stop - start);
if ((ret = check_timing("DROP", last_interval)) != 0)
THROW_ERRNO(ret, "WT_SESSION->drop timeout.");
}
return 0;
}
/*
* This function will sleep for "timestamp_advance" seconds, increment and set oldest_timestamp,
* stable_timestamp with the specified lag until stopping is set to true
*/
int WorkloadRunner::increment_timestamp(WT_CONNECTION *conn) {
char buf[BUF_SIZE];
uint64_t time_us;
char buf[BUF_SIZE];

while (!stopping)
{
Expand Down Expand Up @@ -1933,9 +2024,9 @@ TableInternal::~TableInternal() {}

WorkloadOptions::WorkloadOptions() : max_latency(0),
report_file("workload.stat"), report_interval(0), run_time(0),
sample_file("monitor.json"), sample_interval_ms(0), sample_rate(1),
warmup(0), oldest_timestamp_lag(0.0), stable_timestamp_lag(0.0),
timestamp_advance(0.0), _options() {
sample_file("monitor.json"), sample_interval_ms(0), max_idle_table_cycle(0),
sample_rate(1), warmup(0), oldest_timestamp_lag(0.0), stable_timestamp_lag(0.0),
timestamp_advance(0.0), max_idle_table_cycle_fatal(false), _options() {
_options.add_int("max_latency", max_latency,
"prints warning if any latency measured exceeds this number of "
"milliseconds. Requires sample_interval to be configured.");
Expand All @@ -1954,6 +2045,9 @@ WorkloadOptions::WorkloadOptions() : max_latency(0),
"When set to the empty string, no JSON is emitted.");
_options.add_int("sample_interval_ms", sample_interval_ms,
"performance logging every interval milliseconds, 0 to disable");
_options.add_int("max_idle_table_cycle", max_idle_table_cycle,
"maximum number of seconds a create or drop is allowed before aborting "
"or printing a warning based on max_idle_table_cycle_fatal setting.");
_options.add_int("sample_rate", sample_rate,
"how often the latency of operations is measured. 1 for every operation, "
"2 for every second operation, 3 for every third operation etc.");
Expand All @@ -1966,6 +2060,8 @@ WorkloadOptions::WorkloadOptions() : max_latency(0),
_options.add_double("timestamp_advance", timestamp_advance,
"how many seconds to wait before moving oldest and stable"
"timestamp forward");
_options.add_bool("max_idle_table_cycle_fatal", max_idle_table_cycle_fatal,
"print warning (false) or abort (true) of max_idle_table_cycle failure");
}

WorkloadOptions::WorkloadOptions(const WorkloadOptions &other) :
Expand Down Expand Up @@ -2125,13 +2221,15 @@ int WorkloadRunner::run_all(WT_CONNECTION *conn) {
WorkgenException *exception;
WorkloadOptions *options = &_workload->options;
WorkloadRunnerConnection *runnerConnection;
WorkloadRunnerConnection *createDropTableCycle;
Monitor monitor(*this);
std::ofstream monitor_out;
std::ofstream monitor_json;
std::ostream &out = *_report_out;
pthread_t time_thandle;
pthread_t time_thandle, idle_table_thandle;
WT_DECL_RET;

runnerConnection = createDropTableCycle = NULL;
for (size_t i = 0; i < _trunners.size(); i++)
_trunners[i].get_static_counts(counts);
out << "Starting workload: " << _trunners.size() << " threads, ";
Expand Down Expand Up @@ -2188,58 +2286,84 @@ int WorkloadRunner::run_all(WT_CONNECTION *conn) {
std::cerr << "Stopping Time threads." << std::endl;
(void)pthread_join(time_thandle, &status);
delete runnerConnection;
runnerConnection = NULL;
stopping = true;
}
}

// Treat warmup separately from report interval so that if we have a
// warmup period we clear and ignore stats after it ends.
if (options->warmup != 0)
sleep((unsigned int)options->warmup);
// Start Idle table cycle thread
if (options->max_idle_table_cycle > 0) {

// Clear stats after any warmup period completes.
for (size_t i = 0; i < _trunners.size(); i++) {
ThreadRunner *runner = &_trunners[i];
runner->_stats.clear();
}
createDropTableCycle = new WorkloadRunnerConnection();
createDropTableCycle->runner = this;
createDropTableCycle->connection = conn;

workgen_epoch(&_start);
timespec end = _start + options->run_time;
timespec next_report = _start + options->report_interval;
if ((ret = pthread_create(&idle_table_thandle, NULL, thread_idle_table_cycle_workload,
createDropTableCycle)) != 0) {
std::cerr << "pthread_create failed err=" << ret << std::endl;
std::cerr << "Stopping Create Drop table idle cycle threads." << std::endl;
(void)pthread_join(idle_table_thandle, &status);
delete createDropTableCycle;
createDropTableCycle = NULL;
stopping = true;
}
}

// Let the test run, reporting as needed.
Stats curstats(false);
timespec now = _start;
while (now < end) {
timespec sleep_amt;
timespec now;

sleep_amt = end - now;
if (next_report != 0) {
timespec next_diff = next_report - now;
if (next_diff < next_report)
sleep_amt = next_diff;
/* Don't run the test if any of the above pthread_create fails. */
if (!stopping && ret == 0)
{
// Treat warmup separately from report interval so that if we have a
// warmup period we clear and ignore stats after it ends.
if (options->warmup != 0)
sleep((unsigned int)options->warmup);

// Clear stats after any warmup period completes.
for (size_t i = 0; i < _trunners.size(); i++) {
ThreadRunner *runner = &_trunners[i];
runner->_stats.clear();
}
if (sleep_amt.tv_sec > 0)
sleep((unsigned int)sleep_amt.tv_sec);
else
usleep((useconds_t)((sleep_amt.tv_nsec + 999)/ 1000));

workgen_epoch(&now);
if (now >= next_report && now < end && options->report_interval != 0) {
report(options->report_interval, (now - _start).tv_sec, &curstats);
while (now >= next_report)
next_report += options->report_interval;
workgen_epoch(&_start);
timespec end = _start + options->run_time;
timespec next_report = _start + options->report_interval;

// Let the test run, reporting as needed.
Stats curstats(false);
now = _start;
while (now < end) {
timespec sleep_amt;

sleep_amt = end - now;
if (next_report != 0) {
timespec next_diff = next_report - now;
if (next_diff < next_report)
sleep_amt = next_diff;
}
if (sleep_amt.tv_sec > 0)
sleep((unsigned int)sleep_amt.tv_sec);
else
usleep((useconds_t)((sleep_amt.tv_nsec + 999)/ 1000));

workgen_epoch(&now);
if (now >= next_report && now < end && options->report_interval != 0) {
report(options->report_interval, (now - _start).tv_sec, &curstats);
while (now >= next_report)
next_report += options->report_interval;
}
}
}

// signal all threads to stop
// signal all threads to stop.
if (options->run_time != 0)
for (size_t i = 0; i < _trunners.size(); i++)
_trunners[i]._stop = true;
if (options->sample_interval_ms > 0)
monitor._stop = true;
if (options->oldest_timestamp_lag > 0 || options->stable_timestamp_lag > 0) {
stopping = true;
}

// Signal timestamp and idle table cycle thread to stop.
stopping = true;

// wait for all threads
exception = NULL;
Expand All @@ -2255,11 +2379,17 @@ int WorkloadRunner::run_all(WT_CONNECTION *conn) {
}

// Wait for the time increment thread
if (options->oldest_timestamp_lag > 0 || options->stable_timestamp_lag > 0) {
if (runnerConnection != NULL) {
WT_TRET(pthread_join(time_thandle, &status));
delete runnerConnection;
}

// Wait for the idle table cycle thread.
if (createDropTableCycle != NULL) {
WT_TRET(pthread_join(idle_table_thandle, &status));
delete createDropTableCycle;
}

workgen_epoch(&now);
if (options->sample_interval_ms > 0) {
WT_TRET(pthread_join(monitor._handle, &status));
Expand Down
2 changes: 2 additions & 0 deletions bench/workgen/workgen.h
Expand Up @@ -436,11 +436,13 @@ struct WorkloadOptions {
int run_time;
int sample_interval_ms;
int sample_rate;
int max_idle_table_cycle;
std::string sample_file;
int warmup;
double oldest_timestamp_lag;
double stable_timestamp_lag;
double timestamp_advance;
bool max_idle_table_cycle_fatal;

WorkloadOptions();
WorkloadOptions(const WorkloadOptions &other);
Expand Down
2 changes: 2 additions & 0 deletions bench/workgen/workgen_int.h
Expand Up @@ -278,6 +278,8 @@ struct WorkloadRunner {
~WorkloadRunner();
int run(WT_CONNECTION *conn);
int increment_timestamp(WT_CONNECTION *conn);
int start_table_idle_cycle(WT_CONNECTION *conn);
int check_timing(const char *name, uint64_t last_interval);

private:
int close_all();
Expand Down
14 changes: 13 additions & 1 deletion bench/workgen/wtperf.py
Expand Up @@ -85,7 +85,9 @@ def fatal_error(self, msg, errtype = 'configuration error'):
'readonly', 'reopen_connection', 'run_ops',
'sample_interval', 'sess_config', 'table_config',
'table_count', 'threads', 'transaction_config',
'value_sz' ]
'value_sz',
'max_idle_table_cycle',
'max_idle_table_cycle_fatal' ]

def set_opt(self, optname, val):
if optname not in self.supported_opt_list:
Expand Down Expand Up @@ -535,6 +537,8 @@ def translate_inner(self):
self.get_string_opt('transaction_config', '')
self.get_boolean_opt('compact', False)
self.get_int_opt('pareto', 0)
self.get_int_opt('max_idle_table_cycle', 0)
self.get_boolean_opt('max_idle_table_cycle_fatal', False)
opts = self.options
if opts.range_partition and opts.random_range == 0:
self.fatal_error('range_partition requires random_range to be set')
Expand All @@ -547,6 +551,14 @@ def translate_inner(self):
workloadopts += 'workload.options.sample_interval_ms = ' + \
str(self.options.sample_interval_ms) + '\n'

if self.options.max_idle_table_cycle > 0:
workloadopts += 'workload.options.max_idle_table_cycle = ' + \
str(self.options.max_idle_table_cycle) + '\n'

if self.options.max_idle_table_cycle_fatal:
workloadopts += 'workload.options.max_idle_table_cycle_fatal = ' + \
str(self.options.max_idle_table_cycle_fatal) + '\n'

s = '#/usr/bin/env python\n'
s += '# generated from ' + self.filename + '\n'
s += self.prefix
Expand Down

0 comments on commit 716858f

Please sign in to comment.