Skip to content

Commit

Permalink
Merge pull request apache#12 from piyushnarang/criteo-1.9
Browse files Browse the repository at this point in the history
Cherry-pick fixes to criteo-1.9 fork
  • Loading branch information
JTaky committed Nov 3, 2019
2 parents 0af24d7 + cfad98b commit 2b52e2a
Show file tree
Hide file tree
Showing 10 changed files with 161 additions and 15 deletions.
10 changes: 10 additions & 0 deletions docs/_includes/generated/mesos_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
<td style="word-wrap: break-word;">true</td>
<td>Enables SSL for the Flink artifact server. Note that security.ssl.enabled also needs to be set to true encryption to enable encryption.</td>
</tr>
<tr>
<td><h5>mesos.resourcemanager.declined-offer-refuse-duration</h5></td>
<td style="word-wrap: break-word;">5000</td>
<td>Amount of time to ask the Mesos master to not resend a declined resource offer again. This ensures a declined resource offer isn't resent immediately after being declined</td>
</tr>
<tr>
<td><h5>mesos.resourcemanager.framework.name</h5></td>
<td style="word-wrap: break-word;">"Flink"</td>
Expand Down Expand Up @@ -57,5 +62,10 @@
<td style="word-wrap: break-word;">(none)</td>
<td>Comma-separated list of configuration keys which represent a configurable port. All port keys will dynamically get a port assigned through Mesos.</td>
</tr>
<tr>
<td><h5>mesos.resourcemanager.unused-offer-expiration</h5></td>
<td style="word-wrap: break-word;">120000</td>
<td>Amount of time to wait for unused expired offers before declining them. This ensures your scheduler will not hoard unuseful offers.</td>
</tr>
</tbody>
</table>
10 changes: 10 additions & 0 deletions docs/_includes/generated/yarn_config_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -72,5 +72,15 @@
<td style="word-wrap: break-word;">(none)</td>
<td>A comma-separated list of tags to apply to the Flink YARN application.</td>
</tr>
<tr>
<td><h5>yarn.view.acls</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Users and groups to give VIEW acess. The ACLs are of for comma-separated-usersspacecomma-separated-groups</td>
</tr>
<tr>
<td><h5>yarn.admin.acls</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Users and groups to give MODIFY acess. The ACLs are of for comma-separated-usersspacecomma-separated-groups</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,30 @@ public class MesosOptions {
"All port keys will dynamically get a port assigned through Mesos.")
.build());

/**
* Config parameter to configure the amount of time to wait for unused expired Mesos
* offers before they are declined.
*/
public static final ConfigOption<Long> UNUSED_OFFER_EXPIRATION =
key("mesos.resourcemanager.unused-offer-expiration")
.defaultValue(120000L)
.withDescription(
Description.builder()
.text("Amount of time to wait for unused expired offers before declining them. " +
"This ensures your scheduler will not hoard unuseful offers.")
.build());

/**
* Config parameter to configure the amount of time refuse a particular offer for.
* This ensures the same resource offer isn't resent immediately after declining.
*/
public static final ConfigOption<Long> DECLINED_OFFER_REFUSE_DURATION =
key("mesos.resourcemanager.declined-offer-refuse-duration")
.defaultValue(5000L)
.withDescription(
Description.builder()
.text("Amount of time to ask the Mesos master to not resend a " +
"declined resource offer again. This ensures a declined resource offer " +
"isn't resent immediately after being declined")
.build());
}
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,18 @@ public TaskSchedulerBuilder withLeaseRejectAction(Action1<VirtualMachineLease> a
return this;
}

@Override
public TaskSchedulerBuilder withRejectAllExpiredOffers() {
builder.withRejectAllExpiredOffers();
return this;
}

@Override
public TaskSchedulerBuilder withLeaseOfferExpirySecs(long leaseOfferExpirySecs) {
builder.withLeaseOfferExpirySecs(leaseOfferExpirySecs);
return this;
}

