Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Faust hangs forever when exiting via ctrl-c or exception #484

Open
2 tasks done
jbooth-mastery opened this issue Dec 2, 2019 · 8 comments
Open
2 tasks done

Faust hangs forever when exiting via ctrl-c or exception #484

jbooth-mastery opened this issue Dec 2, 2019 · 8 comments

Comments

@jbooth-mastery
Copy link
Contributor

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Ctrl-C on a running worker. Alternately, raise an exception in a running worker. This happens with the provided examples, eg: faust -A examples.hello_world worker -l debug

This could be (likely is, given the monkeypatch below) a bug in mode itself, but I don't have a simple case to reproduce it there.

Expected behavior

Faust exits properly on the first ctrl-c or unhandled exception.

Actual behavior

Faust hangs after saying [CRITICAL] [^Worker]: We experienced a crash! Reraising original exception...

Full traceback

...
^C-INT- -INT- -INT- -INT- -INT- -INT-
[2019-12-02 14:46:03,270] [11673] [INFO] [^Worker]: Signal received: Signals.SIGINT (2) 
[2019-12-02 14:46:03,271] [11673] [INFO] [^Worker]: Stopping... 
[2019-12-02 14:46:03,272] [11673] [INFO] [^-App]: Stopping... 
[2019-12-02 14:46:03,273] [11673] [INFO] [^--Fetcher]: Stopping... 
[2019-12-02 14:46:03,273] [11673] [INFO] [^--Consumer]: Consumer shutting down for user cancel. 
[2019-12-02 14:46:03,273] [11673] [DEBUG] [^--Fetcher]: Shutting down... 
[2019-12-02 14:46:03,274] [11673] [DEBUG] [^--Fetcher]: -Stopped! 
[2019-12-02 14:46:03,274] [11673] [INFO] [^-App]: Wait for streams... 
[2019-12-02 14:46:03,274] [11673] [INFO] [^-App]: Flush producer buffer... 
[2019-12-02 14:46:03,274] [11673] [INFO] [^--TableManager]: Stopping... 
[2019-12-02 14:46:03,274] [11673] [INFO] [^--Fetcher]: Stopping... 
[2019-12-02 14:46:03,274] [11673] [DEBUG] [^--Fetcher]: Shutting down... 
[2019-12-02 14:46:03,274] [11673] [DEBUG] [^--Fetcher]: -Stopped! 
[2019-12-02 14:46:03,274] [11673] [INFO] [^---Recovery]: Stopping... 
[2019-12-02 14:46:03,274] [11673] [DEBUG] [^---Recovery]: Shutting down... 
[2019-12-02 14:46:03,275] [11673] [DEBUG] [^---Recovery]: -Stopped! 
[2019-12-02 14:46:03,275] [11673] [DEBUG] [^--TableManager]: Shutting down... 
[2019-12-02 14:46:03,275] [11673] [DEBUG] [^--TableManager]: -Stopped! 
[2019-12-02 14:46:03,275] [11673] [INFO] [^--Conductor]: Stopping... 
[2019-12-02 14:46:03,275] [11673] [DEBUG] [^--Conductor]: Shutting down... 
[2019-12-02 14:46:03,275] [11673] [DEBUG] [^--Conductor]: -Stopped! 
[2019-12-02 14:46:03,275] [11673] [INFO] [^--AgentManager]: Stopping... 
[2019-12-02 14:46:03,275] [11673] [INFO] [^--Agent: examples.hello[.]print_greetings]: Stopping... 
[2019-12-02 14:46:03,275] [11673] [INFO] [^---OneForOneSupervisor]: Stopping... 
[2019-12-02 14:46:03,275] [11673] [DEBUG] [^---Agent*: examples.hello[.]print_greetings]: Stopping... 
[2019-12-02 14:46:03,275] [11673] [DEBUG] [^---Agent*: examples.hello[.]print_greetings]: Shutting down... 
[2019-12-02 14:46:03,276] [11673] [DEBUG] [^---Agent*: examples.hello[.]print_greetings]: -Stopped! 
[2019-12-02 14:46:03,276] [11673] [DEBUG] [^---OneForOneSupervisor]: Shutting down... 
[2019-12-02 14:46:03,276] [11673] [DEBUG] [^---OneForOneSupervisor]: -Stopped! 
[2019-12-02 14:46:03,276] [11673] [DEBUG] [^--Agent: examples.hello[.]print_greetings]: Shutting down... 
[2019-12-02 14:46:03,276] [11673] [DEBUG] [^--Agent: examples.hello[.]print_greetings]: -Stopped! 
[2019-12-02 14:46:03,276] [11673] [DEBUG] [^--AgentManager]: Shutting down... 
[2019-12-02 14:46:03,276] [11673] [DEBUG] [^--AgentManager]: -Stopped! 
[2019-12-02 14:46:03,276] [11673] [INFO] [^--ReplyConsumer]: Stopping... 
[2019-12-02 14:46:03,276] [11673] [DEBUG] [^--ReplyConsumer]: Shutting down... 
[2019-12-02 14:46:03,276] [11673] [DEBUG] [^--ReplyConsumer]: -Stopped! 
[2019-12-02 14:46:03,276] [11673] [INFO] [^--LeaderAssignor]: Stopping... 
[2019-12-02 14:46:03,277] [11673] [DEBUG] [^--LeaderAssignor]: Shutting down... 
[2019-12-02 14:46:03,277] [11673] [DEBUG] [^--LeaderAssignor]: -Stopped! 
[2019-12-02 14:46:03,277] [11673] [INFO] [^--Consumer]: Stopping... 
[2019-12-02 14:46:03,277] [11673] [INFO] [^---AIOKafkaConsumerThread]: Stopping... 
[2019-12-02 14:46:03,277] [11673] [DEBUG] [^---AIOKafkaConsumerThread]: Shutting down... 
[2019-12-02 14:46:03,277] [11673] [DEBUG] [^---AIOKafkaConsumerThread]: Waiting for shutdown 
[2019-12-02 14:46:03,328] [11673] [DEBUG] <AIOKafkaConnection host=localhost port=9092> Response 45: FetchResponse_v4(throttle_time_ms=0, topics=[(topics='hello-world-__assignor-__leader', partitions=[(partition=0, error_code=0, highwater_offset=0, last_stable_offset=0, aborted_transactions=NULL, message_set=b'')]), (topics='greetings', partitions=[(partition=0, error_code=0, highwater_offset=513, last_stable_offset=513, aborted_transactions=NULL, message_set=b'')])]) 
[2019-12-02 14:46:03,329] [11673] [DEBUG] Closing the KafkaConsumer. 
[2019-12-02 14:46:03,330] [11673] [DEBUG] Adding fetch request for partition TopicPartition(topic='hello-world-__assignor-__leader', partition=0) at offset 0 
[2019-12-02 14:46:03,331] [11673] [DEBUG] Adding fetch request for partition TopicPartition(topic='greetings', partition=0) at offset 513 
[2019-12-02 14:46:03,331] [11673] [DEBUG] Stopping heartbeat task 
[2019-12-02 14:46:03,332] [11673] [DEBUG] <AIOKafkaConnection host=localhost port=9092> Request 46: FetchRequest_v4(replica_id=-1, max_wait_time=1500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='greetings', partitions=[(partition=0, offset=513, max_bytes=4194304)]), (topic='hello-world-__assignor-__leader', partitions=[(partition=0, offset=0, max_bytes=4194304)])]) 
[2019-12-02 14:46:03,333] [11673] [DEBUG] <AIOKafkaConnection host=localhost port=9092> Request 13: LeaveGroupRequest_v1(group='hello-world', member_id='faust-1.9.0-14ed0084-879a-4042-a9b3-ed36a21fdc2f') 
[2019-12-02 14:46:03,335] [11673] [DEBUG] <AIOKafkaConnection host=localhost port=9092> Response 13: LeaveGroupResponse_v1(throttle_time_ms=0, error_code=0) 
[2019-12-02 14:46:03,335] [11673] [INFO] LeaveGroup request succeeded 
[2019-12-02 14:46:03,335] [11673] [DEBUG] Closing connection at localhost:9092 
[2019-12-02 14:46:03,335] [11673] [DEBUG] Closing connection at localhost:9092 
[2019-12-02 14:46:03,336] [11673] [DEBUG] The KafkaConsumer has closed. 
[2019-12-02 14:46:04,077] [11673] [DEBUG] [^---AIOKafkaConsumerThread]: Shutting down now 
[2019-12-02 14:46:04,077] [11673] [DEBUG] [^---AIOKafkaConsumerThread]: -Stopped! 
[2019-12-02 14:46:04,078] [11673] [DEBUG] Timer Monitor.sampler woke up - iteration=16 time_spent=0.9956555030003074 drift=0.0043444969996926375 sleep_time=1.0043444969996926 since_epoch=16.024470049998854 
[2019-12-02 14:46:04,079] [11673] [DEBUG] [^---MethodQueue]: Stopping... 
[2019-12-02 14:46:04,079] [11673] [DEBUG] [^----MethodQueueWorker]: Stopping... 
[2019-12-02 14:46:04,080] [11673] [DEBUG] [^----MethodQueueWorker]: Shutting down... 
[2019-12-02 14:46:04,081] [11673] [DEBUG] [^----MethodQueueWorker]: -Stopped! 
[2019-12-02 14:46:04,081] [11673] [DEBUG] [^----MethodQueueWorker]: Stopping... 
[2019-12-02 14:46:04,081] [11673] [DEBUG] [^----MethodQueueWorker]: Shutting down... 
[2019-12-02 14:46:04,082] [11673] [DEBUG] [^----MethodQueueWorker]: -Stopped! 
[2019-12-02 14:46:04,082] [11673] [DEBUG] [^---MethodQueue]: Shutting down... 
[2019-12-02 14:46:04,082] [11673] [DEBUG] [^---MethodQueue]: -Stopped! 
[2019-12-02 14:46:04,082] [11673] [DEBUG] [^--Consumer]: Shutting down... 
[2019-12-02 14:46:04,082] [11673] [DEBUG] [^--Consumer]: -Stopped! 
[2019-12-02 14:46:04,082] [11673] [INFO] [^--Web]: Stopping... 
[2019-12-02 14:46:04,082] [11673] [INFO] [^---Server]: Stopping... 
[2019-12-02 14:46:04,082] [11673] [INFO] [^--Web]: Cleanup 
[2019-12-02 14:46:04,083] [11673] [DEBUG] [^---Server]: Shutting down... 
[2019-12-02 14:46:04,083] [11673] [DEBUG] [^---Server]: -Stopped! 
[2019-12-02 14:46:04,083] [11673] [DEBUG] [^--Web]: Shutting down... 
[2019-12-02 14:46:04,083] [11673] [DEBUG] [^--Web]: -Stopped! 
[2019-12-02 14:46:04,083] [11673] [INFO] [^--CacheBackend]: Stopping... 
[2019-12-02 14:46:04,083] [11673] [DEBUG] [^--CacheBackend]: Shutting down... 
[2019-12-02 14:46:04,083] [11673] [DEBUG] [^--CacheBackend]: -Stopped! 
[2019-12-02 14:46:04,083] [11673] [INFO] [^--Producer]: Stopping... 
[2019-12-02 14:46:04,083] [11673] [DEBUG] Closing connection at localhost:9092 
[2019-12-02 14:46:04,084] [11673] [DEBUG] The Kafka producer has closed. 
[2019-12-02 14:46:04,084] [11673] [INFO] [^---ProducerBuffer]: Stopping... 
[2019-12-02 14:46:04,084] [11673] [DEBUG] [^---ProducerBuffer]: Shutting down... 
[2019-12-02 14:46:04,084] [11673] [DEBUG] [^---ProducerBuffer]: -Stopped! 
[2019-12-02 14:46:04,085] [11673] [DEBUG] [^--Producer]: Shutting down... 
[2019-12-02 14:46:04,085] [11673] [DEBUG] [^--Producer]: -Stopped! 
[2019-12-02 14:46:04,085] [11673] [INFO] [^--Monitor]: Stopping... 
[2019-12-02 14:46:04,085] [11673] [DEBUG] [^--Monitor]: Shutting down... 
[2019-12-02 14:46:04,085] [11673] [DEBUG] [^--Monitor]: -Stopped! 
[2019-12-02 14:46:04,085] [11673] [DEBUG] [^-App]: Shutting down... 
[2019-12-02 14:46:04,085] [11673] [DEBUG] [^-App]: -Stopped! 
[2019-12-02 14:46:04,086] [11673] [DEBUG] [^Worker]: Shutting down... 
[2019-12-02 14:46:04,086] [11673] [DEBUG] [^Worker]: -Stopped! 
[2019-12-02 14:46:04,086] [11673] [INFO] [^Worker]: Gathering service tasks... 
[2019-12-02 14:46:04,086] [11673] [INFO] [^Worker]: Gathering all futures... 
[2019-12-02 14:46:05,088] [11673] [INFO] [^Worker]: Closing event loop 
<hangs until ctrl-c again>
^CException ignored in: <module 'threading' from '/home/jbooth/.asdf/installs/python/3.7.4/lib/python3.7/threading.py'>
Traceback (most recent call last):
  File "/home/jbooth/.asdf/installs/python/3.7.4/lib/python3.7/threading.py", line 1308, in _shutdown
    lock.acquire()
