Skip to content

fix: unblock stream listener using control signals#36502

Closed
wylswz wants to merge 5 commits into
langgenius:mainfrom
wylswz:wyl/improve-stream-close
Closed

fix: unblock stream listener using control signals#36502
wylswz wants to merge 5 commits into
langgenius:mainfrom
wylswz:wyl/improve-stream-close

Conversation

@wylswz
Copy link
Copy Markdown
Contributor

@wylswz wylswz commented May 22, 2026

Important

  1. Make sure you have read our contribution guidelines
  2. Ensure there is an associated issue and you have been assigned to it
  3. Use the correct syntax to link this PR: Fixes #<issue number>.

Summary

Emit a control signal on close to unblock listeners.

Screenshots

Before After
... ...

Checklist

  • This change requires a documentation update, included: Dify Document
  • I understand that this PR may be closed in case there was no previous discussion or issues. (This doesn't apply to typos!)
  • I've added a test for each change that was introduced, and I tried as much as possible to make a single atomic change.
  • I've updated the documentation accordingly.
  • I ran make lint && make type-check (backend) and cd web && pnpm exec vp staged (frontend) to appease the lint gods

@dosubot dosubot Bot added the size:L This PR changes 100-499 lines, ignoring generated files. label May 22, 2026
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 22, 2026

Pyrefly Diff

base → PR
--- /tmp/pyrefly_base.txt	2026-05-26 06:29:55.113536091 +0000
+++ /tmp/pyrefly_pr.txt	2026-05-26 06:29:46.160425830 +0000
@@ -5900,51 +5900,45 @@
 ERROR `<=` is not supported between `None` and `datetime` [unsupported-operation]
    --> tests/unit_tests/libs/_human_input/test_models.py:122:16
 ERROR Object of class `Subscription` has no attribute `_start_if_needed` [missing-attribute]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_channel_unit_tests.py:158:17
+   --> tests/unit_tests/libs/broadcast_channel/redis/test_channel_unit_tests.py:155:17
 ERROR Object of class `Subscription` has no attribute `_start_if_needed` [missing-attribute]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_channel_unit_tests.py:226:17
+   --> tests/unit_tests/libs/broadcast_channel/redis/test_channel_unit_tests.py:223:17
 ERROR Argument `FakeRedisClient` is not assignable to parameter `client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis._subscription.RedisSubscriptionBase.__init__` [bad-argument-type]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_channel_unit_tests.py:699:20
+   --> tests/unit_tests/libs/broadcast_channel/redis/test_channel_unit_tests.py:696:20
 ERROR Argument `FakeRedisClient` is not assignable to parameter `client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis._subscription.RedisSubscriptionBase.__init__` [bad-argument-type]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_channel_unit_tests.py:719:20
+   --> tests/unit_tests/libs/broadcast_channel/redis/test_channel_unit_tests.py:716:20
 ERROR Argument `TestRedisShardedSubscription.test_get_message_uses_target_node_for_cluster_client.DummyRedisCluster` is not assignable to parameter `client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis._subscription.RedisSubscriptionBase.__init__` [bad-argument-type]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_channel_unit_tests.py:905:20
+   --> tests/unit_tests/libs/broadcast_channel/redis/test_channel_unit_tests.py:902:20
 ERROR Argument `FakeRedisClient` is not assignable to parameter `client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis._subscription.RedisSubscriptionBase.__init__` [bad-argument-type]
-    --> tests/unit_tests/libs/broadcast_channel/redis/test_channel_unit_tests.py:1035:20
+    --> tests/unit_tests/libs/broadcast_channel/redis/test_channel_unit_tests.py:1032:20
 ERROR Argument `FakeRedisClient` is not assignable to parameter `client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis._subscription.RedisSubscriptionBase.__init__` [bad-argument-type]
-    --> tests/unit_tests/libs/broadcast_channel/redis/test_channel_unit_tests.py:1133:24
+    --> tests/unit_tests/libs/broadcast_channel/redis/test_channel_unit_tests.py:1130:24
 ERROR Argument `FakeStreamsRedis` is not assignable to parameter `redis_client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis.streams_channel.StreamsBroadcastChannel.__init__` [bad-argument-type]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:130:36
+   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:147:36
 ERROR Argument `FakeStreamsRedis` is not assignable to parameter `redis_client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis.streams_channel.StreamsBroadcastChannel.__init__` [bad-argument-type]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:158:45
+   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:175:45
 ERROR Argument `FakeStreamsRedis` is not assignable to parameter `redis_client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis.streams_channel.StreamsBroadcastChannel.__init__` [bad-argument-type]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:166:45
-ERROR Argument `FakeStreamsRedis` is not assignable to parameter `redis_client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis.streams_channel.StreamsBroadcastChannel.__init__` [bad-argument-type]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:180:43
-ERROR Object of class `Subscription` has no attribute `_join_timeout_ms` [missing-attribute]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:187:20
-ERROR Argument `FakeStreamsRedis` is not assignable to parameter `redis_client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis.streams_channel.StreamsBroadcastChannel.__init__` [bad-argument-type]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:192:43
-ERROR Argument `FakeStreamsRedis` is not assignable to parameter `redis_client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis.streams_channel.StreamsBroadcastChannel.__init__` [bad-argument-type]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:205:43
+   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:183:45
 ERROR Argument `FailExpireRedis` is not assignable to parameter `redis_client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis.streams_channel.StreamsBroadcastChannel.__init__` [bad-argument-type]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:222:43
+   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:197:43
 ERROR Argument `FakeStreamsRedis` is not assignable to parameter `redis_client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis.streams_channel.StreamsBroadcastChannel.__init__` [bad-argument-type]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:273:43
+   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:248:43
 ERROR Argument `TestStreamsSubscription.test_listener_normalizes_supported_payloads_and_ignores_unsupported_shapes.OneShotRedis` is not assignable to parameter `client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis.streams_channel._StreamsSubscription.__init__` [bad-argument-type]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:298:45
+   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:273:45
 ERROR Argument `object` is not assignable to parameter `o` with type `Buffer | Iterable[SupportsIndex] | SupportsBytes | SupportsIndex` in function `bytes.__new__` [bad-argument-type]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:306:35
+   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:281:35
 ERROR Argument `FakeStreamsRedis` is not assignable to parameter `client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis.streams_channel._StreamsSubscription.__init__` [bad-argument-type]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:347:45
+   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:322:45
 ERROR Argument `FakeStreamsRedis` is not assignable to parameter `client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis.streams_channel._StreamsSubscription.__init__` [bad-argument-type]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:355:45
+   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:330:45
 ERROR Argument `FakeStreamsRedis` is not assignable to parameter `client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis.streams_channel._StreamsSubscription.__init__` [bad-argument-type]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:364:45
+   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:339:45
 ERROR Argument `FakeStreamsRedis` is not assignable to parameter `client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis.streams_channel._StreamsSubscription.__init__` [bad-argument-type]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:372:45
+   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:347:45
 ERROR Argument `BlockingRedis` is not assignable to parameter `client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis.streams_channel._StreamsSubscription.__init__` [bad-argument-type]
-   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:399:45
+   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:367:45
+ERROR Argument `FakeStreamsRedis` is not assignable to parameter `client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis.streams_channel._StreamsSubscription.__init__` [bad-argument-type]
+   --> tests/unit_tests/libs/broadcast_channel/redis/test_streams_channel_unit_tests.py:385:45
 ERROR Argument `VisualConfig | str` is not assignable to parameter `frequency` with type `str` in function `services.trigger.schedule_service.ScheduleService.visual_to_cron` [bad-argument-type]
    --> tests/unit_tests/libs/test_cron_compatibility.py:318:60
 ERROR Argument `VisualConfig | str` is not assignable to parameter `visual_config` with type `VisualConfig` in function `services.trigger.schedule_service.ScheduleService.visual_to_cron` [bad-argument-type]
@@ -6176,7 +6170,7 @@
     --> tests/unit_tests/services/test_annotation_service.py:1253:27
 ERROR Argument `_FakeStreams` is not assignable to parameter `redis_client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis.streams_channel.StreamsBroadcastChannel.__init__` [bad-argument-type]
   --> tests/unit_tests/services/test_app_generate_service_streaming_integration.py:96:36
-ERROR Argument `_FakeRedisClient` is not assignable to parameter `redis_client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis.channel.BroadcastChannel.__init__` [bad-argument-type]
+ERROR Argument `_FakeRedisClient` is not assignable to parameter `redis_client` with type `Redis[Unknown] | RedisCluster[Unknown]` in function `libs.broadcast_channel.redis.pubsub_channel.BroadcastChannel.__init__` [bad-argument-type]
    --> tests/unit_tests/services/test_app_generate_service_streaming_integration.py:116:34
 ERROR Argument `TestAsyncWorkflowService.test_should_dispatch_to_matching_celery_task_when_triggering_workflow.DummyAccount` is not assignable to parameter `user` with type `Account | EndUser` in function `services.async_workflow_service.AsyncWorkflowService.trigger_workflow_async` [bad-argument-type]
    --> tests/unit_tests/services/test_async_workflow_service.py:155:88

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 22, 2026

Pyrefly Type Coverage

Metric Base PR Delta
Type coverage 45.48% 45.47% -0.01%
Strict coverage 45.00% 44.99% -0.01%
Typed symbols 23,980 23,972 -8
Untyped symbols 29,056 29,056 0
Modules 2709 2710 +1

@wylswz wylswz changed the title fix: improve stream close fix: unblock stream listener using control signals May 22, 2026
@wylswz wylswz enabled auto-merge May 22, 2026 10:26
@laipz8200
Copy link
Copy Markdown
Member

laipz8200 commented May 26, 2026

If you modify the environment variables, you should also provide a PR to update the dify-docs.

@wylswz
Copy link
Copy Markdown
Contributor Author

wylswz commented Jun 5, 2026

If you modify the environment variables, you should also provide a PR to update the dify-docs.

the env PUBSUB_LISTENER_JOIN_TIMEOUT_MS was a temporary solution and was never released.

@wylswz
Copy link
Copy Markdown
Contributor Author

wylswz commented Jun 5, 2026

Looks like the commit history is corrupted... I'll reopen one

@wylswz wylswz closed this Jun 5, 2026
auto-merge was automatically disabled June 5, 2026 08:46

Pull request was closed

@wylswz wylswz mentioned this pull request Jun 5, 2026
5 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants