Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pp: broker hang with gate_closed_exception on fail to remove consumer #9310

Closed
NyaliaLui opened this issue Mar 7, 2023 · 8 comments · Fixed by #9105
Closed

pp: broker hang with gate_closed_exception on fail to remove consumer #9310

NyaliaLui opened this issue Mar 7, 2023 · 8 comments · Fixed by #9105
Labels
area/pandaproxy REST interface for Kafka API kind/bug Something isn't working sev/high loss of availability, pathological performance degradation, recoverable corruption

Comments

@NyaliaLui
Copy link
Contributor

DEBUG 2023-03-07 20:26:44,376 [shard 0] kafka/client - client.cc:95 - exception during stop: seastar::gate_closed_exception (gate closed)
INFO  2023-03-07 20:26:44,376 [shard 0] kafka/client - consumer.cc:129 - Consumer: type=leader, member_id={test_client-2d314d57-7944-4cbc-a3fb-1f9c8ab24c65}, name={}: stop

RP will hang/not-shutdown with many "gate closed exceptions" when someone fails to remove a consumer. I'll create a ticket for this and do some testing and more investigation into solutions. But I will also check with @BenPope about this next week because I may be doing something wrong as well.

Originally posted by @NyaliaLui in #9105 (comment)

@NyaliaLui NyaliaLui added kind/bug Something isn't working area/pandaproxy REST interface for Kafka API labels Mar 7, 2023
@dotnwat
Copy link
Member

dotnwat commented Mar 7, 2023

@NyaliaLui how can this be reproduced?

@NyaliaLui
Copy link
Contributor Author

To reproduce this, add the following python code to the ducktape tests in pandaproxy_test.py. You may also find this repro in my branch at NyaliaLui@990b465

class SampleTest(PandaProxyTestMethods):
    def __init__(self, context):
        super(SampleTest, self).__init__(context)

    @cluster(num_nodes=3)
    def test_restart_http_proxy(self):
        self.topics = [TopicSpec(partition_count=3)]
        self._create_initial_topics()
        topic_name = self.topic

        data = '''
        {
            "records": [
                {"value": "dmVjdG9yaXplZA==", "partition": 0},
                {"value": "cGFuZGFwcm94eQ==", "partition": 1},
                {"value": "bXVsdGlicm9rZXI=", "partition": 2}
            ]
        }'''

        self.logger.info(f"Producing to topic: {topic_name}")
        p_res = self._produce_topic(topic_name, data)
        assert p_res.status_code == requests.codes.ok

        self.logger.info("Check consumer offsets")
        group_id = f"pandaproxy-group-{uuid.uuid4()}"
        self.logger.debug(f"Create a consumer and subscribe to topic: {topic_name}")
        cc_res = self._create_consumer(group_id)
        assert cc_res.status_code == requests.codes.ok
        c0 = Consumer(cc_res.json(), self.logger)
        sc_res = c0.subscribe([topic_name])
        assert sc_res.status_code == requests.codes.no_content

        parts = [0, 1, 2]
        sco_req = dict(partitions=[
            dict(topic=topic_name, partition=p, offset=0)
            for p in parts
        ])
        co_res_raw = c0.set_offsets(data=json.dumps(sco_req))
        assert co_res_raw.status_code == requests.codes.no_content

        self.logger.debug(f"Check consumer offsets")
        co_req = dict(partitions=[
            dict(topic=topic_name, partition=p) for p in parts
        ])
        offset_result_raw = c0.get_offsets(data=json.dumps(co_req))
        assert offset_result_raw.status_code == requests.codes.ok
        res = offset_result_raw.json()
        # Should be one offset for each partition
        assert len(res["offsets"]) == len(parts)
        for r in res["offsets"]:
            assert r["topic"] == topic_name
            assert r["partition"] in parts
            assert r["offset"] == 0
            assert r["metadata"] == ""

        self.logger.debug("Restart the http proxy")
        admin = Admin(self.redpanda)
        result_raw = admin.redpanda_services_restart(rp_service='http-proxy')
        check_service_restart(self.redpanda, "Restarting the http proxy")
        self.logger.debug(result_raw)
        assert result_raw.status_code == requests.codes.ok

        print('check offset begin')
        self.logger.debug("Check consumer offsets after restart")
        offset_result_raw = c0.get_offsets(data=json.dumps(co_req))
        assert offset_result_raw.status_code == requests.codes.ok
        res = offset_result_raw.json()
        # Should be one offset for each partition
        assert len(res["offsets"]) == len(parts)
        for r in res["offsets"]:
            assert r["topic"] == topic_name
            assert r["partition"] in parts
            assert r["offset"] == 0
            assert r["metadata"] == ""
        print('check offset end')

