diff --git a/locust/runners.py b/locust/runners.py index efd63e1f00..228fddcd43 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -333,6 +333,7 @@ def start_shape(self) -> None: def shape_worker(self) -> None: logger.info("Shape worker starting") while self.state == STATE_INIT or self.state == STATE_SPAWNING or self.state == STATE_RUNNING: + shape_adjustment_start = time.time() current_tick = self.environment.shape_class.tick() if self.environment.shape_class is not None else None if current_tick is None: logger.info("Shape test stopping") @@ -343,9 +344,7 @@ def shape_worker(self) -> None: self.shape_greenlet = None self.shape_last_tick = None return - elif self.shape_last_tick == current_tick: - gevent.sleep(1) - else: + elif self.shape_last_tick != current_tick: if len(current_tick) == 2: user_count, spawn_rate = current_tick # type: ignore user_classes = None @@ -367,6 +366,8 @@ def shape_worker(self) -> None: # of each load test shape stage. self.start(user_count=user_count, spawn_rate=spawn_rate, user_classes=user_classes) self.shape_last_tick = current_tick + shape_adjustment_time_ms = time.time() - shape_adjustment_start + gevent.sleep(max(1 - shape_adjustment_time_ms, 0)) def stop(self) -> None: """ diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index f687f214ec..7f75ad5f36 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -2707,6 +2707,67 @@ def my_task(self): for i in range(USERS_COUNT): self.assertEqual(indexes[i], i, "Worker index mismatch") + def test_custom_shape_scale_interval(self): + class MyUser(User): + @task + def my_task(self): + pass + + class TestShape(LoadTestShape): + def __init__(self): + super().__init__() + self._users_num = [1, 1, 1, 2, 2, 3, 3, 3, 4] + self._index = 0 + + def tick(self): + if self._index >= len(self._users_num): + return None + users_num = self._users_num[self._index] + self._index += 1 + return users_num, users_num + + self.environment.shape_class = TestShape() + + with mock.patch("locust.rpc.rpc.Server", mocked_rpc()) as server: + master = self.get_runner(user_classes=[MyUser]) + for i in range(5): + server.mocked_send(Message("client_ready", __version__, "fake_client%i" % i)) + + # Start the shape_worker + self.environment.shape_class.reset_time() + master.start_shape() + + # Wait for shape_worker to update user_count + sleep(0.5) + num_users = sum( + sum(msg.data["user_classes_count"].values()) for _, msg in server.outbox if msg.type != "ack" + ) + self.assertEqual( + 1, num_users, "Total number of users in first stage of shape test is not 1: %i" % num_users + ) + + # Wait for shape_worker to update user_count again + sleep(1.5) + num_users = sum( + sum(msg.data["user_classes_count"].values()) for _, msg in server.outbox if msg.type != "ack" + ) + self.assertEqual( + 1, num_users, "Total number of users in second stage of shape test is not 1: %i" % num_users + ) + + # Wait for shape_worker to update user_count few times but not reach the end yet + sleep(2.5) + num_users = sum( + sum(msg.data["user_classes_count"].values()) for _, msg in server.outbox if msg.type != "ack" + ) + self.assertEqual( + 3, num_users, "Total number of users in second stage of shape test is not 3: %i" % num_users + ) + + # Wait to ensure shape_worker has stopped the test + sleep(3) + self.assertEqual("stopped", master.state, "The test has not been stopped by the shape class") + def test_custom_shape_scale_up(self): class MyUser(User): @task