Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Some joins never complete with dynamic filtering enabled #9917

Closed
lhofhansl opened this issue Nov 9, 2021 · 17 comments · Fixed by #9952
Closed

Some joins never complete with dynamic filtering enabled #9917

lhofhansl opened this issue Nov 9, 2021 · 17 comments · Fixed by #9952
Labels
bug Something isn't working

Comments

@lhofhansl
Copy link
Member

lhofhansl commented Nov 9, 2021

I have noticed that a join query sometimes does not finish when the pushed predicate on one side matches no rows, but only when dynamic filtering is enabled.

This is hard to reproduce as it seems to be related to data size, among other things.
The best I have achieved is loading the sf200.lineitem (1.2bn rows) table into Hive (via Trino) in a single node setup and then issuing this contrived query:

SELECT COUNT(*) FROM lineitem l1, lineitem l2 WHERE l1.orderkey = l2.orderkey AND l1.partkey = l2.partkey AND l1.orderkey < 1;
(Note that l1.orderkey < 1 matches no rows.)

The query will start and then just stop, with Trino consuming no noticeable CPU.
But the query finished when turning off dynamic filtering through the session.

Also, curiously this query does finish:
SELECT COUNT(*) FROM lineitem l1, lineitem l2 WHERE l1.orderkey = l2.orderkey AND l1.partkey = l2.partkey AND l1.orderkey < 2;
(and returns 0 rows)

Update: This 2nd query runs in about 1/2 the time (7s) when dynamic filters off (14s with dynamic filtering on)

