Skip to content

Commit

Permalink
Use environment from context in DB resource group manager
Browse files Browse the repository at this point in the history
  • Loading branch information
nezihyigitbasi committed Nov 7, 2017
1 parent edef5e5 commit d03d945
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 17 deletions.
5 changes: 0 additions & 5 deletions presto-resource-group-managers/pom.xml
Expand Up @@ -52,11 +52,6 @@
<artifactId>log</artifactId> <artifactId>log</artifactId>
</dependency> </dependency>


<dependency>
<groupId>io.airlift</groupId>
<artifactId>node</artifactId>
</dependency>

<dependency> <dependency>
<groupId>com.google.inject</groupId> <groupId>com.google.inject</groupId>
<artifactId>guice</artifactId> <artifactId>guice</artifactId>
Expand Down
Expand Up @@ -28,7 +28,6 @@
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import io.airlift.log.Logger; import io.airlift.log.Logger;
import io.airlift.node.NodeInfo;
import io.airlift.units.Duration; import io.airlift.units.Duration;


import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
Expand Down Expand Up @@ -71,15 +70,15 @@ public class DbResourceGroupConfigurationManager
private final AtomicReference<Optional<Duration>> cpuQuotaPeriod = new AtomicReference<>(Optional.empty()); private final AtomicReference<Optional<Duration>> cpuQuotaPeriod = new AtomicReference<>(Optional.empty());
private final ScheduledExecutorService configExecutor = newSingleThreadScheduledExecutor(daemonThreadsNamed("DbResourceGroupConfigurationManager")); private final ScheduledExecutorService configExecutor = newSingleThreadScheduledExecutor(daemonThreadsNamed("DbResourceGroupConfigurationManager"));
private final AtomicBoolean started = new AtomicBoolean(); private final AtomicBoolean started = new AtomicBoolean();
private final NodeInfo nodeInfo; private final String environment;


