Skip to content

Commit

Permalink
First prototype of hive data importer
Browse files Browse the repository at this point in the history
Includes:
- StorageManager w/ stored metadata in H2. Imports from this class are transactional.
- StorageManager has a two phase import that uses the first pass stats to determine the optimal encoding
- Hive wrapper around StorageManager to track partitions loaded (HiveImportManager)
- Retry handling for import errors
- HACKY way of providing temporary query support until we can formalize the process with the actual QueryPlanner

There are a ton of very hacky things in this commit, but it should be sufficient for running imports and providing the foundation for running queries on the stored data.
  • Loading branch information
erichwang committed Oct 24, 2012
1 parent db5a7de commit e842f5f
Show file tree
Hide file tree
Showing 7 changed files with 870 additions and 27 deletions.
25 changes: 12 additions & 13 deletions presto-main/pom.xml
Expand Up @@ -152,6 +152,18 @@
<artifactId>http-client-experimental</artifactId>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.3.168</version>
</dependency>

<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
<version>2.39</version>
</dependency>

<!-- for packaging -->
<dependency>
<groupId>io.airlift</groupId>
Expand All @@ -174,19 +186,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.3.168</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi</artifactId>
<version>2.39</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
44 changes: 44 additions & 0 deletions presto-main/src/main/java/com/facebook/presto/Main.java
Expand Up @@ -80,6 +80,7 @@ public static void main(String[] args)
.withDefaultCommand(Help.class)
.withCommand(Server.class)
.withCommand(ExampleSumAggregation.class)
.withCommand(Execute.class)
.withCommands(Help.class);

builder.withGroup("example")
Expand Down Expand Up @@ -170,6 +171,7 @@ public void run()
QueryDriversTupleStream tupleStream = new QueryDriversTupleStream(new TupleInfo(Type.VARIABLE_BINARY, Type.FIXED_INT_64), 10,
new HttpQueryProvider("sum", asyncHttpClient, server)
);
// TODO: this currently leaks query resources (need to delete)

// TuplePrinter tuplePrinter = new RecordTuplePrinter();
TuplePrinter tuplePrinter = new DelimitedTuplePrinter();
Expand All @@ -193,6 +195,48 @@ public void run()
}
}

@Command(name = "execute", description = "Execute a query")
public static class Execute
extends BaseCommand
{
private static final Logger log = Logger.get(Execute.class);

@Option(name = "-s", title = "server", required = true)
public URI server;

@Option(name = "-q", title = "query", required = true)
public String query;

public void run()
{
initializeLogging(false);

ExecutorService executor = Executors.newCachedThreadPool();
try {
ApacheHttpClient httpClient = new ApacheHttpClient(new HttpClientConfig()
.setConnectTimeout(new Duration(1, TimeUnit.MINUTES))
.setReadTimeout(new Duration(30, TimeUnit.MINUTES)));
AsyncHttpClient asyncHttpClient = new AsyncHttpClient(httpClient, executor);
QueryDriversTupleStream tupleStream = new QueryDriversTupleStream(new TupleInfo(Type.FIXED_INT_64), 10,
new HttpQueryProvider(query, asyncHttpClient, server)
);
// TODO: this currently leaks query resources (need to delete)

TuplePrinter tuplePrinter = new DelimitedTuplePrinter();

Cursor cursor = tupleStream.cursor(new QuerySession());
while (advanceNextPositionNoYield(cursor)) {
Tuple tuple = cursor.getTuple();
tuplePrinter.print(tuple);
}
log.info("Query complete.");
}
finally {
executor.shutdownNow();
}
}
}

@Command(name = "csv", description = "Convert CSV to columns")
public static class ConvertCsv
extends BaseCommand
Expand Down
68 changes: 68 additions & 0 deletions presto-main/src/main/java/com/facebook/presto/RetryDriver.java
@@ -0,0 +1,68 @@
package com.facebook.presto;

import io.airlift.log.Logger;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

public class RetryDriver
{
private static final Logger log = Logger.get(RetryDriver.class);

private RetryDriver()
{
}

public static <V> V runWithRetry(Callable<V> callable)
throws Exception
{
return runWithRetry(callable, "<default>");
}

public static <V> V runWithRetry(Callable<V> callable, int maxRetryAttempts)
throws Exception
{
return runWithRetry(callable, "<default>", maxRetryAttempts);
}

public static <V> V runWithRetry(Callable<V> callable, String callableName)
throws Exception
{
return runWithRetry(callable, callableName, 10);
}

public static <V> V runWithRetry(Callable<V> callable, String callableName, int maxRetryAttempts)
throws Exception
{
return runWithRetry(callable, callableName, maxRetryAttempts, 1);
}

public static <V> V runWithRetry(Callable<V> callable, String callableName, int maxRetryAttempts, int sleepSecs)
throws Exception
{
checkNotNull(callable, "callable is null");
checkNotNull(callableName, "callableName is null");
checkArgument(maxRetryAttempts > 0, "maxRetryAttempts must be greater than zero");
checkArgument(sleepSecs >= 0, "sleepSecs must be at least than zero");

int attempt = 0;
while (true) {
attempt++;
try {
return callable.call();
}
catch (Exception e) {
if (attempt == maxRetryAttempts) {
throw e;
}
else {
log.warn("Failed on executing %s with attempt %d, will retry. Exception: %s", callableName, attempt, e.getMessage());
}
TimeUnit.SECONDS.sleep(sleepSecs);
}
}
}
}
@@ -0,0 +1,176 @@
package com.facebook.presto.metadata;

