Permalink
Browse files

Changed deploy method from single jar to Hadoop distributed cache

  • Loading branch information...
1 parent ea99816 commit b10c20fca43d64667401fa34f24b1b16c7137726 Gabor Szabo committed Feb 25, 2012
View
58 add_tgz_to_build.sh
@@ -0,0 +1,58 @@
+#!/usr/bin/env bash
+
+#
+# Copyright 2011 Twitter, Inc.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#
+# Extracts a jar file and adds its contents to the PyCascading jar build.
+#
+# We need to extract the jar's contents as we expect that it may contain
+# further jars, which would not be picked up if we didn't extract the
+# whole jar.
+#
+
+usage()
+{
+ cat << EOF
+Usage: $0 <tgz1> [<tgz2> ...]
+
+Adds the tgz files to the main PyCascading tgz. This is useful if we have our
+own or third party Python libraries that the PyCascading scripts use, and want to
+distribute these to the Hadoop server together with the PyCascading master tgz.
+
+The tgz files can contain Python libraries that will be added to the search path.
+
+Obviously, this script must be run after every new build of PyCascading for all
+the tgzs that should be added to the PyCascading build.
+
+EOF
+}
+
+if [ $# -eq 0 ]; then
+ usage
+ exit
+fi
+
+home_dir=$(pwd)
+pycascading_dir=$(dirname "$0")
+
+temp=$(mktemp -d -t PyCascading-tmp-XXXXXX)
+gzip -d <"$pycascading_dir/build/pycascading.tgz" >"$temp/pycascading.tar"
+for j in "$@"; do
+ gzip -d <"$j" >"$temp/archive.tar"
+ tar -A -f "$temp/pycascading.tar" "$temp/archive.tar"
+done
+gzip -c <"$temp/pycascading.tar" >"$pycascading_dir/build/pycascading.tgz"
+rm -rf "$temp"
View
4 examples/joins.py
@@ -44,10 +44,10 @@ def main():
# We need to use declared_fields if the field names
p = (lhs & rhs) | Join(['col1', 'col1'],
declared_fields=['lhs1', 'lhs2', 'rhs1', 'rhs2'])
-
+
# Save the 2nd and 4th columns of p to output1
p | SelectFields(['lhs2', 'rhs2']) | output1
-
+
# Join on the upper-cased first column of p and the 2nd column of rhs,
# and save the output to output2
((p | upper_case) & (rhs | SelectFields(['col2']))) | \
View
22 java/build.xml
@@ -27,7 +27,6 @@
<property name="build.dir" location="${basedir}/../build" />
<property name="build.classes" location="${build.dir}/classes" />
<property name="build.libs" location="${build.classes}/lib" />
- <property name="python.libs" location="${build.classes}/python" />
<property name="python.dir" value="${basedir}/../python" />
@@ -67,7 +66,7 @@
<classpath refid="java.classpath"/>
</javac>
</target>
-
+
<target name="jar" depends="compile" description="Creates a PyCascading jar with dependencies">
<!-- Copy Cascading & Jython jars -->
<copy todir="${build.libs}">
@@ -77,24 +76,23 @@
<fileset dir="${cascading.home}/lib/xml" includes="*.jar" />
<fileset dir="${jython.home}" includes="jython.jar" />
</copy>
-
- <copy todir="${python.libs}">
- <fileset dir="${python.dir}" />
- </copy>
-
- <copy todir="${python.libs}/Lib">
- <fileset dir="${jython.libs}" excludes="*.class" />
- </copy>
-
+
<jar jarfile="${build.dir}/pycascading.jar">
<fileset dir="${build.classes}" />
<manifest>
<!-- the project Main class, by default assumes Main -->
- <attribute name="Main-Class" value="com.twitter.pycascading.Util" />
+ <attribute name="Main-Class" value="com.twitter.pycascading.Main" />
</manifest>
</jar>
+ <!-- Apparently need to use .tgz for the archive, .tar.gz didn't work.
+ This file is going to be put in the Hadoop distributed cache, and if
+ the extension is .tar.gz, my installation didn't extract it. -->
+ <tar destfile="${build.dir}/pycascading.tgz" compression="gzip">
+ <tarfileset dir="${python.dir}" prefix="python" excludes="**/*.class" />
+ <tarfileset dir="${jython.libs}" prefix="python/Lib" excludes="**/*.class" />
+ </tar>
</target>
<target name="clean">
View
20 java/src/com/twitter/pycascading/CascadingBaseOperationWrapper.java
@@ -29,7 +29,10 @@
import org.python.core.PyString;
import org.python.core.PyTuple;
+import cascading.flow.FlowProcess;
+import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.operation.BaseOperation;
+import cascading.operation.OperationCall;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntry;
@@ -100,6 +103,11 @@ public CascadingBaseOperationWrapper(int numArgs, Fields fieldDeclaration) {
super(numArgs, fieldDeclaration);
}
+ @Override
+ public void prepare(FlowProcess flowProcess, OperationCall operationCall) {
+ function.prepare(((HadoopFlowProcess) flowProcess).getJobConf());
+ }
+
private void writeObject(ObjectOutputStream stream) throws IOException {
stream.writeObject(function);
stream.writeObject(convertInputTuples);
@@ -122,8 +130,8 @@ private void readObject(ObjectInputStream stream) throws IOException, ClassNotFo
}
/**
- * We assume that the Python functions (map and reduce) are always called
- * with the same number of arguments. Override this to return the number of
+ * We assume that the Python functions (map and reduce) are always called with
+ * the same number of arguments. Override this to return the number of
* arguments we will be passing in all the time.
*
* @return the number of arguments the wrapper is passing in
@@ -148,8 +156,8 @@ private PyObject getOriginalArg(PyObject arg) {
}
/**
- * Sets up the local variables that were not serialized for optimizations
- * and unwraps function arguments wrapped with PythonFunctionWrapper.
+ * Sets up the local variables that were not serialized for optimizations and
+ * unwraps function arguments wrapped with PythonFunctionWrapper.
*/
protected void setupArgs() {
int numArgs = getNumParameters();
@@ -208,13 +216,9 @@ public Object convertInput(TupleEntry tupleEntry) {
}
i = 1;
for (Object value : tupleEntry.getTuple()) {
- // System.out.println("**** value: " + value);
dictElements[i] = Py.java2py(value);
i += 2;
}
- // for (PyObject o : dictElements) {
- // System.out.println("**** dictElems: " + o);
- // }
PyDictionary dict = new PyDictionary(dictElements);
result = dict;
}
View
18 java/src/com/twitter/pycascading/Main.java
@@ -0,0 +1,18 @@
+package com.twitter.pycascading;
+
+import java.util.Properties;
+
+import org.python.util.PythonInterpreter;
+
+public class Main {
+
+ public static void main(String[] args) throws Exception {
+ Properties sysProps = System.getProperties();
+ Properties props = new Properties();
+ props.put("python.cachedir", sysProps.get("user.home") + "/.jython-cache");
+ props.put("python.cachedir.skip", "0");
+ PythonInterpreter.initialize(System.getProperties(), props, args);
+ PythonInterpreter interpreter = new PythonInterpreter();
+ interpreter.execfile(args[0]);
+ }
+}
View
39 java/src/com/twitter/pycascading/PythonFunctionWrapper.java
@@ -49,6 +49,10 @@
import java.io.Serializable;
import java.net.URISyntaxException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
import org.python.core.PyDictionary;
import org.python.core.PyFunction;
import org.python.core.PyObject;
@@ -94,18 +98,41 @@ private void writeObject(ObjectOutputStream stream) throws IOException {
stream.writeObject(runningMode);
}
- private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException,
- URISyntaxException {
+ private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
funcName = (PyString) stream.readObject();
sourceFile = (PyString) stream.readObject();
runningMode = (RunningMode) stream.readObject();
+ }
+ public void prepare(JobConf conf) {
+ String pycascadingDir = null;
+ String sourceDir = null;
+ String[] modulePaths = null;
+ if (runningMode == RunningMode.HADOOP) {
+ try {
+ Path[] archives = DistributedCache.getLocalCacheArchives(conf);
+ pycascadingDir = archives[0].toString() + "/";
+ sourceDir = archives[1].toString() + "/";
+ modulePaths = new String[archives.length];
+ int i = 0;
+ for (Path archive : archives) {
+ modulePaths[i++] = archive.toString();
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ pycascadingDir = System.getProperty("pycascading.root") + "/";
+ sourceDir = "";
+ modulePaths = new String[] { pycascadingDir, sourceDir };
+ }
getPythonInterpreter();
- String jarDir = Util.getJarFolder();
- interpreter.execfile(jarDir + "python/pycascading/init_module.py");
+ interpreter.execfile(pycascadingDir + "python/pycascading/init_module.py");
interpreter.set("module_name", "m");
- interpreter.set("file_name", (runningMode == RunningMode.LOCAL ? "" : jarDir) + sourceFile);
- PyObject module = (PyObject) interpreter.eval("load_source(module_name, file_name)");
+ interpreter.set("file_name", sourceDir + sourceFile);
+ interpreter.set("module_paths", modulePaths);
+ PyObject module = (PyObject) interpreter
+ .eval("load_source(module_name, file_name, module_paths)");
pythonFunction = module.__getattr__(funcName);
if (!PyFunction.class.isInstance(pythonFunction)) {
// function is assumed to be decorated, resulting in a DecoratedFunction.
View
83 java/src/com/twitter/pycascading/TemporaryHdfs.java
@@ -0,0 +1,83 @@
+package com.twitter.pycascading;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import cascading.flow.Flow;
+import cascading.flow.FlowListener;
+
+public class TemporaryHdfs implements FlowListener {
+ private String tmpDir;
+
+ @Override
+ public void onStarting(Flow flow) {
+ }
+
+ @Override
+ public void onStopping(Flow flow) {
+ removeTmpDir();
+ }
+
+ @Override
+ public void onCompleted(Flow flow) {
+ removeTmpDir();
+ }
+
+ @Override
+ public boolean onThrowable(Flow flow, Throwable throwable) {
+ removeTmpDir();
+ throwable.printStackTrace();
+ return false;
+ }
+
+ private String getRandomFileName() {
+ String name = "";
+ Random rnd = new Random();
+ for (int i = 0; i < 6; i++) {
+ name += (char) ((int) 'a' + rnd.nextInt((int) 'z' - (int) 'a'));
+ }
+ return name;
+ }
+
+ public void createTmpFolder(Configuration conf) throws IOException {
+ // Only fs.default.name and hadoop.tmp.dir are defined at the time of the
+ // job initialization, we cannot use mapreduce.job.dir, mapred.working.dir,
+ // or mapred.job.id
+ // Possibly use Hfs.getTempDir later from Cascading.
+ // In tmpDir, I cannot put a / in between the two variables, otherwise
+ // Hadoop will fail to copy the archive to the temporary folder
+ tmpDir = conf.get("fs.default.name") + conf.get("hadoop.tmp.dir");
+ tmpDir = tmpDir + "/" + "pycascading-" + getRandomFileName();
+ Path path = new Path(tmpDir);
+ FileSystem fs = path.getFileSystem(new Configuration());
+ fs.mkdirs(path);
+ }
+
+ private void removeTmpDir() {
+ Path path = new Path(tmpDir);
+ try {
+ FileSystem fs = path.getFileSystem(new Configuration());
+ fs.delete(path, true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private String getExtension(String path) {
+ int i = path.lastIndexOf('.');
+ return (i >= 0 ? path.substring(i, path.length()) : "");
+ }
+
+ public String copyFromLocalFile(String source) throws IOException {
+ Path src = new Path(source);
+ String destName = tmpDir + "/" + getRandomFileName() + getExtension(source);
+ Path dest = new Path(destName);
+ FileSystem fs = dest.getFileSystem(new Configuration());
+ fs.copyFromLocalFile(src, dest);
+ return destName;
+ }
+}
View
72 java/src/com/twitter/pycascading/Util.java
@@ -14,11 +14,12 @@
*/
package com.twitter.pycascading;
+import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Properties;
-import org.python.util.PythonInterpreter;
+import org.apache.hadoop.conf.Configuration;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
@@ -63,8 +64,22 @@ public static String getCascadingJar() {
}
}
- public static void run(int numReducers, Map<String, Object> config, Map<String, Tap> sources,
- Map<String, Tap> sinks, Pipe... tails) {
+ /**
+ * We use the "pycascading.root" Java system property to store the location of
+ * the Python sources for PyCascading. This is only used in local mode. This
+ * is needed so that we know where to set the import path when we spin up the
+ * mappers and reducers.
+ *
+ * @param root
+ * the location of the PyCascading sources on the local file system
+ */
+ public static void setPycascadingRoot(String root) {
+ System.setProperty("pycascading.root", root);
+ }
+
+ public static void run(String runningMode, int numReducers, Map<String, Object> config,
+ Map<String, Tap> sources, Map<String, Tap> sinks, Pipe... tails) throws IOException,
+ URISyntaxException {
// String strClassPath = System.getProperty("java.class.path");
// System.out.println("Classpath is " + strClassPath);
@@ -86,25 +101,42 @@ public static void run(int numReducers, Map<String, Object> config, Map<String,
+ "com.twitter.pycascading.pythonserialization.PythonSerialization");
properties.put("mapred.jobtracker.completeuserjobs.maximum", 50000);
properties.put("mapred.input.dir.recursive", "true");
- FlowConnector.setApplicationJarClass(properties, Util.class);
- FlowConnector flowConnector = new FlowConnector(properties);
- try {
- Flow flow = flowConnector.connect(sources, sinks, tails);
- // execute the flow, block until complete
- flow.complete();
- } catch (Exception e) {
- e.printStackTrace();
+ Configuration conf = new Configuration();
+ TemporaryHdfs tempDir = new TemporaryHdfs();
+ if ("hadoop".equals(runningMode)) {
+ tempDir = new TemporaryHdfs();
+ tempDir.createTmpFolder(conf);
+ Object archives = config.get("distributed_cache.archives");
+ if (archives != null) {
+ String dests = null;
+ for (String archive : (Iterable<String>) archives) {
+ String dest = tempDir.copyFromLocalFile(archive);
+ dests = (dests == null ? dest : dests + "," + dest);
+ }
+ // This is an ugly hack, we should use DistributedCache.
+ // DistributedCache however operates on a JobConf, and since
+ // Cascading expects a Map, we cannot directly pass
+ // in the parameters set into a JobConf.
+ // TODO: see if a later version of Cascading can update its properties
+ // using a JobConf
+ properties.setProperty("mapred.cache.archives", dests);
+ // TODO: see the one just above
+ properties.setProperty("mapred.create.symlink", "yes");
+ }
}
- }
- public static void main(String args[]) {
- Properties sysProps = System.getProperties();
- Properties props = new Properties();
- props.put("python.cachedir", sysProps.get("user.home") + "/.jython-cache");
- props.put("python.cachedir.skip", "0");
- PythonInterpreter.initialize(System.getProperties(), props, args);
- PythonInterpreter interpreter = new PythonInterpreter();
- interpreter.execfile(System.getProperty("user.dir") + "/" + args[0]);
+ FlowConnector.setApplicationJarClass(properties, Main.class);
+ FlowConnector flowConnector = new FlowConnector(properties);
+ Flow flow = flowConnector.connect(sources, sinks, tails);
+ if ("hadoop".equals(runningMode)) {
+ try {
+ flow.addListener(tempDir);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ // execute the flow, block until complete
+ flow.complete();
}
}
View
3 local_run.sh
@@ -88,4 +88,5 @@ fi
# sys.path will be initialized from JYTHONPATH
JYTHONPATH="$home_dir/python" java -classpath "$classpath" \
-org.python.util.jython "$home_dir/python/pycascading/bootstrap.py" local "$@"
+org.python.util.jython "$home_dir/python/pycascading/bootstrap.py" \
+local "$home_dir" "$@"
View
28 python/pycascading/bootstrap.py
@@ -31,34 +31,50 @@
if __name__ == "__main__":
# The first command line parameter must be 'hadoop' or 'local'
# to indicate the running mode
+ # The second is the location of the PyCascading Python sources in local
+ # mode, and the PyCascading tarball in Hadoop mode
running_mode = sys.argv[1]
+ python_dir = sys.argv[2]
+ sys.argv = sys.argv[3:]
from com.twitter.pycascading import Util
cascading_jar = Util.getCascadingJar()
# This is the folder where Hadoop extracted the jar file for execution
tmp_dir = Util.getJarFolder()
+ Util.setPycascadingRoot(python_dir)
+
# The initial value of sys.path is JYTHONPATH plus whatever Jython appends
# to it (normally the Python standard libraries the come with Jython)
- sys.path.extend((cascading_jar, '.', tmp_dir, tmp_dir + 'python',
- tmp_dir + 'python/Lib'))
+ sys.path.extend((cascading_jar, '.', tmp_dir, python_dir + '/python',
+ python_dir + '/python/Lib'))
+
+ import os
+ import encodings
+ import pycascading.pipe, getopt
+ pycascading.pipe.config = dict()
+
+ opts, args = getopt.getopt(sys.argv, 'a:')
+ pycascading.pipe.config['distributed_cache.archives'] = []
+ for opt in opts:
+ if opt[0] == '-a':
+ pycascading.pipe.config['distributed_cache.archives'].append(opt[1])
# Haha... it's necessary to put this here, otherwise simplejson won't work.
# Maybe it's automatically imported in the beginning of a Jython program,
# but since at that point the sys.path is not set yet to Lib, it will fail?
# Instead, we can use Java's JSON decoder...
# import encodings
- m = imp.load_source('main', sys.argv[2])
+ m = imp.load_source('main', args[0])
# We need to explicitly inject running_mode into the tap module,
# otherwise we cannot import bootstrap from tap and use the
# bootstrap.running_mode from here.
- import pycascading.pipe
- pycascading.pipe.running_mode = running_mode
+ pycascading.pipe.config['running.mode'] = running_mode
# Remove the running mode argument so that sys.argv will look like as
# if it was coming from a simple command line execution
- del sys.argv[0 : 2]
+ sys.argv = args[2:]
m.main()
View
29 python/pycascading/init_module.py
@@ -27,20 +27,35 @@
import sys, imp
-def load_source(module_name, file_name):
+def load_source(module_name, file_name, module_paths):
"""Loads the given module from a Python source file.
-
+
+ This function is called by PythonFunctionWrapper.prepare(...) after it
+ started the Python interpreter to request the given source file to be
+ loaded. The function is to be found in this source file.
+
+ module_paths is an array of path names where the sources or other
+ supporting files are found. In particular, module_paths[0] is the location
+ of the PyCascading Python sources, and modules_paths[1] is the location of
+ the source file defining the function.
+
+ In Hadoop mode (with remote_deploy.sh), the first two -a options must
+ specify the archives of the PyCascading sources and the job sources,
+ respectively.
+
Arguments:
module_name -- the name of the variable read the module into
- file_name -- the file that contains the source for the module
+ file_name -- the file that contains the source for the module
+ module_paths -- the locations of the Python sources
"""
+ # This one should be on the classpath from the job jar or the extracted jar
from com.twitter.pycascading import Util
cascading_jar = Util.getCascadingJar()
- tmp_dir = Util.getJarFolder()
- sys.path.extend((cascading_jar, tmp_dir + '/python',
- tmp_dir + '/python/Lib'))
-
+ sys.path.extend((cascading_jar, module_paths[0] + '/python',
+ module_paths[0] + '/python/Lib'))
+ sys.path.extend(module_paths[1 : ])
+
# Haha... it's necessary to put this here, otherwise simplejson won't work.
# Maybe it's automatically imported in the beginning of a Jython program,
# but since at that point the sys.path is not set yet to Lib, it will fail?
View
2 python/pycascading/pipe.py
@@ -104,7 +104,7 @@ def random_pipe_name(prefix):
def _python_function_to_java(function):
"""Create the serializable Java object for a Python function."""
wrapped_func = PythonFunctionWrapper(function)
- if running_mode == 'local':
+ if config['running.mode'] == 'local':
wrapped_func.setRunningMode(PythonFunctionWrapper.RunningMode.LOCAL)
else:
wrapped_func.setRunningMode(PythonFunctionWrapper.RunningMode.HADOOP)
View
6 python/pycascading/tap.py
@@ -48,7 +48,7 @@ def expand_path_with_home(output_folder):
output_folder -- the absolute or relative path of the output HDFS folder
"""
import pycascading.pipe
- if pycascading.pipe.running_mode == 'hadoop':
+ if pycascading.pipe.config['running.mode'] == 'hadoop':
if not any(map(lambda scheme: output_folder.startswith(scheme), \
['hdfs:', 'file:', 's3:', 's3n:', '/'])):
fs = Path('/').getFileSystem(Configuration())
@@ -198,7 +198,9 @@ def run(self, num_reducers=50, config=None):
if source in sources_used:
source_map[source] = self.source_map[source]
tails = [t.get_assembly() for t in self.tails]
- Util.run(num_reducers, config, source_map, self.sink_map, tails)
+ import pycascading.pipe
+ Util.run(pycascading.pipe.config['running.mode'], num_reducers,
+ pycascading.pipe.config, source_map, self.sink_map, tails)
class _Sink(Chainable):
View
96 remote_deploy.sh
@@ -30,7 +30,8 @@ server=localhost
server_deploys_dir='$HOME/pycascading/deploys'
# The folder on the remote server where the PyCascading master jar will be
-# placed
+# placed. This must be given as an absolute path name so that the master
+# files can be found from any directory.
server_build_dir='$HOME/pycascading/master'
# Additional SSH options (see "man ssh"; private key, etc.)
@@ -48,16 +49,27 @@ The main_script gets executed by PyCascading. All additional_files are also
copied to the remote server and submitted together with the job to Hadoop.
Options:
- -h Show this message
+ -h Show this message.
+
-m Also deploy the PyCascading master jar before submitting
- the job job
+ the job.
+
-f <file> Copy file to the server together with main_script, but
do not bundle it into the Hadoop jar for submission. This
option may be repeated several times for multiple files.
File names cannot start with a dot.
- -s <server> The name of the remote server where Hadoop is installed
- and the PyCascading jar should be deployed to
- -o <ssh_options> Additional options for SSH (such as private key, etc.)
+
+ -s <server> The name of the remote server where Hadoop is installed,
+ and the PyCascading jar should be deployed to.
+
+ -o <ssh_options> Additional options for SSH (such as private key, etc.).
+
+ -r Run the job immediately after submission with SSH. The
+ recommended way to run a script is either using screen
+ or nohup, so that the job doesn't get interrupted if the
+ terminal connection goes down. Note that no additional
+ command line parameters can be passed in this case for
+ the job.
EOF
}
@@ -78,9 +90,12 @@ realpath()
# Copy the master jar over first? The -m option.
master_first=no
+# Run job after submission with SSH?
+run_immediately=no
+
declare -a files_to_copy
-while getopts ":hmf:s:o:" OPTION; do
+while getopts ":hmf:s:o:r" OPTION; do
case $OPTION in
h) usage
exit 1
@@ -93,6 +108,8 @@ while getopts ":hmf:s:o:" OPTION; do
;;
o) ssh_options="$OPTARG"
;;
+ r) run_immediately=yes
+ ;;
esac
done
shift $((OPTIND-1))
@@ -108,25 +125,22 @@ home_dir=$(realpath $(dirname "$0"))
tmp_dir=$(mktemp -d -t PyCascading-tmp-XXXXXX)
if [ $master_first == yes ]; then
- master="$home_dir/build/pycascading.jar"
- if [ -a "$master" ]; then
- ln -s "$master" $home_dir/python/pycascading/bootstrap.py $tmp_dir
+ build_dir="$home_dir/build"
+ if [ -a "$build_dir/pycascading.jar" -a \
+ -a "$build_dir/pycascading.tgz" ]; then
+ ln -s "$build_dir/pycascading.jar" "$build_dir/pycascading.tgz" \
+ "$home_dir/python/pycascading/bootstrap.py" "$tmp_dir"
else
- echo Build the PyCascading master jar first in the \'java\' folder with ant.
+ echo 'Build the PyCascading master package first in the "java" folder with ant.'
exit 2
fi
fi
if [ "$main_file" != "" ]; then
- mkdir $tmp_dir/sources
- mkdir $tmp_dir/other
- for i in "$@"; do
- ln -s $(realpath "$i") "$tmp_dir/sources"
- done
-
- for i in "${files_to_copy[@]}"; do
- ln -s $(realpath "$i") "$tmp_dir/other"
- done
+ tar -c -z -f "$tmp_dir/sources.tgz" "$@"
+ if [ ${#files_to_copy} -gt 0 ]; then
+ tar -c -z -f "$tmp_dir/others.tgz" "${files_to_copy[@]}"
+ fi
fi
# Create a setup file that will be run on the deploy server
@@ -138,40 +152,48 @@ cat >"$tmp_dir/setup.sh" <<EOF
if [ -e pycascading.jar ]; then
# If we packaged the master jar, update it
mkdir -p "$server_build_dir"
- mv pycascading.jar bootstrap.py "$server_build_dir"
+ mv pycascading.jar pycascading.tgz bootstrap.py "$server_build_dir"
+ rm -rf "$server_build_dir/python"
+ tar -x -z -f "$server_build_dir/pycascading.tgz" -C "$server_build_dir"
fi
-if [ -e sources ]; then
+if [ -e sources.tgz ]; then
mkdir -p "$server_deploys_dir"
deploy_dir=\$(mktemp -d "$server_deploys_dir/XXXXXX")
- mv run.sh sources other "\$deploy_dir"
- cd "\$deploy_dir"
- if [ -e "$server_build_dir/pycascading.jar" ]; then
- cp "$server_build_dir/pycascading.jar" deploy.jar
- cp "$server_build_dir/bootstrap.py" .
- jar uf deploy.jar -C sources .
- mv other/* sources 2>/dev/null
- rm -r other
- echo "Run the job on $server with:"
- echo " \$deploy_dir/run.sh [parameters]"
- else
+ mkdir "\$deploy_dir/job"
+ mv run.sh "\$deploy_dir"
+ tar -x -z -f sources.tgz -C "\$deploy_dir/job"
+ mv sources.tgz "\$deploy_dir"
+ if [ -e others.tgz ]; then
+ tar -x -z -f others.tgz -C "\$deploy_dir/job"
+ fi
+ if [ ! -e "$server_build_dir/pycascading.jar" ]; then
+ echo 'WARNING!!!'
echo 'The PyCascading master jar has not yet been deployed, do a "remote_deploy.sh -m" first.'
+ echo
fi
+ echo "Run the job on $server with:"
+ echo " \$deploy_dir/run.sh [parameters]"
+fi
+if [ \$1 == yes ]; then
+ \$deploy_dir/run.sh
fi
EOF
chmod +x "$tmp_dir/setup.sh"
# Create a small script on the remote server that runs the job
-cat >$tmp_dir/run.sh <<EOF
+cat >"$tmp_dir/run.sh" <<EOF
#!/usr/bin/env bash
# Run the PyCascading job
-cd \$(dirname "\$0")/sources
-hadoop jar ../deploy.jar ../bootstrap.py hadoop "$(basename "$main_file")" "\$@"
+cd "\$(dirname "\$0")/job"
+hadoop jar "$server_build_dir/pycascading.jar" "$server_build_dir/bootstrap.py" \\
+hadoop "$server_build_dir" -a "$server_build_dir/pycascading.tgz" -a ../sources.tgz \\
+"$(basename "$main_file")" "\$@"
EOF
chmod +x "$tmp_dir/run.sh"
# Upload the package to the server and run the setup script
cd "$tmp_dir"
tar czhf - . | ssh $server $ssh_options \
"dir=\$(mktemp -d -t PyCascading-tmp-XXXXXX); cd \"\$dir\"; tar xfz -; " \
-"./setup.sh; rm -r \"\$dir\""
+"./setup.sh $run_immediately; rm -r \"\$dir\""
rm -r "$tmp_dir"

0 comments on commit b10c20f

Please sign in to comment.