Skip to content

Commit

Permalink
Add support for Iceberg object store
Browse files Browse the repository at this point in the history
  • Loading branch information
jakelong95 committed Feb 2, 2024
1 parent 1e4e3c4 commit cda8e04
Show file tree
Hide file tree
Showing 12 changed files with 257 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class IcebergConfig
private Optional<String> materializedViewsStorageSchema = Optional.empty();
private boolean sortedWritingEnabled = true;
private boolean queryPartitionFilterRequired;
private boolean objectStoreEnabled;
private Optional<String> dataLocation = Optional.empty();

public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -418,4 +420,30 @@ public boolean isStorageSchemaSetWhenHidingIsEnabled()
{
return hideMaterializedViewStorageTable && materializedViewsStorageSchema.isPresent();
}

@Config("iceberg.object-store.enabled")
@ConfigDescription("Enable the Iceberg object store file layout")
public IcebergConfig setObjectStoreEnabled(boolean objectStoreEnabled)
{
this.objectStoreEnabled = objectStoreEnabled;
return this;
}

public boolean isObjectStoreEnabled()
{
return objectStoreEnabled;
}

@Config("iceberg.data-location")
@ConfigDescription("Path for data files")
public IcebergConfig setDataLocation(String dataLocation)
{
this.dataLocation = Optional.ofNullable(dataLocation);
return this;
}

public Optional<String> getDataLocation()
{
return dataLocation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public final class IcebergSessionProperties
private static final String MERGE_MANIFESTS_ON_WRITE = "merge_manifests_on_write";
private static final String SORTED_WRITING_ENABLED = "sorted_writing_enabled";
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
private static final String OBJECT_STORE_ENABLED = "object_store_enabled";
private static final String DATA_LOCATION = "data_location";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -348,6 +350,16 @@ public IcebergSessionProperties(
"Require filter on partition column",
icebergConfig.isQueryPartitionFilterRequired(),
false))
.add(booleanProperty(
OBJECT_STORE_ENABLED,
"Enable Iceberg object store file layout",
icebergConfig.isObjectStoreEnabled(),
false))
.add(stringProperty(
DATA_LOCATION,
"Location for data files",
icebergConfig.getDataLocation().orElse(null),
false))
.build();
}

Expand Down Expand Up @@ -568,4 +580,14 @@ public static boolean isQueryPartitionFilterRequired(ConnectorSession session)
{
return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class);
}

public static boolean isObjectStoreEnabled(ConnectorSession session)
{
return session.getProperty(OBJECT_STORE_ENABLED, Boolean.class);
}

public static Optional<String> getDataLocation(ConnectorSession session)
{
return Optional.ofNullable(session.getProperty(DATA_LOCATION, String.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MAX;
import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MIN;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.doubleProperty;
import static io.trino.spi.session.PropertyMetadata.enumProperty;
import static io.trino.spi.session.PropertyMetadata.integerProperty;
Expand All @@ -45,6 +46,8 @@ public class IcebergTableProperties
public static final String FORMAT_VERSION_PROPERTY = "format_version";
public static final String ORC_BLOOM_FILTER_COLUMNS = "orc_bloom_filter_columns";
public static final String ORC_BLOOM_FILTER_FPP = "orc_bloom_filter_fpp";
public static final String OBJECT_STORE_ENABLED_PROPERTY = "object_store_enabled";
public static final String DATA_LOCATION_PROPERTY = "data_location";

private final List<PropertyMetadata<?>> tableProperties;

Expand Down Expand Up @@ -107,6 +110,16 @@ public IcebergTableProperties(
orcWriterConfig.getDefaultBloomFilterFpp(),
IcebergTableProperties::validateOrcBloomFilterFpp,
false))
.add(booleanProperty(
OBJECT_STORE_ENABLED_PROPERTY,
"Set to true to enable Iceberg object store file layout. Default false.",
icebergConfig.isObjectStoreEnabled(),
false))
.add(stringProperty(
DATA_LOCATION_PROPERTY,
"File system location URI for the table's data files",
icebergConfig.getDataLocation().orElse(null),
false))
.build();
}

Expand Down Expand Up @@ -169,4 +182,14 @@ private static void validateOrcBloomFilterFpp(double fpp)
throw new TrinoException(INVALID_TABLE_PROPERTY, "Bloom filter fpp value must be between 0.0 and 1.0");
}
}