import com.facebook.presto.Tuple;
import com.facebook.presto.TupleInfo;
import com.facebook.presto.block.StaticTupleAppendingTupleStream;
import com.facebook.presto.block.TupleStream;
import com.facebook.presto.hive.HiveClient;
import com.facebook.presto.hive.PartitionChunk;
import com.facebook.presto.hive.RecordIterator;
import com.facebook.presto.hive.SchemaField;
import com.facebook.presto.ingest.HiveTupleStream;
import com.facebook.presto.slice.Slices;
import com.google.common.base.Charsets;
import com.google.inject.Inject;
import org.skife.jdbi.v2.Handle;
import org.skife.jdbi.v2.IDBI;
import org.skife.jdbi.v2.tweak.HandleCallback;

import java.util.List;
import java.util.concurrent.Callable;

import static com.facebook.presto.RetryDriver.runWithRetry;
import static com.google.common.base.Preconditions.checkNotNull;

public class HiveImportManager
{
private final HiveClient hiveClient;
private final StorageManager storageManager;
private final HiveImportRegistry hiveImportRegistry;

@Inject
public HiveImportManager(HiveClient hiveClient, StorageManager storageManager, IDBI dbi)
{
this.hiveClient = checkNotNull(hiveClient, "hiveClient is null");
this.storageManager = checkNotNull(storageManager, "storageManager is null");
hiveImportRegistry = new HiveImportRegistry(checkNotNull(dbi, "dbi is null"));
}

public long importPartition(final String databaseName, final String tableName, final String partitionName)
throws Exception
{
checkNotNull(databaseName, "databaseName is null");
checkNotNull(tableName, "tableName is null");
checkNotNull(partitionName, "partitionName is null");

// TODO: prevent multiple simultaneous imports on same partition (race condition)
if (hiveImportRegistry.isPartitionImported(databaseName, tableName, partitionName)) {
// Already imported
return 0;
}

final Tuple partitionTuple = TupleInfo.SINGLE_VARBINARY.builder()
.append(Slices.wrappedBuffer(partitionName.getBytes(Charsets.UTF_8)))
.build();

List<PartitionChunk> chunks = runWithRetry(new Callable<List<PartitionChunk>>()
{
@Override
public List<PartitionChunk> call()
throws Exception
{
return hiveClient.getPartitionChunks(databaseName, tableName, partitionName);
}
});

final List<SchemaField> schemaFields = runWithRetry(new Callable<List<SchemaField>>()
{
@Override
public List<SchemaField> call()
throws Exception
{
return hiveClient.getTableSchema(databaseName, tableName);
}
});

long rowCount = 0;
// TODO: right now, failures can result in partial partitions to be loaded (smallest unit needs to be transactional)
for (final PartitionChunk chunk : chunks) {
rowCount += runWithRetry(new Callable<Long>()
{
@Override
public Long call()
throws Exception
{
try (RecordIterator records = hiveClient.getRecords(chunk)) {
TupleStream sourceTupleStream = new StaticTupleAppendingTupleStream(
new HiveTupleStream(records, schemaFields),
partitionTuple
);
// TODO: add layer to break up incoming TupleStream based on size
return storageManager.importTableShard(sourceTupleStream, databaseName, tableName);
}
}
});
}
hiveImportRegistry.markPartitionImported(databaseName, tableName, partitionName);
return rowCount;
}

// TODO: the import registry should use the CHUNK as the smallest unit of import
private static class HiveImportRegistry
{
private final IDBI dbi;

public HiveImportRegistry(IDBI dbi)
{
this.dbi = checkNotNull(dbi, "dbi is null");
initializeDatabaseIfNecessary();
}

private void initializeDatabaseIfNecessary()
{
dbi.withHandle(new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
throws Exception
{
// TODO: use ids for database, table, and partition
handle.createStatement("CREATE TABLE IF NOT EXISTS imported_hive_partitions (database VARCHAR(256), table VARCHAR(256), partition VARCHAR(256), PRIMARY KEY(database, table, partition))")
.execute();
return null;
}
});
}

public boolean isPartitionImported(final String databaseName, final String tableName, final String partitionName)
{
checkNotNull(databaseName, "databaseName is null");
checkNotNull(tableName, "tableName is null");
checkNotNull(partitionName, "partitionName is null");

return dbi.withHandle(new HandleCallback<Boolean>()
{
@Override
public Boolean withHandle(Handle handle)
throws Exception
{
return !handle.createQuery(
"SELECT * " +
"FROM imported_hive_partitions " +
"WHERE database = :database " +
"AND table = :table " +
"AND partition = :partition")
.bind("database", databaseName)
.bind("table", tableName)
.bind("partition", partitionName)
.list()
.isEmpty();
}
});
}

public void markPartitionImported(final String databaseName, final String tableName, final String partitionName)
{
checkNotNull(databaseName, "databaseName is null");
checkNotNull(tableName, "tableName is null");
checkNotNull(partitionName, "partitionName is null");

dbi.withHandle(new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
throws Exception
{
handle.createStatement("INSERT INTO imported_hive_partitions (database, table, partition) values (:database, :table, :partition)")
.bind("database", databaseName)
.bind("table", tableName)
.bind("partition", partitionName)
.execute();
return null;
}
});
}
}
}

0 comments on commit e842f5f

Please sign in to comment.