Skip to content

Commit

Permalink
Merge branch 'parallel_online_workflow' into mcla_update
Browse files Browse the repository at this point in the history
  • Loading branch information
massimo-ferraro committed Feb 5, 2018
2 parents 2ccd6a5 + d1ab297 commit c258403
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public int getAvailableCores() {

@Override
public String startProcess(String name, String owner, DateTime date, DateTime creationDate,
OnlineWorkflowStartParameters start, OnlineWorkflowParameters params, DateTime[] basecases)
OnlineWorkflowStartParameters start, OnlineWorkflowParameters params, DateTime[] basecases, int numThreads)
throws Exception {
Objects.requireNonNull(date);
Objects.requireNonNull(creationDate);
Expand All @@ -171,13 +171,26 @@ public String startProcess(String name, String owner, DateTime date, DateTime cr
String processId = DateTimeFormat.forPattern("yyyyMMddHHmmSSS").print(new DateTime());
LOGGER.info("Starting process: " + processId);
OnlineProcess proc = new OnlineProcess(processId, name, owner, params.getCaseType().toString(), date, creationDate);
for (DateTime bcase : basecases) {
params.setBaseCaseDate(bcase);
String id = startWorkflow(start, params);
org.joda.time.format.DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
String basecaseString = fmt.print(bcase);
proc.addWorkflow(basecaseString, id);

ExecutorService taskExecutor = Executors.newFixedThreadPool(numThreads);
List<Callable<Void>> tasks = new ArrayList<>(numThreads);

for (DateTime basecase : basecases) {
tasks.add(new Callable() {
@Override
public Object call() throws Exception {
OnlineWorkflowParameters pars = new OnlineWorkflowParameters(basecase, params.getStates(), params.getHistoInterval(), params.getOfflineWorkflowId(), params.getTimeHorizon(), params.getFeAnalysisId(), params.getRulesPurityThreshold(), params.storeStates(), params.analyseBasecase(), params.validation(), params.getSecurityIndexes(), params.getCaseType(), params.getCountries(), params.isMergeOptimized(), params.getLimitReduction(), params.isHandleViolationsInN(), params.getConstraintMargin());
String workflowId = startWorkflow(start, pars);
org.joda.time.format.DateTimeFormatter fmt = ISODateTimeFormat.dateTime();
String basecaseString = fmt.print(basecase);
proc.addWorkflow(basecaseString, workflowId);
return workflowId;
} });

}
taskExecutor.invokeAll(tasks);
taskExecutor.shutdown();

onlineDb.storeProcess(proc);
LOGGER.info("End of process: " + processId);
return processId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,6 @@ void runTDSimulations(OnlineWorkflowStartParameters startconfig, String caseFile
String emptyContingencyS, String outputFolderS);

String startProcess(String name, String owner, DateTime date, DateTime creationDate,
OnlineWorkflowStartParameters start, OnlineWorkflowParameters params, DateTime[] basecases)
OnlineWorkflowStartParameters start, OnlineWorkflowParameters params, DateTime[] basecases, int numThreads)
throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ public interface OnlineApplication extends AutoCloseable {
void notifyListeners();

String startProcess(String name, String owner, DateTime date, DateTime creationDate,
OnlineWorkflowStartParameters start, OnlineWorkflowParameters params, DateTime[] basecases)
OnlineWorkflowStartParameters start, OnlineWorkflowParameters params, DateTime[] basecases, int numThreads)
throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public class OnlineWorkflowImpl implements OnlineWorkflow {
private final MergeOptimizerFactory mergeOptimizerFactory;
private final RulesFacadeFactory rulesFacadeFactory;
private final OnlineWorkflowStartParameters startParameters;
private String id;
private final String id;
private final String logHeader;

public OnlineWorkflowImpl(
ComputationManager computationManager,
Expand Down Expand Up @@ -134,7 +135,8 @@ public OnlineWorkflowImpl(
this.parameters.setCaseType((network.getForecastDistance() == 0) ? CaseType.SN : CaseType.FO);
}
this.id = DateTimeFormat.forPattern("yyyyMMdd_HHmm_").print(OnlineUtils.toCetDate(this.parameters.getBaseCaseDate())) + new SimpleDateFormat("yyyyMMddHHmmssSSS").format(new Date());
logger.info(this.parameters.toString());
this.logHeader = " [WorkflowId=" + id + "] ";
logger.info(this.logHeader + this.parameters.toString());
}

/* (non-Javadoc)
Expand All @@ -150,7 +152,7 @@ public String getId() {
*/
@Override
public void start(OnlineWorkflowContext oCtx) throws Exception {
logger.info("{} Online workflow processing, started.", id);
logger.info(this.logHeader + "Online workflow processing, started.");
for (OnlineApplicationListener l : listeners) {
l.onWorkflowUpdate(new StatusSynthesis(id, WorkflowStatusEnum.RUNNING));
}
Expand All @@ -166,8 +168,8 @@ public void start(OnlineWorkflowContext oCtx) throws Exception {
loadFlowFactory, 0, mergeOptimizerFactory, computationManager, parameters.isMergeOptimized());
}

logger.info("- Network id: " + network.getId());
logger.info("- Network name: " + network.getName());
logger.info(this.logHeader + "- Network id: " + network.getId());
logger.info(this.logHeader + "- Network name: " + network.getName());

// needed in order to correctly handle multithreading access to network
network.getStateManager().allowStateMultiThreadAccess(true);
Expand All @@ -186,7 +188,7 @@ public void start(OnlineWorkflowContext oCtx) throws Exception {
oCtx.setWcaSecurityRulesResults(new SecurityRulesApplicationResults(this.getId(), oCtx.getTimeHorizon()));
}

logger.info(" - WCA processing......");
logger.info(this.logHeader + " - WCA processing......");
for (OnlineApplicationListener l : listeners) {
l.onWcaUpdate(new RunningSynthesis(id, true));
}
Expand All @@ -202,7 +204,7 @@ public void start(OnlineWorkflowContext oCtx) throws Exception {
// ArrayList<String> stables = new ArrayList<String>();

for (WCACluster cluster : result.getClusters()) {
logger.info("WCA: contingency {} in cluster {}", cluster.getContingency().getId(), cluster.getNum().toString());
logger.info(this.logHeader + "WCA: contingency {} in cluster {}", cluster.getContingency().getId(), cluster.getNum().toString());
oCtx.getWcaResults().addContingencyWithCluster(cluster.getContingency().getId(), cluster);
if (parameters.validation()) { // if validation
// do not filter out the contingencies
Expand All @@ -222,7 +224,7 @@ public void start(OnlineWorkflowContext oCtx) throws Exception {
}


logger.info("{} Online workflow - Analysis of states, started.", id);
logger.info(this.logHeader + "{} Online workflow - Analysis of states, started.", id);

// create modules used in the states analysis
MontecarloSampler sampler = montecarloSamplerFactory.create(oCtx.getNetwork(), computationManager, feDataStorage);
Expand Down Expand Up @@ -265,13 +267,13 @@ public void start(OnlineWorkflowContext oCtx) throws Exception {
ExecutorService taskExecutor = Executors.newFixedThreadPool(startParameters.getThreads());
taskExecutor.invokeAll(tasks);
taskExecutor.shutdown();
logger.info("{} Online workflow - Analysis of states, terminated.", id);
logger.info(this.logHeader + "{} Online workflow - Analysis of states, terminated.", id);

logger.info("{} Online workflow processing, terminated.", id);
logger.info(this.logHeader + "{} Online workflow processing, terminated.", id);

logger.info("WCA Results:\n" + oCtx.getWcaResults().toString());
logger.info("Security Rules Application Results:\n" + oCtx.getSecurityRulesResults().toString());
logger.info("Results:\n" + oCtx.getResults().toString());
logger.info(this.logHeader + "WCA Results:\n" + oCtx.getWcaResults().toString());
logger.info(this.logHeader + "Security Rules Application Results:\n" + oCtx.getSecurityRulesResults().toString());
logger.info(this.logHeader + "Results:\n" + oCtx.getResults().toString());

for (OnlineApplicationListener l : listeners) {
l.onWorkflowUpdate(new StatusSynthesis(id, WorkflowStatusEnum.DONE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,10 @@ public void removeListener(OnlineApplicationListener l) {

@Override
public String startProcess(String name, String owner, DateTime date, DateTime creationDate,
OnlineWorkflowStartParameters start, OnlineWorkflowParameters params, DateTime[] basecases)
OnlineWorkflowStartParameters start, OnlineWorkflowParameters params, DateTime[] basecases, int numThreads)
throws Exception {
try {
return application.startProcess(name, owner, date, creationDate, start, params, basecases);
return application.startProcess(name, owner, date, creationDate, start, params, basecases, numThreads);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
notifyDisconnection();
Expand Down

0 comments on commit c258403

Please sign in to comment.