KeyboardInterrupt

Versions

  • Python 3.7.4
  • Faust 1.9.0
  • Ubuntu 19.10
  • Kafka confluentinc/cp-enterprise-kafka:5.3.0 (via docker-compose)
  • RocksDB version N/A

Workaround

The following monkeypatch has exit happen successfully. Add it above import faust in the hello_world example. It does break re-raising the original exception at exit, but I can live with that compared to the dead service staying up.

# Patch mode to ensure we crash
import mode
original = mode.Worker._shutdown_loop
def _and_die(self) -> None:
    try:
        original(self)
    finally:
        import os
        os._exit(1)
mode.Worker._shutdown_loop = _and_die
@austinnichols101
Copy link

Using faust within Docker? If so, try with https://github.com/Yelp/dumb-init

basically change your entrypoint to use dumb-init:
ENTRYPOINT ["/usr/bin/dumb-init", "/opt/start-my-awesome-faust-job.sh"]

@jbooth-mastery
Copy link
Contributor Author

This occurs even on the command-line, no docker, just with ctrl-c.

@g-rd
Copy link

g-rd commented Dec 10, 2019

I see the same issue

@hyzyla
Copy link

hyzyla commented Mar 9, 2020

I have the same issue

@Vasiliy-Bondarenko
Copy link

