Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

CBQE-679: wait for clean up of ejected nodes in RebalanceTask

Now after ejecting nodes we are also waiting for / make sure they are clean

Change-Id: I799589420a8f13df005957e88da6950713a1f9ef
Reviewed-on: http://review.couchbase.org/21422
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Farshid Ghods <farshid@couchbase.com>
Tested-by: Farshid Ghods <farshid@couchbase.com>
  • Loading branch information...
commit 921fc4be298c65e416346c95e592f2cfb91a096e 1 parent acf7803
@andreibaranouski andreibaranouski authored Farshid Ghods committed
Showing with 22 additions and 8 deletions.
  1. +22 −8 lib/tasks/task.py
View
30 lib/tasks/task.py
@@ -191,6 +191,7 @@ def __init__(self, servers, to_add=[], to_remove=[], do_stop=False, progress=30)
self.to_add = to_add
self.to_remove = to_remove
self.start_time = None
+ self.rest = RestConnection(self.servers[0])
def execute(self, task_manager):
try:
@@ -204,15 +205,13 @@ def execute(self, task_manager):
def add_nodes(self, task_manager):
master = self.servers[0]
- rest = RestConnection(master)
for node in self.to_add:
self.log.info("adding node {0}:{1} to cluster".format(node.ip, node.port))
- rest.add_node(master.rest_username, master.rest_password,
+ self.rest.add_node(master.rest_username, master.rest_password,
node.ip, node.port)
def start_rebalance(self, task_manager):
- rest = RestConnection(self.servers[0])
- nodes = rest.node_statuses()
+ nodes = self.rest.node_statuses()
#Determine whether its a cluster_run/not
cluster_run = True
@@ -235,14 +234,13 @@ def start_rebalance(self, task_manager):
else:
if server.ip == node.ip and int(server.port) == int(node.port):
ejectedNodes.append(node.id)
- rest.rebalance(otpNodes=[node.id for node in nodes], ejectedNodes=ejectedNodes)
+ self.rest.rebalance(otpNodes=[node.id for node in nodes], ejectedNodes=ejectedNodes)
self.start_time = time.time()
def check(self, task_manager):
- rest = RestConnection(self.servers[0])
progress = -100
try:
- progress = rest._rebalance_progress()
+ progress = self.rest._rebalance_progress()
except RebalanceFailedException as ex:
self.state = FINISHED
self.set_exception(ex)
@@ -255,10 +253,26 @@ def check(self, task_manager):
if progress != -1 and progress != 100:
task_manager.schedule(self, 10)
else:
+ success_cleaned = []
+ for removed in self.to_remove:
+ rest = RestConnection(removed)
+ start = time.time()
+ while time.time() - start < 10:
+ if len(rest.get_pools_info()["pools"]) == 0:
+ success_cleaned.append(removed)
+ break
+ else:
+ time.sleep(0.1)
+ result = True
+ for node in set(self.to_remove) - set(success_cleaned):
+ self.log.error("node {0}:{1} was not cleaned after removing from cluster".format(
+ node.ip, node.port))
+ result = False
+
self.log.info("rebalancing was completed with progress: {0}% in {1} sec".
format(progress, time.time() - self.start_time))
self.state = FINISHED
- self.set_result(True)
+ self.set_result(result)
class StatsWaitTask(Task):
EQUAL = '=='
Please sign in to comment.
Something went wrong with that request. Please try again.