Skip to content

Commit

Permalink
Only delete non-empty directories when dropping Hive schemas
Browse files Browse the repository at this point in the history
In HiveMetadata, delete an empty schema location after dropping it
from the metastore.

In ThriftHiveMetastore and FileHiveMetastore, do not delete data.
  • Loading branch information
jirassimok authored and losipiuk committed Nov 20, 2021
1 parent e3cf288 commit a89d7c1
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 10 deletions.
Expand Up @@ -41,9 +41,11 @@
import io.trino.plugin.hive.security.SqlStandardAccessControlMetadataMetastore;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaNotFoundException;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.TableNotFoundException;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.ConnectorIdentity;
import io.trino.spi.security.PrincipalType;
import io.trino.spi.security.RoleGrant;
import io.trino.spi.statistics.ColumnStatisticType;
Expand Down Expand Up @@ -366,7 +368,38 @@ public synchronized void createDatabase(HiveIdentity identity, Database database

public synchronized void dropDatabase(HiveIdentity identity, String schemaName)
{
setExclusive((delegate, hdfsEnvironment) -> delegate.dropDatabase(identity, schemaName));
HdfsContext context = new HdfsContext(
identity.getUsername()
.map(ConnectorIdentity::ofUser)
.orElseThrow(() -> new IllegalStateException("username is null")));

Optional<Path> location = delegate.getDatabase(schemaName)
.orElseThrow(() -> new SchemaNotFoundException(schemaName))
.getLocation()
.map(Path::new);

setExclusive((delegate, hdfsEnvironment) -> {
delegate.dropDatabase(identity, schemaName);

location.ifPresent(path -> {
try {
FileSystem fs = hdfsEnvironment.getFileSystem(context, path);
// If no files in schema directory, delete it
if (!fs.listFiles(path, false).hasNext()) {
log.debug("Deleting location of dropped schema (%s)", path);
fs.delete(path, true);
}
else {
log.info("Skipped deleting schema location with external files (%s)", path);
}
}
catch (IOException e) {
throw new TrinoException(
HIVE_FILESYSTEM_ERROR,
format("Error checking or deleting schema directory '%s'", path), e);
}
});
});
}

public synchronized void renameDatabase(HiveIdentity identity, String source, String target)
Expand Down
Expand Up @@ -214,7 +214,8 @@ public synchronized void dropDatabase(HiveIdentity identity, String databaseName
throw new TrinoException(HIVE_METASTORE_ERROR, "Database " + databaseName + " is not empty");
}

deleteMetadataDirectory(getDatabaseMetadataDirectory(databaseName));
// Only delete the metadata of the database, not any other files
deleteSchemaFile(DATABASE, getDatabaseMetadataDirectory(databaseName));
}

@Override
Expand Down
Expand Up @@ -1017,7 +1017,7 @@ public void dropDatabase(HiveIdentity identity, String databaseName)
.stopOnIllegalExceptions()
.run("dropDatabase", stats.getDropDatabase().wrap(() -> {
try (ThriftMetastoreClient client = createMetastoreClient(identity)) {
client.dropDatabase(databaseName, true, false);
client.dropDatabase(databaseName, false, false);
}
return null;
}));
Expand Down
Expand Up @@ -17,14 +17,17 @@
import com.google.inject.name.Named;
import io.trino.tempto.ProductTest;
import io.trino.tempto.hadoop.hdfs.HdfsClient;
import io.trino.tempto.query.QueryExecutionException;
import org.testng.annotations.Test;

import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure;
import static io.trino.tempto.query.QueryExecutor.query;
import static io.trino.tests.product.hive.util.TemporaryHiveTable.randomTableSuffix;
import static io.trino.tests.product.utils.QueryExecutors.onHive;
import static io.trino.tests.product.utils.QueryExecutors.onTrino;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

public class TestCreateDropSchema
extends ProductTest
Expand All @@ -39,18 +42,159 @@ public class TestCreateDropSchema
@Test
public void testCreateDropSchema()
{
onHive().executeQuery("DROP DATABASE IF EXISTS test_drop_schema CASCADE");
String schemaName = "test_drop_schema";
String schemaDir = warehouseDirectory + "/test_drop_schema.db";

onTrino().executeQuery("CREATE SCHEMA test_drop_schema");
assertTrue(hdfsClient.exist(warehouseDirectory + "/test_drop_schema.db"));
ensureSchemaDoesNotExist(schemaName);

assertQuerySucceeds("CREATE SCHEMA test_drop_schema");
assertThat(hdfsClient.exist(schemaDir))
.as("Check if expected schema directory exists after creating schema")
.isTrue();

onTrino().executeQuery("CREATE TABLE test_drop_schema.test_drop (col1 int)");

assertQueryFailure(() -> query("DROP SCHEMA test_drop_schema"))
.hasMessageContaining("line 1:1: Cannot drop non-empty schema 'test_drop_schema'");

onTrino().executeQuery("DROP TABLE test_drop_schema.test_drop");

onTrino().executeQuery("DROP SCHEMA test_drop_schema");
assertFalse(hdfsClient.exist(warehouseDirectory + "/test_drop_schema.db"));
assertQuerySucceeds("DROP SCHEMA test_drop_schema");
assertThat(hdfsClient.exist(schemaDir))
.as("Check if schema directory exists after dropping schema")
.isFalse();
}

