Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to be able to query new Delta protocols #22596

Merged
merged 2 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
50 changes: 39 additions & 11 deletions presto-delta/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,61 @@
<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
<enforcer.skip>true</enforcer.skip>
<io.delta.delta-kernel-api.version>3.2.0</io.delta.delta-kernel-api.version>
</properties>

<dependencies>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-standalone_2.12</artifactId>
<version>0.6.0</version>
<artifactId>delta-kernel-api</artifactId>
<version>${io.delta.delta-kernel-api.version}</version>
<exclusions>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.12</artifactId>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-kernel-defaults</artifactId>
<version>${io.delta.delta-kernel-api.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<groupId>org.roringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-runtime</artifactId>
</exclusion>
</exclusions>
</dependency>
Expand Down
198 changes: 135 additions & 63 deletions presto-delta/src/main/java/com/facebook/presto/delta/DeltaClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,32 @@
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import io.delta.standalone.DeltaLog;
import io.delta.standalone.Snapshot;
import io.delta.standalone.actions.AddFile;
import io.delta.standalone.actions.Metadata;
import io.delta.standalone.data.CloseableIterator;
import com.facebook.presto.spi.StandardErrorCode;
import io.delta.kernel.Snapshot;
import io.delta.kernel.Table;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.InternalScanFileUtils;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.utils.CloseableIterator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import javax.inject.Inject;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static com.facebook.presto.delta.DeltaErrorCode.DELTA_UNSUPPORTED_DATA_FORMAT;
import static com.facebook.presto.delta.DeltaTable.DataFormat.PARQUET;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.String.format;
import static java.util.Locale.US;
Expand All @@ -51,6 +55,7 @@
*/
public class DeltaClient
{
private static final String TABLE_NOT_FOUND_ERROR_TEMPLATE = "Delta table (%s.%s) no longer exists.";
private final HdfsEnvironment hdfsEnvironment;

@Inject
Expand All @@ -70,72 +75,105 @@ public DeltaClient(HdfsEnvironment hdfsEnvironment)
* @return If the table is found return {@link DeltaTable}.
*/
public Optional<DeltaTable> getTable(
DeltaConfig config,
ConnectorSession session,
SchemaTableName schemaTableName,
String tableLocation,
Optional<Long> snapshotId,
Optional<Long> snapshotAsOfTimestampMillis)
{
Optional<DeltaLog> deltaLog = loadDeltaTableLog(session, new Path(tableLocation), schemaTableName);
if (!deltaLog.isPresent()) {
Path location = new Path(tableLocation);
Optional<Engine> deltaEngine = loadDeltaEngine(session, location, schemaTableName);
if (!deltaEngine.isPresent()) {
return Optional.empty();
}

Table deltaTable = loadDeltaTable(location.toString(), deltaEngine.get());
Snapshot snapshot = getSnapshot(deltaTable, deltaEngine.get(), schemaTableName, snapshotId,
snapshotAsOfTimestampMillis);
return Optional.of(new DeltaTable(
schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
tableLocation,
Optional.of(snapshot.getVersion(deltaEngine.get())), // lock the snapshot version
getSchema(config, schemaTableName, deltaEngine.get(), snapshot)));
}

private Snapshot getSnapshot(
Table deltaTable,
Engine deltaEngine,
SchemaTableName schemaTableName,
Optional<Long> snapshotId,
Optional<Long> snapshotAsOfTimestampMillis)
{
// Fetch the snapshot info for given snapshot version. If no snapshot version is given, get the latest snapshot info.
// Lock the snapshot version here and use it later in the rest of the query (such as fetching file list etc.).
// If we don't lock the snapshot version here, the query may end up with schema from one version and data files from another
// version when the underlying delta table is changing while the query is running.
Snapshot snapshot;
if (snapshotId.isPresent()) {
snapshot = getSnapshotById(deltaLog.get(), snapshotId.get(), schemaTableName);
snapshot = getSnapshotById(deltaTable, deltaEngine, snapshotId.get(), schemaTableName);
}
else if (snapshotAsOfTimestampMillis.isPresent()) {
snapshot = getSnapshotAsOfTimestamp(deltaLog.get(), snapshotAsOfTimestampMillis.get(), schemaTableName);
snapshot = getSnapshotAsOfTimestamp(deltaTable, deltaEngine,
snapshotAsOfTimestampMillis.get(), schemaTableName);
}
else {
snapshot = deltaLog.get().snapshot(); // get the latest snapshot
try {
snapshot = deltaTable.getLatestSnapshot(deltaEngine); // get the latest snapshot
}
catch (TableNotFoundException e) {
throw new PrestoException(StandardErrorCode.NOT_FOUND,
format("Could not move to latest snapshot on table '%s.%s'", schemaTableName.getSchemaName(),
schemaTableName.getTableName()), e);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For better readability, should we move this code block into a method something like getSnapshot instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


Metadata metadata = snapshot.getMetadata();
String format = metadata.getFormat().getProvider();
if (!PARQUET.name().equalsIgnoreCase(format)) {
throw new PrestoException(DELTA_UNSUPPORTED_DATA_FORMAT,
format("Delta table %s has unsupported data format: %s. Currently only Parquet data format is supported", schemaTableName, format));
if (snapshot instanceof SnapshotImpl) {
String format = ((SnapshotImpl) snapshot).getMetadata().getFormat().getProvider();
if (!PARQUET.name().equalsIgnoreCase(format)) {
throw new PrestoException(DeltaErrorCode.DELTA_UNSUPPORTED_DATA_FORMAT,
format("Delta table %s has unsupported data format: %s. Only the Parquet data format is supported", schemaTableName, format));
}
}

return Optional.of(new DeltaTable(
schemaTableName.getSchemaName(),
schemaTableName.getTableName(),
tableLocation,
Optional.of(snapshot.getVersion()), // lock the snapshot version
getSchema(schemaTableName, metadata)));
return snapshot;
}

/**
* Get the list of files corresponding to the given Delta table.
*
* @return Closeable iterator of files. It is responsibility of the caller to close the iterator.
*/
public CloseableIterator<AddFile> listFiles(ConnectorSession session, DeltaTable deltaTable)
public CloseableIterator<FilteredColumnarBatch> listFiles(ConnectorSession session, DeltaTable deltaTable)
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do null check for deltaTable here -
requireNonNull(deltaTable, "deltaTable is null");

Also I think, we can shift deltaTable.getSnapshotId().isPresent() check in here in the start itself from line 148.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

requireNonNull(deltaTable, "deltaTable is null");
checkArgument(deltaTable.getSnapshotId().isPresent(), "Snapshot id is missing from the Delta table");
Optional<DeltaLog> deltaLog = loadDeltaTableLog(
session,
Optional<Engine> deltaEngine = loadDeltaEngine(session,
new Path(deltaTable.getTableLocation()),
new SchemaTableName(deltaTable.getSchemaName(), deltaTable.getTableName()));
if (!deltaEngine.isPresent()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know under what circumstances this error could happen?

Copy link
Contributor Author

@mblanco-denodo mblanco-denodo Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now it should not happen, as the method loadDeltaTableClient will return a non null Engine or throw an exception. The usage of Optional in that method is inherited from the previous code, and it is a good practice to check if deltaEngine.isPresent() as in future iterations the logic of loadDeltaTableClient could change (note that all the Delta Kernel API is annotated with the @Evolving annotation, and they are making big API changes even on minor versions).

throw new PrestoException(DeltaErrorCode.DELTA_ERROR_LOADING_METADATA,
format("Could not obtain Delta engine in '%s'", deltaTable.getTableLocation()));
}
Table sourceTable = loadDeltaTable(deltaTable.getTableLocation(), deltaEngine.get());

if (!deltaLog.isPresent()) {
throw new PrestoException(NOT_FOUND,
format("Delta table (%s.%s) no longer exists.", deltaTable.getSchemaName(), deltaTable.getTableName()));
if (!deltaTable.getSnapshotId().isPresent()) {
throw new PrestoException(DeltaErrorCode.DELTA_ERROR_LOADING_SNAPSHOT, "Could not obtain snapshot id");
}

return deltaLog.get()
.getSnapshotForVersionAsOf(deltaTable.getSnapshotId().get())
.scan()
.getFiles();
try {
return sourceTable.getSnapshotAsOfVersion(deltaEngine.get(),
deltaTable.getSnapshotId().get()).getScanBuilder(deltaEngine.get()).build()
.getScanFiles(deltaEngine.get());
}
catch (TableNotFoundException e) {
throw new PrestoException(StandardErrorCode.NOT_FOUND,
format("Delta table not found in '%s'", deltaTable.getTableLocation()), e);
}
}

private Optional<DeltaLog> loadDeltaTableLog(ConnectorSession session, Path tableLocation, SchemaTableName schemaTableName)
private Optional<Engine> loadDeltaEngine(ConnectorSession session, Path tableLocation,
SchemaTableName schemaTableName)
{
try {
HdfsContext hdfsContext = new HdfsContext(
Expand All @@ -148,63 +186,97 @@ private Optional<DeltaLog> loadDeltaTableLog(ConnectorSession session, Path tabl
if (!fileSystem.isDirectory(tableLocation)) {
return Optional.empty();
}
return Optional.of(DeltaLog.forTable(
hdfsEnvironment.getConfiguration(hdfsContext, tableLocation),
tableLocation));
return Optional.of(DefaultEngine.create(fileSystem.getConf()));
}
catch (IOException ioException) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, "Failed to load Delta table: " + ioException.getMessage(), ioException);
throw new PrestoException(DeltaErrorCode.DELTA_ERROR_LOADING_METADATA,
"Failed to load Delta table: " + ioException.getMessage(), ioException);
}
}

private static Snapshot getSnapshotById(DeltaLog deltaLog, long snapshotId, SchemaTableName schemaTableName)
private Table loadDeltaTable(String tableLocation, Engine deltaEngine)
{
return Table.forPath(deltaEngine, tableLocation);
}

private static Snapshot getSnapshotById(Table deltaTable, Engine deltaEngine, long snapshotId, SchemaTableName schemaTableName)
{
try {
return deltaLog.getSnapshotForVersionAsOf(snapshotId);
return deltaTable.getSnapshotAsOfVersion(deltaEngine, snapshotId);
}
catch (IllegalArgumentException exception) {
throw new PrestoException(
NOT_FOUND,
StandardErrorCode.NOT_FOUND,
format("Snapshot version %d does not exist in Delta table '%s'.", snapshotId, schemaTableName),
exception);
}
catch (TableNotFoundException e) {
throw new PrestoException(StandardErrorCode.NOT_FOUND,
format(TABLE_NOT_FOUND_ERROR_TEMPLATE, schemaTableName.getSchemaName(),
schemaTableName.getTableName()));
}
}

private static Snapshot getSnapshotAsOfTimestamp(DeltaLog deltaLog, long snapshotAsOfTimestampMillis, SchemaTableName schemaTableName)
private static Snapshot getSnapshotAsOfTimestamp(Table deltaTable, Engine deltaEngine,
long snapshotAsOfTimestampMillis, SchemaTableName schemaTableName)
{
try {
return deltaLog.getSnapshotForTimestampAsOf(snapshotAsOfTimestampMillis);
return deltaTable.getSnapshotAsOfTimestamp(deltaEngine, snapshotAsOfTimestampMillis);
}
catch (IllegalArgumentException exception) {
throw new PrestoException(
NOT_FOUND,
StandardErrorCode.NOT_FOUND,
format(
"There is no snapshot exists in Delta table '%s' that is created on or before '%s'",
schemaTableName,
Instant.ofEpochMilli(snapshotAsOfTimestampMillis)),
exception);
}
catch (TableNotFoundException e) {
throw new PrestoException(StandardErrorCode.NOT_FOUND,
format(TABLE_NOT_FOUND_ERROR_TEMPLATE, schemaTableName.getSchemaName(),
schemaTableName.getTableName()));
}
}

/**
* Utility method that returns the columns in given Delta metadata. Returned columns include regular and partition types.
* Data type from Delta is mapped to appropriate Presto data type.
*/
private static List<DeltaColumn> getSchema(SchemaTableName tableName, Metadata metadata)
private static List<DeltaColumn> getSchema(DeltaConfig config, SchemaTableName tableName, Engine deltaEngine,
Snapshot snapshot)
{
Set<String> partitionColumns = metadata.getPartitionColumns().stream()
.map(String::toLowerCase)
.collect(Collectors.toSet());

return Arrays.stream(metadata.getSchema().getFields())
.map(field -> {
String columnName = field.getName().toLowerCase(US);
TypeSignature prestoType = DeltaTypeUtils.convertDeltaDataTypePrestoDataType(tableName, columnName, field.getDataType());
return new DeltaColumn(
columnName,
prestoType,
field.isNullable(),
partitionColumns.contains(columnName));
}).collect(Collectors.toList());
try (CloseableIterator<FilteredColumnarBatch> columnBatches = snapshot.getScanBuilder(deltaEngine).build()
.getScanFiles(deltaEngine)) {
Row row = null;
while (columnBatches.hasNext()) {
CloseableIterator<Row> rows = columnBatches.next().getRows();
if (rows.hasNext()) {
row = rows.next();
break;
}
}
Map<String, String> partitionValues = row != null ?
InternalScanFileUtils.getPartitionValues(row) : new HashMap<>(0);
return snapshot.getSchema(deltaEngine).fields().stream()
.map(field -> {
String columnName = config.isCaseSensitivePartitionsEnabled() ? field.getName() :
field.getName().toLowerCase(US);
TypeSignature prestoType = DeltaTypeUtils.convertDeltaDataTypePrestoDataType(tableName,
columnName, field.getDataType());
return new DeltaColumn(
columnName,
prestoType,
field.isNullable(),
partitionValues.containsKey(columnName));
}).collect(Collectors.toList());
}
catch (TableNotFoundException e) {
throw new PrestoException(StandardErrorCode.NOT_FOUND,
format(TABLE_NOT_FOUND_ERROR_TEMPLATE, tableName.getSchemaName(), tableName.getTableName()));
}
catch (IOException e) {
throw new UncheckedIOException("Could not close columnar batch row", e);
}
}
}
Loading
Loading