Skip to content

Commit

Permalink
Add experimental.spiller-spill-path property
Browse files Browse the repository at this point in the history
  • Loading branch information
ArturGajowy authored and cberner committed Oct 5, 2016
1 parent e1c3f40 commit bf82007
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 17 deletions.
Expand Up @@ -41,7 +41,6 @@
import java.util.stream.Stream; import java.util.stream.Stream;


import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spiller.BinarySpillerFactory.SPILL_PATH;
import static com.facebook.presto.util.ImmutableCollectors.toImmutableList; import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toList;
Expand All @@ -56,12 +55,12 @@ public class BinaryFileSpiller
private int spillsCount; private int spillsCount;
private final ListeningExecutorService executor; private final ListeningExecutorService executor;


public BinaryFileSpiller(BlockEncodingSerde blockEncodingSerde, ListeningExecutorService executor) public BinaryFileSpiller(BlockEncodingSerde blockEncodingSerde, ListeningExecutorService executor, Path spillPath)
{ {
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null"); this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
this.executor = requireNonNull(executor, "executor is null"); this.executor = requireNonNull(executor, "executor is null");
try { try {
this.targetDirectory = Files.createTempDirectory(SPILL_PATH, "presto-spill"); this.targetDirectory = Files.createTempDirectory(spillPath, "presto-spill");
} }
catch (IOException e) { catch (IOException e) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, "Failed to create spill directory", e); throw new PrestoException(GENERIC_INTERNAL_ERROR, "Failed to create spill directory", e);
Expand Down
Expand Up @@ -16,12 +16,12 @@


import com.facebook.presto.spi.block.BlockEncodingSerde; import com.facebook.presto.spi.block.BlockEncodingSerde;
import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject; import com.google.inject.Inject;


import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List; import java.util.List;


import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.airlift.concurrent.Threads.daemonThreadsNamed;
Expand All @@ -31,28 +31,27 @@
public class BinarySpillerFactory public class BinarySpillerFactory
implements SpillerFactory implements SpillerFactory
{ {
public static final Path SPILL_PATH = Paths.get("/tmp/spills");

private final ListeningExecutorService executor; private final ListeningExecutorService executor;
private final BlockEncodingSerde blockEncodingSerde; private final BlockEncodingSerde blockEncodingSerde;
private final Path spillPath;


@Inject @Inject
public BinarySpillerFactory(BlockEncodingSerde blockEncodingSerde) public BinarySpillerFactory(BlockEncodingSerde blockEncodingSerde, FeaturesConfig featuresConfig)
{ {
this(blockEncodingSerde, MoreExecutors.listeningDecorator(newFixedThreadPool(4, daemonThreadsNamed("binary-spiller-%s")))); this(blockEncodingSerde, MoreExecutors.listeningDecorator(newFixedThreadPool(4, daemonThreadsNamed("binary-spiller-%s"))), featuresConfig);
} }


public BinarySpillerFactory(BlockEncodingSerde blockEncodingSerde, ListeningExecutorService executor) public BinarySpillerFactory(BlockEncodingSerde blockEncodingSerde, ListeningExecutorService executor, FeaturesConfig featuresConfig)
{ {
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null"); this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
this.executor = requireNonNull(executor, "executor is null"); this.executor = requireNonNull(executor, "executor is null");

this.spillPath = featuresConfig.getSpillerSpillPath();
SPILL_PATH.toFile().mkdirs(); this.spillPath.toFile().mkdirs();
} }


@Override @Override
public Spiller create(List<Type> types) public Spiller create(List<Type> types)
{ {
return new BinaryFileSpiller(blockEncodingSerde, executor); return new BinaryFileSpiller(blockEncodingSerde, executor, spillPath);
} }
} }
Expand Up @@ -22,6 +22,8 @@


import javax.validation.constraints.Min; import javax.validation.constraints.Min;


import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List; import java.util.List;


import static com.facebook.presto.sql.analyzer.RegexLibrary.JONI; import static com.facebook.presto.sql.analyzer.RegexLibrary.JONI;
Expand Down Expand Up @@ -58,6 +60,7 @@ public static class ProcessingOptimization
private RegexLibrary regexLibrary = JONI; private RegexLibrary regexLibrary = JONI;
private boolean spillEnabled; private boolean spillEnabled;
private DataSize operatorMemoryLimitBeforeSpill = new DataSize(4, DataSize.Unit.MEGABYTE); private DataSize operatorMemoryLimitBeforeSpill = new DataSize(4, DataSize.Unit.MEGABYTE);
private Path spillerSpillPath = Paths.get(System.getProperty("java.io.tmpdir"), "presto", "spills");


public boolean isResourceGroupsEnabled() public boolean isResourceGroupsEnabled()
{ {
Expand Down Expand Up @@ -281,4 +284,16 @@ public FeaturesConfig setOperatorMemoryLimitBeforeSpill(DataSize operatorMemoryL
this.operatorMemoryLimitBeforeSpill = operatorMemoryLimitBeforeSpill; this.operatorMemoryLimitBeforeSpill = operatorMemoryLimitBeforeSpill;
return this; return this;
} }

public Path getSpillerSpillPath()
{
return spillerSpillPath;
}

@Config("experimental.spiller-spill-path")
public FeaturesConfig setSpillerSpillPath(String spillPath)
{
this.spillerSpillPath = Paths.get(spillPath);
return this;
}
} }
Expand Up @@ -338,7 +338,7 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
.put(Rollback.class, new RollbackTask()) .put(Rollback.class, new RollbackTask())
.build(); .build();


this.spillerFactory = new BinarySpillerFactory(blockEncodingSerde); this.spillerFactory = new BinarySpillerFactory(blockEncodingSerde, featuresConfig);
} }


