Skip to content

Conversation

@yokofly
Copy link
Collaborator

@yokofly yokofly commented Dec 10, 2025

The bug happened because the join could return Finished without signaling downstream, leaving later processors in NeedData and causing “Pipeline stuck.” The new logic closes both input ports and calls output.finish() as soon as either side finishes, which should release the pipeline cleanly

CREATE MATERIALIZED VIEW default.mv3
(
  `str_col` string,
  `_tp_time` datetime64(3, 'UTC'),
  `int_col` int32,
  `nf_col` nullable(float32),
  `b.str_col` string,
  `b._tp_time` datetime64(3, 'UTC'),
  `b.int_col` int32,
  `b.nf_col` nullable(float32),
  `_tp_sn` int64
) AS
SELECT
  *
FROM
  default.`99114_stream` AS a
INNER JOIN default.`99114_stream` AS b ON (a.str_col = b.str_col) AND lag_behind(1s, a._tp_time, b._tp_time)
SETTINGS
  seek_to = 'earliest'
<Fatal> : Logical error: 'Pipeline stuck. Current state:
digraph
{
  rankdir="LR";
  { node [shape = rect]
    n6100735000[label="StreamingStoreSource:(51 jobs, execution time: 0 sec., preparation time: 0.000057335 sec.) (Finished)"];
    n6101782040[label="ExpressionTransform:(51 jobs, execution time: 0 sec., preparation time: 0.000112119 sec.) (Finished)"];
    n6425575448[label="StreamingJoinTransformWithAlignment:(102 jobs, execution time: 0 sec., preparation time: 0.000560041 sec.) (Finished)"];
    n6100733976[label="StreamingStoreSource:(52 jobs, execution time: 0 sec., preparation time: 0.00006962 sec.) (Finished)"];
    n6101782552[label="ExpressionTransform:(51 jobs, execution time: 0 sec., preparation time: 0.000088702 sec.) (Finished)"];
    n6101783064[label="ExpressionTransform:(102 jobs, execution time: 0 sec., preparation time: 0.000131292 sec.) (NeedData)"];
    n6101781528[label="MaterializingTransform:(102 jobs, execution time: 0 sec., preparation time: 0.000127205 sec.) (NeedData)"];
    n5706909976[label="CountingTransform:(102 jobs, execution time: 0 sec., preparation time: 0.00013712 sec.) (NeedData)"];
    n6101781016[label="ConvertingTransform:(102 jobs, execution time: 0 sec., preparation time: 0.000112377 sec.) (NeedData)"];
    n6425577752[label="StreamSink:(103 jobs, execution time: 0 sec., preparation time: 0.000126751 sec.) (NeedData)"];
    n5689829784[label="EmptySink:(103 jobs, execution time: 0 sec., preparation time: 0.000089126 sec.) (NeedData)"];
  }
  n6100735000 -> n6101782040;
  n6101782040 -> n6425575448;
  n6425575448 -> n6101783064;
  n6100733976 -> n6101782552;
  n6101782552 -> n6425575448;
  n6101783064 -> n6101781528;
  n6101781528 -> n5706909976;
  n5706909976 -> n6101781016;
  n6101781016 -> n6425577752;
  n6425577752 -> n5689829784;
}

@chatgpt-codex-connector
Copy link

You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard.
To continue using code reviews, you can upgrade your account or add credits to your account and enable them for code reviews in your settings.

@yokofly yokofly merged commit 2589d44 into develop Dec 10, 2025
7 of 11 checks passed
@yokofly yokofly deleted the fix-pipeline-stuck-during-shutdown branch December 10, 2025 08:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants