Skip to content

Commit

Permalink
Close split sources in DistributedExecutionPlanner
Browse files Browse the repository at this point in the history
This prevents leaking split sources if a call to getSplits() fails.
  • Loading branch information
electrum committed Jan 19, 2017
1 parent 6c919dd commit cae0388
Showing 1 changed file with 36 additions and 2 deletions.
Expand Up @@ -52,9 +52,11 @@
import com.facebook.presto.sql.planner.plan.WindowNode; import com.facebook.presto.sql.planner.plan.WindowNode;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;


import javax.inject.Inject; import javax.inject.Inject;


import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;


Expand All @@ -63,6 +65,8 @@


public class DistributedExecutionPlanner public class DistributedExecutionPlanner
{ {
private static final Logger log = Logger.get(DistributedExecutionPlanner.class);

private final SplitManager splitManager; private final SplitManager splitManager;


@Inject @Inject
Expand All @@ -72,16 +76,38 @@ public DistributedExecutionPlanner(SplitManager splitManager)
} }


public StageExecutionPlan plan(SubPlan root, Session session) public StageExecutionPlan plan(SubPlan root, Session session)
{
Visitor visitor = new Visitor(session);
try {
return plan(root, visitor);
}
catch (Throwable t) {
visitor.getSplitSources().forEach(DistributedExecutionPlanner::closeSplitSource);
throw t;
}
}

private static void closeSplitSource(SplitSource source)
{
try {
source.close();
}
catch (Throwable t) {
log.warn(t, "Error closing split source");
}
}

private StageExecutionPlan plan(SubPlan root, Visitor visitor)
{ {
PlanFragment currentFragment = root.getFragment(); PlanFragment currentFragment = root.getFragment();


// get splits for this fragment, this is lazy so split assignments aren't actually calculated here // get splits for this fragment, this is lazy so split assignments aren't actually calculated here
Map<PlanNodeId, SplitSource> splitSources = currentFragment.getRoot().accept(new Visitor(session), null); Map<PlanNodeId, SplitSource> splitSources = currentFragment.getRoot().accept(visitor, null);


// create child stages // create child stages
ImmutableList.Builder<StageExecutionPlan> dependencies = ImmutableList.builder(); ImmutableList.Builder<StageExecutionPlan> dependencies = ImmutableList.builder();
for (SubPlan childPlan : root.getChildren()) { for (SubPlan childPlan : root.getChildren()) {
dependencies.add(plan(childPlan, session)); dependencies.add(plan(childPlan, visitor));
} }


return new StageExecutionPlan( return new StageExecutionPlan(
Expand All @@ -94,12 +120,18 @@ private final class Visitor
extends PlanVisitor<Void, Map<PlanNodeId, SplitSource>> extends PlanVisitor<Void, Map<PlanNodeId, SplitSource>>
{ {
private final Session session; private final Session session;
private final List<SplitSource> splitSources = new ArrayList<>();


private Visitor(Session session) private Visitor(Session session)
{ {
this.session = session; this.session = session;
} }


public List<SplitSource> getSplitSources()
{
return splitSources;
}

@Override @Override
public Map<PlanNodeId, SplitSource> visitExplainAnalyze(ExplainAnalyzeNode node, Void context) public Map<PlanNodeId, SplitSource> visitExplainAnalyze(ExplainAnalyzeNode node, Void context)
{ {
Expand All @@ -112,6 +144,8 @@ public Map<PlanNodeId, SplitSource> visitTableScan(TableScanNode node, Void cont
// get dataSource for table // get dataSource for table
SplitSource splitSource = splitManager.getSplits(session, node.getLayout().get()); SplitSource splitSource = splitManager.getSplits(session, node.getLayout().get());


splitSources.add(splitSource);

return ImmutableMap.of(node.getId(), splitSource); return ImmutableMap.of(node.getId(), splitSource);
} }


Expand Down

0 comments on commit cae0388

Please sign in to comment.