CC: @dotnwat

@NyaliaLui
Copy link
Contributor Author

NyaliaLui commented Mar 8, 2023

After spending some hours investigate this, I determined that the broker hang happens after restarting the http proxy and we issue a REST request to get consumer offsets.

        self.logger.debug("Restart the http proxy")
        admin = Admin(self.redpanda)
        result_raw = admin.redpanda_services_restart(rp_service='http-proxy')
        check_service_restart(self.redpanda, "Restarting the http proxy")
        self.logger.debug(result_raw)
        assert result_raw.status_code == requests.codes.ok

        print('check offset begin')
        self.logger.debug("Check consumer offsets after restart")
        offset_result_raw = c0.get_offsets(data=json.dumps(co_req)) # <-- problem happens here
        assert offset_result_raw.status_code == requests.codes.ok
        res = offset_result_raw.json()
        # Should be one offset for each partition
        assert len(res["offsets"]) == len(parts)
        for r in res["offsets"]:
            assert r["topic"] == topic_name
            assert r["partition"] in parts
            assert r["offset"] == 0
            assert r["metadata"] == ""
        print('check offset end')

Furthermore, I tried running this test manually in my local machine. The results were that the brokers did shutdown gracefully (I used SIGTERM just like we do in RedpandaService.stop_node), the brokers did not hang, and no gate closed exceptions were found.

Therefore, I suspect that this issue has more to do with my ducktape test in #9105 and this may not be a real bug. I'll mark this issue as sev/low for now until I figure out why the cluster is having issues in ducktape.

@NyaliaLui NyaliaLui added the sev/low Bugs which are non-functional paper cuts, e.g. typos, issues in log messages label Mar 8, 2023
@NyaliaLui
Copy link
Contributor Author

So, it turns out the root cause and solution are more simple than previously thought.

When a user creates a consumer with the REST request POST /consumers/<group name>, the request is forward to the internal kafka client where it will create a consumer and inserts it into a map. From client.cc::create_consumer():

      .then([this, group_id, name](shared_broker_t coordinator) mutable {
          // Called when the consumer is stopped
          auto on_stopped = [this, group_id](const member_id& name) {
              _consumers[group_id].erase(name);
          };

         // Creates a consumer as a lw_shared_ptr, alias is shared_consumer_t
          return make_consumer(
            _config,
            _topic_cache,
            _brokers,
            std::move(coordinator),
            std::move(group_id),
            std::move(name),
            std::move(on_stopped));
      })
      .then([this, group_id](shared_consumer_t c) {
          auto name = c->name();
          // Add the consumer to the _consumers map
          _consumers[group_id].insert(std::move(c));
          return name;
      });

When a user restarts the PP, stop() is called on the consumers which are setup to invoke a callback method, on_stopped. We see that callback defined at consumer construction and it hashes based on the consumer's name:

          // Called when the consumer is stopped
          auto on_stopped = [this, group_id](const member_id& name) {
              _consumers[group_id].erase(name); // <-- find's the consumer to erase by name
          };

Now let's look at the definition of consumer::stop():

ss::future<> consumer::stop() {
    // .. timers and stuff are canceled ..

    _on_stopped(_name); // <-- Problem is here
    if (_as.abort_requested()) {
        return ss::now();
    }
    _as.request_abort();
    return _coordinator->stop()
      .then([this]() { return _gate.close(); }) // <-- gate is closed
      .finally([me{shared_from_this()}] {});
}

We see that _on_stopped is called with the consumers _name; however, in the case that this is an un-named consumer, it's _name variable is simply an empty string. Furthermore, we see that the consumer::name() method is defined to use the _member_id when _name is empty:

    const kafka::member_id& name() const {
        // kafka::no_member is an alias for empty string
        return _name != kafka::no_member ? _name : _member_id;
    }

