Skip to content

Commit

Permalink
feat(sync-flows): revision is now forecast in diffs and we deep-check…
Browse files Browse the repository at this point in the history
… for changes which allow more detailed SyncState

closes #68
  • Loading branch information
brian-mulier-p committed May 23, 2024
1 parent c0e7fb3 commit 2959c1d
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 122 deletions.
124 changes: 71 additions & 53 deletions src/main/java/io/kestra/plugin/git/AbstractSyncTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.eclipse.jgit.api.Git;
import org.slf4j.Logger;

Expand Down Expand Up @@ -65,6 +65,9 @@ protected Map<URI, Supplier<InputStream>> gitResourcesContentByUri(Path baseDire
if (!this.traverseDirectories()) {
filtered = filtered.filter(Files::isRegularFile);
}
KestraIgnore kestraIgnore = new KestraIgnore(baseDirectory);
filtered = filtered.filter(path -> !kestraIgnore.isIgnoredFile(path.toString(), true));

return filtered.collect(Collectors.toMap(
gitPath -> URI.create("/" + baseDirectory.relativize(gitPath) + (gitPath.toFile().isDirectory() ? "/" : "")),
throwFunction(path -> throwSupplier(() -> {
Expand All @@ -80,57 +83,41 @@ protected Map<URI, Supplier<InputStream>> gitResourcesContentByUri(Path baseDire
protected boolean traverseDirectories() {
return true;
}

/**
* Removes any resource from the instance that is not on Git.
*/
private List<T> deleteOutdatedResources(RunContext runContext, S service, String renderedNamespace, Set<URI> gitUris, List<T> instanceFilledResources) throws IOException {
List<T> deleted = new ArrayList<>();
instanceFilledResources.forEach(throwConsumer(instanceResource -> {
if (!gitUris.contains(toUri(gitUris, renderedNamespace, instanceResource)) && !this.mustKeep(runContext, instanceResource)) {
if (!this.dryRun) {
this.deleteResource(service, runContext.tenantId(), renderedNamespace, instanceResource);
}
deleted.add(instanceResource);
}
}));

return deleted;
}

protected boolean mustKeep(RunContext runContext, T instanceResource) {
return false;
}

protected abstract void deleteResource(S service, String tenantId, String renderedNamespace, T instanceResource) throws IOException;

private void writeResources(Logger logger, S service, KestraIgnore kestraIgnore, Path gitDirectory, String tenantId, String renderedNamespace, Map<URI, Supplier<InputStream>> contentByPath) throws IOException {
contentByPath.entrySet().stream().sorted(Comparator.comparingInt(e -> StringUtils.countMatches(e.getKey().toString(), "/")))
.filter(e -> !kestraIgnore.isIgnoredFile(gitDirectory + e.getKey().toString(), true))
.forEach(throwConsumer(e -> this.writeResource(logger, service, tenantId, renderedNamespace, e.getKey(), e.getValue().get())));
}
protected abstract T simulateResourceWrite(S service, String tenantId, String renderedNamespace, URI uri, InputStream inputStream) throws IOException;

protected abstract void writeResource(Logger logger, S service, String tenantId, String renderedNamespace, URI uri, InputStream inputStream) throws IOException;
protected abstract T writeResource(Logger logger, S service, String tenantId, String renderedNamespace, URI uri, InputStream inputStream) throws IOException;

protected abstract SyncResult wrapper(String renderedGitDirectory, String renderedNamespace, URI resourceURI, Supplier<InputStream> gitContent, T resourceBeforeUpdate) throws IOException;
protected abstract SyncResult wrapper(String renderedGitDirectory, String renderedNamespace, URI resourceUri, T resourceBeforeUpdate, T resourceAfterUpdate) throws IOException;

private URI createDiffFile(RunContext runContext, KestraIgnore kestraIgnore, Path gitDirectory, String renderedNamespace, Map<URI, Supplier<InputStream>> gitContentByUri, Map<URI, T> instanceFilledResourcesByUri, List<T> deletedResources) throws IOException, IllegalVariableEvaluationException {
private URI createDiffFile(RunContext runContext, String renderedNamespace, Map<URI, URI> gitUriByResourceUri, Map<URI, T> beforeUpdateResourcesByUri, Map<URI, T> afterUpdateResourcesByUri, List<T> deletedResources) throws IOException, IllegalVariableEvaluationException {
File diffFile = runContext.tempFile(".ion").toFile();

try (BufferedWriter diffWriter = new BufferedWriter(new FileWriter(diffFile))) {
List<SyncResult> syncResults = new ArrayList<>();

String renderedGitDirectory = runContext.render(this.getGitDirectory());
if (deletedResources != null) {
deletedResources.stream().map(throwFunction(deletedResource -> wrapper(renderedGitDirectory, renderedNamespace, toUri(gitContentByUri.keySet(), renderedNamespace, deletedResource), null, deletedResource))).forEach(syncResults::add);
deletedResources.stream()
.map(throwFunction(deletedResource -> wrapper(
renderedGitDirectory,
renderedNamespace,
gitUriByResourceUri.get(this.toUri(renderedNamespace, deletedResource)),
deletedResource,
null
))).forEach(syncResults::add);
}

gitContentByUri.entrySet().stream()
.filter(e -> !kestraIgnore.isIgnoredFile(gitDirectory + e.getKey().toString(), true))
.flatMap(throwFunction(e -> {
SyncResult wrapper = wrapper(renderedGitDirectory, renderedNamespace, e.getKey(), e.getValue(), instanceFilledResourcesByUri.get(e.getKey()));
return wrapper == null ? Stream.empty() : Stream.of(wrapper);
})).forEach(syncResults::add);
afterUpdateResourcesByUri.entrySet().stream().flatMap(throwFunction(e -> {
SyncResult syncResult = wrapper(renderedGitDirectory, renderedNamespace, gitUriByResourceUri.get(e.getKey()), beforeUpdateResourcesByUri.get(e.getKey()), afterUpdateResourcesByUri.get(e.getKey()));
return syncResult == null ? Stream.empty() : Stream.of(syncResult);
})).forEach(syncResults::add);

syncResults.stream().sorted((s1, s2) -> {
if (s1.getGitPath() == null) {
Expand Down Expand Up @@ -163,40 +150,69 @@ public O run(RunContext runContext) throws Exception {

Path localGitDirectory = this.createGitDirectory(runContext);
Map<URI, Supplier<InputStream>> gitContentByUri = this.gitResourcesContentByUri(localGitDirectory);
Set<URI> gitUris = gitContentByUri.keySet();


String renderedNamespace = runContext.render(this.fetchedNamespace());

@SuppressWarnings("unchecked")
S service = runContext.getApplicationContext().getBean((Class<S>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0]);
List<T> instanceFilledResources = this.instanceFilledResources(service, runContext.tenantId(), renderedNamespace);
List<T> deletedResources = null;
if (this.isDelete()) {
deletedResources = this.deleteOutdatedResources(
runContext,
service,
renderedNamespace,
gitUris,
instanceFilledResources

Map<URI, T> beforeUpdateResourcesByUri = this.fetchResources(service, runContext.tenantId(), renderedNamespace).stream().collect(Collectors.toMap(
resource -> this.toUri(renderedNamespace, resource),
Function.identity()
));
Map<URI, URI> gitUriByResourceUri = new HashMap<>();
Map<URI, T> updatedResourcesByUri = gitContentByUri.entrySet().stream()
.map(throwFunction(e -> {
InputStream inputStream = e.getValue().get();
T resource;
if (this.dryRun) {
resource = this.simulateResourceWrite(service, runContext.tenantId(), renderedNamespace, e.getKey(), inputStream);
} else {
resource = this.writeResource(runContext.logger(), service, runContext.tenantId(), renderedNamespace, e.getKey(), inputStream);
}

return Pair.of(e.getKey(), resource);
}))
.collect(
HashMap::new,
(map, pair) -> {
URI uri = pair.getLeft();
T resource = pair.getRight();
URI resourceUri = this.toUri(renderedNamespace, resource);
map.put(resourceUri, resource);
gitUriByResourceUri.put(resourceUri, uri);
},
HashMap::putAll
);
}

KestraIgnore kestraIgnore = new KestraIgnore(localGitDirectory);
if (!this.dryRun) {
this.writeResources(runContext.logger(), service, kestraIgnore, localGitDirectory, runContext.tenantId(), renderedNamespace, gitContentByUri);
List<T> deleted;
if (this.isDelete()) {
deleted = new ArrayList<>();
beforeUpdateResourcesByUri.entrySet().stream().filter(e -> !updatedResourcesByUri.containsKey(e.getKey())).forEach(throwConsumer(e -> {
if (this.mustKeep(runContext, e.getValue())) {
return;
}

if (!this.dryRun) {
this.deleteResource(service, runContext.tenantId(), renderedNamespace, e.getValue());
}

deleted.add(e.getValue());
}));
} else {
deleted = null;
}

Map<URI, T> instanceFilledResourcesByUri = instanceFilledResources.stream().collect(Collectors.toMap(resource -> this.toUri(gitUris, renderedNamespace, resource), Function.identity()));
URI diffFileStorageUri = this.createDiffFile(runContext, kestraIgnore, localGitDirectory, renderedNamespace, gitContentByUri, instanceFilledResourcesByUri, deletedResources);
URI diffFileStorageUri = this.createDiffFile(runContext, renderedNamespace, gitUriByResourceUri, beforeUpdateResourcesByUri, updatedResourcesByUri, deleted);

git.close();

return output(diffFileStorageUri);
}

protected abstract List<T> instanceFilledResources(S service, String tenantId, String renderedNamespace) throws IOException;
protected abstract List<T> fetchResources(S service, String tenantId, String renderedNamespace) throws IOException;

protected abstract URI toUri(Set<URI> gitUris, String renderedNamespace, T resource);
protected abstract URI toUri(String renderedNamespace, T resource);

protected abstract O output(URI diffFileStorageUri);

Expand All @@ -216,6 +232,8 @@ public abstract static class SyncResult {
public enum SyncState {
ADDED,
DELETED,
OVERWRITTEN
OVERWRITTEN,
UPDATED,
UNCHANGED
}
}
67 changes: 36 additions & 31 deletions src/main/java/io/kestra/plugin/git/SyncFlows.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.flows.Flow;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.services.FlowService;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
Expand All @@ -20,8 +19,6 @@
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -143,6 +140,15 @@ protected void deleteResource(FlowService flowService, String tenantId, String r
flowService.delete(flow);
}

@Override
protected Flow simulateResourceWrite(FlowService flowService, String tenantId, String renderedNamespace, URI uri, InputStream inputStream) throws IOException {
if (inputStream == null) {
return null;
}

return flowService.importFlow(tenantId, SyncFlows.replaceNamespace(renderedNamespace, uri, inputStream), true);
}

@Override
protected boolean mustKeep(RunContext runContext, Flow instanceResource) {
RunContext.FlowInfo flowInfo = runContext.flowInfo();
Expand All @@ -157,46 +163,48 @@ protected boolean traverseDirectories() {
}

@Override
protected void writeResource(Logger logger, FlowService flowService, String tenantId, String renderedNamespace, URI uri, InputStream inputStream) throws IOException {
protected Flow writeResource(Logger logger, FlowService flowService, String tenantId, String renderedNamespace, URI uri, InputStream inputStream) throws IOException {
if (inputStream == null) {
return;
return null;
}

String flowSource = SyncFlows.replaceNamespace(renderedNamespace, uri, inputStream);

return flowService.importFlow(tenantId, flowSource);
}

private static String replaceNamespace(String renderedNamespace, URI uri, InputStream inputStream) throws IOException {
String flowSource = IOUtils.toString(inputStream, StandardCharsets.UTF_8);
String uriStr = uri.toString();
String newNamespace = renderedNamespace + uriStr.substring(0, uriStr.lastIndexOf("/")).replace("/", ".");
Matcher matcher = NAMESPACE_FINDER_PATTERN.matcher(flowSource);
flowSource = matcher.replaceFirst("namespace: " + newNamespace);

flowService.importFlow(tenantId, flowSource);
return flowSource.stripTrailing();
}

@Override
protected SyncResult wrapper(String renderedGitDirectory, String renderedNamespace, URI resourceUri, Supplier<InputStream> gitContent, Flow flow) throws IOException {
protected SyncResult wrapper(String renderedGitDirectory, String renderedNamespace, URI resourceUri, Flow flowBeforeUpdate, Flow flowAfterUpdate) {
if (resourceUri != null && resourceUri.toString().endsWith("/")) {
return null;
}

SyncState syncState;
Integer revision = null;
Flow flowAfterUpdate = flow;
if (gitContent == null) {
if (resourceUri == null) {
syncState = SyncState.DELETED;
} else if (flow == null) {
} else if (flowBeforeUpdate == null) {
syncState = SyncState.ADDED;
flowAfterUpdate = JacksonMapper.ofYaml().readValue(gitContent.get(), Flow.class).toBuilder()
.namespace(this.computeNamespaceFromUri(renderedNamespace, Objects.requireNonNull(resourceUri)))
.build();
revision = 1;
} else if (flowBeforeUpdate.getRevision().equals(Objects.requireNonNull(flowAfterUpdate).getRevision())){
syncState = SyncState.UNCHANGED;
} else {
syncState = SyncState.OVERWRITTEN;
revision = flow.getRevision() + 1;
syncState = SyncState.UPDATED;
}

Flow infoHolder = flowAfterUpdate == null ? flowBeforeUpdate : flowAfterUpdate;
SyncResult.SyncResultBuilder<?, ?> builder = SyncResult.builder()
.syncState(syncState)
.namespace(flowAfterUpdate.getNamespace())
.flowId(flowAfterUpdate.getId())
.revision(revision);
.namespace(infoHolder.getNamespace())
.flowId(infoHolder.getId())
.revision(infoHolder.getRevision());

if (syncState != SyncState.DELETED) {
builder.gitPath(renderedGitDirectory + resourceUri);
Expand All @@ -205,15 +213,8 @@ protected SyncResult wrapper(String renderedGitDirectory, String renderedNamespa
return builder.build();
}

private String computeNamespaceFromUri(String renderedNamespace, URI resourceUri) {
String withoutLeadingSlash = resourceUri.toString().substring(1);
int namespaceToFlowIdSeparatorIdx = withoutLeadingSlash.lastIndexOf("/");
String subNamespace = namespaceToFlowIdSeparatorIdx == -1 ? "" : "." + withoutLeadingSlash.substring(0, namespaceToFlowIdSeparatorIdx).replace("/", ".");
return renderedNamespace + subNamespace;
}

@Override
protected List<Flow> instanceFilledResources(FlowService flowService, String tenantId, String renderedNamespace) {
protected List<Flow> fetchResources(FlowService flowService, String tenantId, String renderedNamespace) {
if (this.includeChildNamespaces) {
return flowService.findByNamespacePrefix(tenantId, renderedNamespace);
}
Expand All @@ -222,10 +223,14 @@ protected List<Flow> instanceFilledResources(FlowService flowService, String ten
}

@Override
protected URI toUri(Set<URI> gitUris, String renderedNamespace, Flow resource) {
protected URI toUri(String renderedNamespace, Flow resource) {
if (resource == null) {
return null;
}

String gitSimulatedNamespaceUri = resource.getNamespace().equals(renderedNamespace) ? "" : "/" + resource.getNamespace().substring(renderedNamespace.length() + 1);
String uriWithoutExtension = gitSimulatedNamespaceUri.replace(".", "/") + "/" + resource.getId();
return URI.create(uriWithoutExtension + (gitUris.contains(URI.create(uriWithoutExtension + ".yaml")) ? ".yaml" : ".yml"));
return URI.create(uriWithoutExtension + ".yml");
}

@Override
Expand Down
Loading

0 comments on commit 2959c1d

Please sign in to comment.