Skip to content

Commit

Permalink
Merge pull request #189 from Khushbu27/0.4.0.1
Browse files Browse the repository at this point in the history
cleanup and dispose part will be supported in the fixing for issue #190.
  • Loading branch information
Yaguang Wang committed Jul 10, 2014
2 parents 9f3eb00 + 14f2809 commit 9a21b2c
Showing 1 changed file with 120 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,14 @@ public Workload constructWorkload(HttpServletRequest req,
objectSizesLength = objectSizes.length;

// Adding multiple init stages
addInitStages(workflow, objectSizesLength, containers);
String init_workers = num_of_drivers;
addInitStages(workflow, objectSizesLength, containers, objectSizes,
objects, isRange, unit, init_workers);

// Adding multiple prepare stages

addPrepareStages(workflow, objectSizesLength, objectSizes, objects,
containers, isRange, unit);
containers, isRange, unit, workers);

// Adding multiple normal stages
addNormalStages(workflow, objectSizesLength, objectSizes, objects,
Expand All @@ -211,29 +214,43 @@ public Workload constructWorkload(HttpServletRequest req,
containers, isRange, unit);

// multiple dispose stages
addDisposeStages(workflow, objectSizesLength, containers);
addDisposeStages(workflow, objectSizesLength, objectSizes, containers, isRange, unit);

workload.setWorkflow(workflow);

return workload;
}

private void addDisposeStages(Workflow workflow, int objectSizesLength,
String[] containers) {

String[] objectSizes, String[] containers,
boolean isRange, String unit) {

int previousContainerValue = 0;
for (int i = 0; i < objectSizesLength; i++) {
for (int j = 0; j < containers.length; j++) {
String containerString = containers[j];
String from_container = previousContainerValue
+ Integer.valueOf(containerString) + "";
String to_container = Integer.valueOf(from_container)
+ Integer.valueOf(containerString) - 1 + "";
previousContainerValue = Integer.valueOf(to_container);
Stage stage = createWorkstage(workflow, "dispose", "dispose", "1",
"containers=r(" + from_container + "," + to_container
+ ")");
workflow.addStage(stage);

String containerString = containers[j];
String from_container = previousContainerValue + Integer.valueOf(containerString) + "";
String to_container = Integer.valueOf(from_container) + Integer.valueOf(containerString) - 1 + "";
previousContainerValue = Integer.valueOf(to_container);

String sizeString;
if (isRange)
sizeString = "(" + objectSizes[0] + "," + objectSizes[1] + ")" + unit;
else
sizeString = "(" + objectSizes[i] + ")" + unit;

String numworkers = num_of_drivers;
int numcontainers = (Integer.valueOf(to_container) - Integer.valueOf(from_container) + 1);

String stageName = "w" + sizeString + "_c" + numcontainers + "_dispose_" + numworkers;

String configLine = "containers=r(" + from_container + "," + to_container + ")";

Stage stage = createAWorkstage(workflow, stageName, "dispose", numworkers + "", configLine);

workflow.addStage(stage);

}
}
}
Expand All @@ -248,32 +265,43 @@ private void addCleanupStages(Workflow workflow, int objectSizesLength,
for (int j = 0; j < containers.length; j++) {
for (int k = 0; k < objects.length; k++) {
String containerString = containers[j];
String from_container = previousContainerValue
+ Integer.valueOf(containerString) + "";
String to_container = Integer.valueOf(from_container)
+ Integer.valueOf(containerString) - 1 + "";
String from_container = previousContainerValue + Integer.valueOf(containerString) + "";
String to_container = Integer.valueOf(from_container) + Integer.valueOf(containerString) - 1 + "";
previousContainerValue = Integer.valueOf(to_container);

String from_object = previousObjectValue + 1 + "";
String to_object = Integer.valueOf(from_object)
+ Integer.valueOf(objects[k]) - 1 + "";
String to_object = Integer.valueOf(from_object) + Integer.valueOf(objects[k]) - 1 + "";
previousObjectValue = Integer.valueOf(to_object);

Stage stage = createWorkstage(workflow, "cleanup",
"cleanup", "1", "containers=r(" + from_container
+ "," + to_container + ");objects=r("
+ from_object + "," + to_object + ")");

String sizeString;
if (isRange)
sizeString = "(" + objectSizes[0] + "," + objectSizes[1] + ")" + unit;
else
sizeString = "(" + objectSizes[i] + ")" + unit;

String numworkers = num_of_drivers;
int numcontainers = (Integer.valueOf(to_container) - Integer.valueOf(from_container) + 1);
int numobjects = (Integer.valueOf(to_object) - Integer.valueOf(from_object) + 1);

String stageName = "w" + sizeString + "_c" + numcontainers + "_o" + numobjects + "_cleanup_" + numworkers;

String configLine = "containers=r(" + from_container + ","
+ to_container + ");objects=r(" + from_object + ","
+ to_object + ")";

Stage stage = createAWorkstage(workflow, stageName, "cleanup", numworkers + "", configLine);

workflow.addStage(stage);

}
}
}

}

private void addNormalStages(Workflow workflow, int objectSizesLength,
String[] objectSizes, String[] objects, String[] containers,
boolean isRange, String unit, String[] workers,
String[] rwdRatios) {
boolean isRange, String unit, String[] workers, String[] rwdRatios) {
int previousContainerValue = 0;
int previousObjectValue = 0;
for (int i = 0; i < objectSizesLength; i++) {
Expand All @@ -285,7 +313,6 @@ private void addNormalStages(Workflow workflow, int objectSizesLength,
+ objectSizes[1] + ")" + unit;
else
sizeString = "(" + objectSizes[i] + ")" + unit;

String containerString = containers[j];
String from_container = previousContainerValue
+ Integer.valueOf(containerString) + "";
Expand All @@ -297,19 +324,19 @@ private void addNormalStages(Workflow workflow, int objectSizesLength,
String to_object = Integer.valueOf(from_object)
+ Integer.valueOf(objects[k]) - 1 + "";
previousObjectValue = Integer.valueOf(to_object);

for (int w = 0; w < workers.length; w++) {
for (int r=0 ; r < rwdRatios.length; r++)
{
String[] readWriteDeleteRatios = rwdRatios[r].split(",");
for (int r = 0; r < rwdRatios.length; r++) {
String[] readWriteDeleteRatios = rwdRatios[r]
.split(",");
String readRatio = readWriteDeleteRatios[0];
String writeRatio = readWriteDeleteRatios[1];
String deleteRatio = readWriteDeleteRatios[2];

Stage stage = createNormalWorkstage(workflow, workers[w],
runtime, readRatio, writeRatio, deleteRatio,
isRange, from_container, to_container,
from_object, to_object, sizeString);

Stage stage = createNormalWorkstage(workflow,
workers[w], runtime, readRatio, writeRatio,
deleteRatio, isRange, from_container,
to_container, from_object, to_object,
sizeString);
workflow.addStage(stage);
}
}
Expand All @@ -319,22 +346,16 @@ private void addNormalStages(Workflow workflow, int objectSizesLength,

}



private void addPrepareStages(Workflow workflow, int objectSizesLength,
String[] objectSizes, String[] objects, String[] containers,
boolean isRange, String unit) {

boolean isRange, String unit, String[] workers) {
int previousContainerValue = 0;
int previousObjectValue = 0;
for (int i = 0; i < objectSizesLength; i++) {
for (int j = 0; j < containers.length; j++) {
for (int k = 0; k < objects.length; k++) {
String sizeString;
if (isRange)
sizeString = "(" + objectSizes[0] + ","
+ objectSizes[1] + ")" + unit;
else
sizeString = "(" + objectSizes[i] + ")" + unit;

String containerString = containers[j];
String from_container = previousContainerValue
+ Integer.valueOf(containerString) + "";
Expand All @@ -347,48 +368,78 @@ private void addPrepareStages(Workflow workflow, int objectSizesLength,
+ Integer.valueOf(objects[k]) - 1 + "";
previousObjectValue = Integer.valueOf(to_object);

int numworkers = Integer.valueOf(objects[k]) / Integer.valueOf(num_of_drivers);
int numcontainers = (Integer.valueOf(to_container) - Integer.valueOf(from_container) + 1);
int numobjects = (Integer.valueOf(to_object) - Integer.valueOf(from_object) + 1);

String sizeString;
String sizeSpec = "";
if (isRange) {
Stage stage = createWorkstage(workflow, "prepare",
"prepare", Integer.valueOf(objects[k])
/ Integer.valueOf(num_of_drivers) + "",
"containers=r(" + from_container + ","
+ to_container + ");objects=r("
+ from_object + "," + to_object
+ ");sizes=u" + sizeString);
workflow.addStage(stage);
sizeString = "(" + objectSizes[0] + "-" + objectSizes[1] + ")" + unit;
sizeSpec = "u";
} else {
Stage stage = createWorkstage(workflow, "prepare",
"prepare", Integer.valueOf(objects[k])
/ Integer.valueOf(num_of_drivers) + "",
"containers=r(" + from_container + ","
+ to_container + ");objects=r("
+ from_object + "," + to_object
+ ");sizes=c" + sizeString);
workflow.addStage(stage);
sizeString = "(" + objectSizes[i] + ")" + unit;
sizeSpec = "c";
}

String stageName = "w" + sizeString
+ "_c" + numcontainers
+ "_o" + numobjects
+ "_prepare_" + numworkers;

String configLine =
"containers=r(" + from_container + "," + to_container
+ ");objects=r(" + from_object + "," + to_object
+ ");sizes=" + sizeSpec + sizeString;

Stage stage = createAWorkstage(workflow, stageName, "prepare", numworkers + "", configLine);

workflow.addStage(stage);
}
}
}
}

private void addInitStages(Workflow workflow, int objectSizesLength,
String[] containers) {
String[] containers, String[] objectSizes, String[] objects,
boolean isRange, String unit, String init_workers) {
int previousContainerValue = 0;
for (int i = 0; i < objectSizesLength; i++) {
for (int j = 0; j < containers.length; j++) {

String containerString = containers[j];
String from_container = previousContainerValue
+ Integer.valueOf(containerString) + "";
String to_container = Integer.valueOf(from_container)
+ Integer.valueOf(containerString) - 1 + "";
previousContainerValue = Integer.valueOf(to_container);
Stage stage = createWorkstage(workflow, "init", "init", "1",
"containers=r(" + from_container + "," + to_container
+ ")");
String sizeString;
if (isRange)
sizeString = "(" + objectSizes[0] + "," + objectSizes[1] + ")" + unit;
else
sizeString = "(" + objectSizes[i] + ")" + unit;

int numcontainers = (Integer.valueOf(to_container) - Integer.valueOf(from_container) + 1);

String stageName = "w" + sizeString + "_c" + numcontainers + "_init_" + init_workers;
String config = "containers=r(" + from_container + "," + to_container + ")";
Stage stage = createAWorkstage(workflow, stageName, "init", init_workers, config);

workflow.addStage(stage);

}
}
}

private Stage createAWorkstage(Workflow workflow, String stageName,
String stageType, String workers, String config) {
Stage stage = new Stage(stageName);
Work work = new Work(stageName, stageType);
work.setType(stageType);
work.setWorkers(Integer.parseInt(workers));
work.setConfig(config);
stage.addWork(work);
return stage;
}

private Stage createNormalWorkstage(Workflow workflow,
Expand Down Expand Up @@ -493,4 +544,4 @@ private Stage createWorkstage(Workflow workflow, String stageName,
return stage;
}

}
}

0 comments on commit 9a21b2c

Please sign in to comment.