Skip to content

Commit

Permalink
Few changes.
Browse files Browse the repository at this point in the history
-Reduced the thread #
-made traversing trees more efficient (hopefully should speed up restart on large flows)
-Re-did the UI due to very slow tree performances with custom UI built on JQuery.
  • Loading branch information
rbpark committed Feb 3, 2011
1 parent a822b61 commit c6cbfa2
Show file tree
Hide file tree
Showing 50 changed files with 5,409 additions and 210 deletions.
2 changes: 1 addition & 1 deletion azkaban/src/java/azkaban/app/AzkabanApplication.java
Expand Up @@ -149,7 +149,7 @@ public AzkabanApplication(List<File> jobDirs, File logDir, File tempDir, boolean

String failureEmail = defaultProps.getString("job.failure.email", null);
String successEmail = defaultProps.getString("job.success.email", null);
int schedulerThreads = defaultProps.getInt("scheduler.threads", 50);
int schedulerThreads = defaultProps.getInt("scheduler.threads", 20);
_instanceName = defaultProps.getString(INSTANCE_NAME, "");

final File initialJobDir = _jobDirs.get(0);
Expand Down
40 changes: 40 additions & 0 deletions azkaban/src/java/azkaban/jobs/Executor.java
@@ -0,0 +1,40 @@
package azkaban.jobs;

import azkaban.common.utils.Props;

public interface Executor {

/**
* Returns a unique(should be checked in xml) string name/id for the Job.
*
* @return
*/
public String getId();

/**
* Run the job. In general this method can only be run once. Must either
* succeed or throw an exception.
*/
public void run() throws Exception;

/**
* Best effort attempt to cancel the job.
*
* @throws Exception If cancel fails
*/
public void cancel() throws Exception;

/**
* Returns a progress report between [0 - 1.0] to indicate the percentage
* complete
*
* @throws Exception If getting progress fails
*/
public double getProgress() throws Exception;

/**
* Get the generated properties from this job.
* @return
*/
public Props getJobGeneratedProperties();
}
Expand Up @@ -57,8 +57,6 @@ public class LocalFileScheduleLoader implements ScheduleLoader {
private File scheduleFile;
private File backupScheduleFile;

private JSONUtils jsonUtils = new JSONUtils();

public LocalFileScheduleLoader(File schedule, File backupSchedule) {
this.scheduleFile = schedule;
this.backupScheduleFile = backupSchedule;
Expand Down Expand Up @@ -107,7 +105,7 @@ public void saveSchedule(List<ScheduledJob> schedule) {

try {
FileWriter writer = new FileWriter(scheduleFile);
writer.write(jsonUtils.toJSONString(obj, 4));
writer.write(JSONUtils.toJSONString(obj, 4));
writer.flush();
} catch (Exception e) {
throw new RuntimeException("Error saving flow file", e);
Expand All @@ -129,7 +127,7 @@ private List<ScheduledJob> loadFromFile(File schedulefile)

HashMap<String, Object> schedule;
try {
schedule = (HashMap<String,Object>)jsonUtils.fromJSONStream(reader);
schedule = (HashMap<String,Object>)JSONUtils.fromJSONStream(reader);
} catch (Exception e) {
schedule = loadLegacyFile(schedulefile);
}
Expand Down
8 changes: 3 additions & 5 deletions azkaban/src/java/azkaban/scheduler/ScheduleManager.java
Expand Up @@ -229,16 +229,14 @@ public void run() {
} catch (JobExecutionException e) {
logger.info("Could not run job. " + e.getMessage());
}
schedule.remove(job);

// Immediately reschedule if it's possible. Let the execution manager
// handle any duplicate runs.
if (runningJob.updateTime()) {
schedule.add(runningJob);
saveSchedule();
}
else {
// No need to keep it in the schedule.
removeScheduledJob(runningJob);
}
saveSchedule();
}
else {
// wait until job run
Expand Down
19 changes: 19 additions & 0 deletions azkaban/src/java/azkaban/util/Pair.java
@@ -0,0 +1,19 @@
package azkaban.util;

public class Pair<F, S> {
private final F first;
private final S second;

public Pair(F first, S second) {
this.first = first;
this.second = second;
}

public F getFirst() {
return first;
}

public S getSecond() {
return second;
}
}
47 changes: 39 additions & 8 deletions azkaban/src/java/azkaban/util/json/JSONUtils.java
Expand Up @@ -39,9 +39,9 @@
public class JSONUtils {

/**
* The constructor.
* The constructor. Cannot construct this class.
*/
public JSONUtils() {
private JSONUtils() {
}

/**
Expand All @@ -54,7 +54,7 @@ public JSONUtils() {
* @return
* @throws Exception
*/
public Map<String, Object> fromJSONStream(Reader reader) throws Exception {
public static Map<String, Object> fromJSONStream(Reader reader) throws Exception {
JSONObject jsonObj = new JSONObject(new JSONTokener(reader));
Map<String, Object> results = createObject(jsonObj);

Expand All @@ -70,7 +70,7 @@ public Map<String, Object> fromJSONStream(Reader reader) throws Exception {
* @return
* @throws Exception
*/
public Map<String, Object> fromJSONString(String str) throws Exception {
public static Map<String, Object> fromJSONString(String str) throws Exception {
JSONObject jsonObj = new JSONObject(str);
Map<String, Object> results = createObject(jsonObj);
return results;
Expand All @@ -83,7 +83,7 @@ public Map<String, Object> fromJSONString(String str) throws Exception {
* @return
*/
@SuppressWarnings("unchecked")
private Map<String, Object> createObject(JSONObject obj) {
private static Map<String, Object> createObject(JSONObject obj) {
LinkedHashMap<String, Object> map = new LinkedHashMap<String, Object>();

Iterator<String> iterator = obj.keys();
Expand Down Expand Up @@ -116,7 +116,7 @@ else if (value instanceof JSONObject) {
* @param obj
* @return
*/
private List<Object> createArray(JSONArray array) {
private static List<Object> createArray(JSONArray array) {
ArrayList<Object> list = new ArrayList<Object>();
for (int i = 0; i < array.length(); ++i) {
Object value = null;
Expand Down Expand Up @@ -145,7 +145,38 @@ else if (value instanceof JSONObject) {
* @param obj
* @return
*/
public String toJSONString(Map<String, Object> obj) {
public static String toJSONString(List<?> list) {
JSONArray jsonList = new JSONArray(list);
try {
return jsonList.toString();
} catch (Exception e) {
return "";
}
}

/**
* Creates a json string from Map/List/Primitive object.
*
* @param obj
* @parm indent
* @return
*/
public static String toJSONString(List<?> list, int indent) {
JSONArray jsonList = new JSONArray(list);
try {
return jsonList.toString(indent);
} catch (Exception e) {
return "";
}
}

/**
* Creates a json string from Map/List/Primitive object.
*
* @param obj
* @return
*/
public static String toJSONString(Map<String, Object> obj) {
JSONObject jsonObj = new JSONObject(obj);
try {
return jsonObj.toString();
Expand All @@ -161,7 +192,7 @@ public String toJSONString(Map<String, Object> obj) {
* @param indent
* @return
*/
public String toJSONString(Map<String, Object> obj, int indent) {
public static String toJSONString(Map<String, Object> obj, int indent) {
JSONObject jsonObj = new JSONObject(obj);
try {
return jsonObj.toString(4);
Expand Down
4 changes: 1 addition & 3 deletions azkaban/src/java/azkaban/web/ApiServlet.java
Expand Up @@ -21,8 +21,6 @@ public class ApiServlet extends AbstractAzkabanServlet {
*/
private static final long serialVersionUID = 246975794895355127L;
private static final Logger logger = Logger.getLogger(ApiServlet.class.getName());

private static JSONUtils jsonUtils = new JSONUtils();

@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException,
Expand Down Expand Up @@ -65,7 +63,7 @@ private void handleRunJob(HttpServletRequest req, HttpServletResponse resp) thro
results.put("error", e.getMessage());
}

writer.print(jsonUtils.toJSONString(results));
writer.print(JSONUtils.toJSONString(results));
writer.flush();

writer.close();
Expand Down

0 comments on commit c6cbfa2

Please sign in to comment.