So it has to do something with the predicate on one side of the join not returning any rows.
And this also did not happen with smaller data sets :(

(I'll follow up with the query plans.)

@lhofhansl
Copy link
Member Author

Failing query's plan (dynamic filtering on):

                                                                                                Query Plan                                                                                  >
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->

Fragment 0 [SINGLE]                                                                                                                                                                        >
     Output layout: [count]                                                                                                                                                                 >
     Output partitioning: SINGLE []                                                                                                                                                         >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                          >
     Output[_col0]                                                                                                                                                                          >
     │   Layout: [count:bigint]                                                                                                                                                             >
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                                                                            >
     │   _col0 := count                                                                                                                                                                     >
     └─ Aggregate(FINAL)                                                                                                                                                                    >
        │   Layout: [count:bigint]                                                                                                                                                          >
        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                                                                         >
        │   count := count("count_19")                                                                                                                                                      >
        └─ LocalExchange[SINGLE] ()                                                                                                                                                         >
           │   Layout: [count_19:bigint]                                                                                                                                                    >
           │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                                                                      >
           └─ RemoteSource[1]                                                                                                                                                               >
                  Layout: [count_19:bigint]                                                                                                                                                 >
                                                                                                                                                                                            >
 Fragment 1 [SOURCE]                                                                                                                                                                        >
     Output layout: [count_19]                                                                                                                                                              >
     Output partitioning: SINGLE []                                                                                                                                                         >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                          >
     Aggregate(PARTIAL)                                                                                                                                                                     >
     │   Layout: [count_19:bigint]                                                                                                                                                          >
     │   count_19 := count(*)                                                                                                                                                               >
     └─ InnerJoin[("orderkey" = "orderkey_0") AND ("partkey" = "partkey_1")][$hashvalue, $hashvalue_20]                                                                                     >
        │   Layout: []                                                                                                                                                                      >
        │   Estimates: {rows: 4 (0B), cpu: 80.47G, memory: 106B, network: 106B}                                                                                                             >
        │   Distribution: REPLICATED                                                                                                                                                        >
        │   dynamicFilterAssignments = {orderkey_0 -> #df_408, partkey_1 -> #df_409}                                                                                                        >
        ├─ ScanFilterProject[table = hive:default:lineitem, grouped = false, filterPredicate = ("orderkey" < BIGINT '1'), dynamicFilters = {"orderkey" = #df_408, "partkey" = #df_409}]     >
        │      Layout: [orderkey:bigint, partkey:bigint, $hashvalue:bigint]                                                                                                                 >
        │      Estimates: {rows: 1200018434 (30.18GB), cpu: 20.12G, memory: 0B, network: 0B}/{rows: 4 (106B), cpu: 40.23G, memory: 0B, network: 0B}/{rows: 4 (106B), cpu: 40.23G, memory: 0B>
        │      $hashvalue := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("orderkey"), 0)), COALESCE("$operator$hash_code"("partkey"), 0))                          >
        │      partkey := partkey:bigint:REGULAR                                                                                                                                            >
        │      orderkey := orderkey:bigint:REGULAR                                                                                                                                          >
        └─ LocalExchange[HASH][$hashvalue_20] ("orderkey_0", "partkey_1")                                                                                                                   >
           │   Layout: [orderkey_0:bigint, partkey_1:bigint, $hashvalue_20:bigint]                                                                                                          >
           │   Estimates: {rows: 4 (106B), cpu: 40.23G, memory: 0B, network: 106B}                                                                                                          >
           └─ RemoteSource[2]                                                                                                                                                               >
                  Layout: [orderkey_0:bigint, partkey_1:bigint, $hashvalue_21:bigint]                                                                                                       >
                                                                                                                                                                                            >
 Fragment 2 [SOURCE]                                                                                                                                                                        >
     Output layout: [orderkey_0, partkey_1, $hashvalue_22]                                                                                                                                  >
     Output partitioning: BROADCAST []                                                                                                                                                      >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                          >
     ScanFilterProject[table = hive:default:lineitem, grouped = false, filterPredicate = ("orderkey_0" < BIGINT '1')]                                                                       >
         Layout: [orderkey_0:bigint, partkey_1:bigint, $hashvalue_22:bigint]                                                                                                                >
         Estimates: {rows: 1200018434 (30.18GB), cpu: 20.12G, memory: 0B, network: 0B}/{rows: 4 (106B), cpu: 40.23G, memory: 0B, network: 0B}/{rows: 4 (106B), cpu: 40.23G, memory: 0B, netw>
         $hashvalue_22 := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("orderkey_0"), 0)), COALESCE("$operator$hash_code"("partkey_1"), 0))                         >
         partkey_1 := partkey:bigint:REGULAR                                                                                                                                                >
         orderkey_0 := orderkey:bigint:REGULAR                                                                                                                                              >

@lhofhansl
Copy link
Member Author

Same query with dynamic filtering turned off:

                                                                                                Query Plan                                                                                  >
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 Fragment 0 [SINGLE]                                                                                                                                                                        >
     Output layout: [count]                                                                                                                                                                 >
     Output partitioning: SINGLE []                                                                                                                                                         >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                          >
     Output[_col0]                                                                                                                                                                          >
     │   Layout: [count:bigint]                                                                                                                                                             >
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                                                                            >
     │   _col0 := count                                                                                                                                                                     >
     └─ Aggregate(FINAL)                                                                                                                                                                    >
        │   Layout: [count:bigint]                                                                                                                                                          >
        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                                                                         >
        │   count := count("count_19")                                                                                                                                                      >
        └─ LocalExchange[SINGLE] ()                                                                                                                                                         >
           │   Layout: [count_19:bigint]                                                                                                                                                    >
           │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                                                                      >
           └─ RemoteSource[1]                                                                                                                                                               >
                  Layout: [count_19:bigint]                                                                                                                                                 >
                                                                                                                                                                                            >
 Fragment 1 [SOURCE]                                                                                                                                                                        >
     Output layout: [count_19]                                                                                                                                                              >
     Output partitioning: SINGLE []                                                                                                                                                         >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                          >
     Aggregate(PARTIAL)                                                                                                                                                                     >
     │   Layout: [count_19:bigint]                                                                                                                                                          >
     │   count_19 := count(*)                                                                                                                                                               >
     └─ InnerJoin[("orderkey" = "orderkey_0") AND ("partkey" = "partkey_1")][$hashvalue, $hashvalue_20]                                                                                     >
        │   Layout: []                                                                                                                                                                      >
        │   Estimates: {rows: 4 (0B), cpu: 80.47G, memory: 106B, network: 106B}                                                                                                             >
        │   Distribution: REPLICATED                                                                                                                                                        >
        ├─ ScanFilterProject[table = hive:default:lineitem, grouped = false, filterPredicate = ("orderkey" < BIGINT '1')]                                                                   >
        │      Layout: [orderkey:bigint, partkey:bigint, $hashvalue:bigint]                                                                                                                 >
        │      Estimates: {rows: 1200018434 (30.18GB), cpu: 20.12G, memory: 0B, network: 0B}/{rows: 4 (106B), cpu: 40.23G, memory: 0B, network: 0B}/{rows: 4 (106B), cpu: 40.23G, memory: 0B>
        │      $hashvalue := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("orderkey"), 0)), COALESCE("$operator$hash_code"("partkey"), 0))                          >
        │      partkey := partkey:bigint:REGULAR                                                                                                                                            >
        │      orderkey := orderkey:bigint:REGULAR                                                                                                                                          >
        └─ LocalExchange[HASH][$hashvalue_20] ("orderkey_0", "partkey_1")                                                                                                                   >
           │   Layout: [orderkey_0:bigint, partkey_1:bigint, $hashvalue_20:bigint]                                                                                                          >
           │   Estimates: {rows: 4 (106B), cpu: 40.23G, memory: 0B, network: 106B}                                                                                                          >
           └─ RemoteSource[2]                                                                                                                                                               >
                  Layout: [orderkey_0:bigint, partkey_1:bigint, $hashvalue_21:bigint]                                                                                                       >
                                                                                                                                                                                            >
 Fragment 2 [SOURCE]                                                                                                                                                                        >
     Output layout: [orderkey_0, partkey_1, $hashvalue_22]                                                                                                                                  >
     Output partitioning: BROADCAST []                                                                                                                                                      >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                          >
     ScanFilterProject[table = hive:default:lineitem, grouped = false, filterPredicate = ("orderkey_0" < BIGINT '1')]                                                                       >
         Layout: [orderkey_0:bigint, partkey_1:bigint, $hashvalue_22:bigint]                                                                                                                >
         Estimates: {rows: 1200018434 (30.18GB), cpu: 20.12G, memory: 0B, network: 0B}/{rows: 4 (106B), cpu: 40.23G, memory: 0B, network: 0B}/{rows: 4 (106B), cpu: 40.23G, memory: 0B, netw>
         $hashvalue_22 := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("orderkey_0"), 0)), COALESCE("$operator$hash_code"("partkey_1"), 0))                         >
         partkey_1 := partkey:bigint:REGULAR                                                                                                                                                >
         orderkey_0 := orderkey:bigint:REGULAR                                                                                                                                              >

