Skip to content

Commit

Permalink
Scheduler improvements.
Browse files Browse the repository at this point in the history
 - Added additional configuration parameters for controlling scheduler
   behaviour:
   - mesos.offer.filter.seconds
   - mesos.offer.expiry.multiplier
   - mesos.prefer.reserved.resources
 - Implemented combining of reserved & unreserved resources
 - Improved Docker support, added support for running Storm inside
   containers. This also introduces the mesos.container.docker.image
   config param
 - Added unit tests
 - Improved port handling, especially with regard to logviewer
 - Code cleanup/style fixes
 - Implemented filters & reviveOffers()
  • Loading branch information
brndnmtthws committed Nov 17, 2015
1 parent 98a831a commit af8c49b
Show file tree
Hide file tree
Showing 19 changed files with 1,457 additions and 699 deletions.
12 changes: 6 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
#
# Dockerfile for Storm Mesos framework
#
FROM mesosphere/mesos:0.22.0-1.0.ubuntu1404
FROM mesosphere/mesos:0.25.0-0.2.70.ubuntu1404
MAINTAINER Timothy Chen <tnachen@gmail.com>

# build packages
ENV DEBIAN_FRONTEND noninteractive
RUN apt-get update
RUN apt-get install -yq openjdk-7-jdk
RUN apt-get install -yq openjdk-7-jdk maven wget

# export environment
ENV JAVA_HOME /usr/lib/jvm/java-7-openjdk-amd64

ENV MESOS_NATIVE_JAVA_LIBRARY /usr/lib/libmesos.so

# copy local checkout into /opt/storm
ADD . /opt/storm
ADD . /work

WORKDIR /work
RUN ./bin/build-release.sh main
RUN mkdir -p /opt/storm && tar xf storm-mesos-*.tgz -C /opt/storm --strip=1 && rm -rf /work
WORKDIR /opt/storm

ENTRYPOINT ["/opt/storm/bin/storm-mesos"]
16 changes: 11 additions & 5 deletions README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ For example the following command will download the Storm release zip into the c
```bash
bin/build-release.sh downloadStormRelease
```
* `main`

Build a Storm package with the Mesos scheduler. The output of this command can be used as the package for `mesos.executor.uri`.

* `clean`

Expand Down Expand Up @@ -88,7 +91,7 @@ Storm/Mesos provides resource isolation between topologies. So you don't need to

## Mandatory configuration

1. `mesos.executor.uri`: Once you fill in the configs and repack the distribution, you need to place the distribution somewhere where Mesos executors can find it. Typically this is on HDFS, and this config is the location of where you put the distibution.
1. `mesos.executor.uri` or `mesos.container.docker.image`: Once you fill in the configs and repack the distribution, you need to place the distribution somewhere where Mesos executors can find it. Typically this is on HDFS, and this config is the location of where you put the distibution. Alternatively, you may use a Docker image in place of the executor URI. Take a look at the Dockerfile in the top-level of this repository for an example of how to use it.

2. `mesos.master.url`: URL for the Mesos master.

Expand All @@ -99,25 +102,28 @@ Storm/Mesos provides resource isolation between topologies. So you don't need to
## Optional configuration

* `mesos.supervisor.suicide.inactive.timeout.secs`: Seconds to wait before supervisor to suicides if supervisor has no task to run. Defaults to "120".
* `mesos.master.failover.timeout.secs`: Framework failover timeout in second. Defaults to "3600".
* `mesos.master.failover.timeout.secs`: Framework failover timeout in second. Defaults to "24*7*3600".
* `mesos.allowed.hosts`: Allowed hosts to run topology, which takes hostname list as a white list.
* `mesos.disallowed.hosts`: Disallowed hosts to run topology, which takes hostname list as a back list.
* `mesos.framework.role`: Framework role to use. Defaults to "*".
* `mesos.framework.checkpoint`: Enabled framework checkpoint or not. Defaults to false.
* `mesos.offer.lru.cache.size`: LRU cache size. Defaults to "1000".
* `mesos.offer.filter.seconds`: Number of seconds to filter unused Mesos offers. Defaults to "120".
* `mesos.offer.expiry.multiplier`: Offer expiry multiplier for `nimbus.monitor.freq.secs`. Defaults to "2000".
* `mesos.local.file.server.port`: Port for the local file server to bind to. Defaults to a random port.
* `mesos.framework.name`: Framework name. Defaults to "Storm!!!".
* `mesos.framework.principal`: Framework principal to use to register with Mesos
* `mesos.framework.secret.file`: Location of file that contains the principal's secret. Secret cannot end with a NL.
* `supervisor.autostart.logviewer`: Default is true, if not false please add 128M to topology.mesos.executor.mem.mb
* `mesos.prefer.reserved.resources`: Prefer reserved resources over unreserved (i.e., `"*"` role). Defaults to "true".
* `supervisor.autostart.logviewer`: Default is true, if not false please add 128M to `topology.mesos.executor.mem.mb`.

## Resource configuration

* `topology.mesos.worker.cpu`: CPUs per worker. Defaults to "1".
* `topology.mesos.worker.mem.mb`: Memory (in MiB) per worker. Defaults to "1000".
* `worker.childopts`: Use this for JVM opts. You should have about 25% memory overhead for each task. For
example, with `-Xmx1000m`, you should set `topology.mesos.worker.mem.mb: 1250`
* `topology.mesos.executor.cpu`: CPUs per executor. Defaults to "1".
* `topology.mesos.executor.mem.mb`: Memory (in MiB) per executor. Defaults to "1000".
* `topology.mesos.executor.cpu`: CPUs per executor. Defaults to "0.1".
* `topology.mesos.executor.mem.mb`: Memory (in MiB) per executor. Defaults to "600".
* `supervisor.childopts`: Use this for executor (aka supervisor) JVM opts. You should have about 25% memory
overhead for each task. For example, with `-Xmx500m`, you should set `topology.mesos.executor.mem.mb: 625`
6 changes: 3 additions & 3 deletions bin/storm-mesos
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ STORM_CMD = STORM_PATH + "/storm"
def nimbus():
os.chdir(STORM_PATH + "/..")
os.system(STORM_CMD + " nimbus storm.mesos.MesosNimbus")

COMMANDS = {"nimbus": nimbus}


def main():
COMMAND = sys.argv[1]
ARGS = sys.argv[2:]
(COMMANDS.get(COMMAND))(*ARGS)

if __name__ == "__main__":
main()
main()
57 changes: 32 additions & 25 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
</properties>

<build>
<sourceDirectory>src</sourceDirectory>
<testSourceDirectory>test</testSourceDirectory>
<sourceDirectory>src/main</sourceDirectory>
<testSourceDirectory>src/test</testSourceDirectory>
<resources>
<resource>
<directory>resources</directory>
Expand Down Expand Up @@ -73,24 +73,24 @@
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.15</version>
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<configuration>
<configLocation>config/checkstyle.xml</configLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.15</version>
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<configuration>
<configLocation>config/checkstyle.xml</configLocation>
<encoding>UTF-8</encoding>
<consoleOutput>true</consoleOutput>
<failsOnError>true</failsOnError>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
Expand All @@ -112,7 +112,7 @@
<dependency>
<groupId>org.apache.mesos</groupId>
<artifactId>mesos</artifactId>
<version>0.22.0</version>
<version>0.25.0</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
Expand Down Expand Up @@ -156,9 +156,16 @@
<version>18.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
</project>
38 changes: 38 additions & 0 deletions src/main/storm/mesos/LaunchTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storm.mesos;

import org.apache.mesos.Protos;

import static storm.mesos.PrettyProtobuf.offerToString;
import static storm.mesos.PrettyProtobuf.taskInfoToString;

class LaunchTask {
public final Protos.TaskInfo task;
public final Protos.Offer offer;

public LaunchTask(final Protos.TaskInfo task, final Protos.Offer offer) {
this.task = task;
this.offer = offer;
}

@Override
public String toString() {
return "Offer: " + offerToString(offer) + " TaskInfo: " + taskInfoToString(task);
}
}
19 changes: 13 additions & 6 deletions src/main/storm/mesos/MesosCommon.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package storm.mesos;

import backtype.storm.scheduler.TopologyDetails;
import com.google.common.base.Optional;
import org.apache.log4j.Logger;

import java.util.*;
Expand All @@ -34,8 +35,10 @@ public class MesosCommon {
public static final String WORKER_NAME_PREFIX = "topology.mesos.worker.prefix";
public static final String WORKER_NAME_PREFIX_DELIMITER = "topology.mesos.worker.prefix.delimiter";

public static final double DEFAULT_CPU = 1;
public static final double DEFAULT_MEM_MB = 1000;
public static final double DEFAULT_WORKER_CPU = 1;
public static final double DEFAULT_WORKER_MEM_MB = 1000;
public static final double DEFAULT_EXECUTOR_CPU = 0.1;
public static final double DEFAULT_EXECUTOR_MEM_MB = 200;
public static final int DEFAULT_SUICIDE_TIMEOUT_SECS = 120;

public static final String SUPERVISOR_ID = "supervisorid";
Expand Down Expand Up @@ -75,6 +78,10 @@ public static String supervisorId(String nodeid, String topologyId) {
return nodeid + "-" + topologyId;
}

public static boolean startLogViewer(Map conf) {
return Optional.fromNullable((Boolean) conf.get(AUTO_START_LOGVIEWER_CONF)).or(true);
}

public static int portFromTaskId(String taskId) {
int last = taskId.lastIndexOf("-");
String port = taskId.substring(last + 1);
Expand Down Expand Up @@ -105,7 +112,7 @@ public static double topologyWorkerCpu(Map conf, TopologyDetails info) {
}
LOG.info("CPUObj:" + cpuObj);
if (cpuObj == null) {
return DEFAULT_CPU;
return DEFAULT_WORKER_CPU;
} else {
return ((Number) cpuObj).doubleValue();
}
Expand All @@ -120,7 +127,7 @@ public static double topologyWorkerMem(Map conf, TopologyDetails info) {
}
LOG.info("MemObj:" + memObj + " Conf: " + conf.toString());
if (memObj == null) {
return DEFAULT_MEM_MB;
return DEFAULT_WORKER_MEM_MB;
} else {
return ((Number) memObj).doubleValue();
}
Expand All @@ -133,7 +140,7 @@ public static double executorCpu(Map conf) {
cpuObj = null;
}
if (cpuObj == null) {
return DEFAULT_CPU;
return DEFAULT_EXECUTOR_CPU;
} else {
return ((Number) cpuObj).doubleValue();
}
Expand All @@ -146,7 +153,7 @@ public static double executorMem(Map conf) {
memObj = null;
}
if (memObj == null) {
return DEFAULT_MEM_MB;
return DEFAULT_EXECUTOR_MEM_MB;
} else {
return ((Number) memObj).doubleValue();
}
Expand Down
Loading

0 comments on commit af8c49b

Please sign in to comment.