Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer crashes sporadically and stops consuming - Crashed reason=ConsumerStoppedError() #771

Open
1 of 2 tasks
lachuta opened this issue Nov 22, 2022 · 0 comments
Open
1 of 2 tasks

Comments

@lachuta
Copy link

lachuta commented Nov 22, 2022

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Have not been able to reproduce

Expected behavior

Not have the consumers crashed

Actual behavior

Have a consumer that is deployed in 4 hosts, consuming from a topic on 16 partitions (4 per host). 2 to 3 times a week a consumer crashes, stops consuming messages, with the following showing in the logs:

Some consumer configurations:
broker_heartbeat_interval: 6
broker_request_timeout: 180
broker_session_timeout: 120
broker_max_poll_records: 300

Full traceback

2022-11-21T00:01:02.132Z nid_martens_sa_stream_processor ERROR [logger={faust.transport.consumer}] [^---Fetcher]: Crashed reason=ConsumerStoppedError()
Traceback (most recent call last):
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/services.py", line 779, in _execute_task
    await task
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 176, in _fetcher
    await self._drainer
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 1039, in _drain_messages
    async for tp, message in ait:
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 640, in getmany
    records, active_partitions = await self._wait_next_records(timeout)
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 676, in _wait_next_records
    records = await self._getmany(
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 1269, in _getmany
    return await self._thread.getmany(active_partitions, timeout)
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 805, in getmany
    return await self.call_thread(
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 383, in _process_enqueued
    result = await maybe_async(method(*args, **kwargs))
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/utils/futures.py", line 134, in maybe_async
    return await res
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 822, in _fetch_records
    raise ConsumerStoppedError()
aiokafka.errors.ConsumerStoppedError
2022-11-21T00:01:02.137Z nid_martens_sa_stream_processor INFO [logger={faust.worker}] [^Worker]: Stopping...
2022-11-21T00:01:02.138Z nid_martens_sa_stream_processor INFO [logger={abc_logging_faust.app}] [^-abcLoggingFaustApp]: Stopping...
2022-11-21T00:01:02.138Z nid_martens_sa_stream_processor INFO [logger={faust.transport.consumer}] [^---Fetcher]: Stopping...
2022-11-21T00:01:02.138Z nid_martens_sa_stream_processor INFO [logger={abc_logging_faust.app}] [^-abcLoggingFaustApp]: Wait for streams...
2022-11-21T00:01:02.168Z nid_martens_sa_stream_processor INFO [logger={aiokafka.consumer.group_coordinator}] LeaveGroup request succeeded
2022-11-21T00:01:02.168Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] Exception in thread
2022-11-21T00:01:02.168Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] Thread-1
2022-11-21T00:01:02.169Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] :
2022-11-21T00:01:02.169Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] StopIteration
2022-11-21T00:01:02.170Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] :
2022-11-21T00:01:02.170Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] FetchResponse_v4(throttle_time_ms=0, topics=[(topics='addr-pair-gather', partitions=[(partition=8, error_code=0, highwater_offset=466624, last_stable_offset=466624, aborted_transactions=NULL, message_set=b'')])])
2022-11-21T00:01:02.170Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] The above exception was the direct cause of the following exception:
2022-11-21T00:01:02.170Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] Traceback (most recent call last):
2022-11-21T00:01:02.170Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/rh/rh-python38/root/usr/lib64/python3.8/threading.py", line 932, in _bootstrap_inner
2022-11-21T00:01:02.171Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] self.run()
2022-11-21T00:01:02.171Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 66, in run
2022-11-21T00:01:02.171Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] self.service._start_thread()
2022-11-21T00:01:02.171Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 211, in _start_thread
2022-11-21T00:01:02.171Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] self.thread_loop.run_until_complete(self._serve())
2022-11-21T00:01:02.172Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/rh/rh-python38/root/usr/lib64/python3.8/asyncio/base_events.py", line 616, in run_until_complete
2022-11-21T00:01:02.172Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] return future.result()
2022-11-21T00:01:02.172Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 264, in _serve
2022-11-21T00:01:02.172Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await self._shutdown_thread()
2022-11-21T00:01:02.172Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 240, in _shutdown_thread
2022-11-21T00:01:02.173Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await self.on_thread_stop()
2022-11-21T00:01:02.173Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 309, in on_thread_stop
2022-11-21T00:01:02.173Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await self._consumer.stop()
2022-11-21T00:01:02.173Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/consumer.py", line 503, in stop
2022-11-21T00:01:02.173Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await self._fetcher.close()
2022-11-21T00:01:02.174Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 429, in close
2022-11-21T00:01:02.174Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await self._fetch_task
2022-11-21T00:01:02.174Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 557, in _fetch_requests_routine
2022-11-21T00:01:02.174Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await self.close()
2022-11-21T00:01:02.175Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 439, in close
2022-11-21T00:01:02.175Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] await x
2022-11-21T00:01:02.175Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 547, in _fetch_requests_routine
2022-11-21T00:01:02.175Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] has_new_data = any(fut.result() for fut in done_pending)
2022-11-21T00:01:02.175Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 547, in <genexpr>
2022-11-21T00:01:02.176Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] has_new_data = any(fut.result() for fut in done_pending)
2022-11-21T00:01:02.176Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/aiokafka/consumer/fetcher.py", line 681, in _proc_fetch_request
2022-11-21T00:01:02.176Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] time_response = monotonic()
2022-11-21T00:01:02.176Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] SystemError
2022-11-21T00:01:02.176Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] :
2022-11-21T00:01:02.177Z nid_martens_sa_stream_processor WARNING [logger={mode.redirect}] <built-in function monotonic> returned a result with an error set
2022-11-21T00:06:02.165Z nid_martens_sa_stream_processor WARNING [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: Warning: Task timed out!
2022-11-21T00:06:02.166Z nid_martens_sa_stream_processor WARNING [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: Please make sure it's hanging before restart.
2022-11-21T00:06:02.166Z nid_martens_sa_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-2454] (started at Mon Nov 21 00:01:02 2022) Replaying logs...
2022-11-21T00:06:02.166Z nid_martens_sa_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-2454] (Mon Nov 21 00:01:02 2022) +consumer.commit()
2022-11-21T00:06:02.167Z nid_martens_sa_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-2454] -End of log-
2022-11-21T00:06:02.167Z nid_martens_sa_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: [Flight Recorder-2454] Task traceback
2022-11-21T00:06:02.168Z nid_martens_sa_stream_processor INFO [logger={faust.transport.drivers.aiokafka}] [^--Consumer]: Stack for <Task pending name='Task-2717096' coro=<Service.stop() running at /opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/services.py:839> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fc9effbdac0>()]> cb=[shield.<locals>._inner_done_callback() at /opt/rh/rh-python38/root/usr/lib64/python3.8/asyncio/tasks.py:885]> (most recent call last):
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/services.py", line 839, in stop
    await self.on_stop()
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/app/base.py", line 1467, in on_stop
    await self._stop_consumer()
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/app/base.py", line 1493, in _stop_consumer
    await self._consumer_wait_empty(consumer, self.log)
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/app/base.py", line 1499, in _consumer_wait_empty
    await consumer.wait_empty()
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/services.py", line 459, in _and_transition
    return await fun(self, *args, **kwargs)
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 757, in wait_empty
    await T(self.commit_and_end_transactions)()
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 768, in commit_and_end_transactions
    await self.commit(start_new_transaction=False)
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 822, in commit
    return await self.force_commit(
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/services.py", line 459, in _and_transition
    return await fun(self, *args, **kwargs)
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 857, in force_commit
    did_commit = await self._commit_tps(
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 874, in _commit_tps
    return await self._commit_offsets(
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 943, in _commit_offsets
    did_commit = await self._commit(committable_offsets)
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/consumer.py", line 1311, in _commit
    return await self._thread.commit(offsets)
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/faust/transport/drivers/aiokafka.py", line 523, in commit
    return await self.call_thread(self._commit, offsets)
  File "/opt/abc/nid-martens-sa-stream-processor/releases/20221116T163245/venv/lib64/python3.8/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise

Versions

  • Python version: 3.8.11
  • Faust version: 1.10.4
  • Operating system: Linux 3.10.0-693.el7.x86_64
  • Kafka version: 2.6.0
  • aiohttp version: 3.8.3
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant