Skip to content

Commit

Permalink
Minor refactoring for RaptorSplitManager and StorageManager
Browse files Browse the repository at this point in the history
  • Loading branch information
nileema committed Aug 26, 2015
1 parent beddad1 commit e933878
Show file tree
Hide file tree
Showing 10 changed files with 90 additions and 62 deletions.
Expand Up @@ -28,6 +28,7 @@
import io.airlift.json.JsonCodec;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;

import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -60,7 +61,8 @@ public RaptorPageSink(
List<Type> columnTypes,
Optional<Long> sampleWeightColumnId,
List<Long> sortColumnIds,
List<SortOrder> sortOrders)
List<SortOrder> sortOrders,
DataSize maxBufferSize)
{
this.pageSorter = checkNotNull(pageSorter, "pageSorter is null");
this.columnTypes = ImmutableList.copyOf(checkNotNull(columnTypes, "columnTypes is null"));
Expand All @@ -75,7 +77,8 @@ public RaptorPageSink(
this.sortFields = ImmutableList.copyOf(sortColumnIds.stream().map(columnIds::indexOf).collect(toList()));
this.sortOrders = ImmutableList.copyOf(checkNotNull(sortOrders, "sortOrders is null"));

this.pageBuffer = storageManager.createPageBuffer();
// allow only Integer.MAX_VALUE rows to be buffered as that is the max rows we can sort
this.pageBuffer = new PageBuffer(maxBufferSize.toBytes(), Integer.MAX_VALUE);
}

@Override
Expand Down
Expand Up @@ -15,13 +15,15 @@

import com.facebook.presto.raptor.metadata.ShardInfo;
import com.facebook.presto.raptor.storage.StorageManager;
import com.facebook.presto.raptor.storage.StorageManagerConfig;
import com.facebook.presto.spi.ConnectorInsertTableHandle;
import com.facebook.presto.spi.ConnectorOutputTableHandle;
import com.facebook.presto.spi.ConnectorPageSink;
import com.facebook.presto.spi.ConnectorPageSinkProvider;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PageSorter;
import io.airlift.json.JsonCodec;
import io.airlift.units.DataSize;

import javax.inject.Inject;

Expand All @@ -38,13 +40,15 @@ public class RaptorPageSinkProvider
private final StorageManager storageManager;
private final PageSorter pageSorter;
private final JsonCodec<ShardInfo> shardInfoCodec;
private final DataSize maxBufferSize;

@Inject
public RaptorPageSinkProvider(StorageManager storageManager, PageSorter pageSorter, JsonCodec<ShardInfo> shardInfoCodec)
public RaptorPageSinkProvider(StorageManager storageManager, PageSorter pageSorter, JsonCodec<ShardInfo> shardInfoCodec, StorageManagerConfig config)
{
this.storageManager = checkNotNull(storageManager, "storageManager is null");
this.pageSorter = checkNotNull(pageSorter, "pageSorter is null");
this.shardInfoCodec = checkNotNull(shardInfoCodec, "shardInfoCodec is null");
this.maxBufferSize = config.getMaxBufferSize();
}

@Override
Expand All @@ -59,7 +63,8 @@ public ConnectorPageSink createPageSink(ConnectorSession session, ConnectorOutpu
handle.getColumnTypes(),
optionalColumnId(handle.getSampleWeightColumnHandle()),
toColumnIds(handle.getSortColumnHandles()),
handle.getSortOrders());
handle.getSortOrders(),
maxBufferSize);
}

@Override
Expand All @@ -74,7 +79,8 @@ public ConnectorPageSink createPageSink(ConnectorSession session, ConnectorInser
handle.getColumnTypes(),
Optional.empty(),
toColumnIds(handle.getSortColumnHandles()),
handle.getSortOrders());
handle.getSortOrders(),
maxBufferSize);
}

private static List<Long> toColumnIds(List<RaptorColumnHandle> columnHandles)
Expand Down
Expand Up @@ -13,9 +13,9 @@
*/
package com.facebook.presto.raptor;

import com.facebook.presto.raptor.backup.BackupService;
import com.facebook.presto.raptor.metadata.ShardManager;
import com.facebook.presto.raptor.metadata.ShardNodes;
import com.facebook.presto.raptor.storage.StorageManager;
import com.facebook.presto.raptor.util.CloseableIterator;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPartition;
Expand Down Expand Up @@ -64,16 +64,21 @@ public class RaptorSplitManager
private final String connectorId;
private final NodeManager nodeManager;
private final ShardManager shardManager;
private final StorageManager storageManager;
private final boolean backupAvailable;
private final ExecutorService executor;

@Inject
public RaptorSplitManager(RaptorConnectorId connectorId, NodeManager nodeManager, ShardManager shardManager, StorageManager storageManager)
public RaptorSplitManager(RaptorConnectorId connectorId, NodeManager nodeManager, ShardManager shardManager, BackupService backupService)
{
this(connectorId, nodeManager, shardManager, checkNotNull(backupService, "backupService is null").isBackupAvailable());
}

public RaptorSplitManager(RaptorConnectorId connectorId, NodeManager nodeManager, ShardManager shardManager, boolean backupAvailable)
{
this.connectorId = checkNotNull(connectorId, "connectorId is null").toString();
this.nodeManager = checkNotNull(nodeManager, "nodeManager is null");
this.shardManager = checkNotNull(shardManager, "shardManager is null");
this.storageManager = checkNotNull(storageManager, "storageManager is null");
this.backupAvailable = backupAvailable;
this.executor = newCachedThreadPool(daemonThreadsNamed("raptor-split-" + connectorId + "-%s"));
}

Expand Down Expand Up @@ -174,7 +179,7 @@ private ConnectorSplit createSplit(ShardNodes shard)
List<HostAddress> addresses = getAddressesForNodes(nodesById, nodeIds);

if (addresses.isEmpty()) {
if (!storageManager.isBackupAvailable()) {
if (!backupAvailable) {
throw new PrestoException(RAPTOR_NO_HOST_FOR_SHARD, format("No host for shard %s found: %s", shardId, nodeIds));
}

Expand Down
Expand Up @@ -18,6 +18,7 @@
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.util.Providers;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.configuration.AbstractConfigurationAwareModule;
Expand Down Expand Up @@ -61,6 +62,7 @@ protected void setup(Binder binder)
binder.install(module);
}
}
binder.bind(BackupService.class).to(BackupServiceManager.class).in(Scopes.SINGLETON);
}

@Provides
Expand Down
@@ -0,0 +1,19 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.raptor.backup;

public interface BackupService
{
boolean isBackupAvailable();
}
@@ -0,0 +1,38 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.raptor.backup;

import com.google.inject.Inject;

import java.util.Optional;

import static java.util.Objects.requireNonNull;

public class BackupServiceManager
implements BackupService
{
private final Optional<BackupStore> backupStore;

@Inject
public BackupServiceManager(Optional<BackupStore> backupStore)
{
this.backupStore = requireNonNull(backupStore, "backupStore is null");
}

@Override
public boolean isBackupAvailable()
{
return backupStore.isPresent();
}
}
Expand Up @@ -31,7 +31,6 @@
import com.facebook.presto.raptor.metadata.ShardInfo;
import com.facebook.presto.raptor.storage.OrcFileRewriter.OrcFileInfo;
import com.facebook.presto.raptor.util.CurrentNodeId;
import com.facebook.presto.raptor.util.PageBuffer;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.PrestoException;
Expand Down Expand Up @@ -110,7 +109,6 @@ public class OrcStorageManager
private final Duration recoveryTimeout;
private final long maxShardRows;
private final DataSize maxShardSize;
private final DataSize maxBufferSize;
private final StorageManagerStats stats;
private final TypeManager typeManager;
private final ExecutorService deletionExecutor;
Expand Down Expand Up @@ -139,8 +137,7 @@ public OrcStorageManager(
config.getDeletionThreads(),
config.getShardRecoveryTimeout(),
config.getMaxShardRows(),
config.getMaxShardSize(),
config.getMaxBufferSize());
config.getMaxShardSize());
}