This is interesting, when we insert a consumer into the _consumers map, the map is defined to hash based on the result of consumer::name() method.
From consumer.h:

struct consumer_hash {
   // Hash based on consumer.name or the member_id
    using is_transparent = void;
    size_t operator()(const member_id& id) const {
        return absl::Hash<member_id>{}(id);
    }
    size_t operator()(const consumer& c) const { return (*this)(c.name()); }
    size_t operator()(const shared_consumer_t& c) const {
        return (*this)(c->name()); // <-- this is called when the consumer is inserted into the map
    }
};

So, we inserted the consumer based on a hash of consumer::name(); but, we erase the consumer based on a hash of the _name variable. Therefore, the solution is to change consumer::stop() to use the result of consumer::name() instead of _name:

ss::future<> consumer::stop() {
    // .. timers and stuff are canceled ..

    _on_stopped(name()); // <-- Solution here
    if (_as.abort_requested()) {
        return ss::now();
    }
    _as.request_abort();
    return _coordinator->stop()
      .then([this]() { return _gate.close(); })
      .finally([me{shared_from_this()}] {});
}

@NyaliaLui
Copy link
Contributor Author

NyaliaLui commented Mar 13, 2023

Therefore, the solution is to change consumer::stop() to use the result of consumer::name() instead of _name:

ss::future<> consumer::stop() {
    // .. timers and stuff are canceled ..

    _on_stopped(name()); // <-- Solution here
    if (_as.abort_requested()) {
        return ss::now();
    }
    _as.request_abort();
    return _coordinator->stop()
      .then([this]() { return _gate.close(); })
      .finally([me{shared_from_this()}] {});
}

The reason why this manifested as a gate_closed_exception is because:

  1. When a client is stopped, it will tell all it's consumers to leave the consumer groups:
ss::future<> client::stop() noexcept {
    co_await _gate.close();
    co_await catch_and_log([this]() { return _producer.stop(); });
    for (auto& [id, group] : _consumers) {
        while (!group.empty()) {
            auto c = *group.begin();
            co_await catch_and_log([c]() {
                // The consumer is constructed with an on_stopped which erases
                // istelf from the map after leave() completes.
                return c->leave(); // <-- issues leave_group_req and eventually calls consumer::stop()
            });
        }
    }
    co_await catch_and_log([this]() { return _brokers.stop(); });
}
  1. consumer::leave() will first issue a leave_group_request before calling consumer::stop()
ss::future<leave_group_response> consumer::leave() {
    auto req_builder = [this] {
        return leave_group_request{
          .data{.group_id = _group_id, .member_id = _member_id}};
    };
    // Leave the group before calling stop()
    return req_res(std::move(req_builder)).finally([me{shared_from_this()}]() {
        return me->stop();
    });
}
  1. The consumer will enter it's gate to handle the leave group request:
    template<typename RequestFactory>
    ss::future<
      typename std::invoke_result_t<RequestFactory>::api_type::response_type>
    req_res(RequestFactory req) {
        using api_t = typename std::invoke_result_t<RequestFactory>::api_type;
        using response_t = typename api_t::response_type;
        return ss::try_with_gate(_gate, [this, req{std::move(req)}]() { // <-- attempt to enter gate
            auto r = req();
            kclog.debug("Consumer: {}: {} req: {}", *this, api_t::name, r);
            return _coordinator->dispatch(std::move(r))
              .then([this](response_t res) {
                  kclog.debug(
                    "Consumer: {}: {} res: {}", *this, api_t::name, res);
                  return res;
              });
        });
    }
  1. consumer::stop() will invoke the on_stopped callback with the consumer's name and then close the gate
ss::future<> consumer::stop() {
    // .. timers and stuff are canceled ..

    _on_stopped(_name); // <-- Empty string passed in
    if (_as.abort_requested()) {
        return ss::now();
    }
    _as.request_abort();
    return _coordinator->stop()
      .then([this]() { return _gate.close(); }) // <-- gate closed here 
      .finally([me{shared_from_this()}] {});
}
  1. on_stopped silently fails to remove the consumer from the map:
          auto on_stopped = [this, group_id](const member_id& name) {
              _consumers[group_id].erase(name); // <-- silently fails because name is empty
          };

Since on_stopped failed to remove the consumer in step (5) the client will think that the consumer still exists (because group is not empty in step (1)) and re-execute this process starting from step (2). The consumer will attempt to handle the leave group request in step (3) but the gate was closed already, so seastar returns gate_closed_exception.

@NyaliaLui
Copy link
Contributor Author

NyaliaLui commented Mar 13, 2023

Because this is a real bug that resulted in a crash, I'll change the severity tag to sev/high.
I have a solution in d6756f3

@NyaliaLui NyaliaLui added sev/high loss of availability, pathological performance degradation, recoverable corruption and removed sev/low Bugs which are non-functional paper cuts, e.g. typos, issues in log messages labels Mar 13, 2023
NyaliaLui added a commit to NyaliaLui/redpanda that referenced this issue Mar 13, 2023
Previously, the value of `_name` was passed into on_stopped. This is a
problem in the case of an unnamed consumer because `_name` will be an
empty string. Instead, use the result of `consumer::name()`.

This fixes an issue where a kafka client fails to remove a consumer from
it's internal map of consumers which can lead to an infinite loop of
gate_closed_exceptions.

Fixes redpanda-data#9310
@dotnwat
Copy link
Member

dotnwat commented Mar 13, 2023

awesome analysis @NyaliaLui

NyaliaLui added a commit to NyaliaLui/redpanda that referenced this issue Mar 14, 2023
Previously, the value of `_name` was passed into `on_stopped`. This is a
problem in the case of an unnamed consumer because `_name` will be an
empty string. Instead, use the result of `consumer::name()`.

Here is the definition of `on_stopped`:
```cpp
auto on_stopped = [this, group_id](const member_id& name) {
    _consumers[group_id].erase(name); // silenty fails if name is empty
};
```
The result of passing an empty string to `on_stopped` is that the
consumer is not erased from the map. This is an issue because the client
expects the map to eventually become empty.

From `client::stop()`:
```cpp
for (auto& [id, group] : _consumers) {
    while (!group.empty()) { // group is never empty, so infinite loop
        auto c = *group.begin();
        co_await catch_and_log([c]() {
            return c->leave();
        });
    }
}
```

Instead, the while loop becaomes an inifite loop since the consumer is
never removed.

Fixes redpanda-data#9310
BenPope pushed a commit to BenPope/redpanda that referenced this issue Mar 15, 2023
Previously, the value of `_name` was passed into `on_stopped`. This is a
problem in the case of an unnamed consumer because `_name` will be an
empty string. Instead, use the result of `consumer::name()`.

Here is the definition of `on_stopped`:
```cpp
auto on_stopped = [this, group_id](const member_id& name) {
    _consumers[group_id].erase(name); // silenty fails if name is empty
};
```
The result of passing an empty string to `on_stopped` is that the
consumer is not erased from the map. This is an issue because the client
expects the map to eventually become empty.

From `client::stop()`:
```cpp
for (auto& [id, group] : _consumers) {
    while (!group.empty()) { // group is never empty, so infinite loop
        auto c = *group.begin();
        co_await catch_and_log([c]() {
            return c->leave();
        });
    }
}
```

Instead, the while loop becaomes an inifite loop since the consumer is
never removed.

Fixes redpanda-data#9310

(cherry picked from commit 791cf6e)
BenPope pushed a commit to BenPope/redpanda that referenced this issue Mar 15, 2023
Previously, the value of `_name` was passed into `on_stopped`. This is a
problem in the case of an unnamed consumer because `_name` will be an
empty string. Instead, use the result of `consumer::name()`.

Here is the definition of `on_stopped`:
```cpp
auto on_stopped = [this, group_id](const member_id& name) {
    _consumers[group_id].erase(name); // silenty fails if name is empty
};
```
The result of passing an empty string to `on_stopped` is that the
consumer is not erased from the map. This is an issue because the client
expects the map to eventually become empty.

From `client::stop()`:
```cpp
for (auto& [id, group] : _consumers) {
    while (!group.empty()) { // group is never empty, so infinite loop
        auto c = *group.begin();
        co_await catch_and_log([c]() {
            return c->leave();
        });
    }
}
```

Instead, the while loop becaomes an inifite loop since the consumer is
never removed.

Fixes redpanda-data#9310