Having the same issue.
extremely annoying :(

@rpsjr
Copy link

rpsjr commented Jun 18, 2020

I have the same issue

@Vasiliy-Bondarenko
Copy link

Incompatible AVRO schema hangs the app. Ctrl-C kills the app eventually after a few minutes.

@ask can you please check it out?.. this is very annoying issue... it slows down development significantly when app if freezing like this. and it's dangerous for production for obvious reasons :)
For some reason not every exception hangs the app. I'm confused :)

...
[2020-09-25 07:36:48,290] [56] [INFO] Authenticated as AYBFH3KU4JEMWUKD via PLAIN 
2020-09-25 07:36:48,842 ERROR faust.app.base [^-App]: Crashed reason=ClientError(error=Incompatible Avro schema)
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 779, in _execute_task
    await task
  File "/usr/local/lib/python3.7/site-packages/faust/app/base.py", line 941, in _wrapped
    return await task()
  File "/usr/local/lib/python3.7/site-packages/faust/app/base.py", line 991, in around_timer
    await fun(*args)
  File "/app/kafka_faker/agents.py", line 32, in logs_producer
    value_serializer=log_item_serializer
  File "/usr/local/lib/python3.7/site-packages/faust/topics.py", line 193, in send
    callback=callback,
  File "/usr/local/lib/python3.7/site-packages/faust/channels.py", line 303, in _send_now
    schema, key_serializer, value_serializer, callback))
  File "/usr/local/lib/python3.7/site-packages/faust/channels.py", line 259, in as_future_message
    value, value_serializer, schema, open_headers)
  File "/usr/local/lib/python3.7/site-packages/faust/channels.py", line 661, in prepare_value
    headers=headers)
  File "/usr/local/lib/python3.7/site-packages/faust/serializers/schemas.py", line 102, in dumps_value
    serializer=serializer or self.value_serializer,
  File "/usr/local/lib/python3.7/site-packages/faust/serializers/registry.py", line 175, in dumps_value
    return cast(ModelT, value).dumps(serializer=serializer)
  File "/usr/local/lib/python3.7/site-packages/faust/models/base.py", line 473, in dumps
    self.to_representation())
  File "/usr/local/lib/python3.7/site-packages/faust/serializers/codecs.py", line 363, in dumps
    return get_codec(codec).dumps(obj) if codec else obj
  File "/usr/local/lib/python3.7/site-packages/faust/serializers/codecs.py", line 226, in dumps
    obj = cast(Codec, node)._dumps(obj)
  File "/usr/local/lib/python3.7/site-packages/schema_registry/serializers/faust_serializer.py", line 46, in _dumps
    return self.encode_record_with_schema(self.schema_subject, self.schema, payload)
  File "/usr/local/lib/python3.7/site-packages/schema_registry/serializers/message_serializer.py", line 67, in encode_record_with_schema
    schema_id = self.schemaregistry_client.register(subject, avro_schema)
  File "/usr/local/lib/python3.7/site-packages/schema_registry/client/client.py", line 271, in register
    raise ClientError(message=msg, http_code=code, server_traceback=result)
