Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Should not commit offsets upon crash #75

Closed
vineetgoel opened this issue Mar 4, 2018 · 2 comments
Closed

Should not commit offsets upon crash #75

vineetgoel opened this issue Mar 4, 2018 · 2 comments

Comments

@vineetgoel
Copy link
Contributor

I added a random KafkaTimeoutError in App._commit_attached to simulate a KafkaTimeoutError while producing. I catched this error and triggered a crash: self.crash(exc). This should ideally crash the application. Looks like upon crash we end up committing offsets which can be seen in the log trace below. This is bad because a crash means something bad happened and we should not commit offsets.

[2018-03-03 17:51:07,840: INFO]: [^--TableManager]: Restore complete!
[2018-03-03 17:51:07,841: INFO]: [^--Consumer]: Waiting for lock to resume partitions
[2018-03-03 17:51:07,842: INFO]: [^--Consumer]: Acquired lock to resume partitions
[2018-03-03 17:51:07,842: INFO]: [^--Consumer]: Released resume partitions lock
[2018-03-03 17:51:07,842: INFO]: [^--TableManager]: Attempting to start standbys
[2018-03-03 17:51:07,842: INFO]: [^--TableManager]: New assignments handled
[2018-03-03 17:51:07,871: INFO]: [^--Table: user_to_total]: Starting...
[2018-03-03 17:51:07,871: INFO]: [^---Store: user_to_total]: Starting...
[2018-03-03 17:51:07,871: INFO]: [^--Table: country_to_total]: Starting...
[2018-03-03 17:51:07,871: INFO]: [^---Store: country_to_total]: Starting...
[2018-03-03 17:51:07,913: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 744: k=b'"country_4"' v=b'{"user": "user_434", "country": "country_4", "amount": 1832.8454878058342, "date": "2018-03-04T01:41:26.137348Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 1428: k=b'"country_1"' v=b'{"user": "user_246", "country": "country_1", "amount": 16163.629349934505, "date": "2018-03-04T01:39:52.217088Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 747: k=b'"country_2"' v=b'{"user": "user_114", "country": "country_2", "amount": 7532.398390989598, "date": "2018-03-04T01:41:42.050016Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 32392: k=b'"country_0"' v=b'{"user": "user_239", "country": "country_0", "amount": 24807.034890744173, "date": "2018-03-04T01:40:31.821356Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 8506: k=b'"user_220"' v=b'{"user": "user_220", "country": "country_1", "amount": 11502.542493932206, "date": "2018-03-04T01:41:49.087564Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 8327: k=b'"user_243"' v=b'{"user": "user_243", "country": "country_0", "amount": 8941.192697487695, "date": "2018-03-04T01:41:51.035743Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 8600: k=b'"user_410"' v=b'{"user": "user_410", "country": "country_0", "amount": 22052.862798686932, "date": "2018-03-04T01:40:21.664204Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,914: INFO]: [^--Consumer]: DROPPED MESSAGE ROFF 8973: k=b'"user_396"' v=b'{"user": "user_396", "country": "country_2", "amount": 18885.378500303657, "date": "2018-03-04T01:40:22.124953Z", "__faust": {"ns": "examples.withdrawals.Withdrawal"}}'
[2018-03-03 17:51:07,919: INFO]: [^Worker]: Ready
[2018-03-03 17:51:08,770: INFO]: [^--Consumer]: COMMITTING OFFSETS:
┌Commit Offsets──────────────────────────────────────────────────────────────────────────────┬────────┬──────────┐
│ TP                                                                                         │ Offset │ Metadata │
├────────────────────────────────────────────────────────────────────────────────────────────┼────────┼──────────┤
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=3) │ 751    │          │
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=2) │ 1466   │          │
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=1) │ 754    │          │
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=0) │ 32546  │          │
│ TopicPartition(topic='withdrawals2', partition=3)                                          │ 8712   │          │
│ TopicPartition(topic='withdrawals2', partition=1)                                          │ 8532   │          │
│ TopicPartition(topic='withdrawals2', partition=2)                                          │ 8805   │          │
│ TopicPartition(topic='withdrawals2', partition=0)                                          │ 9178   │          │
└────────────────────────────────────────────────────────────────────────────────────────────┴────────┴──────────┘
[2018-03-03 17:51:11,787: ERROR]: [^-App]: Crashed reason=KafkaTimeoutError()
Traceback (most recent call last):
  File "/Users/vineet/faust/faust/app.py", line 1047, in _commit_attached
    raise KafkaTimeoutError
