Permalink
Browse files

remove unwanted files

recover two java files
  • Loading branch information...
1 parent 2afe79d commit 0985e70c2f77e3ef35551043ffee9a85f2e46e9b @lguo lguo committed Sep 3, 2010
@@ -0,0 +1,179 @@
+package azkaban.jobs;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.fileupload.util.Streams;
+import org.apache.commons.io.IOUtils;
+import org.json.JSONObject;
+
+import azkaban.app.JobDescriptor;
+import azkaban.app.PropsUtils;
+import azkaban.common.jobs.AbstractJob;
+import azkaban.common.utils.Props;
+import azkaban.util.JSONToJava;
+
+/**
+ * A revised process-based job
+ *
+ * @author jkreps
+ *
+ */
+public abstract class AbstractProcessJob extends AbstractJob {
+
+ public static final String ENV_PREFIX = "env.";
+ public static final String WORKING_DIR = "working.dir";
+ public static final String JOB_PROP_ENV = "JOB_PROP_FILE";
+ public static final String JOB_NAME_ENV = "JOB_NAME";
+ public static final String JOB_OUTPUT_PROP_FILE = "JOB_OUTPUT_PROP_FILE";
+
+ private static final JSONToJava jsonToJava = new JSONToJava();
+
+ protected final String _jobPath;
+
+ protected final JobDescriptor _descriptor;
+ protected volatile Props _props;
+
+ protected Map<String, String> _env;
+ protected String _cwd;
+
+ private volatile Props generatedPropeties;
+
+ protected AbstractProcessJob(JobDescriptor descriptor) {
+ super(descriptor.getId());
+
+ _props = descriptor.getProps(); //starting properties
+ _jobPath = descriptor.getFullPath();
+
+ _descriptor = descriptor;
+ _env = getEnvironmentVariables();
+ _cwd = getWorkingDirectory();
+ }
+
+ public JobDescriptor getJobDescriptor() {
+ return _descriptor;
+ }
+
+ public Props getProps() {
+ return _props;
+ }
+
+ public String getJobPath() {
+ return _jobPath;
+ }
+
+ /**
+ *
+ * @author eric
+ */
+ protected void resolveProps () {
+ _props = PropsUtils.resolveProps(_props);
+ }
+
+ public Props getJobGeneratedProperties() {
+ return generatedPropeties;
+ }
+
+ /**
+ * initialize temporary and final property file
+ *
+ * @return {tmpPropFile, outputPropFile}
+ */
+ public File[] initPropsFiles() {
+ // Create properties file with additionally all input generated properties.
+ File [] files = new File[2];
+ files[0] = createFlattenedPropsFile(_cwd);
+
+ _env.put(JOB_PROP_ENV, files[0].getAbsolutePath());
+ _env.put(JOB_NAME_ENV, getId());
+
+ files[1] = createOutputPropsFile(getId(), _cwd);
+ _env.put(JOB_OUTPUT_PROP_FILE, files[1].getAbsolutePath());
+
+ return files;
+ }
+
+ public Map<String,String> getEnv() {
+ return _env;
+ }
+
+ public String getCwd() {
+ return _cwd;
+ }
+
+
+ public Map<String, String> getEnvironmentVariables( ) {
+ return _props.getMapByPrefix(ENV_PREFIX);
+ }
+
+ public String getWorkingDirectory() {
+ return _props.getString(WORKING_DIR, new File(_jobPath).getParent());
+ }
+
+ public Props loadOutputFileProps( File outputPropertiesFile)
+ {
+ InputStream reader = null;
+ try {
+ System.err.println("output properties file=" + outputPropertiesFile.getAbsolutePath());
+ reader = new BufferedInputStream(new FileInputStream(outputPropertiesFile));
+
+ Props outputProps = new Props();
+ final String content = Streams.asString(reader).trim();
+
+ if (!content.isEmpty()) {
+ Map<String, Object> propMap = jsonToJava.apply(new JSONObject(content));
+
+ for (Map.Entry<String, Object> entry : propMap.entrySet()) {
+ outputProps.put(entry.getKey(), entry.getValue().toString());
+ }
+ }
+ return outputProps;
+ }
+ catch (Exception e) {
+ e.printStackTrace(System.err);
+ throw new RuntimeException("Unable to gather output properties from: " + outputPropertiesFile.getAbsolutePath());
+ }
+ finally {
+ IOUtils.closeQuietly(reader);
+ }
+ }
+
+ public File createFlattenedPropsFile(String workingDir) {
+ File directory = new File(workingDir);
+ File tempFile = null;
+ try {
+ tempFile = File.createTempFile(getId() + "_", "_tmp", directory);
+ _props.storeFlattened(tempFile);
+ } catch(IOException e) {
+ throw new RuntimeException("Failed to create temp property file ", e);
+ }
+
+ return tempFile;
+ }
+
+ public static File createOutputPropsFile(String id, String workingDir) {
+ System.err.println("cwd=" + workingDir);
+
+ File directory = new File(workingDir);
+ File tempFile = null;
+ try {
+ tempFile = File.createTempFile(id + "_output_", "_tmp", directory);
+ } catch (IOException e) {
+ System.err.println("Failed to create temp output property file :\n");
+ e.printStackTrace(System.err);
+ throw new RuntimeException("Failed to create temp output property file ", e);
+ }
+ return tempFile;
+ }
+
+ public void generateProperties(File outputFile) {
+ generatedPropeties = loadOutputFileProps( outputFile);
+ }
+
+}
@@ -0,0 +1,112 @@
+package azkaban.jobs;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import azkaban.app.JobDescriptor;
+import azkaban.common.utils.Props;
+import azkaban.util.process.AzkabanProcess;
+import azkaban.util.process.AzkabanProcessBuilder;
+
+
+/**
+ * A job that passes all the job properties as command line arguments in "long" format,
+ * e.g. --key1 value1 --key2 value2 ...
+ *
+ * @author jkreps
+ *
+ */
+public abstract class LongArgJob extends AbstractProcessJob {
+
+ private static final long KILL_TIME_MS = 5000;
+ private final AzkabanProcessBuilder builder;
+ private volatile AzkabanProcess process;
+
+ public LongArgJob(String[] command, JobDescriptor desc) {
+ this(command, desc, new HashSet<String>(0));
+ }
+
+ public LongArgJob(String[] command, JobDescriptor desc, Set<String> suppressedKeys) {
+ //super(command, desc);
+ super(desc);
+ //String cwd = descriptor.getProps().getString(WORKING_DIR, new File(descriptor.getFullPath()).getParent());
+
+ this.builder = new AzkabanProcessBuilder(command).
+ setEnv(getProps().getMapByPrefix(ENV_PREFIX)).
+ setWorkingDir(getCwd()).
+ setLogger(getLog());
+ appendProps(suppressedKeys);
+ }
+
+ public void run() throws Exception {
+
+ resolveProps();
+
+ long startMs = System.currentTimeMillis();
+ info("Command: " + builder.getCommandString());
+ if(builder.getEnv().size() > 0)
+ info("Environment variables: " + builder.getEnv());
+ info("Working directory: " + builder.getWorkingDir());
+
+ File [] propFiles = initPropsFiles( );
+ //System.err.println("outputfile=" + propFiles[1]);
+
+ boolean success = false;
+ this.process = builder.build();
+ try {
+ this.process.run();
+ success = true;
+ }
+ catch (Exception e) {
+ for (File file: propFiles) if (file != null && file.exists()) file.delete();
+ throw new RuntimeException (e);
+ }
+ finally {
+ this.process = null;
+ info("Process completed " + (success? "successfully" : "unsuccessfully") + " in " + ((System.currentTimeMillis() - startMs) / 1000) + " seconds.");
+ }
+
+ // Get the output properties from this job.
+ generateProperties(propFiles[1]);
+
+ for (File file: propFiles)
+ if (file != null && file.exists()) file.delete();
+ }
+
+
+
+ /**
+ * This gives access to the process builder used to construct the process. An overriding class can use this to
+ * add to the command being executed.
+ */
+ protected AzkabanProcessBuilder getBuilder() {
+ return this.builder;
+ }
+
+ @Override
+ public void cancel() throws InterruptedException {
+ if(process == null)
+ throw new IllegalStateException("Not started.");
+ boolean killed = process.softKill(KILL_TIME_MS, TimeUnit.MILLISECONDS);
+ if(!killed) {
+ warn("Kill with signal TERM failed. Killing with KILL signal.");
+ process.hardKill();
+ }
+ }
+
+ @Override
+ public double getProgress() {
+ return process != null && process.isComplete()? 1.0 : 0.0;
+ }
+
+ private void appendProps(Set<String> suppressed) {
+ AzkabanProcessBuilder builder = this.getBuilder();
+ Props props = getProps();
+ for(String key: props.getKeySet())
+ if(!suppressed.contains(key))
+ builder.addArg("--" + key, props.get(key));
+ }
+
+
+}
View
@@ -1,14 +0,0 @@
-?M ../azkaban-common/src/java/azkaban/common/web/JsonSequenceFileViewer.java
-?M ../azkaban/src/java/azkaban/app/JobManager.java
-xxUU ../azkaban/src/java/azkaban/app/Scheduler.java
- M ../test/jobs/lib/azkaban-common-0.04.jar
- M ../test/jobs/lib/azkaban-tests.jar
- M ../test/jobs/system.properties
- M ../test/src/java/azkaban/test/WordCountGrid.java
- M ../test/src/java/azkaban/test/WordCountLocal.java
-?? ../azkaban/src/bin/
-?? ../merge.result
-?? my.diff
-?? ../test/bin/
-?? ../test/jobs/lib/commons-io-1.4.jar
-?? ../test/logs/
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Oops, something went wrong.

0 comments on commit 0985e70

Please sign in to comment.