schema_registry.client.errors.ClientError: Incompatible Avro schema
[2020-09-25 07:36:48,842] [56] [ERROR] [^-App]: Crashed reason=ClientError(error=Incompatible Avro schema) 
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/mode/services.py", line 779, in _execute_task
    await task
  File "/usr/local/lib/python3.7/site-packages/faust/app/base.py", line 941, in _wrapped
    return await task()
  File "/usr/local/lib/python3.7/site-packages/faust/app/base.py", line 991, in around_timer
    await fun(*args)
  File "/app/kafka_faker/agents.py", line 32, in logs_producer
    value_serializer=log_item_serializer
  File "/usr/local/lib/python3.7/site-packages/faust/topics.py", line 193, in send
    callback=callback,
  File "/usr/local/lib/python3.7/site-packages/faust/channels.py", line 303, in _send_now
    schema, key_serializer, value_serializer, callback))
  File "/usr/local/lib/python3.7/site-packages/faust/channels.py", line 259, in as_future_message
    value, value_serializer, schema, open_headers)
  File "/usr/local/lib/python3.7/site-packages/faust/channels.py", line 661, in prepare_value
    headers=headers)
  File "/usr/local/lib/python3.7/site-packages/faust/serializers/schemas.py", line 102, in dumps_value
    serializer=serializer or self.value_serializer,
  File "/usr/local/lib/python3.7/site-packages/faust/serializers/registry.py", line 175, in dumps_value
    return cast(ModelT, value).dumps(serializer=serializer)
  File "/usr/local/lib/python3.7/site-packages/faust/models/base.py", line 473, in dumps
    self.to_representation())
  File "/usr/local/lib/python3.7/site-packages/faust/serializers/codecs.py", line 363, in dumps
    return get_codec(codec).dumps(obj) if codec else obj
  File "/usr/local/lib/python3.7/site-packages/faust/serializers/codecs.py", line 226, in dumps
    obj = cast(Codec, node)._dumps(obj)
  File "/usr/local/lib/python3.7/site-packages/schema_registry/serializers/faust_serializer.py", line 46, in _dumps
    return self.encode_record_with_schema(self.schema_subject, self.schema, payload)
  File "/usr/local/lib/python3.7/site-packages/schema_registry/serializers/message_serializer.py", line 67, in encode_record_with_schema
    schema_id = self.schemaregistry_client.register(subject, avro_schema)
  File "/usr/local/lib/python3.7/site-packages/schema_registry/client/client.py", line 271, in register
    raise ClientError(message=msg, http_code=code, server_traceback=result)
schema_registry.client.errors.ClientError: Incompatible Avro schema
[2020-09-25 07:36:49,119] [56] [INFO] LeaveGroup request succeeded 
[2020-09-25 07:36:49,130] [56] [ERROR] Unclosed AIOKafkaConnection
conn: <AIOKafkaConnection host=b7-pkc-41wq6.eu-west-2.aws.confluent.cloud port=9092> 
^CKilled

faust@a0c030411fbd:/app$ 

@pikhovkin
Copy link

Same issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants