From 0b5586e80b4618ccb5fa4c2678bcc1286abeae56 Mon Sep 17 00:00:00 2001 From: wanjunlei <53003665+wanjunlei@users.noreply.github.com> Date: Mon, 12 Jun 2023 13:45:19 +0800 Subject: [PATCH] add support OpenFunction v1beta2 API (#15) * add support OpenFunction v1beta2 API Signed-off-by: wanjunlei * add support OpenFunction v1beta2 API Signed-off-by: wanjunlei --------- Signed-off-by: wanjunlei --- functions-framework-api/pom.xml | 2 +- .../dev/openfunction/functions/Component.java | 41 ++++- .../dev/openfunction/functions/Context.java | 10 ++ .../java/dev/openfunction/functions/Hook.java | 55 ++++++ .../dev/openfunction/functions/Plugin.java | 1 + functions-framework-invoker/pom.xml | 4 +- .../{runtime => }/JsonEventFormat.java | 2 +- .../java/dev/openfunction/invoker/Runner.java | 42 +++-- .../invoker/context/FunctionContext.java | 139 ++++++++++++++- .../invoker/context/RuntimeContext.java | 163 ++++++++++++------ .../invoker/context/UserContext.java | 144 +++++++++------- .../tracing/OpenTelemetryProvider.java | 22 ++- .../invoker/tracing/SkywalkingProvider.java | 10 +- .../invoker/tracing/TracingProvider.java | 7 +- .../DaprTrigger.java} | 35 ++-- .../HttpTrigger.java} | 12 +- .../Runtime.java => trigger/Trigger.java} | 4 +- samples/pom.xml | 2 +- .../samples/OpenFunctionImpl.java | 16 +- .../samples/hooks/ExampleHook.java | 74 ++++++++ .../samples/plugins/ExamplePlugin.java | 8 +- 21 files changed, 601 insertions(+), 192 deletions(-) create mode 100644 functions-framework-api/src/main/java/dev/openfunction/functions/Hook.java rename functions-framework-invoker/src/main/java/dev/openfunction/invoker/{runtime => }/JsonEventFormat.java (99%) rename functions-framework-invoker/src/main/java/dev/openfunction/invoker/{runtime/AsynchronousRuntime.java => trigger/DaprTrigger.java} (86%) rename functions-framework-invoker/src/main/java/dev/openfunction/invoker/{runtime/SynchronizeRuntime.java => trigger/HttpTrigger.java} (92%) rename functions-framework-invoker/src/main/java/dev/openfunction/invoker/{runtime/Runtime.java => trigger/Trigger.java} (90%) create mode 100644 samples/src/main/java/dev/openfunction/samples/hooks/ExampleHook.java diff --git a/functions-framework-api/pom.xml b/functions-framework-api/pom.xml index 279cc849..26b466a8 100644 --- a/functions-framework-api/pom.xml +++ b/functions-framework-api/pom.xml @@ -38,7 +38,7 @@ dev.openfunction.functions functions-framework-api - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT UTF-8 diff --git a/functions-framework-api/src/main/java/dev/openfunction/functions/Component.java b/functions-framework-api/src/main/java/dev/openfunction/functions/Component.java index ff61628b..34779636 100644 --- a/functions-framework-api/src/main/java/dev/openfunction/functions/Component.java +++ b/functions-framework-api/src/main/java/dev/openfunction/functions/Component.java @@ -16,20 +16,35 @@ package dev.openfunction.functions; +import org.apache.commons.lang3.StringUtils; + import java.util.Map; +import java.util.Objects; public class Component { + private static final String ComponentTypeBinding = "bindings"; + private static final String ComponentTypePubsub = "pubsub"; + + @Deprecated private String uri; + private String topic; private String componentName; private String componentType; private Map metadata; private String operation; - + @Deprecated public String getUri() { - return uri; + if (!StringUtils.isBlank(uri)) { + return uri; + } else if (!StringUtils.isBlank(topic)) { + return topic; + } else { + return componentName; + } } + @Deprecated public void setUri(String uri) { this.uri = uri; } @@ -65,5 +80,27 @@ public String getOperation() { public void setOperation(String operation) { this.operation = operation; } + + public String getTopic() { + if (StringUtils.isNotBlank(topic)) { + return topic; + } else if (StringUtils.isNotBlank(uri) && !Objects.equals(uri, componentName)) { + return uri; + } else { + return null; + } + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public boolean isPubsub() { + return componentType.startsWith(ComponentTypePubsub); + } + public boolean isBinding() { + return componentType.startsWith(ComponentTypeBinding); + } + } diff --git a/functions-framework-api/src/main/java/dev/openfunction/functions/Context.java b/functions-framework-api/src/main/java/dev/openfunction/functions/Context.java index 6535e5ad..d3863eb9 100644 --- a/functions-framework-api/src/main/java/dev/openfunction/functions/Context.java +++ b/functions-framework-api/src/main/java/dev/openfunction/functions/Context.java @@ -32,6 +32,7 @@ public interface Context { * @param data Data String * @return Error */ + @Deprecated Error send(String outputName, String data); /** @@ -90,6 +91,13 @@ public interface Context { */ String getHttpPattern(); + /** + * getInputs returns the inputs of function. + * + * @return Inputs + */ + Map getInputs(); + /** * getOutputs returns the Outputs of function. * @@ -112,4 +120,6 @@ public interface Context { * @return Dapr client */ DaprClient getDaprClient(); + + CloudEvent packageAsCloudevent(String payload); } diff --git a/functions-framework-api/src/main/java/dev/openfunction/functions/Hook.java b/functions-framework-api/src/main/java/dev/openfunction/functions/Hook.java new file mode 100644 index 00000000..cf041395 --- /dev/null +++ b/functions-framework-api/src/main/java/dev/openfunction/functions/Hook.java @@ -0,0 +1,55 @@ +/* +Copyright 2022 The OpenFunction Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dev.openfunction.functions; + +import java.util.Map; + +public interface Hook { + /** + * name return the name of this plugin. + * + * @return Plugin name + */ + String name(); + + /** + * version return the version of this plugin. + * + * @return Plugin name + */ + String version(); + + /** + * init will create a new plugin, and execute hook in this calling. + * If you do not want to use a new plugin to execute hook, just return `this`. + * + * @return Plugin + */ + Hook init(); + + /** + * execute executes the hook. + * + * @param ctx Runtime context + * @return error + */ + Error execute(Context ctx); + + Boolean needToTracing(); + + Map tagsAddToTracing(); +} diff --git a/functions-framework-api/src/main/java/dev/openfunction/functions/Plugin.java b/functions-framework-api/src/main/java/dev/openfunction/functions/Plugin.java index d19b3bf5..0ce5a40f 100644 --- a/functions-framework-api/src/main/java/dev/openfunction/functions/Plugin.java +++ b/functions-framework-api/src/main/java/dev/openfunction/functions/Plugin.java @@ -18,6 +18,7 @@ import java.util.Map; +@Deprecated public interface Plugin { /** * name return the name of this plugin. diff --git a/functions-framework-invoker/pom.xml b/functions-framework-invoker/pom.xml index 1771a007..35a70084 100644 --- a/functions-framework-invoker/pom.xml +++ b/functions-framework-invoker/pom.xml @@ -10,7 +10,7 @@ dev.openfunction.functions functions-framework-invoker - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT 3.8.0 @@ -60,7 +60,7 @@ dev.openfunction.functions functions-framework-api - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT io.cloudevents diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/JsonEventFormat.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/JsonEventFormat.java similarity index 99% rename from functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/JsonEventFormat.java rename to functions-framework-invoker/src/main/java/dev/openfunction/invoker/JsonEventFormat.java index 52cb383e..d18d7785 100644 --- a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/JsonEventFormat.java +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/JsonEventFormat.java @@ -14,7 +14,7 @@ limitations under the License. */ -package dev.openfunction.invoker.runtime; +package dev.openfunction.invoker; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/Runner.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/Runner.java index eaa6321e..efb73116 100644 --- a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/Runner.java +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/Runner.java @@ -18,9 +18,10 @@ import dev.openfunction.invoker.context.RuntimeContext; -import dev.openfunction.invoker.runtime.AsynchronousRuntime; -import dev.openfunction.invoker.runtime.Runtime; -import dev.openfunction.invoker.runtime.SynchronizeRuntime; +import dev.openfunction.invoker.trigger.DaprTrigger; +import dev.openfunction.invoker.trigger.HttpTrigger; +import dev.openfunction.invoker.trigger.Trigger; +import org.apache.commons.lang3.StringUtils; import java.io.File; import java.io.IOException; @@ -42,6 +43,7 @@ public class Runner { private static final Logger logger = Logger.getLogger(Runner.class.getName()); private static final String FunctionContext = "FUNC_CONTEXT"; + private static final String FunctionContextV1beta2 = "FUNC_CONTEXT_V1BETA2"; private static final String FunctionTarget = "FUNCTION_TARGET"; private static final String FunctionClasspath = "FUNCTION_CLASSPATH"; @@ -53,26 +55,36 @@ public static void main(String[] args) { } String target = System.getenv(FunctionTarget); - if (!System.getenv().containsKey(FunctionContext)) { - throw new Error(FunctionContext + " not set"); + String functionContext = ""; + if (System.getenv().containsKey(FunctionContext)) { + functionContext = System.getenv(FunctionContext); + } + + if (System.getenv().containsKey(FunctionContextV1beta2)) { + functionContext = System.getenv(FunctionContextV1beta2); + } + + if (StringUtils.isEmpty(functionContext)) { + throw new Error("Function context not set"); } - String functionContext = System.getenv(FunctionContext); String classPath = System.getenv().getOrDefault(FunctionClasspath, System.getProperty("user.dir") + "/*"); ClassLoader functionClassLoader = new URLClassLoader(classpathToUrls(classPath)); RuntimeContext runtimeContext = new RuntimeContext(functionContext, functionClassLoader); - Runtime runtime; Class[] functionClasses = loadTargets(target, functionClassLoader); - if (Objects.equals(runtimeContext.getRuntime(), RuntimeContext.SyncRuntime)) { - runtime = new SynchronizeRuntime(runtimeContext, functionClasses); - } else if (Objects.equals(runtimeContext.getRuntime(), RuntimeContext.AsyncRuntime)) { - runtime = new AsynchronousRuntime(runtimeContext, functionClasses); - } else { - throw new Exception("Unknown runtime"); + Set triggers = new HashSet<>(); + if (runtimeContext.hasHttpTrigger()) { + triggers.add(new HttpTrigger(runtimeContext, functionClasses)); + } + + if (runtimeContext.hasDaprTrigger()) { + triggers.add(new DaprTrigger(runtimeContext, functionClasses)); } - runtime.start(); + for (Trigger trigger : triggers) { + trigger.start(); + } } catch (Exception e) { logger.log(Level.SEVERE, "Failed to run function", e); e.printStackTrace(); @@ -82,7 +94,7 @@ public static void main(String[] args) { private static Class[] loadTargets(String target, ClassLoader functionClassLoader) throws ClassNotFoundException { String[] targets = target.split(","); Class[] classes = new Class[targets.length]; - for (int i=0; i < targets.length; i++) { + for (int i = 0; i < targets.length; i++) { classes[i] = functionClassLoader.loadClass(targets[i]); } diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/FunctionContext.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/FunctionContext.java index b4c1e530..8ef77d22 100644 --- a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/FunctionContext.java +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/FunctionContext.java @@ -27,13 +27,24 @@ class FunctionContext { private Map inputs; private Map outputs; private Map states; + @Deprecated private String runtime; - private String port; + @Deprecated + private String port = "8080"; + @Deprecated private String[] prePlugins; + @Deprecated private String[] postPlugins; + @Deprecated private TracingConfig pluginsTracing; + private String[] preHooks; + private String[] postHooks; + private TracingConfig tracing; + + private Triggers triggers; + public String getName() { return name; } @@ -75,39 +86,56 @@ public void setRuntime(String runtime) { } public String getPort() { - return port; + if (triggers != null && triggers.http != null) { + return triggers.http.port; + } else { + return port; + } } + @Deprecated public void setPort(String port) { this.port = port; } + @Deprecated public String[] getPrePlugins() { return prePlugins; } + @Deprecated public void setPrePlugins(String[] prePlugins) { this.prePlugins = prePlugins; } + @Deprecated public String[] getPostPlugins() { return postPlugins; } + @Deprecated public void setPostPlugins(String[] postPlugins) { this.postPlugins = postPlugins; } + @Deprecated public TracingConfig getPluginsTracing() { return pluginsTracing; } + @Deprecated public void setPluginsTracing(TracingConfig pluginsTracing) { this.pluginsTracing = pluginsTracing; } public boolean isTracingEnabled() { - return pluginsTracing != null && pluginsTracing.isEnabled(); + if (tracing != null ) { + return tracing.isEnabled(); + } else if (pluginsTracing != null) { + return pluginsTracing.isEnabled(); + } else { + return false; + } } public Map getStates() { @@ -117,4 +145,109 @@ public Map getStates() { public void setStates(Map states) { this.states = states; } + + public String[] getPreHooks() { + return preHooks; + } + + public void setPreHooks(String[] preHooks) { + this.preHooks = preHooks; + } + + public String[] getPostHooks() { + return postHooks; + } + + public void setPostHooks(String[] postHooks) { + this.postHooks = postHooks; + } + + public TracingConfig getTracing() { + return tracing; + } + + public void setTracing(TracingConfig tracing) { + this.tracing = tracing; + } + + public Triggers getTriggers() { + return triggers; + } + + public void setTriggers(Triggers triggers) { + this.triggers = triggers; + } + + static class Triggers { + private HttpTrigger http; + private DaprTrigger[] dapr; + + public HttpTrigger getHttp() { + return http; + } + + public void setHttp(HttpTrigger http) { + this.http = http; + } + + public DaprTrigger[] getDapr() { + return dapr; + } + + public void setDapr(DaprTrigger[] dapr) { + this.dapr = dapr; + } + } + + static class HttpTrigger { + private String port; + + + public String getPort() { + return port; + } + + public void setPort(String port) { + this.port = port; + } + } + + static class DaprTrigger { + private String name; + private String type; + private String topic; + private String inputName; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getInputName() { + return inputName; + } + + public void setInputName(String inputName) { + this.inputName = inputName; + } + } } diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/RuntimeContext.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/RuntimeContext.java index 2b458644..8a0cec8d 100644 --- a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/RuntimeContext.java +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/RuntimeContext.java @@ -16,19 +16,21 @@ package dev.openfunction.invoker.context; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import dev.openfunction.functions.*; import dev.openfunction.invoker.Callback; -import dev.openfunction.invoker.runtime.JsonEventFormat; +import dev.openfunction.invoker.JsonEventFormat; import dev.openfunction.invoker.tracing.OpenTelemetryProvider; import dev.openfunction.invoker.tracing.SkywalkingProvider; import dev.openfunction.invoker.tracing.TracingProvider; import io.cloudevents.CloudEvent; import io.cloudevents.core.provider.EventFormatProvider; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.ArrayUtils; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.logging.Level; import java.util.logging.Logger; @@ -38,28 +40,31 @@ public class RuntimeContext { static final String PodNameEnvName = "POD_NAME"; static final String PodNamespaceEnvName = "POD_NAMESPACE"; + + @Deprecated public static final String SyncRuntime = "Knative"; + @Deprecated public static final String AsyncRuntime = "Async"; private static final String TracingSkywalking = "skywalking"; private static final String TracingOpentelemetry = "opentelemetry"; private final FunctionContext functionContext; - private final int port; - private Map prePlugins; - private Map postPlugins; private TracingProvider tracingProvider; - public RuntimeContext(String context, ClassLoader classLoader) throws Exception { - functionContext = new ObjectMapper().readValue(context, FunctionContext.class); + private Map preHooks; + private Map postHooks; - prePlugins = new HashMap<>(); - postPlugins = new HashMap<>(); + public RuntimeContext(String context, ClassLoader classLoader) throws Exception { + functionContext = new ObjectMapper(). + configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false). + readValue(context, FunctionContext.class); - port = Integer.parseInt(functionContext.getPort()); + preHooks = new HashMap<>(); + postHooks = new HashMap<>(); - loadPlugins(classLoader); + loadHooks(classLoader); if (functionContext.isTracingEnabled() && functionContext.getPluginsTracing().getProvider() != null) { String provider = functionContext.getPluginsTracing().getProvider().getName(); @@ -72,7 +77,7 @@ public RuntimeContext(String context, ClassLoader classLoader) throws Exception tracingProvider = new SkywalkingProvider(); case TracingOpentelemetry: tracingProvider = new OpenTelemetryProvider(functionContext.getPluginsTracing(), - getName(), + functionContext.getName(), System.getenv(RuntimeContext.PodNameEnvName), System.getenv(RuntimeContext.PodNamespaceEnvName)); } @@ -81,71 +86,59 @@ public RuntimeContext(String context, ClassLoader classLoader) throws Exception EventFormatProvider.getInstance().registerFormat(new JsonEventFormat()); } - private void loadPlugins(ClassLoader classLoader) { - prePlugins = loadPlugins(classLoader, functionContext.getPrePlugins()); - postPlugins = loadPlugins(classLoader, functionContext.getPostPlugins()); - } + private void loadHooks(ClassLoader classLoader) { + String[] preHookNames = functionContext.getPreHooks(); + if (ArrayUtils.isEmpty(preHookNames)) { + preHookNames = functionContext.getPrePlugins(); + } - private Map loadPlugins(ClassLoader classLoader, String[] pluginNames) { - Map plugins = new HashMap<>(); - if (pluginNames == null) { - return plugins; + String[] postHookNames = functionContext.getPostHooks(); + if (ArrayUtils.isEmpty(postHookNames)) { + postHookNames = functionContext.getPostPlugins(); } + preHooks = loadHooks(classLoader, preHookNames); + postHooks = loadHooks(classLoader, postHookNames); + } - for (String name : pluginNames) { - if (Objects.equals(name, TracingOpentelemetry) || Objects.equals(name, TracingSkywalking)) { - continue; - } + private Map loadHooks(ClassLoader classLoader, String[] hookNames) { + Map hooks = new HashMap<>(); + if (ArrayUtils.isEmpty(hookNames)) { + return hooks; + } + for (String name : hookNames) { try { - Class pluginClass = classLoader.loadClass(name); - Class pluginImplClass = pluginClass.asSubclass(Plugin.class); - plugins.put(name, pluginImplClass.getConstructor().newInstance()); + Class hookClass = classLoader.loadClass(name); + if (Hook.class.isAssignableFrom(hookClass)) { + Class hookImplClass = hookClass.asSubclass(Hook.class); + hooks.put(name, hookImplClass.getConstructor().newInstance()); + } + + if (Plugin.class.isAssignableFrom(hookClass)) { + Class pluginImplClass = hookClass.asSubclass(Plugin.class); + hooks.put(name, pluginImplClass.getConstructor().newInstance()); + } } catch (Exception e) { - logger.log(Level.WARNING, "load plugin " + name + " error, " + e.getMessage()); + logger.log(Level.WARNING, "load hook " + name + " error, " + e.getMessage()); e.printStackTrace(); } } - return plugins; - } - - public Map getPrePlugins() { - return prePlugins; - } - - public Map getPostPlugins() { - return postPlugins; + return hooks; } public int getPort() { - return port; + return Integer.parseInt(functionContext.getPort()); } public String getName() { return functionContext.getName(); } - public String getRuntime() { - return functionContext.getRuntime(); - } - public Map getInputs() { return functionContext.getInputs(); } - public Map getOutputs() { - return functionContext.getOutputs(); - } - - public Map getStates() { - return functionContext.getStates(); - } - - public TracingProvider getTracingProvider() { - return tracingProvider; - } - public void executeWithTracing(Object obj, Callback callback) throws Exception { if (tracingProvider != null) { if (obj == null) { @@ -162,6 +155,8 @@ public void executeWithTracing(Object obj, Callback callback) throws Exception { tracingProvider.executeWithTracing((UserContext) obj, callback); } else if (obj instanceof Plugin) { tracingProvider.executeWithTracing((Plugin) obj, callback); + } else if (obj instanceof Hook) { + tracingProvider.executeWithTracing((Hook) obj, callback); } } else { Error error = callback.execute(); @@ -170,4 +165,60 @@ public void executeWithTracing(Object obj, Callback callback) throws Exception { } } } + + public Map getPreHooks() { + return preHooks; + } + + public Map getPostHooks() { + return postHooks; + } + + public boolean hasHttpTrigger() { + if (Objects.equals(functionContext.getRuntime(), SyncRuntime)) { + return true; + } + + return functionContext.getTriggers() != null && functionContext.getTriggers().getHttp() != null; + } + + public boolean hasDaprTrigger() { + if (Objects.equals(functionContext.getRuntime(), AsyncRuntime)) { + return true; + } + + return functionContext.getTriggers() != null && ArrayUtils.isEmpty(functionContext.getTriggers().getDapr()); + } + + public Map getDaprTrigger() { + if (Objects.equals(functionContext.getRuntime(), AsyncRuntime)) { + return functionContext.getInputs(); + } + + if (functionContext.getTriggers() != null && + ArrayUtils.isNotEmpty(functionContext.getTriggers().getDapr())) { + Map triggers = new HashMap<>(); + for (FunctionContext.DaprTrigger trigger : functionContext.getTriggers().getDapr()) { + Component component = new Component(); + component.setComponentName(trigger.getName()); + component.setComponentType(trigger.getType()); + component.setTopic(trigger.getTopic()); + triggers.put(component.getComponentName(), component); + } + + return triggers; + } + + return null; + } + + public FunctionContext getFunctionContext() { + return functionContext; + } + + public boolean needToCreateDaprClient() { + return (MapUtils.isNotEmpty(functionContext.getInputs())) || + (MapUtils.isNotEmpty(functionContext.getOutputs())) || + (MapUtils.isNotEmpty(functionContext.getStates())); + } } diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/UserContext.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/UserContext.java index c8925343..b8d14f32 100644 --- a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/UserContext.java +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/context/UserContext.java @@ -18,7 +18,7 @@ import dev.openfunction.functions.*; import dev.openfunction.invoker.Callback; -import dev.openfunction.invoker.runtime.JsonEventFormat; +import dev.openfunction.invoker.JsonEventFormat; import io.cloudevents.CloudEvent; import io.cloudevents.core.v03.CloudEventBuilder; import io.dapr.client.DaprClient; @@ -33,17 +33,14 @@ public class UserContext implements Context { private static final Logger logger = Logger.getLogger("dev.openfunction.invoker"); - public static final String OpenFuncBinding = "bindings"; - public static final String OpenFuncTopic = "pubsub"; - - private static final Set MiddlewaresCloudEventFormatReqired = Set.of( - "kafka", - "kubemq", - "mqtt3", - "rabbitmq", - "redis", - "gcp.pubsub", - "azure.eventhubs" + private static final Set MiddlewaresCloudEventFormatRequired = Set.of( + "bindings.kafka", + "bindings.kubemq", + "bindings.mqtt3", + "bindings.rabbitmq", + "bindings.redis", + "bindings.gcp.pubsub", + "bindings.azure.eventhubs" ); private final RuntimeContext runtimeContext; @@ -82,11 +79,12 @@ public UserContext withTopicEvent(TopicEvent event) { } @Override + @Deprecated public Error send(String outputName, String data) { if (data == null) { return null; } - Map outputs = runtimeContext.getOutputs(); + Map outputs = runtimeContext.getFunctionContext().getOutputs(); if (outputs.isEmpty()) { return new Error("no output"); } @@ -96,21 +94,14 @@ public Error send(String outputName, String data) { return new Error("output " + outputName + " not found"); } - if (output.getComponentType().startsWith(OpenFuncTopic)) { - daprClient.publishEvent(output.getComponentName(), output.getUri(), data); - } else if (output.getComponentType().startsWith(OpenFuncBinding)) { + if (output.isPubsub()) { + daprClient.publishEvent(output.getComponentName(), output.getTopic(), data); + } else if (output.isBinding()) { // If a middleware supports both binding and pubsub, then the data send to // binding must be in CloudEvent format, otherwise pubsub cannot parse the data. byte[] payload = data.getBytes(); - if (MiddlewaresCloudEventFormatReqired.contains(output.getComponentType().substring(OpenFuncBinding.length() + 1))) { - CloudEvent event = new CloudEventBuilder() - .withId(UUID.randomUUID().toString()) - .withType("dapr.invoke") - .withSource(URI.create("openfunction/invokeBinding")) - .withData(data.getBytes()) - .withDataContentType(JsonEventFormat.CONTENT_TYPE) - .withSubject(output.getUri()) - .build(); + if (MiddlewaresCloudEventFormatRequired.contains(output.getComponentType())) { + CloudEvent event = packageAsCloudevent(data); payload = new JsonEventFormat().serialize(event); } @@ -149,7 +140,7 @@ public CloudEvent getCloudEvent() { @Override public String getName() { - return runtimeContext.getName(); + return runtimeContext.getFunctionContext().getName(); } @Override @@ -164,12 +155,12 @@ public String getHttpPattern() { @Override public Map getOutputs() { - return runtimeContext.getOutputs(); + return runtimeContext.getFunctionContext().getOutputs(); } @Override public Map getStates() { - return runtimeContext.getStates(); + return runtimeContext.getFunctionContext().getStates(); } @Override @@ -177,6 +168,18 @@ public DaprClient getDaprClient() { return daprClient; } + @Override + public CloudEvent packageAsCloudevent(String payload) { + return new CloudEventBuilder() + .withId(UUID.randomUUID().toString()) + .withType("dapr.invoke") + .withSource(URI.create("openfunction/invokeBinding")) + .withData(payload.getBytes()) + .withDataContentType(JsonEventFormat.CONTENT_TYPE) + .build(); + } + + @Override public Map getInputs() { return runtimeContext.getInputs(); } @@ -185,44 +188,67 @@ public Class getFunctionClass() { return function.getClass(); } - private void executePrePlugins() throws Exception { - for (String name : runtimeContext.getPrePlugins().keySet()) { - Plugin plugin = runtimeContext.getPrePlugins().get(name).init(); - if (plugin.needToTracing()) { - runtimeContext.executeWithTracing(plugin, () -> { - Error error = plugin.execPreHook(UserContext.this); - if (error != null) { - logger.log(Level.SEVERE, "execute plugin " + plugin.name() + ":" + plugin.version() + " error", error); - } - - return error; - }); - } else { - Error error = plugin.execPreHook(UserContext.this); + private void executeHooks(boolean pre) throws Exception { + Map hooks; + if (pre) { + hooks = runtimeContext.getPreHooks(); + } else { + hooks = runtimeContext.getPostHooks(); + } + for (String name : hooks.keySet()) { + Object obj = hooks.get(name); + if (Hook.class.isAssignableFrom(obj.getClass())) { + executeHook(((Hook) obj).init()); + } + + if (Plugin.class.isAssignableFrom(obj.getClass())) { + executePlugin(((Plugin) obj).init(), pre); + } + } + } + + private void executePlugin(Plugin plugin, boolean pre) throws Exception { + if (plugin.needToTracing()) { + runtimeContext.executeWithTracing(plugin, () -> { + Error error; + if (pre) { + error = plugin.execPreHook(UserContext.this); + } else { + error = plugin.execPostHook(UserContext.this); + } if (error != null) { logger.log(Level.SEVERE, "execute plugin " + plugin.name() + ":" + plugin.version() + " error", error); } + + return error; + }); + } else { + Error error; + if (pre) { + error = plugin.execPreHook(UserContext.this); + } else { + error = plugin.execPostHook(UserContext.this); + } + if (error != null) { + logger.log(Level.SEVERE, "execute plugin " + plugin.name() + ":" + plugin.version() + " error", error); } } } - private void executePostPlugins() throws Exception { - for (String name : runtimeContext.getPostPlugins().keySet()) { - Plugin plugin = runtimeContext.getPostPlugins().get(name).init(); - if (plugin.needToTracing()) { - runtimeContext.executeWithTracing(plugin, () -> { - Error error = plugin.execPostHook(UserContext.this); - if (error != null) { - logger.log(Level.SEVERE, "execute plugin " + plugin.name() + ":" + plugin.version() + " error", error); - } - - return error; - }); - } else { - Error error = plugin.execPostHook(UserContext.this); + private void executeHook(Hook hook) throws Exception { + if (hook.needToTracing()) { + runtimeContext.executeWithTracing(hook, () -> { + Error error = hook.execute(UserContext.this); if (error != null) { - logger.log(Level.SEVERE, "execute plugin " + plugin.name() + ":" + plugin.version() + " error", error); + logger.log(Level.SEVERE, "execute hook " + hook.name() + ":" + hook.version() + " error", error); } + + return error; + }); + } else { + Error error = hook.execute(UserContext.this); + if (error != null) { + logger.log(Level.SEVERE, "execute hook " + hook.name() + ":" + hook.version() + " error", error); } } } @@ -272,9 +298,9 @@ public void executeFunction(OpenFunction function, String payload) throws Except private void executeFunction(Callback callBack) throws Exception { runtimeContext.executeWithTracing(this, () -> { - executePrePlugins(); + executeHooks(true); runtimeContext.executeWithTracing(null, callBack); - executePostPlugins(); + executeHooks(false); return null; }); } diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/OpenTelemetryProvider.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/OpenTelemetryProvider.java index 5dccb9a1..50d7a107 100644 --- a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/OpenTelemetryProvider.java +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/OpenTelemetryProvider.java @@ -217,17 +217,21 @@ public void executeWithTracing(Plugin plugin, Callback callback) throws Exceptio } @Override - public void executeWithTracing(UserContext ctx, Callback callback) throws Exception { - SpanKind kind = SpanKind.SERVER; - if (ctx.getHttpRequest() == null) { - Map inputs = ctx.getInputs(); - if (inputs != null && !inputs.isEmpty()) { - kind = SpanKind.CONSUMER; - } else { - kind = SpanKind.PRODUCER; - } + public void executeWithTracing(Hook hook, Callback callback) throws Exception { + Map tags = new HashMap<>(); + tags.put("kind", "Hook"); + tags.put("name", hook.name()); + tags.put("version", hook.version()); + if (hook.tagsAddToTracing() != null) { + tags.putAll(hook.tagsAddToTracing()); } + executeWithTracing(hook.name(), SpanKind.INTERNAL, tags, callback); + } + + @Override + public void executeWithTracing(UserContext ctx, Callback callback) throws Exception { + SpanKind kind = SpanKind.SERVER; Map tags = new HashMap<>(); tags.put("function", ctx.getFunctionClass().getName()); diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/SkywalkingProvider.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/SkywalkingProvider.java index c349fcc0..8b102300 100644 --- a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/SkywalkingProvider.java +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/SkywalkingProvider.java @@ -16,10 +16,7 @@ package dev.openfunction.invoker.tracing; -import dev.openfunction.functions.BindingEvent; -import dev.openfunction.functions.HttpRequest; -import dev.openfunction.functions.Plugin; -import dev.openfunction.functions.TopicEvent; +import dev.openfunction.functions.*; import dev.openfunction.invoker.Callback; import dev.openfunction.invoker.context.UserContext; import io.cloudevents.CloudEvent; @@ -69,6 +66,11 @@ public void executeWithTracing(Plugin plugin, Callback callback) throws Exceptio } + @Override + public void executeWithTracing(Hook hook, Callback callback) throws Exception { + + } + @Override public void executeWithTracing(UserContext ctx, Callback callback) throws Exception { diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/TracingProvider.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/TracingProvider.java index 9bf54b20..efaf50c5 100644 --- a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/TracingProvider.java +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/tracing/TracingProvider.java @@ -1,9 +1,6 @@ package dev.openfunction.invoker.tracing; -import dev.openfunction.functions.BindingEvent; -import dev.openfunction.functions.HttpRequest; -import dev.openfunction.functions.Plugin; -import dev.openfunction.functions.TopicEvent; +import dev.openfunction.functions.*; import dev.openfunction.invoker.Callback; import dev.openfunction.invoker.context.UserContext; import io.cloudevents.CloudEvent; @@ -23,5 +20,7 @@ public interface TracingProvider { void executeWithTracing(Plugin plugin, Callback callback)throws Exception; + void executeWithTracing(Hook hook, Callback callback)throws Exception; + void executeWithTracing(UserContext ctx, Callback callback)throws Exception; } diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/AsynchronousRuntime.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/trigger/DaprTrigger.java similarity index 86% rename from functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/AsynchronousRuntime.java rename to functions-framework-invoker/src/main/java/dev/openfunction/invoker/trigger/DaprTrigger.java index c2eaf133..6f5abffc 100644 --- a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/AsynchronousRuntime.java +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/trigger/DaprTrigger.java @@ -14,7 +14,7 @@ limitations under the License. */ -package dev.openfunction.invoker.runtime; +package dev.openfunction.invoker.trigger; import com.google.protobuf.Value; import dev.openfunction.functions.BindingEvent; @@ -30,6 +30,7 @@ import io.grpc.Server; import io.grpc.ServerBuilder; import io.grpc.stub.StreamObserver; +import org.apache.commons.collections.MapUtils; import java.util.*; import java.util.logging.Level; @@ -38,8 +39,7 @@ /** * Executes the user's asynchronous function. */ -public final class AsynchronousRuntime implements Runtime { - +public final class DaprTrigger implements Trigger { private static final Logger logger = Logger.getLogger("dev.openfunction.invoker"); private final RuntimeContext runtimeContext; @@ -48,7 +48,7 @@ public final class AsynchronousRuntime implements Runtime { private final Service service; - public AsynchronousRuntime(RuntimeContext runtimeContext, Class[] functionClasses) { + public DaprTrigger(RuntimeContext runtimeContext, Class[] functionClasses) { this.runtimeContext = runtimeContext; functions = new ArrayList<>(); @@ -70,10 +70,8 @@ public AsynchronousRuntime(RuntimeContext runtimeContext, Class[] functionCla @Override public void start() throws Exception { - Map inputs = runtimeContext.getInputs(); - - if (inputs == null || inputs.isEmpty()) { - throw new Error("no inputs defined for the function"); + if (MapUtils.isEmpty(runtimeContext.getDaprTrigger())) { + throw new Error("no dapr trigger defined for the function"); } this.service.start(runtimeContext.getPort()); @@ -90,7 +88,6 @@ private class Service extends AppCallbackGrpc.AppCallbackImplBase { private DaprClient daprClient; public void start(int port) throws Exception { - daprServer = ServerBuilder .forPort(port) .addService(Service.this) @@ -114,15 +111,15 @@ public void start(int port) throws Exception { public void listInputBindings(com.google.protobuf.Empty request, io.grpc.stub.StreamObserver responseObserver) { - List inputs = new ArrayList<>(); - for (String key : runtimeContext.getInputs().keySet()) { - Component input = runtimeContext.getInputs().get(key); - if (input.getComponentType().startsWith(UserContext.OpenFuncBinding)) { - inputs.add(runtimeContext.getInputs().get(key).getComponentName()); + List bindings = new ArrayList<>(); + for (String key : runtimeContext.getDaprTrigger().keySet()) { + Component component = runtimeContext.getDaprTrigger().get(key); + if (component.isBinding()) { + bindings.add(component.getComponentName()); } } - responseObserver.onNext(DaprAppCallbackProtos.ListInputBindingsResponse.newBuilder().addAllBindings(inputs).build()); + responseObserver.onNext(DaprAppCallbackProtos.ListInputBindingsResponse.newBuilder().addAllBindings(bindings).build()); responseObserver.onCompleted(); } @@ -154,10 +151,10 @@ public void onBindingEvent(DaprAppCallbackProtos.BindingEventRequest request, public void listTopicSubscriptions(com.google.protobuf.Empty request, io.grpc.stub.StreamObserver responseObserver) { List subscriptions = new ArrayList<>(); - for (String key : runtimeContext.getInputs().keySet()) { - Component input = runtimeContext.getInputs().get(key); - if (input.getComponentType().startsWith(UserContext.OpenFuncTopic)) { - subscriptions.add(DaprAppCallbackProtos.TopicSubscription.newBuilder().setTopic(input.getUri()).setPubsubName(input.getComponentName()).build()); + for (String key : runtimeContext.getDaprTrigger().keySet()) { + Component component = runtimeContext.getDaprTrigger().get(key); + if (component.isPubsub()) { + subscriptions.add(DaprAppCallbackProtos.TopicSubscription.newBuilder().setTopic(component.getTopic()).setPubsubName(component.getComponentName()).build()); } } responseObserver.onNext(DaprAppCallbackProtos.ListTopicSubscriptionsResponse.newBuilder().addAllSubscriptions(subscriptions).build()); diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/SynchronizeRuntime.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/trigger/HttpTrigger.java similarity index 92% rename from functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/SynchronizeRuntime.java rename to functions-framework-invoker/src/main/java/dev/openfunction/invoker/trigger/HttpTrigger.java index 6c273e02..b7bc80ac 100644 --- a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/SynchronizeRuntime.java +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/trigger/HttpTrigger.java @@ -14,7 +14,7 @@ limitations under the License. */ -package dev.openfunction.invoker.runtime; +package dev.openfunction.invoker.trigger; import dev.openfunction.functions.CloudEventFunction; import dev.openfunction.functions.HttpFunction; @@ -45,7 +45,7 @@ /** * Executes the user's synchronize method. */ -public class SynchronizeRuntime extends HttpServlet implements Runtime { +public class HttpTrigger extends HttpServlet implements Trigger { private static final Logger logger = Logger.getLogger("dev.openfunction..invoker"); private final Class[] functionClasses; @@ -54,18 +54,16 @@ public class SynchronizeRuntime extends HttpServlet implements Runtime { private DaprClient daprClient; - public SynchronizeRuntime(RuntimeContext runtimeContext, Class[] functionClasses) { + public HttpTrigger(RuntimeContext runtimeContext, Class[] functionClasses) { this.runtimeContext = runtimeContext; this.functionClasses = functionClasses; } @Override public void start() throws Exception { - if ((runtimeContext.getInputs() != null && !runtimeContext.getInputs().isEmpty()) || - (runtimeContext.getOutputs() != null && !runtimeContext.getOutputs().isEmpty()) || - (runtimeContext.getStates() != null && !runtimeContext.getStates().isEmpty())) { + if (runtimeContext.needToCreateDaprClient()) { daprClient = new DaprClientBuilder().build(); - daprClient.waitForSidecar(Runtime.WaitDaprSidecarTimeout); + daprClient.waitForSidecar(Trigger.WaitDaprSidecarTimeout); } ServletContextHandler handler = new ServletContextHandler(); diff --git a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/Runtime.java b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/trigger/Trigger.java similarity index 90% rename from functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/Runtime.java rename to functions-framework-invoker/src/main/java/dev/openfunction/invoker/trigger/Trigger.java index a957209c..e82f2265 100644 --- a/functions-framework-invoker/src/main/java/dev/openfunction/invoker/runtime/Runtime.java +++ b/functions-framework-invoker/src/main/java/dev/openfunction/invoker/trigger/Trigger.java @@ -14,9 +14,9 @@ limitations under the License. */ -package dev.openfunction.invoker.runtime; +package dev.openfunction.invoker.trigger; -public interface Runtime { +public interface Trigger { int WaitDaprSidecarTimeout = 60000; diff --git a/samples/pom.xml b/samples/pom.xml index abab19d6..031f8660 100644 --- a/samples/pom.xml +++ b/samples/pom.xml @@ -31,7 +31,7 @@ dev.openfunction.functions functions-framework-api - 1.1.0-SNAPSHOT + 1.2.0-SNAPSHOT diff --git a/samples/src/main/java/dev/openfunction/samples/OpenFunctionImpl.java b/samples/src/main/java/dev/openfunction/samples/OpenFunctionImpl.java index 827bedb7..8c685ab3 100644 --- a/samples/src/main/java/dev/openfunction/samples/OpenFunctionImpl.java +++ b/samples/src/main/java/dev/openfunction/samples/OpenFunctionImpl.java @@ -16,9 +16,11 @@ package dev.openfunction.samples; +import dev.openfunction.functions.Component; import dev.openfunction.functions.OpenFunction; import dev.openfunction.functions.Context; import dev.openfunction.functions.Out; +import io.dapr.client.DaprClient; public class OpenFunctionImpl implements OpenFunction { @@ -26,11 +28,19 @@ public class OpenFunctionImpl implements OpenFunction { public Out accept(Context context, String payload) throws Exception { System.out.printf("receive event: %s", payload).println(); + DaprClient daprClient = context.getDaprClient(); + if (daprClient == null) { + return new Out(); + } + if (context.getOutputs() != null) { for (String key : context.getOutputs().keySet()) { - Error error = context.send(key, payload); - if (error != null) { - System.out.println("send to output " + key + " error, " + error.getMessage()); + Component output = context.getOutputs().get(key); + if (output.isPubsub()) { + daprClient.publishEvent(output.getComponentName(), output.getTopic(), payload, output.getMetadata()); + } else if (output.isBinding()) { + // We recommend using CloudEvents to pass data between Dapr components. + daprClient.invokeBinding(output.getComponentName(), output.getOperation(), context.packageAsCloudevent(payload)); } } } diff --git a/samples/src/main/java/dev/openfunction/samples/hooks/ExampleHook.java b/samples/src/main/java/dev/openfunction/samples/hooks/ExampleHook.java new file mode 100644 index 00000000..15e06013 --- /dev/null +++ b/samples/src/main/java/dev/openfunction/samples/hooks/ExampleHook.java @@ -0,0 +1,74 @@ +/* +Copyright 2022 The OpenFunction Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dev.openfunction.samples.hooks; + +import dev.openfunction.functions.Context; +import dev.openfunction.functions.Hook; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Map; + +public class ExampleHook implements Hook { + private int seq = 0; + + @Override + public String name() { + return "hook-example"; + } + + @Override + public String version() { + return "v1.0.0"; + } + + @Override + public Hook init() { + return this; + } + + @Override + public Error execute(Context ctx) { + String ts = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.XXX").format(new Date()); + if (ctx.getBindingEvent() != null) { + System.out.printf("hook %s:%s exec for binding %s at %s, seq %d, function %s", name(), version(), ctx.getBindingEvent().getName(), ts, seq, ctx.getName()).println(); + } else if (ctx.getTopicEvent() != null) { + System.out.printf("hook %s:%s exec for pubsub %s at %s, seq %d, function %s", name(), version(), ctx.getTopicEvent().getName(), ts, seq, ctx.getName()).println(); + } else if (ctx.getHttpRequest() != null) { + if (ctx.getCloudEvent() != null) { + System.out.printf("hook %s:%s exec for cloudevent function %s at %s, seq %d", name(), version(), ctx.getName(), ts, seq).println(); + } else { + System.out.printf("hook %s:%s exec for http function %s at %s, seq %d", name(), version(), ctx.getName(), ts, seq).println(); + } + } else { + System.out.println("unknown function type"); + } + seq++; + + return null; + } + + @Override + public Boolean needToTracing() { + return true; + } + + @Override + public Map tagsAddToTracing() { + return null; + } +} diff --git a/samples/src/main/java/dev/openfunction/samples/plugins/ExamplePlugin.java b/samples/src/main/java/dev/openfunction/samples/plugins/ExamplePlugin.java index a978fdef..d69b097a 100644 --- a/samples/src/main/java/dev/openfunction/samples/plugins/ExamplePlugin.java +++ b/samples/src/main/java/dev/openfunction/samples/plugins/ExamplePlugin.java @@ -56,14 +56,14 @@ public Error execPostHook(Context ctx) { private void execHook(Context ctx, String type) { String ts = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.XXX").format(new Date()); if (ctx.getBindingEvent() != null) { - System.out.printf("plugin %s:%s exec %s hook for binding %s at %s, seq %d", name(), version(), type, ctx.getBindingEvent().getName(), ts, seq).println(); + System.out.printf("%s plugin %s:%s exec for binding %s at %s, seq %d, function %s", type, name(), version(), ctx.getBindingEvent().getName(), ts, seq, ctx.getName()).println(); } else if (ctx.getTopicEvent() != null) { - System.out.printf("plugin %s:%s exec %s hook for pubsub %s at %s, seq %d", name(), version(), type, ctx.getTopicEvent().getName(), ts, seq).println(); + System.out.printf("%s plugin %s:%s exec for pubsub %s at %s, seq %d, function %s", type, name(), version(), ctx.getTopicEvent().getName(), ts, seq, ctx.getName()).println(); } else if (ctx.getHttpRequest() != null) { if (ctx.getCloudEvent() != null) { - System.out.printf("plugin %s:%s exec %s hook for cloudevent function at %s, seq %d", name(), version(), type, ts, seq).println(); + System.out.printf("%s plugin %s:%s exec for cloudevent function %s at %s, seq %d", type, name(), version(), ctx.getName(), ts, seq).println(); } else { - System.out.printf("plugin %s:%s exec %s hook for http function at %s, seq %d", name(), version(), type, ts, seq).println(); + System.out.printf("%s plugin %s:%s exec for http function %s at %s, seq %d", type, name(), version(), ctx.getName(), ts, seq).println(); } } else { System.out.println("unknown function type");