kafka.errors.KafkaTimeoutError: KafkaTimeoutError
[2018-03-03 17:51:11,806: INFO]: [^Worker]: Stopping...
[2018-03-03 17:51:11,806: INFO]: [^-App]: Stopping...
[2018-03-03 17:51:11,806: INFO]: [^--Fetcher]: Stopping...
[2018-03-03 17:51:11,807: INFO]: [^--Consumer]: Consumer shutting down for user cancel.
[2018-03-03 17:51:11,817: INFO]: [^--Fetcher]: -Stopped!
[2018-03-03 17:51:11,817: INFO]: [^--TableManager]: Stopping...
[2018-03-03 17:51:11,817: INFO]: [^--TableManager]: Aborting ongoing recovery to start over
[2018-03-03 17:51:11,817: INFO]: [^--Table: user_to_total]: Stopping...
[2018-03-03 17:51:11,818: INFO]: [^---Store: user_to_total]: Stopping...
[2018-03-03 17:51:11,818: INFO]: [^---Store: user_to_total]: -Stopped!
[2018-03-03 17:51:11,819: INFO]: [^--Table: user_to_total]: -Stopped!
[2018-03-03 17:51:11,819: INFO]: [^--Table: country_to_total]: Stopping...
[2018-03-03 17:51:11,819: INFO]: [^---Store: country_to_total]: Stopping...
[2018-03-03 17:51:11,819: INFO]: [^---Store: country_to_total]: -Stopped!
[2018-03-03 17:51:11,828: INFO]: [^--Table: country_to_total]: -Stopped!
[2018-03-03 17:51:11,831: INFO]: [^--TableManager]: -Stopped!
[2018-03-03 17:51:11,832: INFO]: [^--TopicConductor]: Stopping...
[2018-03-03 17:51:11,837: INFO]: [^--TopicConductor]: -Stopped!
[2018-03-03 17:51:11,837: INFO]: [^--Agent]: Stopping...
[2018-03-03 17:51:11,837: INFO]: [^---OneForOneSupervisor]: Stopping...
[2018-03-03 17:51:11,837: INFO]: [^---Agent*: examp[.]track_country_withdrawal]: Stopping...
[2018-03-03 17:51:11,840: INFO]: [^---Agent*: examp[.]track_country_withdrawal]: -Stopped!
[2018-03-03 17:51:11,841: INFO]: [^---OneForOneSupervisor]: -Stopped!
[2018-03-03 17:51:11,841: INFO]: [^--Agent]: -Stopped!
[2018-03-03 17:51:11,841: INFO]: [^--Agent]: Stopping...
[2018-03-03 17:51:11,841: INFO]: [^---OneForOneSupervisor]: Stopping...
[2018-03-03 17:51:11,842: INFO]: [^---Agent*: examples[.]track_user_withdrawal]: Stopping...
[2018-03-03 17:51:11,864: INFO]: [^---Agent*: examples[.]track_user_withdrawal]: -Stopped!
[2018-03-03 17:51:11,867: INFO]: [^---OneForOneSupervisor]: -Stopped!
[2018-03-03 17:51:11,867: INFO]: [^--Agent]: -Stopped!
[2018-03-03 17:51:11,867: INFO]: [^--ReplyConsumer]: Stopping...
[2018-03-03 17:51:11,868: INFO]: [^--ReplyConsumer]: -Stopped!
[2018-03-03 17:51:11,868: INFO]: [^--LeaderAssignor]: Stopping...
[2018-03-03 17:51:11,868: INFO]: [^--LeaderAssignor]: -Stopped!
[2018-03-03 17:51:11,868: INFO]: [^--Consumer]: Stopping...
[2018-03-03 17:51:12,082: INFO]: [^--Consumer]: COMMITTING OFFSETS:
┌Commit Offsets──────────────────────────────────────────────────────────────────────────────┬────────┬──────────┐
│ TP                                                                                         │ Offset │ Metadata │
├────────────────────────────────────────────────────────────────────────────────────────────┼────────┼──────────┤
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=3) │ 767    │          │
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=2) │ 1506   │          │
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=1) │ 776    │          │
│ TopicPartition(topic='withdrawals2-f-simple3-Withdrawal.country-repartition', partition=0) │ 33289  │          │
│ TopicPartition(topic='withdrawals2', partition=3)                                          │ 9071   │          │
│ TopicPartition(topic='withdrawals2', partition=1)                                          │ 8833   │          │
│ TopicPartition(topic='withdrawals2', partition=2)                                          │ 9274   │          │
│ TopicPartition(topic='withdrawals2', partition=0)                                          │ 9652   │          │
└────────────────────────────────────────────────────────────────────────────────────────────┴────────┴──────────┘
[2018-03-03 17:51:12,084: INFO]: LeaveGroup request succeeded
[2018-03-03 17:51:12,085: INFO]: [^--Consumer]: -Stopped!
[2018-03-03 17:51:12,085: INFO]: [^--Producer]: Stopping...
[2018-03-03 17:51:12,085: INFO]: [^--Producer]: -Stopped!
[2018-03-03 17:51:12,085: INFO]: [^--MonitorService]: Stopping...
[2018-03-03 17:51:12,086: INFO]: [^--MonitorService]: -Stopped!
[2018-03-03 17:51:12,086: INFO]: [^-App]: -Stopped!
[2018-03-03 17:51:12,086: INFO]: [^-Website]: Stopping...
[2018-03-03 17:51:12,086: INFO]: [^--Web]: Stopping...
[2018-03-03 17:51:12,086: INFO]: [^---ServerThread]: Stopping...
[2018-03-03 17:51:12,087: INFO]: [^--Web]: Closing server
[2018-03-03 17:51:12,087: INFO]: [^--Web]: Waiting for server to close handle
[2018-03-03 17:51:12,087: INFO]: [^--Web]: Shutting down web application
[2018-03-03 17:51:12,087: INFO]: [^--Web]: Waiting for handler to shut down
[2018-03-03 17:51:12,087: INFO]: [^--Web]: Cleanup
[2018-03-03 17:51:12,087: INFO]: [^---ServerThread]: -Stopped!
[2018-03-03 17:51:12,087: INFO]: [^--Web]: -Stopped!
[2018-03-03 17:51:12,088: INFO]: [^-Website]: -Stopped!
[2018-03-03 17:51:12,088: INFO]: [^Worker]: -Stopped!
[2018-03-03 17:51:12,088: INFO]: [^Worker]: Gathering service tasks...
[2018-03-03 17:51:12,088: INFO]: [^Worker]: Gathering all futures...
[2018-03-03 17:51:17,227: INFO]: [^Worker]: Closing event loop
[2018-03-03 17:51:17,228: CRITICAL]: [^Worker]: We experienced a crash! Reraising original exception...
Traceback (most recent call last):
  File "/Users/vineet/.virtualenvs/faust/bin/faust", line 11, in <module>
    load_entry_point('faust', 'console_scripts', 'faust')()
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/core.py", line 722, in __call__
    return self.main(*args, **kwargs)
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/core.py", line 697, in main
    rv = self.invoke(ctx)
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/core.py", line 1066, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/core.py", line 895, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/core.py", line 535, in invoke
    return callback(*args, **kwargs)
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/click-6.7-py3.6.egg/click/decorators.py", line 17, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/Users/vineet/faust/faust/cli/base.py", line 317, in _inner
    return cmd()
  File "/Users/vineet/faust/faust/cli/worker.py", line 106, in __call__
    **{**self.kwargs, **kwargs})
  File "/Users/vineet/faust/faust/cli/worker.py", line 128, in start_worker
    return worker.execute_from_commandline()
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/mode/worker.py", line 155, in execute_from_commandline
    self.stop_and_shutdown()
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/mode/worker.py", line 160, in stop_and_shutdown
    self._shutdown_loop()
  File "/Users/vineet/.virtualenvs/faust/lib/python3.6/site-packages/mode/worker.py", line 188, in _shutdown_loop
    raise self._crash_reason from self._crash_reason
  File "/Users/vineet/faust/faust/app.py", line 1047, in _commit_attached
    raise KafkaTimeoutError
kafka.errors.KafkaTimeoutError: KafkaTimeoutError
@ask
Copy link
Contributor

ask commented Mar 5, 2018

We have to make best effort to commit the offsets that we have processed though

@vineetgoel
Copy link
Contributor Author

The crash might have happened as a result of errors while sending produce messages etc. We should just die hard instead of making assumptions.

This has been fixed, if we are unable to produce messages, we don't commit any more.

Adir-Shemesh pushed a commit to Adir-Shemesh/faust that referenced this issue Jan 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants