Skip to content

Commit

Permalink
Rubix integration
Browse files Browse the repository at this point in the history
  • Loading branch information
shubhamtagra authored and dain committed Mar 18, 2020
1 parent 80fdb38 commit 540e14c
Show file tree
Hide file tree
Showing 9 changed files with 449 additions and 1 deletion.
7 changes: 7 additions & 0 deletions pom.xml
Expand Up @@ -53,6 +53,7 @@
<dep.tempto.version>173</dep.tempto.version>
<dep.gcs.version>2.0.0</dep.gcs.version>
<dep.errorprone.version>2.3.3</dep.errorprone.version>
<rubix.version>0.3.6</rubix.version>

<!--
America/Bahia_Banderas has:
Expand Down Expand Up @@ -1215,6 +1216,12 @@
<type>pom</type>
<scope>import</scope>
</dependency>

<dependency>
<groupId>com.qubole.rubix</groupId>
<artifactId>rubix-presto-shaded</artifactId>
<version>${rubix.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
5 changes: 5 additions & 0 deletions presto-hive/pom.xml
Expand Up @@ -281,6 +281,11 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.qubole.rubix</groupId>
<artifactId>rubix-presto-shaded</artifactId>
</dependency>

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand Down
Expand Up @@ -34,6 +34,9 @@
import io.prestosql.plugin.hive.metastore.HiveMetastore;
import io.prestosql.plugin.hive.metastore.HiveMetastoreModule;
import io.prestosql.plugin.hive.procedure.HiveProcedureModule;
import io.prestosql.plugin.hive.rubix.RubixConfig;
import io.prestosql.plugin.hive.rubix.RubixInitializer;
import io.prestosql.plugin.hive.rubix.RubixModule;
import io.prestosql.plugin.hive.s3.HiveS3Module;
import io.prestosql.plugin.hive.security.HiveSecurityModule;
import io.prestosql.plugin.hive.security.SystemTableAwareAccessControl;
Expand Down Expand Up @@ -84,6 +87,7 @@ public static Connector createConnector(String catalogName, Map<String, String>
new HiveS3Module(),
new HiveGcsModule(),
new HiveAzureModule(),
new RubixModule(),
new HiveMetastoreModule(metastore),
new HiveSecurityModule(catalogName),
new HiveAuthenticationModule(),
Expand All @@ -106,6 +110,12 @@ public static Connector createConnector(String catalogName, Map<String, String>
.setRequiredConfigurationProperties(config)
.initialize();

if (injector.getInstance(RubixConfig.class).isCacheEnabled()) {
// RubixInitializer needs ConfigurationInitializers, hence kept outside RubixModule
RubixInitializer rubixInitializer = injector.getInstance(RubixInitializer.class);
rubixInitializer.initializeRubix(context.getNodeManager());
}

LifeCycleManager lifeCycleManager = injector.getInstance(LifeCycleManager.class);
HiveMetadataFactory metadataFactory = injector.getInstance(HiveMetadataFactory.class);
HiveTransactionManager transactionManager = injector.getInstance(HiveTransactionManager.class);
Expand Down
@@ -0,0 +1,86 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.hive.rubix;

import com.qubole.rubix.spi.CacheConfig;
import io.airlift.configuration.Config;

public class RubixConfig
{
private boolean cacheEnabled;
private boolean parallelWarmupEnabled = true;
private String cacheLocation = "/tmp";
private int bookKeeperServerPort = CacheConfig.DEFAULT_BOOKKEEPER_SERVER_PORT;
private int dataTransferServerPort = CacheConfig.DEFAULT_DATA_TRANSFER_SERVER_PORT;

public boolean isCacheEnabled()
{
return cacheEnabled;
}

@Config("hive.cache.enabled")
public RubixConfig setCacheEnabled(boolean value)
{
this.cacheEnabled = value;
return this;
}

public boolean isParallelWarmupEnabled()
{
return parallelWarmupEnabled;
}

@Config("hive.cache.parallel-warmup-enabled")
public RubixConfig setParallelWarmupEnabled(boolean value)
{
this.parallelWarmupEnabled = value;
return this;
}

public String getCacheLocation()
{
return cacheLocation;
}

@Config("hive.cache.location")
public RubixConfig setCacheLocation(String location)
{
this.cacheLocation = location;
return this;
}

public int getBookKeeperServerPort()
{
return bookKeeperServerPort;
}

@Config("hive.cache.rubix-bookkeeper-port")
public RubixConfig setBookKeeperServerPort(int port)
{
this.bookKeeperServerPort = port;
return this;
}

public int getDataTransferServerPort()
{
return dataTransferServerPort;
}

@Config("hive.cache.rubix-data-transfer-port")
public RubixConfig setDataTransferServerPort(int port)
{
this.dataTransferServerPort = port;
return this;
}
}
@@ -0,0 +1,130 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.hive.rubix;

import com.qubole.rubix.prestosql.CachingPrestoGoogleHadoopFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoNativeAzureFileSystem;
import com.qubole.rubix.prestosql.CachingPrestoS3FileSystem;
import com.qubole.rubix.prestosql.PrestoClusterManager;
import io.prestosql.plugin.hive.ConfigurationInitializer;
import io.prestosql.spi.HostAddress;
import org.apache.hadoop.conf.Configuration;

import javax.inject.Inject;

import static com.google.common.base.Preconditions.checkState;
import static com.qubole.rubix.spi.CacheConfig.enableHeartbeat;
import static com.qubole.rubix.spi.CacheConfig.setBookKeeperServerPort;
import static com.qubole.rubix.spi.CacheConfig.setCacheDataDirPrefix;
import static com.qubole.rubix.spi.CacheConfig.setCacheDataEnabled;
import static com.qubole.rubix.spi.CacheConfig.setClusterNodeRefreshTime;
import static com.qubole.rubix.spi.CacheConfig.setClusterNodesFetchRetryCount;
import static com.qubole.rubix.spi.CacheConfig.setCoordinatorHostName;
import static com.qubole.rubix.spi.CacheConfig.setCurrentNodeHostName;
import static com.qubole.rubix.spi.CacheConfig.setDataTransferServerPort;
import static com.qubole.rubix.spi.CacheConfig.setEmbeddedMode;
import static com.qubole.rubix.spi.CacheConfig.setIsParallelWarmupEnabled;
import static com.qubole.rubix.spi.CacheConfig.setOnMaster;
import static com.qubole.rubix.spi.CacheConfig.setRubixClusterType;
import static com.qubole.rubix.spi.CacheConfig.setWorkerNodeInfoExpiryPeriod;
import static com.qubole.rubix.spi.ClusterType.PRESTOSQL_CLUSTER_MANAGER;

public class RubixConfigurationInitializer
implements ConfigurationInitializer
{
private static final String RUBIX_S3_FS_CLASS_NAME = CachingPrestoS3FileSystem.class.getName();
private static final String RUBIX_AZURE_FS_CLASS_NAME = CachingPrestoNativeAzureFileSystem.class.getName();
private static final String RUBIX_GS_FS_CLASS_NAME = CachingPrestoGoogleHadoopFileSystem.class.getName();

private final boolean parallelWarmupEnabled;
private final String cacheLocation;
private final int bookKeeperServerPort;
private final int dataTransferServerPort;

// Configs below are dependent on node joining the cluster
private boolean cacheNotReady = true;
private boolean isMaster;
private HostAddress masterAddress;
private String nodeAddress;

@Inject
public RubixConfigurationInitializer(RubixConfig config)
{
this.parallelWarmupEnabled = config.isParallelWarmupEnabled();
this.cacheLocation = config.getCacheLocation();
this.bookKeeperServerPort = config.getBookKeeperServerPort();
this.dataTransferServerPort = config.getDataTransferServerPort();
}

@Override
public void initializeConfiguration(Configuration config)
{
if (cacheNotReady) {
setCacheDataEnabled(config, false);
return;
}

updateConfiguration(config);
}

public Configuration updateConfiguration(Configuration config)
{
checkState(masterAddress != null, "masterAddress is not set");
setCacheDataEnabled(config, true);
setOnMaster(config, isMaster);
setCoordinatorHostName(config, masterAddress.getHostText());
PrestoClusterManager.setPrestoServerPort(config, masterAddress.getPort());
setCurrentNodeHostName(config, nodeAddress);

setIsParallelWarmupEnabled(config, parallelWarmupEnabled);
setCacheDataDirPrefix(config, cacheLocation);
setBookKeeperServerPort(config, bookKeeperServerPort);
setDataTransferServerPort(config, dataTransferServerPort);

setEmbeddedMode(config, true);
setRubixClusterType(config, PRESTOSQL_CLUSTER_MANAGER);
enableHeartbeat(config, false);
setClusterNodeRefreshTime(config, 10);
setClusterNodesFetchRetryCount(config, Integer.MAX_VALUE);
setWorkerNodeInfoExpiryPeriod(config, 1);

config.set("fs.s3.impl", RUBIX_S3_FS_CLASS_NAME);
config.set("fs.s3a.impl", RUBIX_S3_FS_CLASS_NAME);
config.set("fs.s3n.impl", RUBIX_S3_FS_CLASS_NAME);
config.set("fs.wasb.impl", RUBIX_AZURE_FS_CLASS_NAME);
config.set("fs.gs.impl", RUBIX_GS_FS_CLASS_NAME);
return config;
}

public void setMaster(boolean master)
{
isMaster = master;
}

public void setMasterAddress(HostAddress masterAddress)
{
this.masterAddress = masterAddress;
}

public void setCurrentNodeAddress(String nodeAddress)
{
this.nodeAddress = nodeAddress;
}

public void initializationDone()
{
checkState(masterAddress != null, "masterAddress is not set");
cacheNotReady = false;
}
}
@@ -0,0 +1,110 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.prestosql.plugin.hive.rubix;

import com.codahale.metrics.MetricRegistry;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.qubole.rubix.bookkeeper.BookKeeper;
import com.qubole.rubix.bookkeeper.BookKeeperServer;
import com.qubole.rubix.bookkeeper.LocalDataTransferServer;
import com.qubole.rubix.core.CachingFileSystem;
import io.airlift.log.Logger;
import io.prestosql.plugin.hive.ConfigurationInitializer;
import io.prestosql.plugin.hive.HiveCatalogName;
import io.prestosql.spi.Node;
import io.prestosql.spi.NodeManager;
import org.apache.hadoop.conf.Configuration;

import javax.inject.Inject;

import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/*
* Responsibilities of this initializer:
* 1. Lazily setup RubixConfigurationInitializer with information about master when it is available
* 2. Start Rubix Servers.
* 3. Inject BookKeeper object into CachingFileSystem class
*/
public class RubixInitializer
{
private static final Logger log = Logger.get(RubixInitializer.class);

private final HiveCatalogName hiveCatalogName;
private final RubixConfigurationInitializer rubixConfigurationInitializer;
private final Set<ConfigurationInitializer> configurationInitializers;

@Inject
public RubixInitializer(HiveCatalogName hiveCatalogName, RubixConfigurationInitializer rubixConfigurationInitializer, Set<ConfigurationInitializer> configurationInitializers)
{
this.hiveCatalogName = hiveCatalogName;
this.rubixConfigurationInitializer = rubixConfigurationInitializer;
this.configurationInitializers = configurationInitializers;
}

public void initializeRubix(NodeManager nodeManager)
{
ExecutorService initializerService = Executors.newSingleThreadExecutor();
ListenableFuture<Boolean> nodeJoinFuture = MoreExecutors.listeningDecorator(initializerService).submit(() ->
{
while (!(nodeManager.getAllNodes().contains(nodeManager.getCurrentNode()) &&
nodeManager.getAllNodes().stream().anyMatch(Node::isCoordinator))) {
try {
Thread.sleep(100);
}
catch (InterruptedException e) {
return false;
}
}
return true;
});

Futures.transform(nodeJoinFuture,
nodeJoined ->
{
if (nodeJoined) {
Node master = nodeManager.getAllNodes().stream().filter(node -> node.isCoordinator()).findFirst().get();
boolean isMaster = nodeManager.getCurrentNode().isCoordinator();

rubixConfigurationInitializer.setMaster(isMaster);
rubixConfigurationInitializer.setMasterAddress(master.getHostAndPort());
rubixConfigurationInitializer.setCurrentNodeAddress(nodeManager.getCurrentNode().getHost());

Configuration configuration = new Configuration(false);
for (ConfigurationInitializer configurationInitializer : configurationInitializers) {
configurationInitializer.initializeConfiguration(configuration);
}

// RubixConfigurationInitializer.initializeConfiguration will not update configurations yet as it has not been fully initialized
// Apply configurations from it by skipping init check
rubixConfigurationInitializer.updateConfiguration(configuration);
MetricRegistry metricRegistry = new MetricRegistry();
BookKeeperServer bookKeeperServer = new BookKeeperServer();
BookKeeper bookKeeper = bookKeeperServer.startServer(configuration, metricRegistry);
LocalDataTransferServer.startServer(configuration, metricRegistry, bookKeeper);

CachingFileSystem.setLocalBookKeeper(bookKeeper, "catalog=" + hiveCatalogName);
log.info("Rubix initialized successfully");
rubixConfigurationInitializer.initializationDone();
}

// In case of node join failing, let the Rubix cache be in default disabled state
return null;
},
initializerService);
}
}

0 comments on commit 540e14c

Please sign in to comment.