New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TableFinishNode distribution properties should be derived from input #11828

Closed
wants to merge 1 commit into
base: master
from

Conversation

Projects
None yet
3 participants
@sopel39
Member

sopel39 commented Oct 31, 2018

TableFinishNode does not enforce any distribution by itself.
It relies on ExchangeNode to stream data to coordinator.

TableFinishNode distribution properties should be derived from input
TableFinishNode does not enforce any distribution by itself.
It relies on ExchangeNode to stream data to coordinator.

@sopel39 sopel39 requested review from dain and findepi Oct 31, 2018

return ActualProperties.builder()
.global(coordinatorSingleStreamPartition())
.build();
return Iterables.getOnlyElement(inputProperties);

This comment has been minimized.

@dain

dain Nov 1, 2018

Contributor

I'm not sure this is correct. The table finish operator instance (there is only one), gets the summary information from the TableWriterOperators and then completes the table in the connector (e.g., in Hive adds the partitions to the pending transaction). IIRC the output is just the number of rows written. @electrum does that sound correct?

This comment has been minimized.

@sopel39

sopel39 Nov 1, 2018

Member

You might be right. The code in AddExchanges for TableFinishNode is:

            ...
            if (!child.getProperties().isCoordinatorOnly()) {
                child = withDerivedProperties(
                        gatheringExchange(idAllocator.getNextId(), REMOTE, child.getNode()),
                        child.getProperties());
            }
            ...

so it creates (just) single node partitioning if child is not running on coordinator.
I think it should create coordinator partitioning instead.

Then we probably could remove context.get().setCoordinatorOnlyDistribution(); from Fragmenter#visitTableFinish
In fact AddExchanges.Rewriter#visitTableFinish can probably be simplified to:

            PlanWithProperties child = planChild(node, PreferredProperties.any());
            if (!child.getProperties().isCoordinatorOnly()) {
                child = withDerivedProperties(
                        gatheringCoordinatorExchange(idAllocator.getNextId(), REMOTE, child.getNode()),
                        child.getProperties());
            }
            return rebaseAndDeriveProperties(node, child);

This way we

  • create gathering coordinator exchange (to offload coordinator) even if child is running on a single (non-coordinator node) and
  • don't create gathering exchange if stream is already running on coordinator.

This should decouple AddExchanges from PlanFragmenter and PropertyDerivations a bit more

This comment has been minimized.

@dain

dain Nov 1, 2018

Contributor

Not sure I followed all of that, but the critical part is that TableFinish must run on the coordinator, so it can have direct access to the transaction state.

@sopel39

This comment has been minimized.

Member

sopel39 commented Nov 1, 2018

Closing as the PR is too simplistic

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment