Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iceberg: use LocationProvider instead of hardcoded path #8573

Merged
merged 2 commits into from
Aug 12, 2021

Conversation

jackye1995
Copy link
Member

Use Iceberg's LocationProvider instead of hard-coding file paths. With the current hard-coded Hive-based file paths to write data files, users on cloud storage cannot enjoy the benefit of Iceberg's ObjectStorageLocationProvider. This PR fixes this issue.

@findepi
Copy link
Member

findepi commented Jul 16, 2021

cc @losipiuk @alexjo2144

{
return outputPath;
if (locationProvider == null) {
locationProvider = deserializeFromBytes(serializedLocationProvider);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not use Java serialization. In particular, it can lead to security issues.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, there are multiple ways we can go with this. We can also get the location provider through the dependency injection of table operation, since there is a general agreement that HiveTableOperations will be used across the board.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought about it more, it would be inefficient to initialize a table operation, read table metadata and get the location provider. The alternative way is to pass in table properties. Please see if the new version works, thanks!

Copy link
Member

@alexjo2144 alexjo2144 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code changes seem reasonable, just one broader thought. A table could specify any custom LocationProvider implementation in the properties. Is that location guaranteed to be compatible with org.apache.hadoop.fs.Path?

Alternatively, we could scope this down to just supporting DefaultLocationProvider and ObjectStoreLocationProvider by introducing a config or table property option and initializing the LocationProvider directly rather than through LocationProviders#locationsFor.

@jackye1995
Copy link
Member Author

@alexjo2144 thanks for the feedback.

we could scope this down to just supporting DefaultLocationProvider and ObjectStoreLocationProvider

Sure I can introduce another static util method instead of using the Iceberg one.

Is that location guaranteed to be compatible with org.apache.hadoop.fs.Path?

This is related to the multi-catalog discussion, the conclusion there is that:

  1. multi-catalog will be supported, but only ones explicitly listed by Trino through an Enum, no custom loading is supported.
  2. only HdfsFileIO is used across all catalogs as the IO operator

So yes, it's guaranteed to be compatible.

@@ -296,4 +292,12 @@ public static Object deserializePartitionValue(Type type, String valueString, St

return Collections.unmodifiableMap(partitionKeys);
}

public static LocationProvider getLocationProvider(String tableLocation, Map<String, String> properties)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexjo2144 I think this is the best we can do on Trino side, the 2 location provider are protected and not public classes, so we can only block the creation if locaiton-impl table property is set.

Copy link
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(just skimming)

@@ -159,14 +159,6 @@ public static long resolveSnapshotId(Table table, long snapshotId)
return columns.build();
}

public static String getDataPath(String location)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't seem to belong to "pass in table properties to initialize location provider in sink prov…" commit

return new IcebergPageSink(
schema,
partitionSpec,
tableHandle.getOutputPath(),
locationProvider,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't seem to belong to "pass in table properties to initialize location provider in sink prov…" commit

@alexjo2144
Copy link
Member

@findepi did you have an opinion on allowing custom LocationProviders vs restricting?

@jackye1995 this looks good to me, can you just add a test case to TestSparkCompatibility with a test case

  • Create table in Spark with ObjectStorageLocationProvider
  • Insert data from Spark
  • Insert data from Trino
  • Read from both and ensure both get the same data
  • Read the "$path" column and ensure data files have the correct path structure

@jackye1995 jackye1995 force-pushed the location-provider-path branch 6 times, most recently from 9bab127 to 0ec481a Compare July 20, 2021 18:19
@jackye1995
Copy link
Member Author

@findepi thanks for the comments, I am still getting familiar with the standards here so sorry for the bad commit message. I was used to commit with random message and merge with squash. I have force updated everything into a single commit, please let me know if this works.

@alexjo2144 test added, should be good for another look

"'write.object-storage.enabled'=true," +
"'write.object-storage.path'='" + dataPath + "')";
onSpark().executeQuery(format(sparkTableDefinition, sparkTableName));
onSpark().executeQuery(format("INSERT INTO %s VALUES ('a_string', 1000000000000000)", sparkTableName));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's important to do an INSERT from Trino here too. That's what runs the changes you made in the IcebergPageSink

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah actually this is a good point, it does not test anything by inserting it using Spark, so I directly changed it to be executed on onTrino.

If we want to test Trino able to read Spark data written by object storage location provider, I think it should be placed in the testPrestoReadingSparkData test. But LocationProvider is a write side feature, on read side it's just using the file path in the manifest, so I think there is little value for adding that test case.

@jackye1995 jackye1995 force-pushed the location-provider-path branch 2 times, most recently from fe90e49 to 34eafea Compare July 20, 2021 22:37
Copy link
Member

@alexjo2144 alexjo2144 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. The option to create tables using ObjectStoreLocationProvider from Trino would be nice, but in separate Issue/PR

@jackye1995
Copy link
Member Author

@alexjo2144 @findepi any updates on this? Please let me know if there is any change needed, otherwise could you merge the PR? Thanks!

Copy link
Member

@electrum electrum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding this correctly, the location provider should be specific to the catalog implementation, correct? If so, then we should add this to the upcoming catalog API. Or is this something that the user should configure, for example, depending on if they are using HDFS or S3?

Either way, this shouldn't be or be passed in a table property. We can discuss this on Slack if that's easier.

this.fileWriterFactory = requireNonNull(fileWriterFactory, "fileWriterFactory is null");
this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
this.hdfsContext = requireNonNull(hdfsContext, "hdfsContext is null");
this.jobConf = toJobConf(hdfsEnvironment.getConfiguration(hdfsContext, new Path(outputPath)));
this.jobConf = toJobConf(hdfsEnvironment.getConfiguration(hdfsContext, new Path(locationProvider.newDataLocation(""))));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parameter is for a filename, so an empty filename would seem to be invalid, and thus a location provider might reject it. We could use a default name like data-file instead.

@jackye1995
Copy link
Member Author

@electrum thanks for the comment, I will both replay here just for visibility.

We discussed about the use of table properties in Trino versus Iceberg last time, what I found out is that they are not really the same concept after all. The fundamental difference is that:

On Trino side, table property is only used at table creation time, and it is up to the connector to decide what to do with each property. For Iceberg's case, we have:

  1. partitioning: used to input partition transform information for Iceberg tables, stored as a partition spec in Iceberg table metadata JSON file
  2. location: used to input table location, stored as the location field of the table metadata.
  3. format, used to specify table format, stored as the write.format.default (Iceberg TableProperties.DEFAULT_FILE_FORMAT) in the table metadata's table properties map.

On Iceberg side, table properties are directly stored as a part of the table metadata JSON file, which is the format case in Trino Iceberg connector table property. The property is used to determine read/write behavior at runtime.

With this understanding, now we look at location provider configuration, which is related to the following Iceberg table properties:

  1. write.object-storage.enabled: boolean to determine if object storage location provider should be used or not
  2. write.object-storage.path: root path for data files in object storage mode
  3. write.location-provider.impl: any custom location provider impl, which Trino will directly reject loading.

What this PR is trying to support is the case for which:

  1. a user writes an Iceberg table with the above configurations set using some other systems like Spark/Hive
  2. then the user tries to use Trino to also perform additional write operations

In the case described above, Trino should respect the configurations and write to the configured location instead of the hard-coded table location. This has nothing to do with user configuring any Trino table property.

I think what you are concerned about is the case where user can create a table with object storage mode or not based on a Trino table property or some other table features like the storage provider. I agree that behavior should be controlled. I am trying to keep this PR small, we can introduce a boolean table property like object_storage_mode and map to the Iceberg table properties behind the scene, similar to the format case.

@findepi
Copy link
Member

findepi commented Aug 3, 2021

@jackye1995

  • can write.object-storage.path be used without specifying write.location-provider.impl?
  • can write.object-storage.path be used without specifying write.object-storage.enabled? what should be the behavior then?

perhaps we can narrow down the scope of the change to

  • respect write.object-storage.path (and write.object-storage.enabled if they must be used in tandem) -- instead of writing to table's directory (determined from location), write to location pointed to by write.object-storage.path
  • throw when attempting to write when write.location-provider.impl is set

Such changes should require significant code changes, should they?

Actually, isn't the following implementing the aforementioned behavior already?

return LocationProviders.locationsFor(metadata.location(), metadata.properties());

@jackye1995
Copy link
Member Author

can write.object-storage.path be used without specifying write.location-provider.impl
can write.object-storage.path be used without specifying write.object-storage.enabled? what should be the behavior then?

write.object-storage.path is only used by ObjectStorageLocationProvider when write.object-storage.enabled is true. There is no other way to enable this as the class is not public. Other location provider implementations might leverage the same config, but Trino will reject it through write.location-provider.impl config.

perhaps we can narrow down the scope of the change to ...

Yes this PR is exactly trying to achieve the 2 goals you listed.

@jackye1995
Copy link
Member Author

Actually, isn't the following implementing the aforementioned behavior already?

Yes, but to use it in the page sink provider, we need to construct a new HiveTableOperations which is quite expensive. That's why IcebergUtil.getLocationProvider is directly called.

@@ -423,6 +424,33 @@ public void testTrinoShowingSparkCreatedTables()
onTrino().executeQuery("DROP TABLE " + trinoTableName(trinoTable));
}

@Test(groups = {ICEBERG, PROFILE_SPECIFIC_TESTS})
public void testPrestoWritingDataWithObjectStorageLocationProvider()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presto -> Trino

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should run this test for all supported formats, since page sink can have format-dependent behavior (currently it does not have). Requires #8751

String trinoTableName = trinoTableName(baseTableName);
String dataPath = "hdfs://hadoop-master:9000/user/hive/warehouse/test_object_storage_location_provider/obj-data";

String sparkTableDefinition = "CREATE TABLE %s (_string STRING, _bigint BIGINT) USING ICEBERG OPTIONS (" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is using OPTIONS different from using TBLPROPERTIES?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's the same. I can change to TBLPROPERTIES for consistency.

String sparkTableDefinition = "CREATE TABLE %s (_string STRING, _bigint BIGINT) USING ICEBERG OPTIONS (" +
"'write.object-storage.enabled'=true," +
"'write.object-storage.path'='" + dataPath + "')";
onSpark().executeQuery(format(sparkTableDefinition, sparkTableName));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline sparkTableDefinition variable, it's redundant

(i know it's a copy of existing test code; i am doing this cleanup for existing tests in 566d41a)

assertThat(prestoSelect).containsOnly(result);

QueryResult filesTableResult = onTrino().executeQuery(format("SELECT file_path FROM %s", trinoTableName("\"" + baseTableName + "$files\"")));
filesTableResult.rows().forEach(row -> assertTrue(((String) row.get(0)).contains(dataPath)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case of failure, this assertion will produce useless error message

user Assertions.assertThat.

@jackye1995
Copy link
Member Author

@findepi thanks for the comments.

We should run this test for all supported formats

It looks like we just need to rebase based on which PR gets in first?

Apart from that, hopefully I have addressed all your comments except for the table deletion one.

we should have two delete modes

This is actually an interesting topic that I would like to also get fixed, because the current behavior is broken even for Spark Iceberg tables that are not using the object storage location provider.

In Iceberg there is another config write.folder-storage.path that can allow writing to a different location with a default location provider. Currently Trino just calls Hive to purge data, which would not work with this config.

The correct behavior is to call Iceberg's CatalogUtil.dropTableData which will recursively search all manifests and all data files in a table and delete them. I think we should just have a PR specifically for this fix, and it does not need any Spark-Trino compatibility test, we can directly test with normal Trino Iceberg unit test.

@@ -700,6 +705,12 @@ public ColumnHandle getDeleteRowIdColumnHandle(ConnectorSession session, Connect
public void dropTable(ConnectorSession session, ConnectorTableHandle tableHandle)
{
IcebergTableHandle handle = (IcebergTableHandle) tableHandle;
org.apache.iceberg.Table table = getIcebergTable(session, handle.getSchemaTableName());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@findepi I think this is the best we can do for now for all the Iceberg path overrides. We know they come from external systems like Spark, so we will not drop the table to avoid leaving files not deleted during a drop table operation. I can work on the solution to this after this PR is merged, when we start to enable creating Trino Iceberg table with those overrides.

Copy link
Member

@losipiuk losipiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM % minor comments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

None yet

6 participants