@Test
public void testDropSchemaWithEmptyLocation()
{
String schemaName = schemaName("schema_with_empty_location");
String schemaDir = warehouseDirectory + "/schema-with-empty-location/";

createSchema(schemaName, schemaDir);
assertFileExists(schemaDir, true, "schema directory exists after create schema");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertFileExists(schemaDir, false, "schema directory exists after drop schema");
}

@Test
public void testDropSchemaFilesWithoutLocation()
{
String schemaName = schemaName("schema_without_location");
String schemaDir = format("%s/%s.db/", warehouseDirectory, schemaName);

createSchema(schemaName);
assertFileExists(schemaDir, true, "schema directory exists after create schema");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertFileExists(schemaDir, false, "schema directory exists after drop schema");
}

@Test
public void testDropSchemaFilesWithNonemptyLocation()
{
String schemaName = schemaName("schema_with_nonempty_location");
String schemaDir = warehouseDirectory + "/schema-with-nonempty-location/";

// Create file in schema directory before creating schema
String externalFile = schemaDir + "external-file";
hdfsClient.createDirectory(schemaDir);
hdfsClient.saveFile(externalFile, "");

createSchema(schemaName, schemaDir);
assertFileExists(schemaDir, true, "schema directory exists after create schema");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertFileExists(schemaDir, true, "schema directory exists after drop schema");

assertFileExists(externalFile, true, "external file exists after drop schema");

hdfsClient.delete(externalFile);
}

// Tests create/drop schema transactions with default schema location
@Test
public void testDropSchemaFilesTransactions()
{
String schemaName = schemaName("schema_directory_transactions");
String schemaDir = format("%s/%s.db/", warehouseDirectory, schemaName);

createSchema(schemaName);
assertFileExists(schemaDir, true, "schema directory exists after create schema");

onTrino().executeQuery("START TRANSACTION");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertQuerySucceeds("ROLLBACK");
assertFileExists(schemaDir, true, "schema directory exists after rollback");

// Sanity check: schema is still working
onTrino().executeQuery(format("CREATE TABLE %s.test_table (i integer)", schemaName));
onTrino().executeQuery(format("DROP TABLE %s.test_table", schemaName));

onTrino().executeQuery("START TRANSACTION");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertQuerySucceeds("COMMIT");
assertFileExists(schemaDir, false, "schema directory exists after drop schema");
}

@Test
public void testDropSchemaFilesTransactionsWithExternalFiles()
{
String schemaName = schemaName("schema_transactions_with_external_files");
String schemaDir = warehouseDirectory + "/schema-transactions-with-external-files/";

// Create file in schema directory before creating schema
String externalFile = schemaDir + "external-file";
hdfsClient.createDirectory(schemaDir);
hdfsClient.saveFile(externalFile, "");

createSchema(schemaName, schemaDir);

onTrino().executeQuery("START TRANSACTION");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertQuerySucceeds("ROLLBACK");
assertFileExists(externalFile, true, "external file exists after rolling back drop schema");

// Sanity check: schema is still working
onTrino().executeQuery(format("CREATE TABLE %s.test_table (i integer)", schemaName));
onTrino().executeQuery(format("DROP TABLE %s.test_table", schemaName));

onTrino().executeQuery("START TRANSACTION");
assertQuerySucceeds("DROP SCHEMA " + schemaName);
assertQuerySucceeds("COMMIT");
assertFileExists(externalFile, true, "schema directory exists after committing drop schema");
}

private void assertFileExists(String path, boolean exists, String description)
{
assertThat(hdfsClient.exist(path)).as("%s (%s)", description, path).isEqualTo(exists);
}

private static void assertQuerySucceeds(String query)
{
try {
onTrino().executeQuery(query);
}
catch (QueryExecutionException e) {
fail(format("Expected query to succeed: %s", query), e.getCause());
}
}

private void createSchema(String name)
{
onTrino().executeQuery(format("CREATE SCHEMA %s", name));
}

private void createSchema(String name, String location)
{
onTrino().executeQuery(format("CREATE SCHEMA %s WITH (location = '%s')", name, location));
}

private static String schemaName(String name)
{
return format("%s_%s", name, randomTableSuffix());
}

private static void ensureSchemaDoesNotExist(String schemaName)
{
onHive().executeQuery(format("DROP DATABASE IF EXISTS %s CASCADE", schemaName));
}
}

0 comments on commit a89d7c1

Please sign in to comment.