Skip to content

Commit

Permalink
fix: migrate to new core StorageContext interface
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois authored and loicmathieu committed Jan 30, 2024
1 parent 9e8f089 commit 8ab0200
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 28 deletions.
3 changes: 2 additions & 1 deletion src/main/java/io/kestra/plugin/git/Sync.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.kestra.core.repositories.FlowRepositoryInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.services.FlowService;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.KestraIgnore;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -215,7 +216,7 @@ public VoidOutput run(RunContext runContext) throws Exception {
}

StorageInterface storage = runContext.getApplicationContext().getBean(StorageInterface.class);
URI namespaceFilePrefix = URI.create("kestra://" + storage.namespaceFilePrefix(namespace) + "/");
URI namespaceFilePrefix = URI.create("kestra://" + StorageContext.namespaceFilePrefix(namespace) + "/");
if (this.namespaceFilesDirectory != null) {
String renderedNamespaceFilesDirectory = runContext.render(this.namespaceFilesDirectory);
renderedNamespaceFilesDirectory = renderedNamespaceFilesDirectory.startsWith("/") ? renderedNamespaceFilesDirectory.substring(1) : renderedNamespaceFilesDirectory;
Expand Down
3 changes: 2 additions & 1 deletion src/test/java/io/kestra/plugin/git/PushTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.serializers.YamlFlowParser;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.IdUtils;
import io.micronaut.context.annotation.Value;
Expand Down Expand Up @@ -175,7 +176,7 @@ void oneTaskPush_ExistingBranch() throws Exception {
try(ByteArrayInputStream is = new ByteArrayInputStream(expectedNamespaceFileContent.getBytes())) {
storageInterface.put(
tenantId,
URI.create(Path.of(storageInterface.namespaceFilePrefix(namespace), namespaceFileName).toString()),
URI.create(Path.of(StorageContext.namespaceFilePrefix(namespace), namespaceFileName).toString()),
is
);
}
Expand Down
53 changes: 27 additions & 26 deletions src/test/java/io/kestra/plugin/git/SyncTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.kestra.core.runners.RunContext;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.serializers.YamlFlowParser;
import io.kestra.core.storages.StorageContext;
import io.kestra.core.storages.StorageInterface;
import io.kestra.core.utils.KestraIgnore;
import io.kestra.core.utils.TestsUtils;
Expand Down Expand Up @@ -66,8 +67,8 @@ class SyncTest {
@BeforeEach
void init() throws IOException {
flowRepositoryInterface.findAllForAllTenants().forEach(flow -> flowRepositoryInterface.delete(flow));
storageInterface.deleteByPrefix(null, URI.create(storageInterface.namespaceFilePrefix(NAMESPACE)));
storageInterface.deleteByPrefix(TENANT_ID, URI.create(storageInterface.namespaceFilePrefix(NAMESPACE)));
storageInterface.deleteByPrefix(null, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE)));
storageInterface.deleteByPrefix(TENANT_ID, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE)));
}

@Test
Expand Down Expand Up @@ -117,47 +118,47 @@ void reconcileNsFilesAndFlows() throws Exception {
String readmeContent = "README content";
storageInterface.put(
TENANT_ID,
URI.create(storageInterface.namespaceFilePrefix(NAMESPACE) + "/README.md"),
URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + "/README.md"),
new ByteArrayInputStream(readmeContent.getBytes())
);
// will be deleted as it's not on git
String deletedFilePath = "/sync_directory/file_to_delete.txt";
storageInterface.put(
TENANT_ID,
URI.create(storageInterface.namespaceFilePrefix(NAMESPACE) + deletedFilePath),
URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + deletedFilePath),
new ByteArrayInputStream(new byte[0])
);
String deletedDirPath = "/sync_directory/dir_to_delete";
storageInterface.createDirectory(
TENANT_ID,
URI.create(storageInterface.namespaceFilePrefix(NAMESPACE) + deletedDirPath)
URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + deletedDirPath)
);
String deletedDirSubFilePath = "/sync_directory/dir_to_delete/file_to_delete.txt";
storageInterface.put(
TENANT_ID,
URI.create(storageInterface.namespaceFilePrefix(NAMESPACE) + deletedDirSubFilePath),
URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + deletedDirSubFilePath),
new ByteArrayInputStream(new byte[0])
);
// will get updated
String clonedFilePath = "/sync_directory/cloned.json";
storageInterface.put(
TENANT_ID,
URI.create(storageInterface.namespaceFilePrefix(NAMESPACE) + clonedFilePath),
URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + clonedFilePath),
new ByteArrayInputStream("{\"old-field\": \"old-value\"}".getBytes())
);

// check behaviour in case of converting a file to dir or dir to file
String fileToDirPath = "/sync_directory/file_to_dir";
storageInterface.put(
TENANT_ID,
URI.create(storageInterface.namespaceFilePrefix(NAMESPACE) + fileToDirPath),
URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + fileToDirPath),
new ByteArrayInputStream("some content".getBytes())
);

String dirToFilePath = "/sync_directory/dir_to_file";
storageInterface.put(
TENANT_ID,
URI.create(storageInterface.namespaceFilePrefix(NAMESPACE) + dirToFilePath + "/file.txt"),
URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + dirToFilePath + "/file.txt"),
new ByteArrayInputStream("nested file content".getBytes())
);
// endregion
Expand Down Expand Up @@ -198,19 +199,19 @@ void reconcileNsFilesAndFlows() throws Exception {
// endregion

// region namespace files
assertThat(storageInterface.exists(TENANT_ID, URI.create(storageInterface.namespaceFilePrefix(NAMESPACE) + "/" + destinationDirectory + "/" + KestraIgnore.KESTRA_IGNORE_FILE_NAME)), is(false));
assertThat(storageInterface.exists(TENANT_ID, URI.create(storageInterface.namespaceFilePrefix(NAMESPACE) + "/" + destinationDirectory + "/file_to_ignore.txt")), is(false));
assertThat(storageInterface.exists(TENANT_ID, URI.create(storageInterface.namespaceFilePrefix(NAMESPACE) + "/" + destinationDirectory + "/dir_to_ignore/file.txt")), is(false));
assertThat(storageInterface.exists(TENANT_ID, URI.create(storageInterface.namespaceFilePrefix(NAMESPACE) + "/" + destinationDirectory + "/dir_to_ignore")), is(false));
assertThat(storageInterface.exists(TENANT_ID, URI.create(storageInterface.namespaceFilePrefix(NAMESPACE) + "/" + destinationDirectory + "/_flows")), is(false));
assertThat(storageInterface.exists(TENANT_ID, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + "/" + destinationDirectory + "/" + KestraIgnore.KESTRA_IGNORE_FILE_NAME)), is(false));
assertThat(storageInterface.exists(TENANT_ID, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + "/" + destinationDirectory + "/file_to_ignore.txt")), is(false));
assertThat(storageInterface.exists(TENANT_ID, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + "/" + destinationDirectory + "/dir_to_ignore/file.txt")), is(false));
assertThat(storageInterface.exists(TENANT_ID, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + "/" + destinationDirectory + "/dir_to_ignore")), is(false));
assertThat(storageInterface.exists(TENANT_ID, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + "/" + destinationDirectory + "/_flows")), is(false));
assertNamespaceFileContent(TENANT_ID, "/README.md", readmeContent);
assertThat(storageInterface.exists(TENANT_ID, URI.create(storageInterface.namespaceFilePrefix(NAMESPACE) + deletedFilePath)), is(false));
assertThat(storageInterface.exists(TENANT_ID, URI.create(storageInterface.namespaceFilePrefix(NAMESPACE) + deletedDirPath)), is(false));
assertThat(storageInterface.exists(TENANT_ID, URI.create(storageInterface.namespaceFilePrefix(NAMESPACE) + deletedDirSubFilePath)), is(false));
assertThat(storageInterface.exists(TENANT_ID, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + deletedFilePath)), is(false));
assertThat(storageInterface.exists(TENANT_ID, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + deletedDirPath)), is(false));
assertThat(storageInterface.exists(TENANT_ID, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + deletedDirSubFilePath)), is(false));
assertNamespaceFileContent(TENANT_ID, clonedFilePath, "{\"my-field\": \"my-value\"}");
assertThat(storageInterface.exists(TENANT_ID, URI.create(storageInterface.namespaceFilePrefix(NAMESPACE) + fileToDirPath)), is(true));
assertThat(storageInterface.exists(TENANT_ID, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + fileToDirPath)), is(true));
assertNamespaceFileContent(TENANT_ID, fileToDirPath + "/file.txt", "directory replacing file");
assertThat(storageInterface.exists(TENANT_ID, URI.create(storageInterface.namespaceFilePrefix(NAMESPACE) + dirToFilePath + "/file.txt")), is(false));
assertThat(storageInterface.exists(TENANT_ID, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + dirToFilePath + "/file.txt")), is(false));
assertNamespaceFileContent(TENANT_ID, dirToFilePath, "file replacing a directory");
// endregion
// endregion
Expand Down Expand Up @@ -258,13 +259,13 @@ void reconcile_MinimumSetup() throws Exception {
String toDeleteFilePath = "/to_delete.txt";
storageInterface.put(
TENANT_ID,
URI.create(storageInterface.namespaceFilePrefix(NAMESPACE) + toDeleteFilePath),
URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + toDeleteFilePath),
new ByteArrayInputStream("File to delete".getBytes())
);
// in git but with another content, should be updated
storageInterface.put(
TENANT_ID,
URI.create(storageInterface.namespaceFilePrefix(NAMESPACE) + "/README.md"),
URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + "/README.md"),
new ByteArrayInputStream("README content".getBytes())
);

Expand Down Expand Up @@ -299,7 +300,7 @@ void reconcile_MinimumSetup() throws Exception {
// endregion

// region namespace files
assertThat(storageInterface.exists(TENANT_ID, URI.create(storageInterface.namespaceFilePrefix(NAMESPACE) + toDeleteFilePath)), is(false));
assertThat(storageInterface.exists(TENANT_ID, URI.create(StorageContext.namespaceFilePrefix(NAMESPACE) + toDeleteFilePath)), is(false));
assertNamespaceFileContent(TENANT_ID, "/README.md", "This repository is used for unit testing Git integration");
assertNamespaceFileContent(TENANT_ID, "/ignored.json", "{\"ignored\": true}");
// endregion
Expand Down Expand Up @@ -349,15 +350,15 @@ void reconcile_DryRun_ShouldDoNothing() throws Exception {
String keptContent = "kept content since dry run";
storageInterface.put(
null,
URI.create(storageInterface.namespaceFilePrefix(namespace) + toUpdateFilePath),
URI.create(StorageContext.namespaceFilePrefix(namespace) + toUpdateFilePath),
new ByteArrayInputStream(keptContent.getBytes())
);

String someFilePath = "/file.txt";
String someFileContent = "hello";
storageInterface.put(
null,
URI.create(storageInterface.namespaceFilePrefix(namespace) + someFilePath),
URI.create(StorageContext.namespaceFilePrefix(namespace) + someFilePath),
new ByteArrayInputStream(someFileContent.getBytes())
);

Expand All @@ -378,7 +379,7 @@ void reconcile_DryRun_ShouldDoNothing() throws Exception {

assertNamespaceFileContent(null, namespace, toUpdateFilePath, keptContent);
assertNamespaceFileContent(null, namespace, someFilePath, someFileContent);
assertThat(storageInterface.exists(null, URI.create(storageInterface.namespaceFilePrefix(namespace) + "/cloned.json")), is(false));
assertThat(storageInterface.exists(null, URI.create(StorageContext.namespaceFilePrefix(namespace) + "/cloned.json")), is(false));

assertHasInfoLog(logs, "Dry run is enabled, not performing following actions (- for deletions, + for creations, ~ for update or no modification):");
assertHasInfoLog(logs, "~ /_flows/first-flow.yml");
Expand Down Expand Up @@ -438,7 +439,7 @@ private void assertNamespaceFileContent(String tenantId, String namespaceFileUri
}

private void assertNamespaceFileContent(String tenantId, String namespace, String namespaceFileUri, String expectedFileContent) throws IOException {
try (InputStream is = storageInterface.get(tenantId, URI.create(storageInterface.namespaceFilePrefix(namespace) + namespaceFileUri))) {
try (InputStream is = storageInterface.get(tenantId, URI.create(StorageContext.namespaceFilePrefix(namespace) + namespaceFileUri))) {
assertThat(new BufferedReader(new InputStreamReader(is)).lines().collect(Collectors.joining("\n")), is(expectedFileContent));
}
}
Expand Down

0 comments on commit 8ab0200

Please sign in to comment.