@lhofhansl
Copy link
Member Author

And this is the query where the predicate matches some rows:

                                                                                               Query Plan                                                                                   >
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 Fragment 0 [SINGLE]                                                                                                                                                                        >
     Output layout: [count]                                                                                                                                                                 >
     Output partitioning: SINGLE []                                                                                                                                                         >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                          >
     Output[_col0]                                                                                                                                                                          >
     │   Layout: [count:bigint]                                                                                                                                                             >
     │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                                                                            >
     │   _col0 := count                                                                                                                                                                     >
     └─ Aggregate(FINAL)                                                                                                                                                                    >
        │   Layout: [count:bigint]                                                                                                                                                          >
        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                                                                         >
        │   count := count("count_19")                                                                                                                                                      >
        └─ LocalExchange[SINGLE] ()                                                                                                                                                         >
           │   Layout: [count_19:bigint]                                                                                                                                                    >
           │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}                                                                                                                      >
           └─ RemoteSource[1]                                                                                                                                                               >
                  Layout: [count_19:bigint]                                                                                                                                                 >
                                                                                                                                                                                            >
 Fragment 1 [SOURCE]                                                                                                                                                                        >
     Output layout: [count_19]                                                                                                                                                              >
     Output partitioning: SINGLE []                                                                                                                                                         >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                          >
     Aggregate(PARTIAL)                                                                                                                                                                     >
     │   Layout: [count_19:bigint]                                                                                                                                                          >
     │   count_19 := count(*)                                                                                                                                                               >
     └─ InnerJoin[("orderkey" = "orderkey_0") AND ("partkey" = "partkey_1")][$hashvalue, $hashvalue_20]                                                                                     >
        │   Layout: []                                                                                                                                                                      >
        │   Estimates: {rows: 1 (0B), cpu: 80.47G, memory: 27B, network: 27B}                                                                                                               >
        │   Distribution: REPLICATED                                                                                                                                                        >
        │   dynamicFilterAssignments = {orderkey_0 -> #df_408, partkey_1 -> #df_409}                                                                                                        >
        ├─ ScanFilterProject[table = hive:default:lineitem, grouped = false, filterPredicate = ("orderkey" < BIGINT '2'), dynamicFilters = {"orderkey" = #df_408, "partkey" = #df_409}]     >
        │      Layout: [orderkey:bigint, partkey:bigint, $hashvalue:bigint]                                                                                                                 >
        │      Estimates: {rows: 1200018434 (30.18GB), cpu: 20.12G, memory: 0B, network: 0B}/{rows: 1 (27B), cpu: 40.23G, memory: 0B, network: 0B}/{rows: 1 (27B), cpu: 40.23G, memory: 0B, >
        │      $hashvalue := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("orderkey"), 0)), COALESCE("$operator$hash_code"("partkey"), 0))                          >
        │      partkey := partkey:bigint:REGULAR                                                                                                                                            >
        │      orderkey := orderkey:bigint:REGULAR                                                                                                                                          >
        └─ LocalExchange[HASH][$hashvalue_20] ("orderkey_0", "partkey_1")                                                                                                                   >
           │   Layout: [orderkey_0:bigint, partkey_1:bigint, $hashvalue_20:bigint]                                                                                                          >
           │   Estimates: {rows: 1 (27B), cpu: 40.23G, memory: 0B, network: 27B}                                                                                                            >
           └─ RemoteSource[2]                                                                                                                                                               >
                  Layout: [orderkey_0:bigint, partkey_1:bigint, $hashvalue_21:bigint]                                                                                                       >
                                                                                                                                                                                            >
 Fragment 2 [SOURCE]                                                                                                                                                                        >
     Output layout: [orderkey_0, partkey_1, $hashvalue_22]                                                                                                                                  >
     Output partitioning: BROADCAST []                                                                                                                                                      >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                          >
     ScanFilterProject[table = hive:default:lineitem, grouped = false, filterPredicate = ("orderkey_0" < BIGINT '2')]                                                                       >
         Layout: [orderkey_0:bigint, partkey_1:bigint, $hashvalue_22:bigint]                                                                                                                >
         Estimates: {rows: 1200018434 (30.18GB), cpu: 20.12G, memory: 0B, network: 0B}/{rows: 1 (27B), cpu: 40.23G, memory: 0B, network: 0B}/{rows: 1 (27B), cpu: 40.23G, memory: 0B, networ>
         $hashvalue_22 := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("orderkey_0"), 0)), COALESCE("$operator$hash_code"("partkey_1"), 0))                         >
         partkey_1 := partkey:bigint:REGULAR                                                                                                                                                >
         orderkey_0 := orderkey:bigint:REGULAR                                                                                                                                              >

