Skip to content

Commit

Permalink
(1) Add some logging about when Azkaban tries to send an email. (2) A…
Browse files Browse the repository at this point in the history
…dd a new job-runner servlet that will run a job based on a curl statement or something. (3) Try to set up tar extraction, but found out it's harder than it really should be in java. Leaving skeleton of code that will allow for it for now.
  • Loading branch information
cheddar committed Oct 15, 2010
1 parent 956de97 commit 9b1f51e
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 26 deletions.
2 changes: 2 additions & 0 deletions azkaban/src/java/azkaban/app/AzkabanApp.java
Expand Up @@ -5,6 +5,7 @@
import azkaban.jobs.AzkabanCommandLine;
import azkaban.web.AzkabanServletContextListener;
import azkaban.web.JobManagerServlet;
import azkaban.web.JobRunnerServlet;
import azkaban.web.LogServlet;
import azkaban.web.pages.ExecutionHistoryServlet;
import azkaban.web.pages.HdfsBrowserServlet;
Expand Down Expand Up @@ -99,6 +100,7 @@ public static void main(String[] arguments) throws Exception {
ExecutionHistoryServlet.class.getName());
servlets.addServlet("Job Manager", "/api/jobs", JobManagerServlet.class.getName());
servlets.addServlet("Job Upload", "/job-upload/*", JobUploadServlet.class.getName());
servlets.addServlet("Job Runner", "/job-runner/*", JobRunnerServlet.class.getName());
servlets.addServlet("HDFS Browser", "/fs/*", HdfsBrowserServlet.class.getName());

try {
Expand Down
23 changes: 23 additions & 0 deletions azkaban/src/java/azkaban/app/PropsUtils.java
Expand Up @@ -19,11 +19,14 @@
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import azkaban.common.utils.Props;
import azkaban.common.utils.UndefinedPropertyException;
import azkaban.flow.ExecutableFlow;
import org.joda.time.DateTime;

public class PropsUtils {

Expand Down Expand Up @@ -147,4 +150,24 @@ public static Props resolveProps(Props props) {

return resolvedProps;
}

public static Props produceParentProperties(final ExecutableFlow flow) {
Props parentProps = new Props();

parentProps.put("azkaban.flow.id", flow.getId());
parentProps.put("azkaban.flow.uuid", UUID.randomUUID().toString());

DateTime loadTime = new DateTime();

parentProps.put("azkaban.flow.start.timestamp", loadTime.toString());
parentProps.put("azkaban.flow.start.year", loadTime.toString("yyyy"));
parentProps.put("azkaban.flow.start.month", loadTime.toString("MM"));
parentProps.put("azkaban.flow.start.day", loadTime.toString("dd"));
parentProps.put("azkaban.flow.start.hour", loadTime.toString("HH"));
parentProps.put("azkaban.flow.start.minute", loadTime.toString("mm"));
parentProps.put("azkaban.flow.start.seconds", loadTime.toString("ss"));
parentProps.put("azkaban.flow.start.milliseconds", loadTime.toString("SSS"));
parentProps.put("azkaban.flow.start.timezone", loadTime.toString("ZZZZ"));
return parentProps;
}
}
26 changes: 5 additions & 21 deletions azkaban/src/java/azkaban/app/Scheduler.java
Expand Up @@ -28,7 +28,6 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -481,6 +480,8 @@ private void sendErrorEmail(ScheduledJob job,
if((emailList == null || emailList.isEmpty()) && _jobFailureEmail != null)
emailList = Arrays.asList(_jobFailureEmail);

logger.info(String.format("Sending error email for job[%s] from %s to %s", job.getId(), senderAddress, emailList));

if(emailList != null && _mailman != null) {
try {

Expand Down Expand Up @@ -556,6 +557,8 @@ private void sendSuccessEmail(ScheduledJob job,
emailList = Arrays.asList(_jobSuccessEmail);
}

logger.info(String.format("Sending success email for job[%s] from %s to %s", job.getId(), senderAddress, emailList));

if(emailList != null && _mailman != null) {
try {
_mailman.sendEmailIfPossible(senderAddress,
Expand Down Expand Up @@ -691,7 +694,7 @@ public void run() {
senderAddress = desc.getSenderEmail();
final String senderEmail = senderAddress;

final Props parentProps = produceParentProperties(flowToRun);
final Props parentProps = PropsUtils.produceParentProperties(flowToRun);

// mark the job as executing
_scheduled.remove(_scheduledJob.getId());
Expand Down Expand Up @@ -869,23 +872,4 @@ public void completed(Status status) {
}
}

private Props produceParentProperties(final ExecutableFlow flow) {
Props parentProps = new Props();

parentProps.put("azkaban.flow.id", flow.getId());
parentProps.put("azkaban.flow.uuid", UUID.randomUUID().toString());

DateTime loadTime = new DateTime();

parentProps.put("azkaban.flow.start.timestamp", loadTime.toString());
parentProps.put("azkaban.flow.start.year", loadTime.toString("yyyy"));
parentProps.put("azkaban.flow.start.month", loadTime.toString("MM"));
parentProps.put("azkaban.flow.start.day", loadTime.toString("dd"));
parentProps.put("azkaban.flow.start.hour", loadTime.toString("HH"));
parentProps.put("azkaban.flow.start.minute", loadTime.toString("mm"));
parentProps.put("azkaban.flow.start.seconds", loadTime.toString("ss"));
parentProps.put("azkaban.flow.start.milliseconds", loadTime.toString("SSS"));
parentProps.put("azkaban.flow.start.timezone", loadTime.toString("ZZZZ"));
return parentProps;
}
}
23 changes: 22 additions & 1 deletion azkaban/src/java/azkaban/web/JobManagerServlet.java
Expand Up @@ -90,7 +90,7 @@ protected void doPut(HttpServletRequest request, HttpServletResponse response) t

FileItem item = (FileItem) params.get("file");
String deployPath = (String) params.get("path");
File jobDir = unzipFile(item);
File jobDir = extractFile(item);

jobManager.deployJobDir(jobDir.getAbsolutePath(), deployPath);
} catch (Exception e) {
Expand All @@ -105,6 +105,19 @@ protected void doPut(HttpServletRequest request, HttpServletResponse response) t
setMessagedUrl(response, redirectSuccess, "Installation Succeeded");
}

private File extractFile(FileItem item) throws IOException, ServletException {
final String contentType = item.getContentType();
if (contentType.startsWith("application/zip")) {
return unzipFile(item);
}

if (contentType.startsWith("application/x-tar")) {
return untarFile(item);
}

throw new ServletException(String.format("Unsupported file type[%s].", contentType));
}

private void setMessagedUrl(HttpServletResponse response, String redirectUrl, String message) throws IOException {
String url = redirectUrl + "/" + message;
response.sendRedirect(response.encodeRedirectUrl(url));
Expand All @@ -124,4 +137,12 @@ private File unzipFile(FileItem item) throws ServletException, IOException {
return unzipped;
}

private File untarFile(FileItem item) throws IOException, ServletException {
File extractionPath = Utils.createTempDir(new File(_tempDir));

if (true)
throw new ServletException("Unsupported file type [tar].");

return extractionPath;
}
}
74 changes: 74 additions & 0 deletions azkaban/src/java/azkaban/web/JobRunnerServlet.java
@@ -0,0 +1,74 @@
package azkaban.web;

import azkaban.app.PropsUtils;
import azkaban.common.utils.Props;
import azkaban.flow.ExecutableFlow;
import azkaban.flow.Flow;
import azkaban.flow.FlowExecutionHolder;
import azkaban.flow.FlowManager;
import azkaban.util.JSONToJava;
import org.apache.commons.io.IOUtils;
import org.json.JSONException;
import org.json.JSONObject;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.Map;

/**
*/
public class JobRunnerServlet extends AbstractAzkabanServlet
{
private static final JSONToJava jsonConverter = new JSONToJava();

@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
String jobName = req.getPathInfo();

if (jobName == null || jobName.length() < 2) {
resp.sendError(400, "Path must end with job name to run.");
return;
}

jobName = jobName.substring(1); // Remove the "/" prefix

final FlowManager flowManager = getApplication().getAllFlows();
ExecutableFlow flowToRun = flowManager.createNewExecutableFlow(jobName);

if (flowToRun == null) {
resp.sendError(404, String.format("Unknown job[%s]", jobName));
return;
}

Props parentProps = PropsUtils.produceParentProperties(flowToRun);
final String contentType = req.getContentType();
if (contentType != null) {
if (contentType.startsWith("application/json")) {
JSONObject obj = null;
try {
obj = new JSONObject(IOUtils.toString(req.getInputStream()));
} catch (JSONException e) {
resp.sendError(400, String.format("Bad JSON object."));
}

Map propertyMap = jsonConverter.apply(obj);
parentProps = new Props(parentProps, propertyMap);
}
else {
resp.sendError(400, String.format("Cannot handle content type[%s]", contentType));
return;
}
}

getApplication().getScheduler().scheduleNow(
new FlowExecutionHolder(
flowToRun,
parentProps
)
);

resp.setStatus(200);
}
}
18 changes: 14 additions & 4 deletions azkaban/web/web.xml
Expand Up @@ -40,6 +40,11 @@
<servlet-class>azkaban.web.JobManagerServlet</servlet-class>
</servlet>

<servlet>
<servlet-name>job-runner</servlet-name>
<servlet-class>azkaban.web.JobRunnerServlet</servlet-class>
</servlet>

<servlet>
<servlet-name>job-upload</servlet-name>
<servlet-class>azkaban.web.pages.JobUploadServlet</servlet-class>
Expand All @@ -60,10 +65,15 @@
<url-pattern>/job</url-pattern>
</servlet-mapping>

<servlet-mapping>
<servlet-name>job-upload</servlet-name>
<url-pattern>/job-upload/*</url-pattern>
</servlet-mapping>
<servlet-mapping>
<servlet-name>job-upload</servlet-name>
<url-pattern>/job-upload/*</url-pattern>
</servlet-mapping>

<servlet-mapping>
<servlet-name>job-runner</servlet-name>
<url-pattern>/job-runner/*</url-pattern>
</servlet-mapping>

<servlet-mapping>
<servlet-name>logs</servlet-name>
Expand Down

0 comments on commit 9b1f51e

Please sign in to comment.