Skip to content

Commit

Permalink
Fix ConcurrentModification exception in Workflow Garbage Collection (a…
Browse files Browse the repository at this point in the history
…pache#741)

In workflow Garbage collection, there is possibility that we encounter
ConcurrentMod exception while looping through the workflow contexts.
This commit fixes this issue by adding a try-catch.
  • Loading branch information
Ali Reza Zamani Zadeh Najari committed Feb 12, 2020
1 parent 2d6415f commit 0fa949d
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 11 deletions.
37 changes: 27 additions & 10 deletions helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -1043,23 +1043,40 @@ public static void purgeExpiredJobs(String workflow, WorkflowConfig workflowConf
* @param dataProvider
* @param manager
*/
public static void workflowGarbageCollection(WorkflowControllerDataProvider dataProvider,
public static void workflowGarbageCollection(final WorkflowControllerDataProvider dataProvider,
final HelixManager manager) {
// Garbage collections for conditions where workflow context exists but config is missing.
Map<String, ZNRecord> contexts = dataProvider.getContexts();
HelixDataAccessor accessor = manager.getHelixDataAccessor();
HelixPropertyStore<ZNRecord> propertyStore = manager.getHelixPropertyStore();

Set<String> existingContexts;
/*
* Here try-catch is used to avoid concurrent modification exception while doing deep copy.
* Map.keySet() can produce concurrent modification exception.
* Reason: If the map is modified while an iteration over the set is in progress, concurrent
* modification exception will be thrown.
*/
try {
existingContexts = new HashSet<>(dataProvider.getContexts().keySet());
} catch (Exception e) {
LOG.warn(
"Exception occurred while creating a list of all workflow/job context names!",
e);
return;
}

// toBeDeletedWorkflows is a set that contains the name of the workflows that their contexts
// should be deleted.
Set<String> toBeDeletedWorkflows = new HashSet<>();
for (Map.Entry<String, ZNRecord> entry : contexts.entrySet()) {
if (entry.getValue() != null
&& entry.getValue().getId().equals(TaskUtil.WORKFLOW_CONTEXT_KW)) {
if (dataProvider.getWorkflowConfig(entry.getKey()) == null) {
toBeDeletedWorkflows.add(entry.getKey());
}
for (String entry : existingContexts) {
WorkflowConfig cfg = dataProvider.getWorkflowConfig(entry);
WorkflowContext ctx = dataProvider.getWorkflowContext(entry);
if (ctx != null && ctx.getId().equals(TaskUtil.WORKFLOW_CONTEXT_KW) && cfg == null) {
toBeDeletedWorkflows.add(entry);
}
}

HelixDataAccessor accessor = manager.getHelixDataAccessor();
HelixPropertyStore<ZNRecord> propertyStore = manager.getHelixPropertyStore();

for (String workflowName : toBeDeletedWorkflows) {
LOG.warn(String.format(
"WorkflowContext exists for workflow %s. However, Workflow Config is missing! Deleting the WorkflowConfig and IdealState!!",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ record = _manager.getHelixDataAccessor().getBaseDataAccessor().get(workflowConte
Assert.assertTrue(contextDeleted);
}

Workflow.Builder createSimpleWorkflowBuilder(String workflowName) {
private Workflow.Builder createSimpleWorkflowBuilder(String workflowName) {
final long expiryTime = 5000L;
Workflow.Builder builder = new Workflow.Builder(workflowName);

Expand Down

0 comments on commit 0fa949d

Please sign in to comment.