Skip to content

Commit

Permalink
Improving file server location and executor package handling
Browse files Browse the repository at this point in the history
  • Loading branch information
echinthaka committed Sep 2, 2015
1 parent c13c133 commit 51f1f18
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions src/storm/mesos/MesosNimbus.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class MesosNimbus implements INimbus {
public static final String CONF_MESOS_CHECKPOINT = "mesos.framework.checkpoint";
public static final String CONF_MESOS_OFFER_LRU_CACHE_SIZE = "mesos.offer.lru.cache.size";
public static final String CONF_MESOS_LOCAL_FILE_SERVER_PORT = "mesos.local.file.server.port";
public static final String CONF_MESOS_LOCAL_FILE_SERVER_LOCATION = "mesos.local.file.server.location";
public static final String CONF_MESOS_FRAMEWORK_NAME = "mesos.framework.name";

public static final Logger LOG = Logger.getLogger(MesosNimbus.class);
Expand Down Expand Up @@ -150,7 +151,8 @@ protected void createLocalServerPort() {

protected void setupHttpServer() throws Exception {
_httpServer = new LocalFileServer();
_configUrl = _httpServer.serveDir("/conf", "conf", _localFileServerPort);
String localConfFolder = Optional.fromNullable((String) _conf.get(CONF_MESOS_LOCAL_FILE_SERVER_LOCATION)).or("conf");
_configUrl = _httpServer.serveDir("/conf", localConfFolder, _localFileServerPort);

LOG.info("Started HTTP server from which config for the MesosSupervisor's may be fetched. URL: " + _configUrl);
}
Expand Down Expand Up @@ -547,6 +549,7 @@ public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot

String executorDataStr = JSONValue.toJSONString(executorData);
LOG.info("Launching task with Mesos Executor data: <" + executorDataStr + ">");
final String executorURI = (String) _conf.get(CONF_EXECUTOR_URI);
TaskInfo task = TaskInfo.newBuilder()
.setName("worker " + slot.getNodeId() + ":" + slot.getPort())
.setTaskId(taskId)
Expand All @@ -555,9 +558,12 @@ public void assignSlots(Topologies topologies, Map<String, Collection<WorkerSlot
.setExecutorId(ExecutorID.newBuilder().setValue(details.getId()))
.setData(ByteString.copyFromUtf8(executorDataStr))
.setCommand(CommandInfo.newBuilder()
.addUris(URI.newBuilder().setValue((String) _conf.get(CONF_EXECUTOR_URI)))
.addUris(URI.newBuilder().setValue(configUri))
.setValue("cp storm.yaml storm-mesos*/conf && cd storm-mesos* && python bin/storm " +
// Instead of downloading the executor package through mesos, we are downloading it here since sometimes the file name can contain
// additional characters at the end. For example, if we are downloading from hdfs the URI will have "?op=OPEN" at the end. This
// can fail the complete command. Hence, instead of relying on mesos to do the download for us, we are explicitly using wget to do
// that and use that to rename the file.
.setValue("wget -O storm-mesos.tgz " + executorURI + " && tar xvf storm-mesos.tgz && cp storm.yaml storm-mesos*/conf && cd storm-mesos* && python bin/storm " +
"supervisor storm.mesos.MesosSupervisor"))
.addResources(Resource.newBuilder()
.setName("cpus")
Expand Down

0 comments on commit 51f1f18

Please sign in to comment.