Skip to content

Commit

Permalink
feat: Implement Realtime Trigger (#377)
Browse files Browse the repository at this point in the history
  • Loading branch information
iNikitaGricenko committed May 14, 2024
1 parent 4bb05f1 commit aa48158
Show file tree
Hide file tree
Showing 6 changed files with 299 additions and 2 deletions.
67 changes: 67 additions & 0 deletions src/main/java/io/kestra/plugin/gcp/pubsub/Consume.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package io.kestra.plugin.gcp.pubsub;

import com.google.api.core.ApiService;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
Expand All @@ -21,10 +25,16 @@
import java.net.URI;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import jakarta.validation.constraints.NotNull;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@SuperBuilder
@ToString
Expand Down Expand Up @@ -122,6 +132,63 @@ public Output run(RunContext runContext) throws Exception {
.build();
}

public Publisher<Message> stream(RunContext runContext) throws Exception {
ProjectSubscriptionName subscriptionName = this.createSubscription(runContext, subscription, autoCreateSubscription);
GoogleCredentials credentials = this.credentials(runContext);

return Flux.<Message>create(
sink -> {
AtomicInteger total = new AtomicInteger();
ZonedDateTime started = ZonedDateTime.now();

MessageReceiver receiver = (message, consumer) -> {
try {
sink.next(Message.of(message, serdeType));
total.getAndIncrement();
consumer.ack();
}
catch(Exception exception) {
sink.error(exception);
consumer.nack();
}
};

Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver)
.setCredentialsProvider(FixedCredentialsProvider.create(credentials))
.build();

subscriber.startAsync().awaitRunning();

subscriber.addListener(
new ApiService.Listener() {
@Override
public void failed(ApiService.State from, Throwable failure) {
sink.error(failure);
}
}, MoreExecutors.directExecutor()
);

while (!this.ended(total, started)) {
if (sink.isCancelled()) {
subscriber.stopAsync().awaitTerminated();
return;
}
try {
Thread.sleep(100);
} catch (InterruptedException exception) {
Thread.currentThread().interrupt();
sink.error(exception);
}
}

subscriber.stopAsync().awaitTerminated();
sink.complete();
},
FluxSink.OverflowStrategy.BUFFER
)
.subscribeOn(Schedulers.boundedElastic());
}