public static boolean getObjectStoreEnabled(Map<String, Object> tableProperties)
{
return (Boolean) tableProperties.get(OBJECT_STORE_ENABLED_PROPERTY);
}

public static Optional<String> getDataLocation(Map<String, Object> tableProperties)
{
return Optional.ofNullable((String) tableProperties.get(DATA_LOCATION_PROPERTY));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,8 @@
import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_ENABLED_DEFAULT;
import static org.apache.iceberg.TableProperties.OBJECT_STORE_PATH;
import static org.apache.iceberg.TableProperties.WRITE_DATA_LOCATION;
import static org.apache.iceberg.TableProperties.WRITE_LOCATION_PROVIDER_IMPL;
import static org.apache.iceberg.TableProperties.WRITE_METADATA_LOCATION;
import static org.apache.iceberg.types.Type.TypeID.BINARY;
import static org.apache.iceberg.types.Type.TypeID.FIXED;
import static org.apache.iceberg.util.PropertyUtil.propertyAsBoolean;
Expand Down Expand Up @@ -657,6 +655,10 @@ public static Map<String, String> createTableProperties(ConnectorTableMetadata t
IcebergFileFormat fileFormat = IcebergTableProperties.getFileFormat(tableMetadata.getProperties());
propertiesBuilder.put(DEFAULT_FILE_FORMAT, fileFormat.toIceberg().toString());
propertiesBuilder.put(FORMAT_VERSION, Integer.toString(IcebergTableProperties.getFormatVersion(tableMetadata.getProperties())));
propertiesBuilder.put(OBJECT_STORE_ENABLED, Boolean.toString(IcebergTableProperties.getObjectStoreEnabled(tableMetadata.getProperties())));

Optional<String> dataLocation = IcebergTableProperties.getDataLocation(tableMetadata.getProperties());
dataLocation.ifPresent(s -> propertiesBuilder.put(WRITE_DATA_LOCATION, s));

// iceberg ORC format bloom filter properties used by create table
List<String> columns = getOrcBloomFilterColumns(tableMetadata.getProperties());
Expand Down Expand Up @@ -739,17 +741,6 @@ public static long getSnapshotIdAsOfTime(Table table, long epochMillis)
.snapshotId();
}

public static void validateTableCanBeDropped(Table table)
{
// TODO: support path override in Iceberg table creation: https://github.com/trinodb/trino/issues/8861
if (table.properties().containsKey(OBJECT_STORE_PATH) ||
table.properties().containsKey("write.folder-storage.path") || // Removed from Iceberg as of 0.14.0, but preserved for backward compatibility
table.properties().containsKey(WRITE_METADATA_LOCATION) ||
table.properties().containsKey(WRITE_DATA_LOCATION)) {
throw new TrinoException(NOT_SUPPORTED, "Table contains Iceberg path override properties and cannot be dropped from Trino: " + table.name());
}
}

private static void checkFormatForProperty(FileFormat actualStorageFormat, FileFormat expectedStorageFormat, String propertyName)
{
if (actualStorageFormat != expectedStorageFormat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.getTableComment;
import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE;
import static io.trino.plugin.iceberg.TrinoMetricsReporter.TRINO_METRICS_REPORTER;
import static io.trino.plugin.iceberg.catalog.glue.GlueIcebergUtil.getMaterializedViewTableInput;
Expand Down Expand Up @@ -676,7 +675,6 @@ private Optional<List<ColumnMetadata>> getCachedColumnMetadata(com.amazonaws.ser
public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
BaseTable table = (BaseTable) loadTable(session, schemaTableName);
validateTableCanBeDropped(table);
try {
deleteTable(schemaTableName.getSchemaName(), schemaTableName.getTableName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable;
import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.plugin.iceberg.TableType.MATERIALIZED_VIEW_STORAGE;
import static io.trino.plugin.iceberg.TrinoMetricsReporter.TRINO_METRICS_REPORTER;
import static io.trino.plugin.iceberg.catalog.AbstractIcebergTableOperations.ICEBERG_METASTORE_STORAGE_FORMAT;
Expand Down Expand Up @@ -411,7 +410,6 @@ public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
BaseTable table = (BaseTable) loadTable(session, schemaTableName);
TableMetadata metadata = table.operations().current();
validateTableCanBeDropped(table);

io.trino.plugin.hive.metastore.Table metastoreTable = metastore.getTable(schemaTableName.getSchemaName(), schemaTableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(schemaTableName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@
import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.loadIcebergTable;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
Expand Down Expand Up @@ -283,7 +282,6 @@ public void unregisterTable(ConnectorSession session, SchemaTableName tableName)
public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
BaseTable table = (BaseTable) loadTable(session, schemaTableName);
validateTableCanBeDropped(table);

jdbcCatalog.dropTable(toIdentifier(schemaTableName), false);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import static io.trino.plugin.iceberg.IcebergSchemaProperties.LOCATION_PROPERTY;
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableWithMetadata;
import static io.trino.plugin.iceberg.IcebergUtil.quotedTableName;
import static io.trino.plugin.iceberg.IcebergUtil.validateTableCanBeDropped;
import static io.trino.plugin.iceberg.catalog.nessie.IcebergNessieUtil.toIdentifier;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.connector.SchemaTableName.schemaTableName;
Expand Down Expand Up @@ -248,7 +247,6 @@ public Map<SchemaTableName, List<ColumnMetadata>> tryGetColumnMetadata(Connector
public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
BaseTable table = (BaseTable) loadTable(session, schemaTableName);
validateTableCanBeDropped(table);
nessieClient.dropTable(toIdentifier(schemaTableName), true);
deleteTableDirectory(fileSystemFactory.create(session), schemaTableName, table.location());
invalidateTableCache(schemaTableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4710,7 +4710,8 @@ protected void verifyIcebergTableProperties(MaterializedResult actual)
assertThat(actual).isNotNull();
MaterializedResult expected = resultBuilder(getSession())
.row("write.format.default", format.name())
.row("write.parquet.compression-codec", "zstd").build();
.row("write.parquet.compression-codec", "zstd")
.row("write.object-storage.enabled", "false").build();
assertEqualsIgnoreOrder(actual.getMaterializedRows(), expected.getMaterializedRows());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ public void testDefaults()
.setMaterializedViewsStorageSchema(null)
.setRegisterTableProcedureEnabled(false)
.setSortedWritingEnabled(true)
.setQueryPartitionFilterRequired(false));
.setQueryPartitionFilterRequired(false)
.setObjectStoreEnabled(false)
.setDataLocation(null));
}

@Test
Expand Down Expand Up @@ -97,6 +99,8 @@ public void testExplicitPropertyMappings()
.put("iceberg.register-table-procedure.enabled", "true")
.put("iceberg.sorted-writing-enabled", "false")
.put("iceberg.query-partition-filter-required", "true")
.put("iceberg.object-store.enabled", "true")
.put("iceberg.data-location", "data_location")
.buildOrThrow();

IcebergConfig expected = new IcebergConfig()
Expand All @@ -123,7 +127,9 @@ public void testExplicitPropertyMappings()
.setMaterializedViewsStorageSchema("mv_storage_schema")
.setRegisterTableProcedureEnabled(true)
.setSortedWritingEnabled(false)
.setQueryPartitionFilterRequired(true);
.setQueryPartitionFilterRequired(true)
.setObjectStoreEnabled(true)
.setDataLocation("data_location");

assertFullMapping(properties, expected);
}
Expand Down
Loading

0 comments on commit cda8e04

Please sign in to comment.