public OrcStorageManager(
Expand All @@ -157,8 +154,7 @@ public OrcStorageManager(
int deletionThreads,
Duration shardRecoveryTimeout,
long maxShardRows,
DataSize maxShardSize,
DataSize maxBufferSize)
DataSize maxShardSize)
{
this.nodeId = checkNotNull(nodeId, "nodeId is null");
this.storageService = checkNotNull(storageService, "storageService is null");
Expand All @@ -174,7 +170,6 @@ public OrcStorageManager(
checkArgument(maxShardRows > 0, "maxShardRows must be > 0");
this.maxShardRows = min(maxShardRows, MAX_ROWS);
this.maxShardSize = checkNotNull(maxShardSize, "maxShardSize is null");
this.maxBufferSize = checkNotNull(maxBufferSize, "maxBufferSize is null");
this.stats = new StorageManagerStats();
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.deletionExecutor = newFixedThreadPool(deletionThreads, daemonThreadsNamed("raptor-delete-" + connectorId + "-%s"));
Expand Down Expand Up @@ -253,25 +248,13 @@ private void writeShard(UUID shardUuid)
throw new PrestoException(RAPTOR_ERROR, "Failed to move shard file", e);
}

if (isBackupAvailable()) {
if (backupStore.isPresent()) {
long start = System.nanoTime();
backupStore.get().backupShard(shardUuid, storageFile);
stats.addCopyShardDataRate(new DataSize(storageFile.length(), BYTE), nanosSince(start));
}
}

@Override
public PageBuffer createPageBuffer()
{
return new PageBuffer(maxBufferSize.toBytes(), Integer.MAX_VALUE);
}

@Override
public boolean isBackupAvailable()
{
return backupStore.isPresent();
}

@Managed
@Flatten
public StorageManagerStats getStats()
Expand Down
Expand Up @@ -14,7 +14,6 @@
package com.facebook.presto.raptor.storage;

import com.facebook.presto.raptor.RaptorColumnHandle;
import com.facebook.presto.raptor.util.PageBuffer;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.TupleDomain;
import com.facebook.presto.spi.type.Type;
Expand All @@ -27,8 +26,4 @@ public interface StorageManager
ConnectorPageSource getPageSource(UUID shardUuid, List<Long> columnIds, List<Type> columnTypes, TupleDomain<RaptorColumnHandle> effectivePredicate);

StoragePageSink createStoragePageSink(List<Long> columnIds, List<Type> columnTypes);

boolean isBackupAvailable();

PageBuffer createPageBuffer();
}
Expand Up @@ -22,12 +22,6 @@
import com.facebook.presto.raptor.RaptorMetadata;
import com.facebook.presto.raptor.RaptorSplitManager;
import com.facebook.presto.raptor.RaptorTableHandle;
import com.facebook.presto.raptor.backup.BackupStore;
import com.facebook.presto.raptor.backup.FileBackupStore;
import com.facebook.presto.raptor.storage.FileStorageService;
import com.facebook.presto.raptor.storage.ShardRecoveryManager;
import com.facebook.presto.raptor.storage.StorageManager;
import com.facebook.presto.raptor.storage.StorageService;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPartition;
import com.facebook.presto.spi.ConnectorPartitionResult;
Expand All @@ -41,7 +35,6 @@
import com.facebook.presto.type.TypeRegistry;
import com.google.common.collect.ImmutableList;
import io.airlift.json.JsonCodec;
import io.airlift.units.Duration;
import org.skife.jdbi.v2.DBI;
import org.skife.jdbi.v2.Handle;
import org.testng.annotations.AfterMethod;
Expand All @@ -57,7 +50,6 @@

import static com.facebook.presto.raptor.metadata.DatabaseShardManager.shardIndexTable;
import static com.facebook.presto.raptor.metadata.TestDatabaseShardManager.shardInfo;
import static com.facebook.presto.raptor.storage.TestOrcStorageManager.createOrcStorageManager;
import static com.facebook.presto.raptor.util.Types.checkType;
import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
import static com.facebook.presto.testing.TestingConnectorSession.SESSION;
Expand All @@ -67,7 +59,6 @@
import static io.airlift.json.JsonCodec.jsonCodec;
import static io.airlift.testing.FileUtils.deleteRecursively;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.stream.Collectors.toList;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
Expand All @@ -88,7 +79,6 @@ public class TestRaptorSplitManager
private RaptorSplitManager raptorSplitManager;
private ConnectorTableHandle tableHandle;
private ShardManager shardManager;
private StorageManager storageManagerWithBackup;
private long tableId;

