Skip to content

Commit

Permalink
Ensure putSnapshot path honoring case insensitive contract (#85)
Browse files Browse the repository at this point in the history
## Summary

This is a bug fixes for put-snapshot code path, where UUID-extraction
process is using `tableId` and `databaseId` from the request itself.
Those ids, if directly obtained from top-level request body, can lost
casing if the requests were from platform like Spark SQL. Since the
underlying storage (e.g. HDFS ) is case sensitive in its path URL, we
will need to ensure the original casing info when issue the first commit
as part of CTAS is preserved in the process of UUID-extraction of the
second commit.

The other parts in this PR is to ensure, when `put` is happening, the
`tableDto` is not always built from scratch when there's existing object
discovered by `findById` method previously. This is done by switch
`orElse` method to `orElseGet`, in which the latter will only call the
supplier lazily when the calling object is absent. This leads to a
wasteful implementation as well as confusion on the code stack.

This PR also include:
- Some refactoring to share code with existing code. 
- Ensure `text-fixtures` module has its repository horning case
insensitive contract so that this behavior can be tested through
embedded instances.
- Testing the casing contract in both SQL and Catalog API.
  • Loading branch information
autumnust committed May 1, 2024
1 parent 041cdd3 commit d824024
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 52 deletions.
8 changes: 8 additions & 0 deletions integrations/spark/openhouse-spark-runtime/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ dependencies {
exclude group: 'org.roaringbitmap'
}

testImplementation("org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:" + icebergVersion)
testImplementation(project(':tables-test-fixtures_2.12'))
testImplementation('org.apache.spark:spark-sql_2.12:' + spark_version){
// These classes are available from `client-codegen-convention.gradle`
exclude group: "io.netty"
}
testImplementation(project(path: ':integrations:java:openhouse-java-runtime', configuration: 'shadow'))

fatJarPackagedDependencies(project(path: ':integrations:java:openhouse-java-runtime'))
fatJarRuntimeDependencies("org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:" + icebergVersion)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import com.linkedin.openhouse.tablestest.OpenHouseSparkITest;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import scala.collection.JavaConverters;

public class CatalogOperationTest extends OpenHouseSparkITest {
@Test
public void testCasingWithCTAS() throws Exception {
try (SparkSession spark = getSparkSession()) {
// creating a casing preserving table using backtick
spark.sql("CREATE TABLE openhouse.d1.`tT1` (name string)");
// testing writing behavior, note the casing of tt1 is intentionally changed.
spark.sql("INSERT INTO openhouse.d1.Tt1 VALUES ('foo')");

// Verifying by querying with all lower-cased name
Assertions.assertEquals(
1, spark.sql("SELECT * from openhouse.d1.tt1").collectAsList().size());
// ctas but referring with lower-cased name
spark.sql("CREATE TABLE openhouse.d1.t2 AS SELECT * from openhouse.d1.tt1");
Assertions.assertEquals(1, spark.sql("SELECT * FROM openhouse.d1.t2").collectAsList().size());
}
}

@Test
public void testCatalogWriteAPI() throws Exception {
try (SparkSession spark = getSparkSession()) {
Catalog icebergCatalog = getOpenHouseCatalog(spark);
// Create a table
Schema schema = new Schema(Types.NestedField.required(1, "name", Types.StringType.get()));
TableIdentifier tableIdentifier = TableIdentifier.of("db", "aaa");
icebergCatalog.createTable(tableIdentifier, schema);

// Write into data with intentionally changed casing in name
TableIdentifier tableIdentifierUpperTblName = TableIdentifier.of("db", "AAA");

DataFile fooDataFile =
DataFiles.builder(PartitionSpec.unpartitioned())
.withPath("/path/to/data-a.parquet")
.withFileSizeInBytes(10)
.withRecordCount(1)
.build();
AtomicReference<Table> tableRef = new AtomicReference<>();
Assertions.assertDoesNotThrow(
() -> {
Table loadedTable = icebergCatalog.loadTable(tableIdentifierUpperTblName);
tableRef.set(loadedTable);
});
Table table = tableRef.get();
Assertions.assertDoesNotThrow(
() -> {
table.newAppend().appendFile(fooDataFile).commit();
});
}
}

/**
* This is a copy of com.linkedin.openhouse.jobs.spark.Operations#getCatalog() temporarily.
* Refactoring these pieces require deployment coordination, thus we shall create an artifact
* module that can be pulled by :apps module.
*/
private Catalog getOpenHouseCatalog(SparkSession spark) {
final Map<String, String> catalogProperties = new HashMap<>();
final String catalogPropertyPrefix = String.format("spark.sql.catalog.openhouse.");
final Map<String, String> sparkProperties = JavaConverters.mapAsJavaMap(spark.conf().getAll());
for (Map.Entry<String, String> entry : sparkProperties.entrySet()) {
if (entry.getKey().startsWith(catalogPropertyPrefix)) {
catalogProperties.put(
entry.getKey().substring(catalogPropertyPrefix.length()), entry.getValue());
}
}
// this initializes the catalog based on runtime Catalog class passed in catalog-impl conf.
return CatalogUtil.loadCatalog(
sparkProperties.get("spark.sql.catalog.openhouse.catalog-impl"),
"openhouse",
catalogProperties,
spark.sparkContext().hadoopConfiguration());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,23 @@ public Pair<TableDto, Boolean> putIcebergSnapshots(

TableDto tableDtoToSave =
tablesMapper.toTableDto(
tableDto.orElse(
TableDto.builder()
.tableId(tableId)
.databaseId(databaseId)
.clusterId(clusterId)
.tableUri(
TableUri.builder()
.tableId(tableId)
.databaseId(databaseId)
.clusterId(clusterId)
.build()
.toString())
.tableUUID(
tableUUIDGenerator.generateUUID(icebergSnapshotRequestBody).toString())
.tableCreator(tableCreatorUpdater)
.build()),
tableDto.orElseGet(
() ->
TableDto.builder()
.tableId(tableId)
.databaseId(databaseId)
.clusterId(clusterId)
.tableUri(
TableUri.builder()
.tableId(tableId)
.databaseId(databaseId)
.clusterId(clusterId)
.build()
.toString())
.tableUUID(
tableUUIDGenerator.generateUUID(icebergSnapshotRequestBody).toString())
.tableCreator(tableCreatorUpdater)
.build()),
icebergSnapshotRequestBody);

if (tableDto.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,19 +106,22 @@ public Pair<TableDto, Boolean> putTable(
// FIXME: save method redundantly issue existence check after findById is called above
TableDto tableDtoToSave =
tablesMapper.toTableDto(
tableDto.orElse(
TableDto.builder()
.tableUri(
TableUri.builder()
.tableId(tableId)
.databaseId(databaseId)
.clusterId(createUpdateTableRequestBody.getClusterId())
.build()
.toString())
.tableUUID(
tableUUIDGenerator.generateUUID(createUpdateTableRequestBody).toString())
.tableCreator(tableCreatorUpdater)
.build()),
tableDto.orElseGet(
() ->
TableDto.builder()
.tableUri(
TableUri.builder()
.tableId(tableId)
.databaseId(databaseId)
.clusterId(createUpdateTableRequestBody.getClusterId())
.build()
.toString())
.tableUUID(
tableUUIDGenerator
.generateUUID(createUpdateTableRequestBody)
.toString())
.tableCreator(tableCreatorUpdater)
.build()),
createUpdateTableRequestBody);
try {
return Pair.of(openHouseInternalRepository.save(tableDtoToSave), !tableDto.isPresent());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
public class TableUUIDGenerator {
// TODO: r/w of tableProperties being managed in single place.
private static final String OPENHOUSE_NAMESPACE = "openhouse.";
private static final String DB_RAW_KEY = "databaseId";
private static final String TBL_RAW_KEY = "tableId";

@Autowired FsStorageProvider fsStorageProvider;

Expand Down Expand Up @@ -62,14 +64,37 @@ public UUID generateUUID(CreateUpdateTableRequestBody createUpdateTableRequestBo
* @return UUID
*/
public UUID generateUUID(IcebergSnapshotsRequestBody icebergSnapshotsRequestBody) {
return extractUUIDFromSnapshotJson(
icebergSnapshotsRequestBody.getJsonSnapshots(),
icebergSnapshotsRequestBody.getCreateUpdateTableRequestBody().getDatabaseId(),
icebergSnapshotsRequestBody.getCreateUpdateTableRequestBody().getTableId())
return extractUUIDFromRequestBody(icebergSnapshotsRequestBody)
.orElseGet(
() -> generateUUID(icebergSnapshotsRequestBody.getCreateUpdateTableRequestBody()));
}

/** Simple helper method to obtain tableURI from requestBody. */
private String getTableURI(IcebergSnapshotsRequestBody icebergSnapshotsRequestBody) {
return icebergSnapshotsRequestBody.getCreateUpdateTableRequestBody().getDatabaseId()
+ "."
+ icebergSnapshotsRequestBody.getCreateUpdateTableRequestBody().getTableId();
}

/**
* Extracting the value of given key from the table properties map. The main use cases are for
* tableId and databaseId where the value captured in tblproperties preserved the casing from
* creation. This casing is critical if r/w for this table occurs in a platform with different
* casing-preservation contract.
*/
private String extractFromTblPropsIfExists(
String tableURI, Map<String, String> tblProps, String rawKey) {
if (tblProps == null
|| !tblProps.containsKey(OPENHOUSE_NAMESPACE + rawKey)
|| tblProps.get(OPENHOUSE_NAMESPACE + rawKey) == null) {
throw new RequestValidationFailureException(
String.format(
"Provided snapshot is invalid for %s since databaseId or tableId is missing in properties",
tableURI));
}
return tblProps.get(OPENHOUSE_NAMESPACE + rawKey);
}

/**
* Helper method to extract UUID from tableProperties. A CTAS command's commit() call provides
* "openhouse.tableUUID", if snapshot was not provided, this property is used and its path is
Expand Down Expand Up @@ -111,18 +136,10 @@ private void validatePathOfProvidedRequest(
String tableUUIDProperty,
TableType tableType) {

// Using Ids from tableProperties is to ensure casing of these Ids are properly presented as
// they were when
// initially created. Ids carried in the requestBody, if sourced from query engine, may lose
// proper casing.
String dbIdFromProps = tableProperties.get(OPENHOUSE_NAMESPACE + "databaseId");
String tblIdFromProps = tableProperties.get(OPENHOUSE_NAMESPACE + "tableId");
if (dbIdFromProps == null || tblIdFromProps == null) {
throw new RequestValidationFailureException(
String.format(
"Provided snapshot is invalid for %s.%s since databaseId or tableId is missing in properties",
databaseId, tableId));
}
String dbIdFromProps =
extractFromTblPropsIfExists(databaseId + "." + tableId, tableProperties, DB_RAW_KEY);
String tblIdFromProps =
extractFromTblPropsIfExists(databaseId + "." + tableId, tableProperties, TBL_RAW_KEY);

java.nio.file.Path previousPath =
InternalRepositoryUtils.constructTablePath(
Expand All @@ -135,18 +152,32 @@ private void validatePathOfProvidedRequest(
}

/**
* Helper method to extract UUID from List.of(jsonSnapshots)
* Helper method to extract UUID from Iceberg-Snapshots' RequestBody
*
* <p>If List is null or empty returns empty Optional. If List contains a snapshot, Snapshot is
* validated by evaluating its "manifest-list" key.
*
* @param jsonSnapshots
* @param databaseId
* @param tableId
* @param snapshotsRequestBody a complete snapshot request-body
* @return Optional.of(UUID)
*/
private Optional<UUID> extractUUIDFromSnapshotJson(
List<String> jsonSnapshots, String databaseId, String tableId) {
private Optional<UUID> extractUUIDFromRequestBody(
IcebergSnapshotsRequestBody snapshotsRequestBody) {
List<String> jsonSnapshots = snapshotsRequestBody.getJsonSnapshots();
String tableURI =
snapshotsRequestBody.getCreateUpdateTableRequestBody().getDatabaseId()
+ "."
+ snapshotsRequestBody.getCreateUpdateTableRequestBody().getTableId();
String databaseId =
extractFromTblPropsIfExists(
tableURI,
snapshotsRequestBody.getCreateUpdateTableRequestBody().getTableProperties(),
DB_RAW_KEY);
String tableId =
extractFromTblPropsIfExists(
tableURI,
snapshotsRequestBody.getCreateUpdateTableRequestBody().getTableProperties(),
TBL_RAW_KEY);

String snapshotStr =
Optional.ofNullable(jsonSnapshots)
.filter(l -> !l.isEmpty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ public void testPutSnapshotsAppendWithStagedTable(GetTableResponseBody getTableR
getTableResponseBody
.toBuilder()
.tableVersion(INITIAL_TABLE_VERSION)
.tableProperties(tablePropsHelperForResponseBody(getTableResponseBody))
.tableUUID(beforeUUID)
.build()));
Map<String, String> snapshotRefs =
Expand All @@ -127,6 +128,7 @@ public void testPutSnapshotsAppendWithStagedTable(GetTableResponseBody getTableR
buildCreateUpdateTableRequestBody(getTableResponseBody)
.toBuilder()
.baseTableVersion(INITIAL_TABLE_VERSION)
.tableProperties(tablePropsHelperForResponseBody(getTableResponseBody))
.build())
.jsonSnapshots(jsonSnapshots)
.snapshotRefs(snapshotRefs)
Expand Down Expand Up @@ -327,4 +329,15 @@ private List<Snapshot> getSnapshotsWithMultipleAppendRequests(
}
return snapshots;
}

/**
* For mock responseBody, ensure they are equipped with correct properties that are critical for
* casing contract.
*/
private Map<String, String> tablePropsHelperForResponseBody(GetTableResponseBody responseBody) {
Map<String, String> originalProps = responseBody.getTableProperties();
originalProps.put("openhouse.databaseId", responseBody.getDatabaseId());
originalProps.put("openhouse.tableId", responseBody.getTableId());
return originalProps;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import com.linkedin.openhouse.tables.repository.impl.InternalRepositoryUtils;
import com.linkedin.openhouse.tables.utils.TableUUIDGenerator;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import lombok.SneakyThrows;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -146,6 +148,7 @@ public void testUUIDFailsForInvalidSnapshot() {
CreateUpdateTableRequestBody.builder()
.tableId("t")
.databaseId("db")
.tableProperties(generateMinimalTestProps("db", "t"))
.clusterId(CLUSTER_NAME)
.build())
.jsonSnapshots(
Expand Down Expand Up @@ -189,6 +192,7 @@ public void testUUIDFailsForInvalidSnapshotShortManifestList() {
CreateUpdateTableRequestBody.builder()
.tableId("t")
.databaseId("db")
.tableProperties(generateMinimalTestProps("db", "t"))
.clusterId(CLUSTER_NAME)
.build())
.jsonSnapshots(
Expand All @@ -210,6 +214,7 @@ public void testUUIDFailsForInvalidSnapshotWithoutUUID() {
CreateUpdateTableRequestBody.builder()
.tableId("t")
.databaseId("db")
.tableProperties(generateMinimalTestProps("db", "t"))
.clusterId(CLUSTER_NAME)
.build())
.jsonSnapshots(
Expand Down Expand Up @@ -298,4 +303,13 @@ private String getIcebergSnapshot(String manifestListValue) {
jsonObject.addProperty(key, manifestListValue);
return jsonObject.toString();
}

private Map<String, String> generateMinimalTestProps(String databaseId, String tableId) {
return new HashMap<String, String>() {
{
put("openhouse.databaseId", databaseId);
put("openhouse.tableId", tableId);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.linkedin.openhouse.tablestest;

import com.linkedin.openhouse.internal.catalog.model.HouseTable;
import com.linkedin.openhouse.internal.catalog.model.HouseTablePrimaryKey;
import com.linkedin.openhouse.internal.catalog.repository.HouseTableRepository;
import java.util.Optional;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Repository;

Expand All @@ -11,4 +14,13 @@
*/
@Repository
@Primary
public interface HouseTablesH2Repository extends HouseTableRepository {}
public interface HouseTablesH2Repository extends HouseTableRepository {
Optional<HouseTable> findByDatabaseIdIgnoreCaseAndTableIdIgnoreCase(
String databaseId, String tableId);

@Override
default Optional<HouseTable> findById(HouseTablePrimaryKey houseTablePrimaryKey) {
return this.findByDatabaseIdIgnoreCaseAndTableIdIgnoreCase(
houseTablePrimaryKey.getDatabaseId(), houseTablePrimaryKey.getTableId());
}
}

0 comments on commit d824024

Please sign in to comment.