Skip to content

Commit

Permalink
Extract guice bindings for custom Iceberg catalog
Browse files Browse the repository at this point in the history
Allow tests to use a fully customized Iceberg catalog
that is not necessarily backed by a `FileHiveMetastore` instance.
  • Loading branch information
findinpath authored and findepi committed Sep 20, 2022
1 parent b9b0485 commit 5ac5fdb
Show file tree
Hide file tree
Showing 14 changed files with 78 additions and 53 deletions.
Expand Up @@ -36,7 +36,6 @@
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.hive.azure.HiveAzureModule;
import io.trino.plugin.hive.gcs.HiveGcsModule;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.s3.HiveS3Module;
import io.trino.plugin.iceberg.catalog.IcebergCatalogModule;
import io.trino.spi.NodeManager;
Expand Down Expand Up @@ -73,7 +72,7 @@ public static Connector createConnector(
Map<String, String> config,
ConnectorContext context,
Module module,
Optional<HiveMetastore> metastore,
Optional<Module> icebergCatalogModule,
Optional<TrinoFileSystemFactory> fileSystemFactory)
{
ClassLoader classLoader = InternalIcebergConnectorFactory.class.getClassLoader();
Expand All @@ -85,7 +84,7 @@ public static Connector createConnector(
new JsonModule(),
new IcebergModule(),
new IcebergSecurityModule(),
new IcebergCatalogModule(metastore),
icebergCatalogModule.orElse(new IcebergCatalogModule()),
new HdfsModule(),
new HiveS3Module(),
new HiveGcsModule(),
Expand Down
Expand Up @@ -15,53 +15,27 @@

import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.hive.metastore.DecoratedHiveMetastoreModule;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.plugin.hive.metastore.RawHiveMetastoreFactory;
import io.trino.plugin.iceberg.CatalogType;
import io.trino.plugin.iceberg.IcebergConfig;
import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.file.IcebergFileMetastoreCatalogModule;
import io.trino.plugin.iceberg.catalog.glue.IcebergGlueCatalogModule;
import io.trino.plugin.iceberg.catalog.hms.IcebergHiveMetastoreCatalogModule;
import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalogFactory;

import java.util.Optional;

import static io.airlift.configuration.ConditionalModule.conditionalModule;
import static io.trino.plugin.iceberg.CatalogType.GLUE;
import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE;
import static io.trino.plugin.iceberg.CatalogType.TESTING_FILE_METASTORE;
import static java.util.Objects.requireNonNull;

public class IcebergCatalogModule
extends AbstractConfigurationAwareModule
{
private final Optional<HiveMetastore> metastore;

public IcebergCatalogModule(Optional<HiveMetastore> metastore)
{
this.metastore = requireNonNull(metastore, "metastore is null");
}

@Override
protected void setup(Binder binder)
{
if (metastore.isPresent()) {
binder.bind(HiveMetastoreFactory.class).annotatedWith(RawHiveMetastoreFactory.class).toInstance(HiveMetastoreFactory.ofInstance(metastore.get()));
binder.bind(MetastoreValidator.class).asEagerSingleton();
install(new DecoratedHiveMetastoreModule());
binder.bind(IcebergTableOperationsProvider.class).to(FileMetastoreTableOperationsProvider.class).in(Scopes.SINGLETON);
binder.bind(TrinoCatalogFactory.class).to(TrinoHiveCatalogFactory.class).in(Scopes.SINGLETON);
}
else {
bindCatalogModule(HIVE_METASTORE, new IcebergHiveMetastoreCatalogModule());
bindCatalogModule(TESTING_FILE_METASTORE, new IcebergFileMetastoreCatalogModule());
bindCatalogModule(GLUE, new IcebergGlueCatalogModule());
}
bindCatalogModule(HIVE_METASTORE, new IcebergHiveMetastoreCatalogModule());
bindCatalogModule(TESTING_FILE_METASTORE, new IcebergFileMetastoreCatalogModule());
bindCatalogModule(GLUE, new IcebergGlueCatalogModule());
}

private void bindCatalogModule(CatalogType catalogType, Module module)
Expand Down
Expand Up @@ -20,6 +20,7 @@
import io.trino.metadata.MetadataManager;
import io.trino.plugin.hive.metastore.Database;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.iceberg.catalog.file.TestingIcebergFileMetastoreCatalogModule;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.spi.security.PrincipalType;
import io.trino.testing.AbstractTestQueryFramework;
Expand Down Expand Up @@ -70,7 +71,7 @@ protected QueryRunner createQueryRunner()
HiveMetastore metastore = createTestingFileHiveMetastore(metastoreDir);
localQueryRunner.createCatalog(
"iceberg",
new TestingIcebergConnectorFactory(Optional.of(metastore), Optional.empty(), EMPTY_MODULE),
new TestingIcebergConnectorFactory(Optional.of(new TestingIcebergFileMetastoreCatalogModule(metastore)), Optional.empty(), EMPTY_MODULE),
ImmutableMap.of());
Database database = Database.builder()
.setDatabaseName("tiny")
Expand Down
Expand Up @@ -21,6 +21,7 @@
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.iceberg.TrackingFileSystemFactory.OperationContext;
import io.trino.plugin.iceberg.TrackingFileSystemFactory.OperationType;
import io.trino.plugin.iceberg.catalog.file.TestingIcebergFileMetastoreCatalogModule;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
Expand Down Expand Up @@ -84,7 +85,7 @@ protected DistributedQueryRunner createQueryRunner()
HiveMetastore metastore = createTestingFileHiveMetastore(baseDir);

trackingFileSystemFactory = new TrackingFileSystemFactory(new HdfsFileSystemFactory(HDFS_ENVIRONMENT));
queryRunner.installPlugin(new TestingIcebergPlugin(Optional.of(metastore), Optional.of(trackingFileSystemFactory), EMPTY_MODULE));
queryRunner.installPlugin(new TestingIcebergPlugin(Optional.of(new TestingIcebergFileMetastoreCatalogModule(metastore)), Optional.of(trackingFileSystemFactory), EMPTY_MODULE));
queryRunner.createCatalog("iceberg", "iceberg");
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");
Expand Down
Expand Up @@ -26,9 +26,9 @@
import io.trino.metadata.QualifiedObjectName;
import io.trino.plugin.hive.NodeVersion;
import io.trino.plugin.hive.TestingHivePlugin;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.file.FileHiveMetastore;
import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig;
import io.trino.plugin.iceberg.catalog.file.TestingIcebergFileMetastoreCatalogModule;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.security.Identity;
import io.trino.spi.security.SelectedRole;
Expand All @@ -52,7 +52,7 @@
public class TestIcebergMetadataListing
extends AbstractTestQueryFramework
{
private HiveMetastore metastore;
private FileHiveMetastore metastore;
private SchemaTableName storageTable;

@Override
Expand Down Expand Up @@ -80,7 +80,7 @@ protected DistributedQueryRunner createQueryRunner()
.setCatalogDirectory(baseDir.toURI().toString())
.setMetastoreUser("test"));

queryRunner.installPlugin(new TestingIcebergPlugin(Optional.of(metastore), Optional.empty(), EMPTY_MODULE));
queryRunner.installPlugin(new TestingIcebergPlugin(Optional.of(new TestingIcebergFileMetastoreCatalogModule(metastore)), Optional.empty(), EMPTY_MODULE));
queryRunner.createCatalog("iceberg", "iceberg");
queryRunner.installPlugin(new TestingHivePlugin(metastore));
queryRunner.createCatalog("hive", "hive", ImmutableMap.of("hive.security", "sql-standard"));
Expand Down
Expand Up @@ -29,6 +29,7 @@
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.file.FileHiveMetastore;
import io.trino.plugin.hive.metastore.file.FileHiveMetastoreConfig;
import io.trino.plugin.iceberg.catalog.file.TestingIcebergFileMetastoreCatalogModule;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.DistributedQueryRunner;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -87,7 +88,7 @@ protected DistributedQueryRunner createQueryRunner()
.setCatalogDirectory(baseDir.toURI().toString())
.setMetastoreUser("test"));
metastore = new CountingAccessHiveMetastore(hiveMetastore);
queryRunner.installPlugin(new TestingIcebergPlugin(Optional.of(metastore), Optional.empty(), EMPTY_MODULE));
queryRunner.installPlugin(new TestingIcebergPlugin(Optional.of(new TestingIcebergFileMetastoreCatalogModule(metastore)), Optional.empty(), EMPTY_MODULE));
queryRunner.createCatalog("iceberg", "iceberg");

queryRunner.execute("CREATE SCHEMA test_schema");
Expand Down
Expand Up @@ -25,6 +25,7 @@
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.TrinoCatalog;
import io.trino.plugin.iceberg.catalog.file.FileMetastoreTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.file.TestingIcebergFileMetastoreCatalogModule;
import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalog;
import io.trino.plugin.tpch.TpchPlugin;
import io.trino.spi.connector.SchemaTableName;
Expand Down Expand Up @@ -99,7 +100,7 @@ protected QueryRunner createQueryRunner()
false,
false);

queryRunner.installPlugin(new TestingIcebergPlugin(Optional.of(metastore), Optional.empty(), EMPTY_MODULE));
queryRunner.installPlugin(new TestingIcebergPlugin(Optional.of(new TestingIcebergFileMetastoreCatalogModule(metastore)), Optional.empty(), EMPTY_MODULE));
queryRunner.createCatalog("iceberg", "iceberg");

queryRunner.installPlugin(new TpchPlugin());
Expand Down
Expand Up @@ -21,6 +21,7 @@
import io.trino.metadata.TableHandle;
import io.trino.plugin.hive.metastore.Database;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.iceberg.catalog.file.TestingIcebergFileMetastoreCatalogModule;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
Expand Down Expand Up @@ -84,7 +85,7 @@ protected LocalQueryRunner createLocalQueryRunner()

queryRunner.createCatalog(
CATALOG,
new TestingIcebergConnectorFactory(Optional.of(metastore), Optional.empty(), EMPTY_MODULE),
new TestingIcebergConnectorFactory(Optional.of(new TestingIcebergFileMetastoreCatalogModule(metastore)), Optional.empty(), EMPTY_MODULE),
ImmutableMap.of());

Database database = Database.builder()
Expand Down
Expand Up @@ -18,6 +18,7 @@
import io.trino.Session;
import io.trino.plugin.hive.metastore.Database;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.iceberg.catalog.file.TestingIcebergFileMetastoreCatalogModule;
import io.trino.spi.security.PrincipalType;
import io.trino.sql.planner.assertions.BasePushdownPlanTest;
import io.trino.sql.tree.LongLiteral;
Expand Down Expand Up @@ -66,7 +67,7 @@ protected LocalQueryRunner createLocalQueryRunner()

queryRunner.createCatalog(
ICEBERG_CATALOG,
new TestingIcebergConnectorFactory(Optional.of(metastore), Optional.empty(), EMPTY_MODULE),
new TestingIcebergConnectorFactory(Optional.of(new TestingIcebergFileMetastoreCatalogModule(metastore)), Optional.empty(), EMPTY_MODULE),
ImmutableMap.of());

Database database = Database.builder()
Expand Down
Expand Up @@ -15,7 +15,6 @@

import com.google.inject.Module;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorContext;
import io.trino.spi.connector.ConnectorFactory;
Expand All @@ -29,13 +28,13 @@
public class TestingIcebergConnectorFactory
implements ConnectorFactory
{
private final Optional<HiveMetastore> metastore;
private final Optional<Module> icebergCatalogModule;
private final Optional<TrinoFileSystemFactory> fileSystemFactory;
private final Module module;

public TestingIcebergConnectorFactory(Optional<HiveMetastore> metastore, Optional<TrinoFileSystemFactory> fileSystemFactory, Module module)
public TestingIcebergConnectorFactory(Optional<Module> icebergCatalogModule, Optional<TrinoFileSystemFactory> fileSystemFactory, Module module)
{
this.metastore = requireNonNull(metastore, "metastore is null");
this.icebergCatalogModule = requireNonNull(icebergCatalogModule, "icebergCatalogModule is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.module = requireNonNull(module, "module is null");
}
Expand All @@ -49,6 +48,6 @@ public String getName()
@Override
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
{
return createConnector(catalogName, config, context, module, metastore, fileSystemFactory);
return createConnector(catalogName, config, context, module, icebergCatalogModule, fileSystemFactory);
}
}
Expand Up @@ -16,7 +16,6 @@
import com.google.common.collect.ImmutableList;
import com.google.inject.Module;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.spi.connector.ConnectorFactory;

import java.util.List;
Expand All @@ -28,13 +27,13 @@
public class TestingIcebergPlugin
extends IcebergPlugin
{
private final Optional<HiveMetastore> metastore;
private final Optional<Module> icebergCatalogModule;
private final Optional<TrinoFileSystemFactory> fileSystemFactory;
private final Module module;

public TestingIcebergPlugin(Optional<HiveMetastore> metastore, Optional<TrinoFileSystemFactory> fileSystemFactory, Module module)
public TestingIcebergPlugin(Optional<Module> icebergCatalogModule, Optional<TrinoFileSystemFactory> fileSystemFactory, Module module)
{
this.metastore = requireNonNull(metastore, "metastore is null");
this.icebergCatalogModule = requireNonNull(icebergCatalogModule, "icebergCatalogModule is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.module = requireNonNull(module, "module is null");
}
Expand All @@ -45,6 +44,6 @@ public Iterable<ConnectorFactory> getConnectorFactories()
List<ConnectorFactory> connectorFactories = ImmutableList.copyOf(super.getConnectorFactories());
verify(connectorFactories.size() == 1, "Unexpected connector factories: %s", connectorFactories);

return ImmutableList.of(new TestingIcebergConnectorFactory(metastore, fileSystemFactory, module));
return ImmutableList.of(new TestingIcebergConnectorFactory(icebergCatalogModule, fileSystemFactory, module));
}
}
Expand Up @@ -87,7 +87,7 @@ public synchronized void replaceTable(String databaseName, String tableName, Tab

queryRunner.createCatalog(
ICEBERG_CATALOG,
new TestingIcebergConnectorFactory(Optional.of(metastore), Optional.empty(), EMPTY_MODULE),
new TestingIcebergConnectorFactory(Optional.of(new TestingIcebergFileMetastoreCatalogModule(metastore)), Optional.empty(), EMPTY_MODULE),
ImmutableMap.of());

Database database = Database.builder()
Expand Down
@@ -0,0 +1,47 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg.catalog.file;

import com.google.inject.Binder;
import com.google.inject.Scopes;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.hive.metastore.DecoratedHiveMetastoreModule;
import io.trino.plugin.hive.metastore.HiveMetastore;
import io.trino.plugin.hive.metastore.HiveMetastoreFactory;
import io.trino.plugin.hive.metastore.RawHiveMetastoreFactory;
import io.trino.plugin.iceberg.catalog.IcebergTableOperationsProvider;
import io.trino.plugin.iceberg.catalog.TrinoCatalogFactory;
import io.trino.plugin.iceberg.catalog.hms.TrinoHiveCatalogFactory;

import static java.util.Objects.requireNonNull;

public class TestingIcebergFileMetastoreCatalogModule
extends AbstractConfigurationAwareModule
{
private final HiveMetastore metastore;

public TestingIcebergFileMetastoreCatalogModule(HiveMetastore metastore)
{
this.metastore = requireNonNull(metastore, "metastore is null");
}

@Override
protected void setup(Binder binder)
{
binder.bind(HiveMetastoreFactory.class).annotatedWith(RawHiveMetastoreFactory.class).toInstance(HiveMetastoreFactory.ofInstance(metastore));
install(new DecoratedHiveMetastoreModule());
binder.bind(IcebergTableOperationsProvider.class).to(FileMetastoreTableOperationsProvider.class).in(Scopes.SINGLETON);
binder.bind(TrinoCatalogFactory.class).to(TrinoHiveCatalogFactory.class).in(Scopes.SINGLETON);
}
}
Expand Up @@ -35,6 +35,7 @@
import io.trino.plugin.iceberg.IcebergColumnHandle;
import io.trino.plugin.iceberg.IcebergTableHandle;
import io.trino.plugin.iceberg.TestingIcebergConnectorFactory;
import io.trino.plugin.iceberg.catalog.file.TestingIcebergFileMetastoreCatalogModule;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.security.PrincipalType;
Expand Down Expand Up @@ -136,7 +137,7 @@ protected Optional<LocalQueryRunner> createLocalQueryRunner()

queryRunner.createCatalog(
TEST_CATALOG_NAME,
new TestingIcebergConnectorFactory(Optional.of(metastore), Optional.empty(), EMPTY_MODULE),
new TestingIcebergConnectorFactory(Optional.of(new TestingIcebergFileMetastoreCatalogModule(metastore)), Optional.empty(), EMPTY_MODULE),
ImmutableMap.of());

return Optional.of(queryRunner);
Expand Down

0 comments on commit 5ac5fdb

Please sign in to comment.