@Override
public TaskScheduler build() {
return builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ public interface TaskSchedulerBuilder {
*/
TaskSchedulerBuilder withLeaseRejectAction(Action1<VirtualMachineLease> action);

/**
* Set up TaskScheduler to reject all offers on expiry.
*/
TaskSchedulerBuilder withRejectAllExpiredOffers();

/**
* Specify the expiration time for unused resource offers.
*/
TaskSchedulerBuilder withLeaseOfferExpirySecs(long leaseOfferExpirySecs);

/**
* Build a Fenzo task scheduler.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,24 @@
package org.apache.flink.mesos.scheduler

import java.util.Collections
import java.util.concurrent.TimeUnit

import akka.actor.{Actor, ActorRef, FSM, Props}
import com.netflix.fenzo._
import com.netflix.fenzo.functions.Action1
import grizzled.slf4j.Logger
import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2}
import org.apache.flink.api.java.tuple.{Tuple2 => FlinkTuple2}
import org.apache.flink.configuration.Configuration
import org.apache.flink.mesos.Utils
import org.apache.flink.mesos.scheduler.LaunchCoordinator._
import org.apache.flink.mesos.scheduler.messages._
import org.apache.flink.mesos.util.MesosResourceAllocation
import org.apache.mesos.{SchedulerDriver, Protos}
import org.apache.mesos.{Protos, SchedulerDriver}

import scala.collection.JavaConverters._
import scala.collection.mutable.{Map => MutableMap}
import scala.concurrent.duration._
import org.apache.flink.mesos.configuration.MesosOptions._

/**
* The launch coordinator handles offer processing, including
Expand All @@ -54,6 +56,15 @@ class LaunchCoordinator(

val LOG = Logger(getClass)

val declineOfferFilters: Protos.Filters =
Protos.Filters.newBuilder()
.setRefuseSeconds(
Duration(config.getLong(DECLINED_OFFER_REFUSE_DURATION), TimeUnit.MILLISECONDS).toSeconds)
.build()

val unusedOfferExpirationDuration: Long =
Duration(config.getLong(UNUSED_OFFER_EXPIRATION), TimeUnit.MILLISECONDS).toSeconds

/**
* The task placement optimizer.
*
Expand All @@ -66,10 +77,15 @@ class LaunchCoordinator(
.withLeaseRejectAction(new Action1[VirtualMachineLease]() {
def call(lease: VirtualMachineLease) {
LOG.info(s"Declined offer ${lease.getId} from ${lease.hostname()} "
+ s"of ${lease.memoryMB()} MB, ${lease.cpuCores()} cpus.")
schedulerDriver.declineOffer(lease.getOffer.getId)
+ s"of memory ${lease.memoryMB()} MB, ${lease.cpuCores()} cpus, "
+ s"${lease.getScalarValue("gpus")} gpus, "
+ s"of disk: ${lease.diskMB()} MB, network: ${lease.networkMbps()} Mbps "
+ s"for the next ${declineOfferFilters.getRefuseSeconds} seconds")
schedulerDriver.declineOffer(lease.getOffer.getId, declineOfferFilters)
}
}).build
})
.withLeaseOfferExpirySecs(unusedOfferExpirationDuration)
.withRejectAllExpiredOffers().build
}

override def postStop(): Unit = {
Expand Down Expand Up @@ -109,7 +125,9 @@ class LaunchCoordinator(
case Event(offers: ResourceOffers, data: GatherData) =>
// decline any offers that come in
schedulerDriver.suppressOffers()
for(offer <- offers.offers().asScala) { schedulerDriver.declineOffer(offer.getId) }
for(offer <- offers.offers().asScala) {
schedulerDriver.declineOffer(offer.getId, declineOfferFilters)
}
stay()

case Event(msg: Launch, data: GatherData) =>
Expand Down Expand Up @@ -150,15 +168,18 @@ class LaunchCoordinator(
case Event(offers: ResourceOffers, data: GatherData) =>
val leases = offers.offers().asScala.map(new Offer(_))
if(LOG.isInfoEnabled) {
val (cpus, gpus, mem) = leases.foldLeft((0.0,0.0,0.0)) {
(z,o) => (z._1 + o.cpuCores(), z._2 + o.gpus(), z._3 + o.memoryMB())
val (cpus, gpus, mem, disk, network) = leases.foldLeft((0.0,0.0,0.0, 0.0, 0.0)) {
(z,o) => (z._1 + o.cpuCores(), z._2 + o.gpus(), z._3 + o.memoryMB(),
z._4 + o.diskMB(), z._5 + o.networkMbps())
}
LOG.info(s"Received offer(s) of $mem MB, $cpus cpus, $gpus gpus:")
LOG.info(s"Received offer(s) of $mem MB, $cpus cpus, $gpus gpus, " +
s"$disk disk MB, $network Mbps")
for(l <- leases) {
val reservations = l.getResources.asScala.map(_.getRole).toSet
LOG.info(
s" ${l.getId} from ${l.hostname()} of ${l.memoryMB()} MB," +
s" ${l.cpuCores()} cpus, ${l.gpus()} gpus" +
s" ${l.diskMB()} disk MB, ${l.networkMbps()} Mbps" +
s" for ${reservations.mkString("[", ",", "]")}")
}
}
Expand All @@ -180,7 +201,8 @@ class LaunchCoordinator(
for(vm <- optimizer.getVmCurrentStates.asScala) {
val lease = vm.getCurrAvailableResources
LOG.info(s" ${vm.getHostname} has ${lease.memoryMB()} MB," +
s" ${lease.cpuCores()} cpus, ${lease.getScalarValue("gpus")} gpus")
s" ${lease.cpuCores()} cpus, ${lease.getScalarValue("gpus")} gpus" +
s" ${lease.diskMB()} disk MB, ${lease.networkMbps()} Mbps")
}
}
log.debug(result.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.mesos.scheduler

import java.util.concurrent.TimeUnit
import java.util.{Collections, UUID}
import java.util.concurrent.atomic.AtomicReference

Expand All @@ -27,13 +28,13 @@ import akka.testkit._
import com.netflix.fenzo.TaskRequest.{AssignedResources, NamedResourceSetRequest}
import com.netflix.fenzo._
import com.netflix.fenzo.functions.{Action1, Action2}
import org.apache.flink.api.java.tuple.{Tuple2 => FlinkTuple2}
import org.apache.flink.api.java.tuple.{Tuple2=>FlinkTuple2}
import org.apache.flink.configuration.Configuration
import org.apache.flink.mesos.scheduler.LaunchCoordinator._
import org.apache.flink.mesos.scheduler.messages._
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.mesos.Protos.{SlaveID, TaskInfo}
import org.apache.mesos.{Protos, SchedulerDriver}
import org.apache.mesos.{SchedulerDriver, Protos}
import org.junit.runner.RunWith
import org.mockito.Mockito.{verify, _}
import org.mockito.invocation.InvocationOnMock
Expand All @@ -47,8 +48,11 @@ import scala.collection.JavaConverters._
import org.apache.flink.mesos.Utils.range
import org.apache.flink.mesos.Utils.ranges
import org.apache.flink.mesos.Utils.scalar
import org.apache.flink.mesos.configuration.MesosOptions.DECLINED_OFFER_REFUSE_DURATION
import org.apache.flink.mesos.util.MesosResourceAllocation

import scala.concurrent.duration.Duration

@RunWith(classOf[JUnitRunner])
class LaunchCoordinatorTest
extends TestKitBase
Expand Down Expand Up @@ -201,12 +205,27 @@ class LaunchCoordinatorTest
*/
def taskSchedulerBuilder(optimizer: TaskScheduler) = new TaskSchedulerBuilder {
var leaseRejectAction: Action1[VirtualMachineLease] = null
var rejectAllExpiredOffers: Boolean = false
var leaseOfferExpiry: Long = 0L
var offersToReject: Int = 0

override def withLeaseRejectAction(
action: Action1[VirtualMachineLease]): TaskSchedulerBuilder = {
leaseRejectAction = action
this
}
override def withRejectAllExpiredOffers(): TaskSchedulerBuilder = {
rejectAllExpiredOffers = true
this
}

override def withLeaseOfferExpirySecs(leaseOfferExpirySecs: Long): TaskSchedulerBuilder = {
leaseOfferExpiry = leaseOfferExpirySecs
this
}

override def build(): TaskScheduler = optimizer

}

/**
Expand All @@ -233,6 +252,11 @@ class LaunchCoordinatorTest
val trace = Mockito.inOrder(schedulerDriver)
val fsm =
TestFSMRef(new LaunchCoordinator(testActor, config, schedulerDriver, optimizerBuilder))
val refuseFilter =
Protos.Filters.newBuilder()
.setRefuseSeconds(
Duration(config.getLong(DECLINED_OFFER_REFUSE_DURATION), TimeUnit.MILLISECONDS).toSeconds)
.build()

val framework = randomFramework
val task1 = randomTask
Expand Down Expand Up @@ -315,8 +339,8 @@ class LaunchCoordinatorTest
"stays in Idle with offers declined" in new Context {
fsm.setState(Idle)
fsm ! new ResourceOffers(Seq(slave1._3, slave1._4).asJava)
verify(schedulerDriver).declineOffer(slave1._3.getId)
verify(schedulerDriver).declineOffer(slave1._4.getId)
verify(schedulerDriver).declineOffer(slave1._3.getId, refuseFilter)
verify(schedulerDriver).declineOffer(slave1._4.getId, refuseFilter)
fsm.stateName should be (Idle)
}
}
Expand Down Expand Up @@ -456,7 +480,7 @@ class LaunchCoordinatorTest
fsm.setState(GatheringOffers,
GatherData(tasks = Seq(task1._2), newLeases = Seq(lease(slave1._3))))
fsm ! StateTimeout
verify(schedulerDriver).declineOffer(slave1._3.getId)
verify(schedulerDriver).declineOffer(slave1._3.getId, refuseFilter)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,8 @@ public ApplicationReport startAppMaster(
amContainer.setLocalResources(localResources);
fs.close();

Utils.setAclsFor(amContainer, flinkConfiguration);

// Setup CLASSPATH and environment variables for ApplicationMaster
final Map<String, String> appMasterEnv = new HashMap<>();
// set user specified app master environment variables
Expand Down
11 changes: 11 additions & 0 deletions flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.util.HadoopUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.StringUtils;
import org.apache.flink.yarn.configuration.YarnConfigOptions;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand All @@ -40,6 +41,7 @@
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
Expand Down Expand Up @@ -263,6 +265,13 @@ private static LocalResource registerLocalResource(FileSystem fs, Path remoteRsr
return localResource;
}

public static void setAclsFor(ContainerLaunchContext amContainer, org.apache.flink.configuration.Configuration flinkConfig) {
amContainer.setApplicationACLs(new HashMap<ApplicationAccessType, String>(){{
put(ApplicationAccessType.VIEW_APP, flinkConfig.getString(YarnConfigOptions.APPLICATION_VIEW_ACLS));
put(ApplicationAccessType.MODIFY_APP, flinkConfig.getString(YarnConfigOptions.APPLICATION_ADMIN_ACLS));
}});
}

public static void setTokensFor(ContainerLaunchContext amContainer, List<Path> paths, Configuration conf) throws IOException {
Credentials credentials = new Credentials();
// for HDFS
Expand Down Expand Up @@ -583,6 +592,8 @@ static ContainerLaunchContext createTaskExecutorContext(

ctx.setEnvironment(containerEnv);

setAclsFor(ctx, flinkConfig);

// For TaskManager YARN container context, read the tokens from the jobmanager yarn container local file.
// NOTE: must read the tokens from the local file, not from the UGI context, because if UGI is login
// using Kerberos keytabs, there is no HDFS delegation token in the UGI context.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,25 @@ public class YarnConfigOptions {
.defaultValue("")
.withDescription("A comma-separated list of tags to apply to the Flink YARN application.");

/**
* Users and groups to give VIEW access.
* https://www.cloudera.com/documentation/enterprise/latest/topics/cm_mc_yarn_acl.html
*/
public static final ConfigOption<String> APPLICATION_VIEW_ACLS =
key("yarn.view.acls")
.defaultValue("")
.withDescription("Users and groups to give VIEW acess. The ACLs are of for" +
" comma-separated-usersspacecomma-separated-groups");

/**
* Users and groups to give MODIFY access.
*/
public static final ConfigOption<String> APPLICATION_ADMIN_ACLS =
key("yarn.admin.acls")
.defaultValue("")
.withDescription("Users and groups to give MODIFY acess. The ACLs are of for" +
" comma-separated-usersspacecomma-separated-groups");

// ------------------------------------------------------------------------

/** This class is not meant to be instantiated. */
Expand Down

0 comments on commit 2b52e2a

Please sign in to comment.