@lhofhansl
Copy link
Member Author

I might have time to look into this, for now just parking the information here for later.

@lhofhansl
Copy link
Member Author

@raunaqmorarka FYI (seen your name in many of the dynamic filtering PRs.)

@lhofhansl lhofhansl added the bug Something isn't working label Nov 10, 2021
@rzeyde-varada
Copy link
Contributor

Many thanks for reporting this issue!

IIUC, the problematic query:

SELECT COUNT(*) 
FROM lineitem l1, lineitem l2 
WHERE l1.orderkey = l2.orderkey AND l1.partkey = l2.partkey AND l1.orderkey < 1;

results in a broadcast join:

 Fragment 1 [SOURCE]                                                                                                                                                                        >
     Output layout: [count_19]                                                                                                                                                              >
     Output partitioning: SINGLE []                                                                                                                                                         >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                          >
     Aggregate(PARTIAL)                                                                                                                                                                     >
     │   Layout: [count_19:bigint]                                                                                                                                                          >
     │   count_19 := count(*)                                                                                                                                                               >
     └─ InnerJoin[("orderkey" = "orderkey_0") AND ("partkey" = "partkey_1")][$hashvalue, $hashvalue_20]                                                                                     >
        │   Layout: []                                                                                                                                                                      >
        │   Estimates: {rows: 4 (0B), cpu: 80.47G, memory: 106B, network: 106B}                                                                                                             >
        │   Distribution: REPLICATED                                                                                                                                                        >
        │   dynamicFilterAssignments = {orderkey_0 -> #df_408, partkey_1 -> #df_409}                                                                                                        >
        ├─ ScanFilterProject[table = hive:default:lineitem, grouped = false, filterPredicate = ("orderkey" < BIGINT '1'), dynamicFilters = {"orderkey" = #df_408, "partkey" = #df_409}]     >
        │      Layout: [orderkey:bigint, partkey:bigint, $hashvalue:bigint]                                                                                                                 >
        │      Estimates: {rows: 1200018434 (30.18GB), cpu: 20.12G, memory: 0B, network: 0B}/{rows: 4 (106B), cpu: 40.23G, memory: 0B, network: 0B}/{rows: 4 (106B), cpu: 40.23G, memory: 0B>
        │      $hashvalue := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("orderkey"), 0)), COALESCE("$operator$hash_code"("partkey"), 0))                          >
        │      partkey := partkey:bigint:REGULAR                                                                                                                                            >
        │      orderkey := orderkey:bigint:REGULAR                                                                                                                                          >
        └─ LocalExchange[HASH][$hashvalue_20] ("orderkey_0", "partkey_1")                                                                                                                   >
           │   Layout: [orderkey_0:bigint, partkey_1:bigint, $hashvalue_20:bigint]                                                                                                          >
           │   Estimates: {rows: 4 (106B), cpu: 40.23G, memory: 0B, network: 106B}                                                                                                          >
           └─ RemoteSource[2]                                                                                                                                                               >
                  Layout: [orderkey_0:bigint, partkey_1:bigint, $hashvalue_21:bigint]                                                                                                       >
                                                                                                                                                                                            >
 Fragment 2 [SOURCE]                                                                                                                                                                        >
     Output layout: [orderkey_0, partkey_1, $hashvalue_22]                                                                                                                                  >
     Output partitioning: BROADCAST []                                                                                                                                                      >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                          >
     ScanFilterProject[table = hive:default:lineitem, grouped = false, filterPredicate = ("orderkey_0" < BIGINT '1')]                                                                       >
         Layout: [orderkey_0:bigint, partkey_1:bigint, $hashvalue_22:bigint]                                                                                                                >
         Estimates: {rows: 1200018434 (30.18GB), cpu: 20.12G, memory: 0B, network: 0B}/{rows: 4 (106B), cpu: 40.23G, memory: 0B, network: 0B}/{rows: 4 (106B), cpu: 40.23G, memory: 0B, netw>
         $hashvalue_22 := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("orderkey_0"), 0)), COALESCE("$operator$hash_code"("partkey_1"), 0))                         >
         partkey_1 := partkey:bigint:REGULAR                                                                                                                                                >
         orderkey_0 := orderkey:bigint:REGULAR