public static LocalQueryRunner queryRunnerWithInitialTransaction(Session defaultSession) public static LocalQueryRunner queryRunnerWithInitialTransaction(Session defaultSession)
Expand Down
Expand Up @@ -34,6 +34,7 @@
import com.facebook.presto.spiller.BinarySpillerFactory; import com.facebook.presto.spiller.BinarySpillerFactory;
import com.facebook.presto.split.PageSinkManager; import com.facebook.presto.split.PageSinkManager;
import com.facebook.presto.split.PageSourceManager; import com.facebook.presto.split.PageSourceManager;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.gen.ExpressionCompiler; import com.facebook.presto.sql.gen.ExpressionCompiler;
import com.facebook.presto.sql.gen.JoinFilterFunctionCompiler; import com.facebook.presto.sql.gen.JoinFilterFunctionCompiler;
import com.facebook.presto.sql.parser.SqlParser; import com.facebook.presto.sql.parser.SqlParser;
Expand Down Expand Up @@ -128,7 +129,7 @@ public static LocalExecutionPlanner createTestingPlanner()
new IndexJoinLookupStats(), new IndexJoinLookupStats(),
new CompilerConfig(), new CompilerConfig(),
new TaskManagerConfig(), new TaskManagerConfig(),
new BinarySpillerFactory(new BlockEncodingManager(metadata.getTypeManager()))); new BinarySpillerFactory(new BlockEncodingManager(metadata.getTypeManager()), new FeaturesConfig()));
} }


public static TaskInfo updateTask(SqlTask sqlTask, List<TaskSource> taskSources, OutputBuffers outputBuffers) public static TaskInfo updateTask(SqlTask sqlTask, List<TaskSource> taskSources, OutputBuffers outputBuffers)
Expand Down
Expand Up @@ -20,6 +20,7 @@
import com.facebook.presto.spi.block.BlockBuilderStatus; import com.facebook.presto.spi.block.BlockBuilderStatus;
import com.facebook.presto.spi.block.BlockEncodingSerde; import com.facebook.presto.spi.block.BlockEncodingSerde;
import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.Type;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.type.TypeRegistry; import com.facebook.presto.type.TypeRegistry;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
Expand All @@ -45,7 +46,7 @@ public class TestBinaryFileSpiller
private static final List<Type> TYPES = ImmutableList.of(BIGINT, VARCHAR, DOUBLE, BIGINT); private static final List<Type> TYPES = ImmutableList.of(BIGINT, VARCHAR, DOUBLE, BIGINT);
private final ListeningExecutorService executor = listeningDecorator(newSingleThreadScheduledExecutor()); private final ListeningExecutorService executor = listeningDecorator(newSingleThreadScheduledExecutor());
private final BlockEncodingSerde blockEncodingSerde = new BlockEncodingManager(new TypeRegistry(ImmutableSet.of(BIGINT, DOUBLE, VARBINARY))); private final BlockEncodingSerde blockEncodingSerde = new BlockEncodingManager(new TypeRegistry(ImmutableSet.of(BIGINT, DOUBLE, VARBINARY)));
private final BinarySpillerFactory factory = new BinarySpillerFactory(blockEncodingSerde, executor); private final BinarySpillerFactory factory = new BinarySpillerFactory(blockEncodingSerde, executor, new FeaturesConfig());