private boolean ended(AtomicInteger count, ZonedDateTime start) {
if (this.maxRecords != null && count.get() >= this.maxRecords) {
return true;
Expand Down
103 changes: 103 additions & 0 deletions src/main/java/io/kestra/plugin/gcp/pubsub/RealtimeTrigger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package io.kestra.plugin.gcp.pubsub;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.conditions.ConditionContext;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.models.triggers.*;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.gcp.pubsub.model.SerdeType;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;


@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "React and consume messages in a Pub/Sub topic."
)
@Plugin(
examples = {
@Example(
code = {
"topic: test-topic",
"maxRecords: 10"
}
)
}
)
public class RealtimeTrigger extends AbstractTrigger implements RealtimeTriggerInterface, TriggerOutput<Consume.Output>, PubSubConnectionInterface {

private String projectId;

private String serviceAccount;

@Builder.Default
private List<String> scopes = Collections.singletonList("https://www.googleapis.com/auth/cloud-platform");

private String topic;

@Schema(
title = "The Pub/Sub subscription",
description = "The Pub/Sub subscription. It will be created automatically if it didn't exist and 'autoCreateSubscription' is enabled."
)
@PluginProperty(dynamic = true)
private String subscription;

@Schema(
title = "Whether the Pub/Sub subscription should be created if not exist"
)
@PluginProperty
@Builder.Default
private Boolean autoCreateSubscription = true;

@Builder.Default
private final Duration interval = Duration.ofSeconds(60);

@PluginProperty
@Schema(title = "Max number of records, when reached the task will end.")
private Integer maxRecords;

@PluginProperty
@Schema(title = "Max duration in the Duration ISO format, after that the task will end.")
private Duration maxDuration;

@Builder.Default
@PluginProperty
@NotNull
@Schema(title = "The serializer/deserializer to use.")
private SerdeType serdeType = SerdeType.STRING;

@Override
public Publisher<Execution> evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception {
Consume task = Consume.builder()
.topic(this.topic)
.subscription(this.subscription)
.autoCreateSubscription(this.autoCreateSubscription)
.projectId(this.projectId)
.serviceAccount(this.serviceAccount)
.scopes(this.scopes)
.maxRecords(this.maxRecords)
.maxDuration(this.maxDuration)
.serdeType(this.serdeType)
.build();

return Flux.from(task.stream(conditionContext.getRunContext()))
.map(message -> TriggerService.generateRealtimeExecution(this, context, message))
.next();
}
}
3 changes: 2 additions & 1 deletion src/main/java/io/kestra/plugin/gcp/pubsub/model/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.pubsub.v1.PubsubMessage;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
Expand All @@ -19,7 +20,7 @@
@Getter
@Builder
@Jacksonized
public class Message {
public class Message implements Output {

@Schema(
title = "The message data, must be a string if serde type is 'STRING', otherwise a JSON object",
Expand Down
112 changes: 112 additions & 0 deletions src/test/java/io/kestra/plugin/gcp/pubsub/RealtimeTriggerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package io.kestra.plugin.gcp.pubsub;

import com.google.common.collect.ImmutableMap;
import io.kestra.core.models.executions.Execution;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.LocalFlowRepositoryLoader;
import io.kestra.core.runners.FlowListeners;
import io.kestra.core.runners.RunContextFactory;
import io.kestra.core.runners.Worker;
import io.kestra.core.schedulers.AbstractScheduler;
import io.kestra.core.schedulers.DefaultScheduler;
import io.kestra.core.schedulers.SchedulerTriggerStateInterface;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.TestsUtils;
import io.kestra.plugin.gcp.pubsub.model.Message;
import io.kestra.plugin.gcp.pubsub.model.SerdeType;
import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.junit.jupiter.api.Test;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;

@MicronautTest
class RealtimeTriggerTest {

@Inject
private ApplicationContext applicationContext;

@Inject
private SchedulerTriggerStateInterface triggerState;

@Inject
private FlowListeners flowListenersService;

@Inject
@Named(QueueFactoryInterface.EXECUTION_NAMED)
private QueueInterface<Execution> executionQueue;

@Inject
protected LocalFlowRepositoryLoader repositoryLoader;

@Inject
private RunContextFactory runContextFactory;

@Value("${kestra.variables.globals.project}")
private String project;

@Test
void flow() throws Exception {
// mock flow listeners
CountDownLatch queueCount = new CountDownLatch(1);

// scheduler
try (
Worker worker = applicationContext.createBean(Worker.class, IdUtils.create(), 8, null);
AbstractScheduler scheduler = new DefaultScheduler(
this.applicationContext,
this.flowListenersService,
this.triggerState
);
) {
AtomicReference<Execution> last = new AtomicReference<>();

// wait for execution
executionQueue.receive(RealtimeTriggerTest.class, execution -> {
last.set(execution.getLeft());

queueCount.countDown();
assertThat(execution.getLeft().getFlowId(), is("realtime-listen"));
});


worker.run();
scheduler.run();

repositoryLoader.load(Objects.requireNonNull(RealtimeTriggerTest.class.getClassLoader().getResource("flows/pubsub/realtime.yaml")));

// publish message to trigger the flow
Publish task = Publish.builder()
.id(Publish.class.getSimpleName())
.type(Publish.class.getName())
.topic("test-topic")
.projectId(this.project)
.from(
List.of(
Message.builder().data("Hello World").build()
)
)
.build();
task.run(TestsUtils.mockRunContext(runContextFactory, task, ImmutableMap.of()));

queueCount.await(1, TimeUnit.MINUTES);

Map<String, Object> variables = last.get().getTrigger().getVariables();
assertThat(new String(Base64.getDecoder().decode((String) variables.get("data")), StandardCharsets.UTF_8), is("Hello World"));
}
}
}
2 changes: 1 addition & 1 deletion src/test/java/io/kestra/plugin/gcp/pubsub/TriggerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void flow() throws Exception {
worker.run();
scheduler.run();

repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/pubsub")));
repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/pubsub/pubsub-listen.yaml")));

// publish two messages to trigger the flow
Publish task = Publish.builder()
Expand Down
14 changes: 14 additions & 0 deletions src/test/resources/flows/pubsub/realtime.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
id: realtime-listen
namespace: io.kestra.tests

triggers:
- id: watch
type: io.kestra.plugin.gcp.pubsub.RealtimeTrigger
projectId: "{{globals.project}}"
topic: test-topic
subscription: test-subscription

tasks:
- id: end
type: io.kestra.core.tasks.debugs.Return
format: "{{task.id}} > {{taskrun.startDate}}"

0 comments on commit aa48158

Please sign in to comment.