so the engine should use LocalDynamicFiltersCollector - which should be handled locally in each worker.
The collection takes place in DynamicFilterSourceOperator, calling LocalDynamicFilterConsumer#addPartition (on each hash build partition).
After all partitions are collected, LocalDynamicFilterConsumer#getDynamicFilterDomains future is set, passing the DF to the scan operators:

Following #3414 and #4685, we also allow the page source to block until the relevant dynamic filters are ready.
See https://youtu.be/ZwaVZplVmVA?t=2430 for more context about this feature.

It is possible that we have a bug somewhere in the above implementation, causing the query to get stuck :(

  • Could you please share jstack output from the server? It may help us to narrow down the problem...
  • Also, could you please check whether the probe-side stage is starting (i.e. any split of lineitem l1 has started/finished scanning)?
  • Does this issue happen on the latest Trino version?
    Would it be possible, please, to git bisect it over the latest releases?

@lhofhansl
Copy link
Member Author

Thanks for the response @rzeyde-varada

Here're the stages while this running.
It's interesting that it always stops when 16 tasks are running in stage 1.

trino:default> select count(*) from lineitem l1, lineitem l2 where l1.orderkey = l2.orderkey and l1.partkey = l2.partkey and l1.orderkey < 1;

Query 20211113_203124_00017_bwhrg, RUNNING, 1 node, 772 splits
0:27 [    0 rows,     0B] [    0 rows/s,     0B/s] [                           <=>            ]

     STAGES   ROWS  ROWS/s  BYTES  BYTES/s  QUEUED    RUN   DONE
0.........R    108       4  3.27K     124B       0     17      0
  1.......S      0       0     0B       0B       0     16    124
    2.....F      0       0     0B       0B       0      0    615

@lhofhansl
Copy link
Member Author

trino-jstack.txt

@rzeyde-varada
Copy link
Contributor

Could you please try to see if disabling enable_coordinator_dynamic_filters_distribution (by setting it to false) has an effect on this issue?

@rzeyde-varada
Copy link
Contributor

Would it be possible to share the QueryInfo JSON of this query?

It should be possible to retrieve it via the Web UI (using the "JSON" tab, in the upper-right corner):
image

@lhofhansl
Copy link
Member Author

enable_coordinator_dynamic_filters_distribution does not make a difference.

@lhofhansl
Copy link
Member Author

@rzeyde-varada
Copy link
Contributor

rzeyde-varada commented Nov 15, 2021

From the JSON above, it seems that DF collection is finished:

    "dynamicFiltersStats": {
      "dynamicFilterDomainStats": [
        {
          "dynamicFilterId": "df_408",
          "simplifiedDomain": "NONE",
          "collectionDuration": "13.14s"
        },
        {
          "dynamicFilterId": "df_409",
          "simplifiedDomain": "NONE",
          "collectionDuration": "13.14s"
        }
      ],
      "lazyDynamicFilters": 2,
      "replicatedDynamicFilters": 2,
      "totalDynamicFilters": 2,
      "dynamicFiltersCompleted": 2
    },

However, it's a bit strange that join-containing stage seems to be stuck in the scheduling state:

    "subStages": [
      {
        "stageId": "20211114_161209_00011_bwhrg.1",
        "state": "SCHEDULING",
        "plan": {
          "id": "1",
          "root": {
            "@type": "aggregation",
            "id": "479",
            "source": {
              "@type": "join",
              "id": "339",
              "type": "INNER",
              "left": {
                "@type": "project",
                "id": "522",
                "source": {
                  "@type": "filter",
                  "id": "410",
                  "source": {
                    "@type": "tablescan",
                    "id": "0",

From the stack above, it seems that the workers are "waiting" for splits from the coordinator:

"SplitRunner-0-110" #110 prio=5 os_prio=0 cpu=211.77ms elapsed=853.48s tid=0x00007fc89d190000 nid=0x1da40e waiting on condition  [0x00007fbaa5bdc000]
   java.lang.Thread.State: WAITING (parking)
	at jdk.internal.misc.Unsafe.park(java.base@11.0.11/Native Method)
	- parking to wait for  <0x00007fbc8415eaa8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.park(java.base@11.0.11/LockSupport.java:194)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(java.base@11.0.11/AbstractQueuedSynchronizer.java:2081)
	at io.trino.execution.executor.MultilevelSplitQueue.take(MultilevelSplitQueue.java:133)
	at io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:469)
	at io.trino.$gen.Trino_364_102_g8708cd7_dirty____20211113_201945_2.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@11.0.11/ThreadPoolExecutor.java:1128)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@11.0.11/ThreadPoolExecutor.java:628)
	at java.lang.Thread.run(java.base@11.0.11/Thread.java:829)

So IIUC, the coordinator is no longer producing splits / notifying the workers that there are no more splits...
Maybe it is somehow stuck?

BTW, are there any errors/warning in the server.log files?

@rzeyde-varada
Copy link
Contributor

cc: @sopel39 @findepi

@rzeyde-varada
Copy link
Contributor

Also, does this issue reproduce on previous Trino releases?

@sopel39
Copy link
Member

sopel39 commented Nov 15, 2021

This should be fixed by #9952. This bug is only in unreleased version

@lhofhansl
Copy link
Member Author

lhofhansl commented Nov 15, 2021

Thanks @sopel39!
This was indeed a version built against current master.
I'll retest this now.

FWIW, I retested with #9952 applied, and the problem is gone. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Development

Successfully merging a pull request may close this issue.

3 participants