@Inject @Inject
public DbResourceGroupConfigurationManager(ClusterMemoryPoolManager memoryPoolManager, ResourceGroupsDao dao, NodeInfo nodeInfo) public DbResourceGroupConfigurationManager(ClusterMemoryPoolManager memoryPoolManager, ResourceGroupsDao dao, @ForEnvironment String environment)
{ {
super(memoryPoolManager); super(memoryPoolManager);
requireNonNull(memoryPoolManager, "memoryPoolManager is null"); requireNonNull(memoryPoolManager, "memoryPoolManager is null");
requireNonNull(dao, "daoProvider is null"); requireNonNull(dao, "daoProvider is null");
this.nodeInfo = requireNonNull(nodeInfo, "nodeInfo is null"); this.environment = requireNonNull(environment, "environment is null");
this.dao = dao; this.dao = dao;
this.dao.createResourceGroupsGlobalPropertiesTable(); this.dao.createResourceGroupsGlobalPropertiesTable();
this.dao.createResourceGroupsTable(); this.dao.createResourceGroupsTable();
Expand Down Expand Up @@ -174,7 +173,7 @@ private synchronized void populateFromDbHelper(Map<Long, ResourceGroupSpecBuilde
Map<Long, ResourceGroupIdTemplate> resourceGroupIdTemplateMap, Map<Long, ResourceGroupIdTemplate> resourceGroupIdTemplateMap,
Map<Long, Set<Long>> subGroupIdsToBuild) Map<Long, Set<Long>> subGroupIdsToBuild)
{ {
List<ResourceGroupSpecBuilder> records = dao.getResourceGroups(nodeInfo.getEnvironment()); List<ResourceGroupSpecBuilder> records = dao.getResourceGroups(environment);
for (ResourceGroupSpecBuilder record : records) { for (ResourceGroupSpecBuilder record : records) {
recordMap.put(record.getId(), record); recordMap.put(record.getId(), record);
if (!record.getParentId().isPresent()) { if (!record.getParentId().isPresent()) {
Expand Down
Expand Up @@ -50,6 +50,7 @@ public ResourceGroupConfigurationManager create(Map<String, String> config, Reso
Bootstrap app = new Bootstrap( Bootstrap app = new Bootstrap(
new JsonModule(), new JsonModule(),
new DbResourceGroupsModule(), new DbResourceGroupsModule(),
binder -> binder.bind(String.class).annotatedWith(ForEnvironment.class).toInstance(context.getEnvironment()),
binder -> binder.bind(ClusterMemoryPoolManager.class).toInstance(context.getMemoryPoolManager())); binder -> binder.bind(ClusterMemoryPoolManager.class).toInstance(context.getMemoryPoolManager()));


Injector injector = app Injector injector = app
Expand Down
@@ -0,0 +1,31 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.resourceGroups.db;

import javax.inject.Qualifier;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@Qualifier
public @interface ForEnvironment
{
}
Expand Up @@ -20,7 +20,6 @@
import com.facebook.presto.spi.resourceGroups.SchedulingPolicy; import com.facebook.presto.spi.resourceGroups.SchedulingPolicy;
import com.facebook.presto.spi.resourceGroups.SelectionContext; import com.facebook.presto.spi.resourceGroups.SelectionContext;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import io.airlift.node.NodeInfo;
import io.airlift.units.DataSize; import io.airlift.units.DataSize;
import io.airlift.units.Duration; import io.airlift.units.Duration;
import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException; import org.skife.jdbi.v2.exceptions.UnableToExecuteStatementException;
Expand All @@ -44,7 +43,6 @@
public class TestDbResourceGroupConfigurationManager public class TestDbResourceGroupConfigurationManager
{ {
private static final String ENVIRONMENT = "test"; private static final String ENVIRONMENT = "test";
private static final NodeInfo NODE_INFO = new NodeInfo(ENVIRONMENT);


static H2DaoProvider setup(String prefix) static H2DaoProvider setup(String prefix)
{ {
Expand All @@ -70,7 +68,7 @@ public void testEnvironments()
dao.insertSelector(2, ".*dev_user.*", null, null); dao.insertSelector(2, ".*dev_user.*", null, null);


// check the prod configuration // check the prod configuration
DbResourceGroupConfigurationManager manager = new DbResourceGroupConfigurationManager((poolId, listener) -> {}, daoProvider.get(), new NodeInfo(prodEnvironment)); DbResourceGroupConfigurationManager manager = new DbResourceGroupConfigurationManager((poolId, listener) -> {}, daoProvider.get(), prodEnvironment);
List<ResourceGroupSpec> groups = manager.getRootGroups(); List<ResourceGroupSpec> groups = manager.getRootGroups();
assertEquals(groups.size(), 1); assertEquals(groups.size(), 1);
InternalResourceGroup prodGlobal = new InternalResourceGroup.RootInternalResourceGroup("prod_global", (group, export) -> {}, directExecutor()); InternalResourceGroup prodGlobal = new InternalResourceGroup.RootInternalResourceGroup("prod_global", (group, export) -> {}, directExecutor());
Expand All @@ -83,7 +81,7 @@ public void testEnvironments()
assertEquals(prodResourceGroupId.get().toString(), "prod_global"); assertEquals(prodResourceGroupId.get().toString(), "prod_global");


// check the dev configuration // check the dev configuration
manager = new DbResourceGroupConfigurationManager((poolId, listener) -> {}, daoProvider.get(), new NodeInfo(devEnvironment)); manager = new DbResourceGroupConfigurationManager((poolId, listener) -> {}, daoProvider.get(), devEnvironment);
assertEquals(groups.size(), 1); assertEquals(groups.size(), 1);
InternalResourceGroup devGlobal = new InternalResourceGroup.RootInternalResourceGroup("dev_global", (group, export) -> {}, directExecutor()); InternalResourceGroup devGlobal = new InternalResourceGroup.RootInternalResourceGroup("dev_global", (group, export) -> {}, directExecutor());
manager.configure(devGlobal, new SelectionContext(true, "user", Optional.empty(), ImmutableSet.of(), 1, Optional.empty())); manager.configure(devGlobal, new SelectionContext(true, "user", Optional.empty(), ImmutableSet.of(), 1, Optional.empty()));
Expand All @@ -107,7 +105,7 @@ public void testConfiguration()
dao.insertResourceGroup(1, "global", "1MB", 1000, 100, 100, "weighted", null, true, "1h", "1d", "1h", "1h", null, ENVIRONMENT); dao.insertResourceGroup(1, "global", "1MB", 1000, 100, 100, "weighted", null, true, "1h", "1d", "1h", "1h", null, ENVIRONMENT);
dao.insertResourceGroup(2, "sub", "2MB", 4, 3, 3, null, 5, null, null, null, "1h", "1h", 1L, ENVIRONMENT); dao.insertResourceGroup(2, "sub", "2MB", 4, 3, 3, null, 5, null, null, null, "1h", "1h", 1L, ENVIRONMENT);
dao.insertSelector(2, null, null, null); dao.insertSelector(2, null, null, null);
DbResourceGroupConfigurationManager manager = new DbResourceGroupConfigurationManager((poolId, listener) -> {}, daoProvider.get(), NODE_INFO); DbResourceGroupConfigurationManager manager = new DbResourceGroupConfigurationManager((poolId, listener) -> {}, daoProvider.get(), ENVIRONMENT);
AtomicBoolean exported = new AtomicBoolean(); AtomicBoolean exported = new AtomicBoolean();
InternalResourceGroup global = new InternalResourceGroup.RootInternalResourceGroup("global", (group, export) -> exported.set(export), directExecutor()); InternalResourceGroup global = new InternalResourceGroup.RootInternalResourceGroup("global", (group, export) -> exported.set(export), directExecutor());
manager.configure(global, new SelectionContext(true, "user", Optional.empty(), ImmutableSet.of(), 1, Optional.empty())); manager.configure(global, new SelectionContext(true, "user", Optional.empty(), ImmutableSet.of(), 1, Optional.empty()));
Expand Down Expand Up @@ -168,7 +166,7 @@ public void testMissing()
dao.insertResourceGroup(2, "sub", "2MB", 4, 3, 3, null, 5, null, null, null, null, null, 1L, ENVIRONMENT); dao.insertResourceGroup(2, "sub", "2MB", 4, 3, 3, null, 5, null, null, null, null, null, 1L, ENVIRONMENT);
dao.insertResourceGroupsGlobalProperties("cpu_quota_period", "1h"); dao.insertResourceGroupsGlobalProperties("cpu_quota_period", "1h");
dao.insertSelector(2, null, null, null); dao.insertSelector(2, null, null, null);
DbResourceGroupConfigurationManager manager = new DbResourceGroupConfigurationManager((poolId, listener) -> {}, daoProvider.get(), NODE_INFO); DbResourceGroupConfigurationManager manager = new DbResourceGroupConfigurationManager((poolId, listener) -> {}, daoProvider.get(), ENVIRONMENT);
InternalResourceGroup missing = new InternalResourceGroup.RootInternalResourceGroup("missing", (group, export) -> {}, directExecutor()); InternalResourceGroup missing = new InternalResourceGroup.RootInternalResourceGroup("missing", (group, export) -> {}, directExecutor());
manager.configure(missing, new SelectionContext(true, "user", Optional.empty(), ImmutableSet.of(), 1, Optional.empty())); manager.configure(missing, new SelectionContext(true, "user", Optional.empty(), ImmutableSet.of(), 1, Optional.empty()));
} }
Expand All @@ -186,7 +184,7 @@ public void testReconfig()
dao.insertResourceGroup(2, "sub", "2MB", 4, 3, 3, null, 5, null, null, null, null, null, 1L, ENVIRONMENT); dao.insertResourceGroup(2, "sub", "2MB", 4, 3, 3, null, 5, null, null, null, null, null, 1L, ENVIRONMENT);
dao.insertSelector(2, null, null, null); dao.insertSelector(2, null, null, null);
dao.insertResourceGroupsGlobalProperties("cpu_quota_period", "1h"); dao.insertResourceGroupsGlobalProperties("cpu_quota_period", "1h");
DbResourceGroupConfigurationManager manager = new DbResourceGroupConfigurationManager((poolId, listener) -> {}, daoProvider.get(), NODE_INFO); DbResourceGroupConfigurationManager manager = new DbResourceGroupConfigurationManager((poolId, listener) -> {}, daoProvider.get(), ENVIRONMENT);
manager.start(); manager.start();
AtomicBoolean exported = new AtomicBoolean(); AtomicBoolean exported = new AtomicBoolean();
InternalResourceGroup global = new InternalResourceGroup.RootInternalResourceGroup("global", (group, export) -> exported.set(export), directExecutor()); InternalResourceGroup global = new InternalResourceGroup.RootInternalResourceGroup("global", (group, export) -> exported.set(export), directExecutor());
Expand Down
5 changes: 5 additions & 0 deletions presto-tests/pom.xml
Expand Up @@ -122,6 +122,11 @@
<artifactId>testing</artifactId> <artifactId>testing</artifactId>
</dependency> </dependency>


<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
</dependency>

<dependency> <dependency>
<groupId>com.squareup.okhttp3</groupId> <groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId> <artifactId>okhttp</artifactId>
Expand Down
Expand Up @@ -53,6 +53,7 @@ public ResourceGroupConfigurationManager create(Map<String, String> config, Reso
new JsonModule(), new JsonModule(),
new H2ResourceGroupsModule(), new H2ResourceGroupsModule(),
new NodeModule(), new NodeModule(),
binder -> binder.bind(ResourceGroupConfigurationManagerContext.class).toInstance(context),
binder -> binder.bind(ClusterMemoryPoolManager.class).toInstance(context.getMemoryPoolManager())); binder -> binder.bind(ClusterMemoryPoolManager.class).toInstance(context.getMemoryPoolManager()));


Injector injector = app Injector injector = app
Expand Down
Expand Up @@ -15,13 +15,18 @@


import com.facebook.presto.resourceGroups.db.DbResourceGroupConfig; import com.facebook.presto.resourceGroups.db.DbResourceGroupConfig;
import com.facebook.presto.resourceGroups.db.DbResourceGroupConfigurationManager; import com.facebook.presto.resourceGroups.db.DbResourceGroupConfigurationManager;
import com.facebook.presto.resourceGroups.db.ForEnvironment;
import com.facebook.presto.resourceGroups.db.H2DaoProvider; import com.facebook.presto.resourceGroups.db.H2DaoProvider;
import com.facebook.presto.resourceGroups.db.ResourceGroupsDao; import com.facebook.presto.resourceGroups.db.ResourceGroupsDao;
import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManager; import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManager;
import com.facebook.presto.spi.resourceGroups.ResourceGroupConfigurationManagerContext;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Module; import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Scopes; import com.google.inject.Scopes;


import javax.inject.Singleton;

import static io.airlift.configuration.ConfigBinder.configBinder; import static io.airlift.configuration.ConfigBinder.configBinder;


public class H2ResourceGroupsModule public class H2ResourceGroupsModule
Expand All @@ -35,4 +40,12 @@ public void configure(Binder binder)
binder.bind(DbResourceGroupConfigurationManager.class).in(Scopes.SINGLETON); binder.bind(DbResourceGroupConfigurationManager.class).in(Scopes.SINGLETON);
binder.bind(ResourceGroupConfigurationManager.class).to(DbResourceGroupConfigurationManager.class).in(Scopes.SINGLETON); binder.bind(ResourceGroupConfigurationManager.class).to(DbResourceGroupConfigurationManager.class).in(Scopes.SINGLETON);
} }

@Provides
@Singleton
@ForEnvironment
public String getEnvironment(ResourceGroupConfigurationManagerContext context)
{
return context.getEnvironment();
}
} }

0 comments on commit d03d945

Please sign in to comment.