From e93387816eded61007c55eec9dc0c508b28758a0 Mon Sep 17 00:00:00 2001 From: Nileema Shingte Date: Tue, 25 Aug 2015 11:11:00 -0700 Subject: [PATCH] Minor refactoring for RaptorSplitManager and StorageManager --- .../presto/raptor/RaptorPageSink.java | 7 +++- .../presto/raptor/RaptorPageSinkProvider.java | 12 ++++-- .../presto/raptor/RaptorSplitManager.java | 15 +++++--- .../presto/raptor/backup/BackupModule.java | 2 + .../presto/raptor/backup/BackupService.java | 19 ++++++++++ .../raptor/backup/BackupServiceManager.java | 38 +++++++++++++++++++ .../raptor/storage/OrcStorageManager.java | 23 ++--------- .../presto/raptor/storage/StorageManager.java | 5 --- .../metadata/TestRaptorSplitManager.java | 27 ++----------- .../raptor/storage/TestOrcStorageManager.java | 4 +- 10 files changed, 90 insertions(+), 62 deletions(-) create mode 100644 presto-raptor/src/main/java/com/facebook/presto/raptor/backup/BackupService.java create mode 100644 presto-raptor/src/main/java/com/facebook/presto/raptor/backup/BackupServiceManager.java diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSink.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSink.java index 3c816ab05a01..f73c4a620f88 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSink.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSink.java @@ -28,6 +28,7 @@ import io.airlift.json.JsonCodec; import io.airlift.slice.Slice; import io.airlift.slice.Slices; +import io.airlift.units.DataSize; import java.util.Collection; import java.util.List; @@ -60,7 +61,8 @@ public RaptorPageSink( List columnTypes, Optional sampleWeightColumnId, List sortColumnIds, - List sortOrders) + List sortOrders, + DataSize maxBufferSize) { this.pageSorter = checkNotNull(pageSorter, "pageSorter is null"); this.columnTypes = ImmutableList.copyOf(checkNotNull(columnTypes, "columnTypes is null")); @@ -75,7 +77,8 @@ public RaptorPageSink( this.sortFields = ImmutableList.copyOf(sortColumnIds.stream().map(columnIds::indexOf).collect(toList())); this.sortOrders = ImmutableList.copyOf(checkNotNull(sortOrders, "sortOrders is null")); - this.pageBuffer = storageManager.createPageBuffer(); + // allow only Integer.MAX_VALUE rows to be buffered as that is the max rows we can sort + this.pageBuffer = new PageBuffer(maxBufferSize.toBytes(), Integer.MAX_VALUE); } @Override diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSinkProvider.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSinkProvider.java index 978954a0df93..85f76b17ea96 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSinkProvider.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorPageSinkProvider.java @@ -15,6 +15,7 @@ import com.facebook.presto.raptor.metadata.ShardInfo; import com.facebook.presto.raptor.storage.StorageManager; +import com.facebook.presto.raptor.storage.StorageManagerConfig; import com.facebook.presto.spi.ConnectorInsertTableHandle; import com.facebook.presto.spi.ConnectorOutputTableHandle; import com.facebook.presto.spi.ConnectorPageSink; @@ -22,6 +23,7 @@ import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.PageSorter; import io.airlift.json.JsonCodec; +import io.airlift.units.DataSize; import javax.inject.Inject; @@ -38,13 +40,15 @@ public class RaptorPageSinkProvider private final StorageManager storageManager; private final PageSorter pageSorter; private final JsonCodec shardInfoCodec; + private final DataSize maxBufferSize; @Inject - public RaptorPageSinkProvider(StorageManager storageManager, PageSorter pageSorter, JsonCodec shardInfoCodec) + public RaptorPageSinkProvider(StorageManager storageManager, PageSorter pageSorter, JsonCodec shardInfoCodec, StorageManagerConfig config) { this.storageManager = checkNotNull(storageManager, "storageManager is null"); this.pageSorter = checkNotNull(pageSorter, "pageSorter is null"); this.shardInfoCodec = checkNotNull(shardInfoCodec, "shardInfoCodec is null"); + this.maxBufferSize = config.getMaxBufferSize(); } @Override @@ -59,7 +63,8 @@ public ConnectorPageSink createPageSink(ConnectorSession session, ConnectorOutpu handle.getColumnTypes(), optionalColumnId(handle.getSampleWeightColumnHandle()), toColumnIds(handle.getSortColumnHandles()), - handle.getSortOrders()); + handle.getSortOrders(), + maxBufferSize); } @Override @@ -74,7 +79,8 @@ public ConnectorPageSink createPageSink(ConnectorSession session, ConnectorInser handle.getColumnTypes(), Optional.empty(), toColumnIds(handle.getSortColumnHandles()), - handle.getSortOrders()); + handle.getSortOrders(), + maxBufferSize); } private static List toColumnIds(List columnHandles) diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorSplitManager.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorSplitManager.java index e4d4e067870e..1f1953d9811d 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorSplitManager.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/RaptorSplitManager.java @@ -13,9 +13,9 @@ */ package com.facebook.presto.raptor; +import com.facebook.presto.raptor.backup.BackupService; import com.facebook.presto.raptor.metadata.ShardManager; import com.facebook.presto.raptor.metadata.ShardNodes; -import com.facebook.presto.raptor.storage.StorageManager; import com.facebook.presto.raptor.util.CloseableIterator; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorPartition; @@ -64,16 +64,21 @@ public class RaptorSplitManager private final String connectorId; private final NodeManager nodeManager; private final ShardManager shardManager; - private final StorageManager storageManager; + private final boolean backupAvailable; private final ExecutorService executor; @Inject - public RaptorSplitManager(RaptorConnectorId connectorId, NodeManager nodeManager, ShardManager shardManager, StorageManager storageManager) + public RaptorSplitManager(RaptorConnectorId connectorId, NodeManager nodeManager, ShardManager shardManager, BackupService backupService) + { + this(connectorId, nodeManager, shardManager, checkNotNull(backupService, "backupService is null").isBackupAvailable()); + } + + public RaptorSplitManager(RaptorConnectorId connectorId, NodeManager nodeManager, ShardManager shardManager, boolean backupAvailable) { this.connectorId = checkNotNull(connectorId, "connectorId is null").toString(); this.nodeManager = checkNotNull(nodeManager, "nodeManager is null"); this.shardManager = checkNotNull(shardManager, "shardManager is null"); - this.storageManager = checkNotNull(storageManager, "storageManager is null"); + this.backupAvailable = backupAvailable; this.executor = newCachedThreadPool(daemonThreadsNamed("raptor-split-" + connectorId + "-%s")); } @@ -174,7 +179,7 @@ private ConnectorSplit createSplit(ShardNodes shard) List addresses = getAddressesForNodes(nodesById, nodeIds); if (addresses.isEmpty()) { - if (!storageManager.isBackupAvailable()) { + if (!backupAvailable) { throw new PrestoException(RAPTOR_NO_HOST_FOR_SHARD, format("No host for shard %s found: %s", shardId, nodeIds)); } diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/backup/BackupModule.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/backup/BackupModule.java index a06181744e7e..bf947ec06693 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/backup/BackupModule.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/backup/BackupModule.java @@ -18,6 +18,7 @@ import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Provides; +import com.google.inject.Scopes; import com.google.inject.util.Providers; import io.airlift.bootstrap.LifeCycleManager; import io.airlift.configuration.AbstractConfigurationAwareModule; @@ -61,6 +62,7 @@ protected void setup(Binder binder) binder.install(module); } } + binder.bind(BackupService.class).to(BackupServiceManager.class).in(Scopes.SINGLETON); } @Provides diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/backup/BackupService.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/backup/BackupService.java new file mode 100644 index 000000000000..5eda5319aaf1 --- /dev/null +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/backup/BackupService.java @@ -0,0 +1,19 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.raptor.backup; + +public interface BackupService +{ + boolean isBackupAvailable(); +} diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/backup/BackupServiceManager.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/backup/BackupServiceManager.java new file mode 100644 index 000000000000..197b79f9b1a4 --- /dev/null +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/backup/BackupServiceManager.java @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.raptor.backup; + +import com.google.inject.Inject; + +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +public class BackupServiceManager + implements BackupService +{ + private final Optional backupStore; + + @Inject + public BackupServiceManager(Optional backupStore) + { + this.backupStore = requireNonNull(backupStore, "backupStore is null"); + } + + @Override + public boolean isBackupAvailable() + { + return backupStore.isPresent(); + } +} diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/OrcStorageManager.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/OrcStorageManager.java index c63699d8b057..090c23884cb0 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/OrcStorageManager.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/OrcStorageManager.java @@ -31,7 +31,6 @@ import com.facebook.presto.raptor.metadata.ShardInfo; import com.facebook.presto.raptor.storage.OrcFileRewriter.OrcFileInfo; import com.facebook.presto.raptor.util.CurrentNodeId; -import com.facebook.presto.raptor.util.PageBuffer; import com.facebook.presto.spi.ConnectorPageSource; import com.facebook.presto.spi.Page; import com.facebook.presto.spi.PrestoException; @@ -110,7 +109,6 @@ public class OrcStorageManager private final Duration recoveryTimeout; private final long maxShardRows; private final DataSize maxShardSize; - private final DataSize maxBufferSize; private final StorageManagerStats stats; private final TypeManager typeManager; private final ExecutorService deletionExecutor; @@ -139,8 +137,7 @@ public OrcStorageManager( config.getDeletionThreads(), config.getShardRecoveryTimeout(), config.getMaxShardRows(), - config.getMaxShardSize(), - config.getMaxBufferSize()); + config.getMaxShardSize()); } public OrcStorageManager( @@ -157,8 +154,7 @@ public OrcStorageManager( int deletionThreads, Duration shardRecoveryTimeout, long maxShardRows, - DataSize maxShardSize, - DataSize maxBufferSize) + DataSize maxShardSize) { this.nodeId = checkNotNull(nodeId, "nodeId is null"); this.storageService = checkNotNull(storageService, "storageService is null"); @@ -174,7 +170,6 @@ public OrcStorageManager( checkArgument(maxShardRows > 0, "maxShardRows must be > 0"); this.maxShardRows = min(maxShardRows, MAX_ROWS); this.maxShardSize = checkNotNull(maxShardSize, "maxShardSize is null"); - this.maxBufferSize = checkNotNull(maxBufferSize, "maxBufferSize is null"); this.stats = new StorageManagerStats(); this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.deletionExecutor = newFixedThreadPool(deletionThreads, daemonThreadsNamed("raptor-delete-" + connectorId + "-%s")); @@ -253,25 +248,13 @@ private void writeShard(UUID shardUuid) throw new PrestoException(RAPTOR_ERROR, "Failed to move shard file", e); } - if (isBackupAvailable()) { + if (backupStore.isPresent()) { long start = System.nanoTime(); backupStore.get().backupShard(shardUuid, storageFile); stats.addCopyShardDataRate(new DataSize(storageFile.length(), BYTE), nanosSince(start)); } } - @Override - public PageBuffer createPageBuffer() - { - return new PageBuffer(maxBufferSize.toBytes(), Integer.MAX_VALUE); - } - - @Override - public boolean isBackupAvailable() - { - return backupStore.isPresent(); - } - @Managed @Flatten public StorageManagerStats getStats() diff --git a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StorageManager.java b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StorageManager.java index 65e152d94d3a..fa985e158a58 100644 --- a/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StorageManager.java +++ b/presto-raptor/src/main/java/com/facebook/presto/raptor/storage/StorageManager.java @@ -14,7 +14,6 @@ package com.facebook.presto.raptor.storage; import com.facebook.presto.raptor.RaptorColumnHandle; -import com.facebook.presto.raptor.util.PageBuffer; import com.facebook.presto.spi.ConnectorPageSource; import com.facebook.presto.spi.TupleDomain; import com.facebook.presto.spi.type.Type; @@ -27,8 +26,4 @@ public interface StorageManager ConnectorPageSource getPageSource(UUID shardUuid, List columnIds, List columnTypes, TupleDomain effectivePredicate); StoragePageSink createStoragePageSink(List columnIds, List columnTypes); - - boolean isBackupAvailable(); - - PageBuffer createPageBuffer(); } diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/metadata/TestRaptorSplitManager.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/metadata/TestRaptorSplitManager.java index 2028e59b240d..fe8630404211 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/metadata/TestRaptorSplitManager.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/metadata/TestRaptorSplitManager.java @@ -22,12 +22,6 @@ import com.facebook.presto.raptor.RaptorMetadata; import com.facebook.presto.raptor.RaptorSplitManager; import com.facebook.presto.raptor.RaptorTableHandle; -import com.facebook.presto.raptor.backup.BackupStore; -import com.facebook.presto.raptor.backup.FileBackupStore; -import com.facebook.presto.raptor.storage.FileStorageService; -import com.facebook.presto.raptor.storage.ShardRecoveryManager; -import com.facebook.presto.raptor.storage.StorageManager; -import com.facebook.presto.raptor.storage.StorageService; import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorPartition; import com.facebook.presto.spi.ConnectorPartitionResult; @@ -41,7 +35,6 @@ import com.facebook.presto.type.TypeRegistry; import com.google.common.collect.ImmutableList; import io.airlift.json.JsonCodec; -import io.airlift.units.Duration; import org.skife.jdbi.v2.DBI; import org.skife.jdbi.v2.Handle; import org.testng.annotations.AfterMethod; @@ -57,7 +50,6 @@ import static com.facebook.presto.raptor.metadata.DatabaseShardManager.shardIndexTable; import static com.facebook.presto.raptor.metadata.TestDatabaseShardManager.shardInfo; -import static com.facebook.presto.raptor.storage.TestOrcStorageManager.createOrcStorageManager; import static com.facebook.presto.raptor.util.Types.checkType; import static com.facebook.presto.spi.type.VarcharType.VARCHAR; import static com.facebook.presto.testing.TestingConnectorSession.SESSION; @@ -67,7 +59,6 @@ import static io.airlift.json.JsonCodec.jsonCodec; import static io.airlift.testing.FileUtils.deleteRecursively; import static java.lang.String.format; -import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.stream.Collectors.toList; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -88,7 +79,6 @@ public class TestRaptorSplitManager private RaptorSplitManager raptorSplitManager; private ConnectorTableHandle tableHandle; private ShardManager shardManager; - private StorageManager storageManagerWithBackup; private long tableId; @BeforeMethod @@ -103,17 +93,6 @@ public void setup() shardManager = new DatabaseShardManager(dbi); InMemoryNodeManager nodeManager = new InMemoryNodeManager(); - File dataDir = new File(temporary, "data"); - File backupDir = new File(temporary, "backup"); - FileBackupStore fileBackupStore = new FileBackupStore(backupDir); - fileBackupStore.start(); - Optional backupStore = Optional.of(fileBackupStore); - - StorageService storageService = new FileStorageService(dataDir); - ShardRecoveryManager recoveryManager = new ShardRecoveryManager(storageService, Optional.empty(), new InMemoryNodeManager(), shardManager, new Duration(5, MINUTES), 10); - StorageManager storageManager = createOrcStorageManager(storageService, Optional.empty(), recoveryManager); - storageManagerWithBackup = createOrcStorageManager(storageService, backupStore, recoveryManager); - String nodeName = UUID.randomUUID().toString(); nodeManager.addNode("raptor", new PrestoNode(nodeName, new URI("http://127.0.0.1/"), NodeVersion.UNKNOWN)); @@ -139,7 +118,7 @@ public void setup() shardManager.commitShards(tableId, columns, shards, Optional.empty()); - raptorSplitManager = new RaptorSplitManager(connectorId, nodeManager, shardManager, storageManager); + raptorSplitManager = new RaptorSplitManager(connectorId, nodeManager, shardManager, false); } @AfterMethod @@ -189,7 +168,7 @@ public void testAssignRandomNodeWhenBackupAvailable() InMemoryNodeManager nodeManager = new InMemoryNodeManager(); PrestoNode node = new PrestoNode(UUID.randomUUID().toString(), new URI("http://127.0.0.1/"), NodeVersion.UNKNOWN); nodeManager.addNode("fbraptor", node); - RaptorSplitManager raptorSplitManagerWithBackup = new RaptorSplitManager(new RaptorConnectorId("fbraptor"), nodeManager, shardManager, storageManagerWithBackup); + RaptorSplitManager raptorSplitManagerWithBackup = new RaptorSplitManager(new RaptorConnectorId("fbraptor"), nodeManager, shardManager, true); deleteShardNodes(); @@ -205,7 +184,7 @@ public void testNoNodes() { deleteShardNodes(); - RaptorSplitManager raptorSplitManagerWithBackup = new RaptorSplitManager(new RaptorConnectorId("fbraptor"), new InMemoryNodeManager(), shardManager, storageManagerWithBackup); + RaptorSplitManager raptorSplitManagerWithBackup = new RaptorSplitManager(new RaptorConnectorId("fbraptor"), new InMemoryNodeManager(), shardManager, true); ConnectorPartitionResult result = raptorSplitManagerWithBackup.getPartitions(SESSION, tableHandle, TupleDomain.all()); ConnectorSplitSource splitSource = raptorSplitManagerWithBackup.getPartitionSplits(SESSION, tableHandle, result.getPartitions()); getFutureValue(splitSource.getNextBatch(1000), PrestoException.class); diff --git a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestOrcStorageManager.java b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestOrcStorageManager.java index b0003209808a..6de686af18ef 100644 --- a/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestOrcStorageManager.java +++ b/presto-raptor/src/test/java/com/facebook/presto/raptor/storage/TestOrcStorageManager.java @@ -102,7 +102,6 @@ public class TestOrcStorageManager private static final String CONNECTOR_ID = "test"; private static final int DELETION_THREADS = 2; private static final Duration SHARD_RECOVERY_TIMEOUT = new Duration(30, TimeUnit.SECONDS); - private static final DataSize MAX_BUFFER_SIZE = new DataSize(256, MEGABYTE); private static final int MAX_SHARD_ROWS = 100; private static final DataSize MAX_FILE_SIZE = new DataSize(1, MEGABYTE); private static final Duration MISSING_SHARD_DISCOVERY = new Duration(5, TimeUnit.MINUTES); @@ -496,8 +495,7 @@ public static OrcStorageManager createOrcStorageManager( DELETION_THREADS, SHARD_RECOVERY_TIMEOUT, maxShardRows, - maxFileSize, - MAX_BUFFER_SIZE); + maxFileSize); } private static void assertColumnStats(List list, long columnId, Object min, Object max)