@BeforeMethod
Expand All @@ -103,17 +93,6 @@ public void setup()
shardManager = new DatabaseShardManager(dbi);
InMemoryNodeManager nodeManager = new InMemoryNodeManager();

File dataDir = new File(temporary, "data");
File backupDir = new File(temporary, "backup");
FileBackupStore fileBackupStore = new FileBackupStore(backupDir);
fileBackupStore.start();
Optional<BackupStore> backupStore = Optional.of(fileBackupStore);

StorageService storageService = new FileStorageService(dataDir);
ShardRecoveryManager recoveryManager = new ShardRecoveryManager(storageService, Optional.empty(), new InMemoryNodeManager(), shardManager, new Duration(5, MINUTES), 10);
StorageManager storageManager = createOrcStorageManager(storageService, Optional.empty(), recoveryManager);
storageManagerWithBackup = createOrcStorageManager(storageService, backupStore, recoveryManager);

String nodeName = UUID.randomUUID().toString();
nodeManager.addNode("raptor", new PrestoNode(nodeName, new URI("http://127.0.0.1/"), NodeVersion.UNKNOWN));

Expand All @@ -139,7 +118,7 @@ public void setup()

shardManager.commitShards(tableId, columns, shards, Optional.empty());

raptorSplitManager = new RaptorSplitManager(connectorId, nodeManager, shardManager, storageManager);
raptorSplitManager = new RaptorSplitManager(connectorId, nodeManager, shardManager, false);
}

@AfterMethod
Expand Down Expand Up @@ -189,7 +168,7 @@ public void testAssignRandomNodeWhenBackupAvailable()
InMemoryNodeManager nodeManager = new InMemoryNodeManager();
PrestoNode node = new PrestoNode(UUID.randomUUID().toString(), new URI("http://127.0.0.1/"), NodeVersion.UNKNOWN);
nodeManager.addNode("fbraptor", node);
RaptorSplitManager raptorSplitManagerWithBackup = new RaptorSplitManager(new RaptorConnectorId("fbraptor"), nodeManager, shardManager, storageManagerWithBackup);
RaptorSplitManager raptorSplitManagerWithBackup = new RaptorSplitManager(new RaptorConnectorId("fbraptor"), nodeManager, shardManager, true);

deleteShardNodes();

Expand All @@ -205,7 +184,7 @@ public void testNoNodes()
{
deleteShardNodes();

RaptorSplitManager raptorSplitManagerWithBackup = new RaptorSplitManager(new RaptorConnectorId("fbraptor"), new InMemoryNodeManager(), shardManager, storageManagerWithBackup);
RaptorSplitManager raptorSplitManagerWithBackup = new RaptorSplitManager(new RaptorConnectorId("fbraptor"), new InMemoryNodeManager(), shardManager, true);
ConnectorPartitionResult result = raptorSplitManagerWithBackup.getPartitions(SESSION, tableHandle, TupleDomain.<ColumnHandle>all());
ConnectorSplitSource splitSource = raptorSplitManagerWithBackup.getPartitionSplits(SESSION, tableHandle, result.getPartitions());
getFutureValue(splitSource.getNextBatch(1000), PrestoException.class);
Expand Down
Expand Up @@ -102,7 +102,6 @@ public class TestOrcStorageManager
private static final String CONNECTOR_ID = "test";
private static final int DELETION_THREADS = 2;
private static final Duration SHARD_RECOVERY_TIMEOUT = new Duration(30, TimeUnit.SECONDS);
private static final DataSize MAX_BUFFER_SIZE = new DataSize(256, MEGABYTE);
private static final int MAX_SHARD_ROWS = 100;
private static final DataSize MAX_FILE_SIZE = new DataSize(1, MEGABYTE);
private static final Duration MISSING_SHARD_DISCOVERY = new Duration(5, TimeUnit.MINUTES);
Expand Down Expand Up @@ -496,8 +495,7 @@ public static OrcStorageManager createOrcStorageManager(
DELETION_THREADS,
SHARD_RECOVERY_TIMEOUT,
maxShardRows,
maxFileSize,
MAX_BUFFER_SIZE);
maxFileSize);
}

private static void assertColumnStats(List<ColumnStats> list, long columnId, Object min, Object max)
Expand Down

0 comments on commit e933878

Please sign in to comment.