@Test @Test
public void testFileSpiller() public void testFileSpiller()
Expand Down
Expand Up @@ -18,6 +18,7 @@
import io.airlift.units.DataSize; import io.airlift.units.DataSize;
import org.testng.annotations.Test; import org.testng.annotations.Test;


import java.nio.file.Paths;
import java.util.Map; import java.util.Map;


import static com.facebook.presto.sql.analyzer.FeaturesConfig.ProcessingOptimization.COLUMNAR_DICTIONARY; import static com.facebook.presto.sql.analyzer.FeaturesConfig.ProcessingOptimization.COLUMNAR_DICTIONARY;
Expand Down Expand Up @@ -51,7 +52,8 @@ public void testDefaults()
.setRe2JDfaStatesLimit(Integer.MAX_VALUE) .setRe2JDfaStatesLimit(Integer.MAX_VALUE)
.setRe2JDfaRetries(5) .setRe2JDfaRetries(5)
.setSpillEnabled(false) .setSpillEnabled(false)
.setOperatorMemoryLimitBeforeSpill(DataSize.valueOf("4MB"))); .setOperatorMemoryLimitBeforeSpill(DataSize.valueOf("4MB"))
.setSpillerSpillPath(Paths.get(System.getProperty("java.io.tmpdir"), "presto", "spills").toString()));
} }


@Test @Test
Expand All @@ -76,6 +78,7 @@ public void testExplicitPropertyMappings()
.put("re2j.dfa-retries", "42") .put("re2j.dfa-retries", "42")
.put("experimental.spill-enabled", "true") .put("experimental.spill-enabled", "true")
.put("experimental.operator-memory-limit-before-spill", "100MB") .put("experimental.operator-memory-limit-before-spill", "100MB")
.put("experimental.spiller-spill-path", "/tmp/custom/spill/path")
.build(); .build();
Map<String, String> properties = new ImmutableMap.Builder<String, String>() Map<String, String> properties = new ImmutableMap.Builder<String, String>()
.put("experimental-syntax-enabled", "true") .put("experimental-syntax-enabled", "true")
Expand All @@ -96,6 +99,7 @@ public void testExplicitPropertyMappings()
.put("re2j.dfa-retries", "42") .put("re2j.dfa-retries", "42")
.put("experimental.spill-enabled", "true") .put("experimental.spill-enabled", "true")
.put("experimental.operator-memory-limit-before-spill", "100MB") .put("experimental.operator-memory-limit-before-spill", "100MB")
.put("experimental.spiller-spill-path", "/tmp/custom/spill/path")
.build(); .build();


FeaturesConfig expected = new FeaturesConfig() FeaturesConfig expected = new FeaturesConfig()
Expand All @@ -116,7 +120,8 @@ public void testExplicitPropertyMappings()
.setRe2JDfaStatesLimit(42) .setRe2JDfaStatesLimit(42)
.setRe2JDfaRetries(42) .setRe2JDfaRetries(42)
.setSpillEnabled(true) .setSpillEnabled(true)
.setOperatorMemoryLimitBeforeSpill(DataSize.valueOf("100MB")); .setOperatorMemoryLimitBeforeSpill(DataSize.valueOf("100MB"))
.setSpillerSpillPath("/tmp/custom/spill/path");


assertFullMapping(properties, expected); assertFullMapping(properties, expected);
assertDeprecatedEquivalence(FeaturesConfig.class, properties, propertiesLegacy); assertDeprecatedEquivalence(FeaturesConfig.class, properties, propertiesLegacy);
Expand Down

0 comments on commit bf82007

Please sign in to comment.