Skip to content
This repository has been archived by the owner on Jan 6, 2018. It is now read-only.

Commit

Permalink
CLOSES OOZIE:7 Log retrieval for large number of actions particular c…
Browse files Browse the repository at this point in the history
…oorinator actions
  • Loading branch information
kirann authored and Mohammad Kamrul Islam committed Aug 19, 2011
1 parent 575d71c commit 9591abb
Show file tree
Hide file tree
Showing 11 changed files with 345 additions and 73 deletions.
34 changes: 31 additions & 3 deletions client/src/main/java/org/apache/oozie/cli/OozieCLI.java
Expand Up @@ -19,6 +19,7 @@
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
Expand Down Expand Up @@ -94,6 +95,7 @@ public class OozieCLI {
public static final String RERUN_OPTION = "rerun";
public static final String INFO_OPTION = "info";
public static final String LOG_OPTION = "log";
public static final String LOG_ACTION_OPTION = "action";
public static final String DEFINITION_OPTION = "definition";
public static final String CONFIG_CONTENT_OPTION = "configcontent";

Expand Down Expand Up @@ -199,6 +201,8 @@ protected Options createJobOptions() {
Option len = new Option(LEN_OPTION, true, "number of actions (default TOTAL ACTIONS, requires -info)");
Option localtime = new Option(LOCAL_TIME_OPTION, false, "use local time (default GMT)");
Option log = new Option(LOG_OPTION, true, "job log");
Option log_action = new Option(LOG_ACTION_OPTION, true,
"coordinator log retrieval on action ids (requires -log)");
Option definition = new Option(DEFINITION_OPTION, true, "job definition");
Option config_content = new Option(CONFIG_CONTENT_OPTION, true, "job configuration");
Option verbose = new Option(VERBOSE_OPTION, false, "verbose mode");
Expand Down Expand Up @@ -242,6 +246,7 @@ protected Options createJobOptions() {
jobOptions.addOption(rerun_coord);
jobOptions.addOption(rerun_refresh);
jobOptions.addOption(rerun_nocleanup);
jobOptions.addOption(log_action);
jobOptions.addOptionGroup(actions);
return jobOptions;
}
Expand Down Expand Up @@ -695,7 +700,30 @@ else if (commandLine.getOptionValue(INFO_OPTION).contains("-W@")) {
}
}
else if (options.contains(LOG_OPTION)) {
System.out.println(wc.getJobLog(commandLine.getOptionValue(LOG_OPTION)));
PrintStream ps = System.out;
if (commandLine.getOptionValue(LOG_OPTION).contains("-C")) {
String logRetrievalScope = null;
String logRetrievalType = null;
if (options.contains(LOG_ACTION_OPTION)) {
logRetrievalType = RestConstants.JOB_LOG_ACTION;
logRetrievalScope = commandLine.getOptionValue(LOG_ACTION_OPTION);
}
try {
wc.getJobLog(commandLine.getOptionValue(LOG_OPTION), logRetrievalType, logRetrievalScope, ps);
}
finally {
ps.close();
}
}
else {
if (!options.contains(LOG_ACTION_OPTION)) {
wc.getJobLog(commandLine.getOptionValue(LOG_OPTION), null, null, ps);
}
else {
throw new OozieCLIException("Invalid options provided for log retrieval. " + LOG_ACTION_OPTION
+ " is valid only for coordinator job log retrieval");
}
}
}
else if (options.contains(DEFINITION_OPTION)) {
System.out.println(wc.getJobDefinition(commandLine.getOptionValue(DEFINITION_OPTION)));
Expand Down Expand Up @@ -1035,8 +1063,8 @@ private void printBundleJobs(List<BundleJob> jobs, boolean localtime, boolean ve

for (BundleJob job : jobs) {
System.out.println(String.format(BUNDLE_JOBS_FORMATTER, maskIfNull(job.getId()), maskIfNull(job
.getAppName()), job.getStatus(), maskDate(job.getKickoffTime(), localtime),
maskDate(job.getCreatedTime(), localtime), maskIfNull(job.getUser()), maskIfNull(job.getGroup())));
.getAppName()), job.getStatus(), maskDate(job.getKickoffTime(), localtime), maskDate(job
.getCreatedTime(), localtime), maskIfNull(job.getUser()), maskIfNull(job.getGroup())));
System.out.println(RULER);
}
}
Expand Down
86 changes: 77 additions & 9 deletions client/src/main/java/org/apache/oozie/client/OozieClient.java
Expand Up @@ -16,8 +16,10 @@

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.Reader;
import java.net.HttpURLConnection;
import java.net.URL;
Expand Down Expand Up @@ -125,7 +127,7 @@ public class OozieClient {
public static enum SYSTEM_MODE {
NORMAL, NOWEBSERVICE, SAFEMODE
};

/**
* debugMode =0 means no debugging. > 0 means debugging on.
*/
Expand Down Expand Up @@ -372,8 +374,8 @@ public T call() throws OozieClientException {
return call(conn);
}
else {
System.out
.println("Option not supported in target server. Supported only on Oozie-2.0 or greater. Use 'oozie help' for details");
System.out.println("Option not supported in target server. Supported only on Oozie-2.0 or greater."
+ " Use 'oozie help' for details");
throw new OozieClientException(OozieClientException.UNSUPPORTED_VERSION, new Exception());
}
}
Expand Down Expand Up @@ -687,11 +689,28 @@ public String getJobLog(String jobId) throws OozieClientException {
return new JobLog(jobId).call();
}

private class JobLog extends JobMetadata {
/**
* Get the log of a job.
*
* @param jobId job Id.
* @param logRetrievalType Based on which filter criteria the log is retrieved
* @param logRetrievalScope Value for the retrieval type
* @param ps Printstream of command line interface
* @throws OozieClientException thrown if the job info could not be retrieved.
*/
public void getJobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps)
throws OozieClientException {
new JobLog(jobId, logRetrievalType, logRetrievalScope, ps).call();
}

private class JobLog extends JobMetadata {
JobLog(String jobId) {
super(jobId, RestConstants.JOB_SHOW_LOG);
}

JobLog(String jobId, String logRetrievalType, String logRetrievalScope, PrintStream ps) {
super(jobId, logRetrievalType, logRetrievalScope, RestConstants.JOB_SHOW_LOG, ps);
}
}

/**
Expand All @@ -713,23 +732,73 @@ private class JobDefinition extends JobMetadata {
}

private class JobMetadata extends ClientCallable<String> {
PrintStream printStream;

JobMetadata(String jobId, String metaType) {
super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
metaType));
}

JobMetadata(String jobId, String logRetrievalType, String logRetrievalScope, String metaType, PrintStream ps) {
super("GET", RestConstants.JOB, notEmpty(jobId, "jobId"), prepareParams(RestConstants.JOB_SHOW_PARAM,
metaType, RestConstants.JOB_LOG_TYPE_PARAM, logRetrievalType, RestConstants.JOB_LOG_SCOPE_PARAM,
logRetrievalScope));
printStream = ps;
}

@Override
protected String call(HttpURLConnection conn) throws IOException, OozieClientException {
String returnVal = null;
if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) {

String output = getReaderAsString(new InputStreamReader(conn.getInputStream()), -1);
return output;
InputStream is = conn.getInputStream();
InputStreamReader isr = new InputStreamReader(is);
try {
if (printStream != null) {
sendToOutputStream(isr, -1);
}
else {
returnVal = getReaderAsString(isr, -1);
}
}
finally {
isr.close();
}
}
else {
handleError(conn);
}
return null;
return returnVal;
}

/**
* Output the log to command line interface
*
* @param reader reader to read into a string.
* @param maxLen max content length allowed, if -1 there is no limit.
* @param ps Printstream of command line interface
* @throws IOException
*/
private void sendToOutputStream(Reader reader, int maxLen) throws IOException {
if (reader == null) {
throw new IllegalArgumentException("reader cannot be null");
}
StringBuilder sb = new StringBuilder();
char[] buffer = new char[2048];
int read;
int count = 0;
int noOfCharstoFlush = 1024;
while ((read = reader.read(buffer)) > -1) {
count += read;
if ((maxLen > -1) && (count > maxLen)) {
break;
}
sb.append(buffer, 0, read);
if (sb.length() > noOfCharstoFlush) {
printStream.print(sb.toString());
sb = new StringBuilder("");
}
}
printStream.print(sb.toString());
}

/**
Expand All @@ -745,7 +814,6 @@ private String getReaderAsString(Reader reader, int maxLen) throws IOException {
if (reader == null) {
throw new IllegalArgumentException("reader cannot be null");
}

StringBuffer sb = new StringBuffer();
char[] buffer = new char[2048];
int read;
Expand Down
Expand Up @@ -59,7 +59,7 @@ public interface RestConstants {
public static final String JOB_ACTION_RERUN = "rerun";

public static final String JOB_COORD_ACTION_RERUN = "coord-rerun";

public static final String JOB_BUNDLE_ACTION_RERUN = "bundle-rerun";

public static final String JOB_SHOW_PARAM = "show";
Expand All @@ -73,9 +73,9 @@ public interface RestConstants {
public static final String JOB_SHOW_DEFINITION = "definition";

public static final String JOB_BUNDLE_RERUN_COORD_SCOPE_PARAM = "coord-scope";

public static final String JOB_BUNDLE_RERUN_DATE_SCOPE_PARAM = "date-scope";

public static final String JOB_COORD_RERUN_TYPE_PARAM = "type";

public static final String JOB_COORD_RERUN_DATE = "date";
Expand All @@ -88,6 +88,12 @@ public interface RestConstants {

public static final String JOB_COORD_RERUN_NOCLEANUP_PARAM = "nocleanup";

public static final String JOB_LOG_ACTION = "action";

public static final String JOB_LOG_SCOPE_PARAM = "scope";

public static final String JOB_LOG_TYPE_PARAM = "type";

public static final String JOBS_FILTER_PARAM = "filter";

public static final String JOBS_EXTERNAL_ID_PARAM = "external-id";
Expand Down
16 changes: 9 additions & 7 deletions core/src/main/java/org/apache/oozie/BaseEngine.java
Expand Up @@ -51,7 +51,7 @@

public abstract class BaseEngine {
public static final String USE_XCOMMAND = "oozie.useXCommand";

protected String user;
protected String authToken;

Expand All @@ -74,7 +74,9 @@ protected String getAuthToken() {
}

/**
* Submit a job. <p/> It validates configuration properties.
* Submit a job.
* <p/>
* It validates configuration properties.
*
* @param conf job configuration.
* @param startJob indicates if the job should be started or not.
Expand Down Expand Up @@ -133,7 +135,6 @@ protected String getAuthToken() {
*/
public abstract void reRun(String jobId, Configuration conf) throws BaseEngineException;


/**
* Return the info about a wf job.
*
Expand Down Expand Up @@ -190,20 +191,21 @@ protected String getAuthToken() {
* @param writer writer to stream the log to.
* @throws IOException thrown if the log cannot be streamed.
* @throws BaseEngineException thrown if there is error in getting the Workflow/Coordinator Job Information for
* jobId.
* jobId.
*/
public abstract void streamLog(String jobId, Writer writer) throws IOException, BaseEngineException;

/**
* Return the workflow Job ID for an external ID. <p/> This is reverse lookup for recovery purposes.
* Return the workflow Job ID for an external ID.
* <p/>
* This is reverse lookup for recovery purposes.
*
* @param externalId external ID provided at job submission time.
* @return the associated workflow job ID if any, <code>null</code> if none.
* @throws BaseEngineException thrown if the lookup could not be done.
*/
public abstract String getJobIdForExternalId(String externalId) throws BaseEngineException;

public abstract String dryrunSubmit(Configuration conf, boolean startJob)
throws BaseEngineException;
public abstract String dryrunSubmit(Configuration conf, boolean startJob) throws BaseEngineException;

}

0 comments on commit 9591abb

Please sign in to comment.