(cherry picked from commit 791cf6e)
BenPope pushed a commit to BenPope/redpanda that referenced this issue Mar 15, 2023
Previously, the value of `_name` was passed into `on_stopped`. This is a
problem in the case of an unnamed consumer because `_name` will be an
empty string. Instead, use the result of `consumer::name()`.

Here is the definition of `on_stopped`:
```cpp
auto on_stopped = [this, group_id](const member_id& name) {
    _consumers[group_id].erase(name); // silenty fails if name is empty
};
```
The result of passing an empty string to `on_stopped` is that the
consumer is not erased from the map. This is an issue because the client
expects the map to eventually become empty.

From `client::stop()`:
```cpp
for (auto& [id, group] : _consumers) {
    while (!group.empty()) { // group is never empty, so infinite loop
        auto c = *group.begin();
        co_await catch_and_log([c]() {
            return c->leave();
        });
    }
}
```

Instead, the while loop becaomes an inifite loop since the consumer is
never removed.

Fixes redpanda-data#9310

(cherry picked from commit 791cf6e)
BenPope pushed a commit to BenPope/redpanda that referenced this issue Mar 15, 2023
Previously, the value of `_name` was passed into `on_stopped`. This is a
problem in the case of an unnamed consumer because `_name` will be an
empty string. Instead, use the result of `consumer::name()`.

Here is the definition of `on_stopped`:
```cpp
auto on_stopped = [this, group_id](const member_id& name) {
    _consumers[group_id].erase(name); // silenty fails if name is empty
};
```
The result of passing an empty string to `on_stopped` is that the
consumer is not erased from the map. This is an issue because the client
expects the map to eventually become empty.

From `client::stop()`:
```cpp
for (auto& [id, group] : _consumers) {
    while (!group.empty()) { // group is never empty, so infinite loop
        auto c = *group.begin();
        co_await catch_and_log([c]() {
            return c->leave();
        });
    }
}
```

Instead, the while loop becaomes an inifite loop since the consumer is
never removed.

Fixes redpanda-data#9310

(cherry picked from commit 791cf6e)
BenPope pushed a commit to BenPope/redpanda that referenced this issue Mar 15, 2023
Previously, the value of `_name` was passed into `on_stopped`. This is a
problem in the case of an unnamed consumer because `_name` will be an
empty string. Instead, use the result of `consumer::name()`.

Here is the definition of `on_stopped`:
```cpp
auto on_stopped = [this, group_id](const member_id& name) {
    _consumers[group_id].erase(name); // silenty fails if name is empty
};
```
The result of passing an empty string to `on_stopped` is that the
consumer is not erased from the map. This is an issue because the client
expects the map to eventually become empty.

From `client::stop()`:
```cpp
for (auto& [id, group] : _consumers) {
    while (!group.empty()) { // group is never empty, so infinite loop
        auto c = *group.begin();
        co_await catch_and_log([c]() {
            return c->leave();
        });
    }
}
```

Instead, the while loop becaomes an inifite loop since the consumer is
never removed.

Fixes redpanda-data#9310

(cherry picked from commit 791cf6e)
@piyushredpanda
Copy link
Contributor

This was awesome work, @NyaliaLui !

ballard26 pushed a commit to ballard26/redpanda that referenced this issue Mar 20, 2023
Previously, the value of `_name` was passed into `on_stopped`. This is a
problem in the case of an unnamed consumer because `_name` will be an
empty string. Instead, use the result of `consumer::name()`.

Here is the definition of `on_stopped`:
```cpp
auto on_stopped = [this, group_id](const member_id& name) {
    _consumers[group_id].erase(name); // silenty fails if name is empty
};
```
The result of passing an empty string to `on_stopped` is that the
consumer is not erased from the map. This is an issue because the client
expects the map to eventually become empty.

From `client::stop()`:
```cpp
for (auto& [id, group] : _consumers) {
    while (!group.empty()) { // group is never empty, so infinite loop
        auto c = *group.begin();
        co_await catch_and_log([c]() {
            return c->leave();
        });
    }
}
```

Instead, the while loop becaomes an inifite loop since the consumer is
never removed.

Fixes redpanda-data#9310
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/pandaproxy REST interface for Kafka API kind/bug Something isn't working sev/high loss of availability, pathological performance degradation, recoverable corruption
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants