Skip to content

Commit

Permalink
MB-14412: Rename tap throttle to replication throttle.
Browse files Browse the repository at this point in the history
Since the throttle function is used by modules other than tap like
DCP as well, it is more appropriate to rename it to replication throttle
Change-Id: Iee4f07e2329e8f2f5da1d06971af60a5b01cde5d
Reviewed-on: http://review.couchbase.org/49482
Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
Tested-by: Manu Dhundi <manu@couchbase.com>
  • Loading branch information
manudhundi committed Apr 13, 2015
1 parent 6d90a37 commit b82bd60
Show file tree
Hide file tree
Showing 16 changed files with 236 additions and 236 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Expand Up @@ -136,7 +136,7 @@ ADD_LIBRARY(ep SHARED
src/sizes.cc
${CMAKE_CURRENT_BINARY_DIR}/src/stats-info.c
src/stored-value.cc src/tapconnection.cc src/connmap.cc
src/tapthrottle.cc src/tasks.cc
src/replicationthrottle.cc src/tasks.cc
src/taskqueue.cc src/vbucket.cc
src/vbucketmap.cc src/warmup.cc
${KVSTORE_SOURCE} ${COUCH_KVSTORE_SOURCE}
Expand Down
6 changes: 3 additions & 3 deletions configuration.json
Expand Up @@ -411,7 +411,7 @@
"default": "0.1",
"type": "float"
},
"tap_throttle_cap_pcnt": {
"replication_throttle_cap_pcnt": {
"default": "10",
"descr": "Percentage of total items in write queue at which we throttle tap input",
"type": "size_t",
Expand All @@ -422,7 +422,7 @@
}
}
},
"tap_throttle_queue_cap": {
"replication_throttle_queue_cap": {
"default": "-1",
"descr": "Max size of a write queue to throttle incoming tap input.",
"type": "ssize_t",
Expand All @@ -433,7 +433,7 @@
}
}
},
"tap_throttle_threshold": {
"replication_throttle_threshold": {
"default": "99",
"descr": "Percentage of max mem at which we begin NAKing tap input.",
"type": "size_t",
Expand Down
200 changes: 100 additions & 100 deletions docs/engine-params.org

Large diffs are not rendered by default.

108 changes: 54 additions & 54 deletions docs/stats.org
Expand Up @@ -292,11 +292,11 @@ For introductory information on stats within membase, start with the
| | sent on an idle connection |
| ep_tap_requeue_sleep_time | The amount of time to wait before a |
| | failed tap item is requeued |
| ep_tap_throttle_cap_pcnt | Percentage of total items in write |
| ep_replication_throttle_cap_pcnt | Percentage of total items in write |
| | queue at which we throttle tap input |
| ep_tap_throttle_queue_cap | Max size of a write queue to throttle |
| ep_replication_throttle_queue_cap | Max size of a write queue to throttle |
| | incoming tap input |
| ep_tap_throttle_threshold | Percentage of max mem at which we |
| ep_replication_throttle_threshold | Percentage of max mem at which we |
| | begin NAKing tap input |
| ep_uncommitted_items | The amount of items that have not been |
| | written to disk |
Expand Down Expand Up @@ -473,56 +473,56 @@ The stats below are listed for each vbucket.

** Tap stats

| ep_tap_ack_grace_period | The amount of time to wait for a tap acks |
| | before disconnecting |
| ep_tap_ack_interval | The amount of messages a tap producer |
| | should send before requesting an ack |
| ep_tap_ack_window_size | The maximum amount of ack requests that |
| | can be sent before the consumer sends a |
| | response ack. When the window is full the |
| | tap stream is paused |
| ep_tap_queue_backfillremaining | Number of items needing to be backfilled |
| ep_tap_total_backlog_size | Number of remaining items for replication |
| ep_tap_total_queue | Sum of tap queue sizes on the current |
| | tap queues |
| ep_tap_total_fetched | Sum of all tap messages sent |
| ep_tap_bg_max_pending | The maximum number of bg jobs a tap |
| | connection may have |
| ep_tap_bg_fetched | Number of tap disk fetches |
| ep_tap_bg_fetch_requeued | Number of times a tap bg fetch task is |
| | requeued |
| ep_tap_fg_fetched | Number of tap memory fetches |
| ep_tap_deletes | Number of tap deletion messages sent |
| ep_tap_throttled | Number of tap messages refused due to |
| | throttling |
| ep_tap_count | Number of tap connections |
| ep_tap_bg_num_samples | The number of tap bg fetch samples |
| | included in the avg |
| ep_tap_bg_min_wait | The shortest time (µs) for a tap item |
| | before it is serviced by the dispatcher |
| ep_tap_bg_max_wait | The longest time (µs) for a tap item |
| | before it is serviced by the dispatcher |
| ep_tap_bg_wait_avg | The average wait time (µs) for a tap item |
| | before it is serviced by the dispatcher |
| ep_tap_bg_min_load | The shortest time (µs) for a tap item to |
| | be loaded from the persistence layer |
| ep_tap_bg_max_load | The longest time (µs) for a tap item to |
| | be loaded from the persistence layer |
| ep_tap_bg_load_avg | The average time (µs) for a tap item to |
| | be loaded from the persistence layer |
| ep_tap_noop_interval | The number of secs between a noop is |
| | added to an idle connection |
| ep_tap_backoff_period | The number of seconds the tap connection |
| | should back off after receiving ETMPFAIL |
| ep_tap_queue_fill | Total enqueued items |
| ep_tap_queue_drain | Total drained items |
| ep_tap_queue_backoff | Total back-off items |
| ep_tap_queue_backfill | Number of backfill remaining |
| ep_tap_queue_itemondisk | Number of items remaining on disk |
| ep_tap_throttle_threshold | Percentage of memory in use before we |
| | throttle tap streams |
| ep_tap_throttle_queue_cap | Disk write queue cap to throttle |
| | tap streams |
| ep_tap_ack_grace_period | The amount of time to wait for a tap acks |
| | before disconnecting |
| ep_tap_ack_interval | The amount of messages a tap producer |
| | should send before requesting an ack |
| ep_tap_ack_window_size | The maximum amount of ack requests that |
| | can be sent before the consumer sends a |
| | response ack. When the window is full the |
| | tap stream is paused |
| ep_tap_queue_backfillremaining | Number of items needing to be backfilled |
| ep_tap_total_backlog_size | Number of remaining items for replication |
| ep_tap_total_queue | Sum of tap queue sizes on the current |
| | tap queues |
| ep_tap_total_fetched | Sum of all tap messages sent |
| ep_tap_bg_max_pending | The maximum number of bg jobs a tap |
| | connection may have |
| ep_tap_bg_fetched | Number of tap disk fetches |
| ep_tap_bg_fetch_requeued | Number of times a tap bg fetch task is |
| | requeued |
| ep_tap_fg_fetched | Number of tap memory fetches |
| ep_tap_deletes | Number of tap deletion messages sent |
| ep_replication_throttled | Number of tap messages refused due to |
| | throttling |
| ep_tap_count | Number of tap connections |
| ep_tap_bg_num_samples | The number of tap bg fetch samples |
| | included in the avg |
| ep_tap_bg_min_wait | The shortest time (µs) for a tap item |
| | before it is serviced by the dispatcher |
| ep_tap_bg_max_wait | The longest time (µs) for a tap item |
| | before it is serviced by the dispatcher |
| ep_tap_bg_wait_avg | The average wait time (µs) for a tap item |
| | before it is serviced by the dispatcher |
| ep_tap_bg_min_load | The shortest time (µs) for a tap item to |
| | be loaded from the persistence layer |
| ep_tap_bg_max_load | The longest time (µs) for a tap item to |
| | be loaded from the persistence layer |
| ep_tap_bg_load_avg | The average time (µs) for a tap item to |
| | be loaded from the persistence layer |
| ep_tap_noop_interval | The number of secs between a noop is |
| | added to an idle connection |
| ep_tap_backoff_period | The number of seconds the tap connection |
| | should back off after receiving ETMPFAIL |
| ep_tap_queue_fill | Total enqueued items |
| ep_tap_queue_drain | Total drained items |
| ep_tap_queue_backoff | Total back-off items |
| ep_tap_queue_backfill | Number of backfill remaining |
| ep_tap_queue_itemondisk | Number of items remaining on disk |
| ep_replication_throttle_threshold| Percentage of memory in use before we |
| | throttle tap streams |
| ep_replication_throttle_queue_cap| Disk write queue cap to throttle |
| | tap streams |


*** Per Tap Client Stats
Expand Down Expand Up @@ -1186,7 +1186,7 @@ Reset Stats:
| ep_tap_bg_min_load |
| ep_tap_bg_min_wait |
| ep_tap_bg_wait_avg |
| ep_tap_throttled |
| ep_replication_throttled |
| ep_tap_total_fetched |
| ep_vbucket_del_max_walltime |
| pending_ops |
Expand Down
10 changes: 5 additions & 5 deletions management/cbepctl
Expand Up @@ -52,7 +52,7 @@ def set_param(mc, type, key, val):
else:
print 'Error: Bad parameter %s' % type

if key == 'tap_throttle_queue_cap' and val == 'infinite':
if key == 'replication_throttle_queue_cap' and val == 'infinite':
val = '-1'

if key == "mem_high_wat" or key == "mem_low_wat":
Expand Down Expand Up @@ -215,11 +215,11 @@ Available params for "set":
Available params for "set tap_param":
tap_keepalive - Seconds to hold a named tap connection.
tap_throttle_queue_cap - Max disk write queue size to throttle tap
replication_throttle_queue_cap - Max disk write queue size to throttle replication
streams ('infinite' means no cap).
tap_throttle_cap_pcnt - Percentage of total items in write queue at
which we throttle tap input
tap_throttle_threshold - Percentage of memory in use to throttle tap
replication_throttle_cap_pcnt - Percentage of total items in write queue at
which we throttle replication input
replication_throttle_threshold - Percentage of memory in use to throttle replication
streams.
""")

Expand Down
4 changes: 2 additions & 2 deletions src/dcp/consumer.cc
Expand Up @@ -20,7 +20,7 @@
#include "ep_engine.h"
#include "failover-table.h"
#include "connmap.h"
#include "tapthrottle.h"
#include "replicationthrottle.h"
#include "dcp/consumer.h"
#include "dcp/response.h"
#include "dcp/stream.h"
Expand Down Expand Up @@ -624,7 +624,7 @@ process_items_error_t DcpConsumer::processBufferedItems() {
uint32_t bytes_processed;

do {
if (!engine_.getTapThrottle().shouldProcess()) {
if (!engine_.getReplicationThrottle().shouldProcess()) {
backoffs++;
return cannot_process;
}
Expand Down
4 changes: 2 additions & 2 deletions src/dcp/stream.cc
Expand Up @@ -27,7 +27,7 @@
#include "dcp/producer.h"
#include "dcp/response.h"
#include "dcp/stream.h"
#include "tapthrottle.h"
#include "replicationthrottle.h"

static const char* snapshotTypeToString(snapshot_type_t type) {
static const char * const snapshotTypes[] = { "none", "disk", "memory" };
Expand Down Expand Up @@ -909,7 +909,7 @@ ENGINE_ERROR_CODE PassiveStream::messageReceived(DcpResponse* resp) {
last_seqno = bySeqno;
}

if (engine->getTapThrottle().shouldProcess() && !buffer.items) {
if (engine->getReplicationThrottle().shouldProcess() && !buffer.items) {
/* Process the response here itself rather than buffering it */
ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
switch (resp->getEvent()) {
Expand Down
26 changes: 13 additions & 13 deletions src/ep.cc
Expand Up @@ -44,7 +44,7 @@
#include "mutation_log.h"
#include "warmup.h"
#include "connmap.h"
#include "tapthrottle.h"
#include "replicationthrottle.h"

class StatsValueChangeListener : public ValueChangedListener {
public:
Expand All @@ -68,8 +68,8 @@ class StatsValueChangeListener : public ValueChangedListener {
stats.mem_low_wat.store(value);
} else if (key.compare("mem_high_wat") == 0) {
stats.mem_high_wat.store(value);
} else if (key.compare("tap_throttle_threshold") == 0) {
stats.tapThrottleThreshold.store(
} else if (key.compare("replication_throttle_threshold") == 0) {
stats.replicationThrottleThreshold.store(
static_cast<double>(value) / 100.0);
} else if (key.compare("warmup_min_memory_threshold") == 0) {
stats.warmupMemUsedCap.store(static_cast<double>(value) / 100.0);
Expand Down Expand Up @@ -116,10 +116,10 @@ class EPStoreValueChangeListener : public ValueChangedListener {
store.setBackfillMemoryThreshold(backfill_threshold);
} else if (key.compare("compaction_exp_mem_threshold") == 0) {
store.setCompactionExpMemThreshold(value);
} else if (key.compare("tap_throttle_queue_cap") == 0) {
store.getEPEngine().getTapThrottle().setQueueCap(value);
} else if (key.compare("tap_throttle_cap_pcnt") == 0) {
store.getEPEngine().getTapThrottle().setCapPercent(value);
} else if (key.compare("replication_throttle_queue_cap") == 0) {
store.getEPEngine().getReplicationThrottle().setQueueCap(value);
} else if (key.compare("replication_throttle_cap_pcnt") == 0) {
store.getEPEngine().getReplicationThrottle().setCapPercent(value);
} else {
LOG(EXTENSION_LOG_WARNING,
"Failed to change value for unknown variable, %s\n",
Expand Down Expand Up @@ -311,16 +311,16 @@ EventuallyPersistentStore::EventuallyPersistentStore(
config.addValueChangedListener("mem_high_wat",
new StatsValueChangeListener(stats, *this));

stats.tapThrottleThreshold.store(static_cast<double>
(config.getTapThrottleThreshold())
stats.replicationThrottleThreshold.store(static_cast<double>
(config.getReplicationThrottleThreshold())
/ 100.0);
config.addValueChangedListener("tap_throttle_threshold",
config.addValueChangedListener("replication_throttle_threshold",
new StatsValueChangeListener(stats, *this));

stats.tapThrottleWriteQueueCap.store(config.getTapThrottleQueueCap());
config.addValueChangedListener("tap_throttle_queue_cap",
stats.replicationThrottleWriteQueueCap.store(config.getReplicationThrottleQueueCap());
config.addValueChangedListener("replication_throttle_queue_cap",
new EPStoreValueChangeListener(*this));
config.addValueChangedListener("tap_throttle_cap_pcnt",
config.addValueChangedListener("replication_throttle_cap_pcnt",
new EPStoreValueChangeListener(*this));

setBGFetchDelay(config.getBgFetchDelay());
Expand Down
6 changes: 3 additions & 3 deletions src/ep.h
Expand Up @@ -815,14 +815,14 @@ class EventuallyPersistentStore {
bool compactionCanExpireItems() {
// Process expired items only if memory usage is lesser than
// compaction_exp_mem_threshold and disk queue is small
// enough (marked by tap_throttle_queue_cap)
// enough (marked by replication_throttle_queue_cap)

bool isMemoryUsageOk = (stats.getTotalMemoryUsed() <
(stats.getMaxDataSize() * compactionExpMemThreshold));

size_t queueSize = stats.diskQueueSize.load();
bool isQueueSizeOk = ((stats.tapThrottleWriteQueueCap == -1) ||
(queueSize < static_cast<size_t>(stats.tapThrottleWriteQueueCap)));
bool isQueueSizeOk = ((stats.replicationThrottleWriteQueueCap == -1) ||
(queueSize < static_cast<size_t>(stats.replicationThrottleWriteQueueCap)));

return (isMemoryUsageOk && isQueueSizeOk);
}
Expand Down

0 comments on commit b82bd60

Please sign in to comment.