Permalink
Browse files

Always send TAP_VBUCKET_SET commands for takeover reconnections.

If cluster rebalance stopped due to internal errors or user
actions, the ns-server sets a pending vbucket in the destination
node to a dead vbucket. The ns-server janitor periodically (every
10 sec) collects the list of dead vbuckets from the cluster and
then sends vbucket deletion commands to the nodes having dead
vbuckets. Consequently, if cluster rebalance resumes, the
destination node might receive items from the source node, which
belong to the dead vbucket that has been already deleted. This
causes the source node to close the TAP connection constantly.

To prevent this racing issue, when the source node receives a
vbucket takeover reconnection from eBucketMigrator, it should
always send TAP_VBUCKET_SET "pending" command to the destination
node so that it can instantiate the pending vbucket if not exist.

Change-Id: If04e047b82bb65745ce2973c8f1caa6efa85d61f
Reviewed-on: http://review.couchbase.org/9364
Tested-by: Chiyoung Seo <chiyoung.seo@gmail.com>
Reviewed-by: Chiyoung Seo <chiyoung.seo@gmail.com>
  • Loading branch information...
1 parent c006f65 commit 4af628082a4407c37b33dd16196c5a47a51332aa @chiyoung chiyoung committed Sep 2, 2011
Showing with 42 additions and 7 deletions.
  1. +42 −7 tapconnection.cc
View
@@ -192,15 +192,50 @@ void TapProducer::setVBucketFilter(const std::vector<uint16_t> &vbuckets)
// Note that we do re-evaluete all entries when we suck them out of the
// queue to send them..
if (flags & TAP_CONNECT_FLAG_TAKEOVER_VBUCKETS) {
- const std::vector<uint16_t> &vec = diff.getVector();
+ std::list<TapVBucketEvent> nonVBucketOpaqueMessages;
+ std::list<TapVBucketEvent> vBucketOpaqueMessages;
+ // Clear vbucket state change messages with a higher priority.
+ while (!vBucketHighPriority.empty()) {
+ TapVBucketEvent msg = vBucketHighPriority.front();
+ vBucketHighPriority.pop();
+ if (msg.event == TAP_OPAQUE) {
+ uint32_t opaqueCode = (uint32_t) msg.state;
+ if (opaqueCode == htonl(TAP_OPAQUE_ENABLE_AUTO_NACK) ||
+ opaqueCode == htonl(TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC)) {
+ nonVBucketOpaqueMessages.push_back(msg);
+ } else {
+ vBucketOpaqueMessages.push_back(msg);
+ }
+ }
+ }
+
+ // Add non-vbucket opaque messages back to the high priority queue.
+ std::list<TapVBucketEvent>::iterator iter = nonVBucketOpaqueMessages.begin();
+ while (iter != nonVBucketOpaqueMessages.end()) {
+ addVBucketHighPriority_UNLOCKED(*iter);
+ ++iter;
+ }
+
+ // Clear vbucket state changes messages with a lower priority.
+ while (!vBucketLowPriority.empty()) {
+ vBucketLowPriority.pop();
+ }
+
+ // Add new vbucket state change messages with a higher or lower priority.
+ const std::vector<uint16_t> &vec = vbucketFilter.getVector();
for (std::vector<uint16_t>::const_iterator it = vec.begin();
it != vec.end(); ++it) {
- if (vbucketFilter(*it)) {
- TapVBucketEvent hi(TAP_VBUCKET_SET, *it, vbucket_state_pending);
- TapVBucketEvent lo(TAP_VBUCKET_SET, *it, vbucket_state_active);
- addVBucketHighPriority_UNLOCKED(hi);
- addVBucketLowPriority_UNLOCKED(lo);
- }
+ TapVBucketEvent hi(TAP_VBUCKET_SET, *it, vbucket_state_pending);
+ TapVBucketEvent lo(TAP_VBUCKET_SET, *it, vbucket_state_active);
+ addVBucketHighPriority_UNLOCKED(hi);
+ addVBucketLowPriority_UNLOCKED(lo);
+ }
+
+ // Add vbucket opaque messages back to the high priority queue.
+ iter = vBucketOpaqueMessages.begin();
+ while (iter != vBucketOpaqueMessages.end()) {
+ addVBucketHighPriority_UNLOCKED(*iter);
+ ++iter;
}
doTakeOver = true;
}

0 comments on commit 4af6280

Please sign in to comment.