Skip to content

Commit

Permalink
NIFI-1908:
Browse files Browse the repository at this point in the history
- Fixing issue with potentially uninitialized RemoteGroupPorts in copySnippet.
  • Loading branch information
mcgilman committed May 30, 2016
1 parent 889a5b6 commit 6daea58
Showing 1 changed file with 60 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1353,50 +1353,11 @@ public FlowEntity copySnippet(final String groupId, final String snippetId, fina
// create the new snippet
final FlowSnippetDTO snippet = snippetDAO.copySnippet(groupId, snippetId, originX, originY, idGenerationSeed);

// validate the new snippet
validateSnippetContents(snippet);

// save the flow
controllerFacade.save();

// drop the snippet
snippetDAO.dropSnippet(snippetId);

// identify all components added
final Set<String> identifiers = new HashSet<>();
snippet.getProcessors().stream()
.map(proc -> proc.getId())
.forEach(id -> identifiers.add(id));
snippet.getConnections().stream()
.map(conn -> conn.getId())
.forEach(id -> identifiers.add(id));
snippet.getInputPorts().stream()
.map(port -> port.getId())
.forEach(id -> identifiers.add(id));
snippet.getOutputPorts().stream()
.map(port -> port.getId())
.forEach(id -> identifiers.add(id));
snippet.getProcessGroups().stream()
.map(group -> group.getId())
.forEach(id -> identifiers.add(id));
snippet.getRemoteProcessGroups().stream()
.map(remoteGroup -> remoteGroup.getId())
.forEach(id -> identifiers.add(id));
snippet.getRemoteProcessGroups().stream()
.flatMap(remoteGroup -> remoteGroup.getContents().getInputPorts().stream())
.map(remoteInputPort -> remoteInputPort.getId())
.forEach(id -> identifiers.add(id));
snippet.getRemoteProcessGroups().stream()
.flatMap(remoteGroup -> remoteGroup.getContents().getOutputPorts().stream())
.map(remoteOutputPort -> remoteOutputPort.getId())
.forEach(id -> identifiers.add(id));

final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);
return revisionManager.get(identifiers,
() -> {
final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId);
return dtoFactory.createFlowDto(processGroup, groupStatus, snippet, revisionManager);
});
// post process new flow snippet
return postProcessNewFlowSnippet(groupId, snippet);
});

final FlowEntity flowEntity = new FlowEntity();
Expand Down Expand Up @@ -1519,59 +1480,71 @@ public TemplateDTO importTemplate(TemplateDTO templateDTO, String groupId, Optio
return dtoFactory.createTemplateDTO(template);
}

/**
* Post processes a new flow snippet including validation, flow saving, removing the snippet, and DTO conversion.
*
* @param groupId group id
* @param snippet snippet
* @return flow dto
*/
private FlowDTO postProcessNewFlowSnippet(final String groupId, final FlowSnippetDTO snippet) {
// validate the new snippet
validateSnippetContents(snippet);

// save the flow
controllerFacade.save();

// identify all components added
final Set<String> identifiers = new HashSet<>();
snippet.getProcessors().stream()
.map(proc -> proc.getId())
.forEach(id -> identifiers.add(id));
snippet.getConnections().stream()
.map(conn -> conn.getId())
.forEach(id -> identifiers.add(id));
snippet.getInputPorts().stream()
.map(port -> port.getId())
.forEach(id -> identifiers.add(id));
snippet.getOutputPorts().stream()
.map(port -> port.getId())
.forEach(id -> identifiers.add(id));
snippet.getProcessGroups().stream()
.map(group -> group.getId())
.forEach(id -> identifiers.add(id));
snippet.getRemoteProcessGroups().stream()
.map(remoteGroup -> remoteGroup.getId())
.forEach(id -> identifiers.add(id));
snippet.getRemoteProcessGroups().stream()
.filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getInputPorts() != null)
.flatMap(remoteGroup -> remoteGroup.getContents().getInputPorts().stream())
.map(remoteInputPort -> remoteInputPort.getId())
.forEach(id -> identifiers.add(id));
snippet.getRemoteProcessGroups().stream()
.filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getOutputPorts() != null)
.flatMap(remoteGroup -> remoteGroup.getContents().getOutputPorts().stream())
.map(remoteOutputPort -> remoteOutputPort.getId())
.forEach(id -> identifiers.add(id));
snippet.getLabels().stream()
.map(label -> label.getId())
.forEach(id -> identifiers.add(id));

return revisionManager.get(identifiers,
() -> {
final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId);
return dtoFactory.createFlowDto(group, groupStatus, snippet, revisionManager);
});
}

@Override
public FlowEntity createTemplateInstance(final String groupId, final Double originX, final Double originY, final String templateId, final String idGenerationSeed) {
final FlowDTO flowDto = revisionManager.get(groupId, rev -> {
// instantiate the template - there is no need to make another copy of the flow snippet since the actual template
// was copied and this dto is only used to instantiate it's components (which as already completed)
final FlowSnippetDTO snippet = templateDAO.instantiateTemplate(groupId, originX, originY, templateId, idGenerationSeed);

// validate the new snippet
validateSnippetContents(snippet);

// save the flow
controllerFacade.save();

// identify all components added
final Set<String> identifiers = new HashSet<>();
snippet.getProcessors().stream()
.map(proc -> proc.getId())
.forEach(id -> identifiers.add(id));
snippet.getConnections().stream()
.map(conn -> conn.getId())
.forEach(id -> identifiers.add(id));
snippet.getInputPorts().stream()
.map(port -> port.getId())
.forEach(id -> identifiers.add(id));
snippet.getOutputPorts().stream()
.map(port -> port.getId())
.forEach(id -> identifiers.add(id));
snippet.getProcessGroups().stream()
.map(group -> group.getId())
.forEach(id -> identifiers.add(id));
snippet.getRemoteProcessGroups().stream()
.map(remoteGroup -> remoteGroup.getId())
.forEach(id -> identifiers.add(id));
snippet.getRemoteProcessGroups().stream()
.filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getInputPorts() != null)
.flatMap(remoteGroup -> remoteGroup.getContents().getInputPorts().stream())
.map(remoteInputPort -> remoteInputPort.getId())
.forEach(id -> identifiers.add(id));
snippet.getRemoteProcessGroups().stream()
.filter(remoteGroup -> remoteGroup.getContents() != null && remoteGroup.getContents().getOutputPorts() != null)
.flatMap(remoteGroup -> remoteGroup.getContents().getOutputPorts().stream())
.map(remoteOutputPort -> remoteOutputPort.getId())
.forEach(id -> identifiers.add(id));
snippet.getLabels().stream()
.map(label -> label.getId())
.forEach(id -> identifiers.add(id));

return revisionManager.get(identifiers,
() -> {
final ProcessGroup group = processGroupDAO.getProcessGroup(groupId);
final ProcessGroupStatus groupStatus = controllerFacade.getProcessGroupStatus(groupId);
return dtoFactory.createFlowDto(group, groupStatus, snippet, revisionManager);
});
// post process the new flow snippet
return postProcessNewFlowSnippet(groupId, snippet);
});

final FlowEntity flowEntity = new FlowEntity();
Expand Down

2 comments on commit 6daea58

@jtstorck
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me, I like that you've consolidated the snippet processing across copySnippet/importTemplate/createTemplateInstance.

IMO, it might be a good idea to pull the saving of the flow out of that method though, and call save from the three methods you refactored, just to decouple the functionality of processing the snippet from having the controller save the flow.

@mcgilman
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted. I'll move the save out.

Please sign in to comment.