Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Funqy KNative events support for attribute filtering on incoming events #17655

Merged
merged 1 commit into from
Jun 30, 2021

Conversation

mswiderski
Copy link
Contributor

No description provided.

@geoand
Copy link
Contributor

geoand commented Jun 3, 2021

cc @matejvasek

import io.quarkus.funqy.knative.events.EventAttribute;
import io.smallrye.mutiny.Uni;

public class WithAttributeFilter {
Copy link
Contributor

@matejvasek matejvasek Jun 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mswiderski as I understand it, it is possible to filter on multiple attributes. Test cases should test that too (e.g. filter on source and subject).

Is dynamic dispatch supported? What I mean by dynamic dispatch: if there are two methods with different filters the runtime can pick automatically matching function (same way it works for trigger(type) now).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@matejvasek test case for filtering by multiple attributes is here https://github.com/quarkusio/quarkus/pull/17655/files#diff-02c86bd7c06f51dbb63cbeb424137c6685d25ab51722469c3c175219bd9a6b44R66 it tests one standard CE attribute (source) and one extension attribute (custom)

yes, dynamic dispatch is there as function invokers are grouped by type and then filtered out based on matching attributes if any are defined.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test case for filtering by multiple attributes is here

sorry I missed that @EventAttribute(name = "custom", value = "hello") }) in diff 🤦‍♂️

yes, dynamic dispatch is there as function invokers are grouped by type and then filtered out based on matching attributes if any are defined.

Is this tested? I see two functions with different type (so dispatch is carried over it). Maybe there could be another function toSemicolonSeparated() with the same trigger as toCommaSeparated() but with different source.

Also is it possible that some filters are subset of another meaning that multiple functions match? If so which one is picked? The first declared?

Note I didn't review actual code. I am now just looking at test I trying to understand what it's supposed to do not how.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this tested? I see two functions with different type (so dispatch is carried over it). Maybe there could be another function toSemicolonSeparated() with the same trigger as toCommaSeparated() but with different source.

you're right there is no test covering that and I agree that it makes sense to have it in

Also is it possible that some filters are subset of another meaning that multiple functions match? If so which one is picked? The first declared?

yes, there can be multiple functions matching and at the moment they are all invoked. This might be to optimistic as it only makes sense to invoke all if they are without output.
So I think we can do following

  • invoke all if they don't have the output
  • throw an error (409 - conflict) in case there are multiple functions matching and have outputs

what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like having different behavior for functions with and without output.

Maybe we could pick function with the most specific filter.
Example: if there is a filter on source and a filter on source & subject, the latter would be picked.
This assumes that functions with identical filter doesn't exist (it could be compilation error maybe?)
Also this approach might not work with the CESQL if we decided to implement it in future,
because detection whether some expression is a specialization of some other expression might be difficult/impossible.

So IMHO most simple solution would be outright forbid multiple matches (preferably in compile time).

@patriot1burke wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@matejvasek I have updated tests to cover what we discussed and now there is also build check if there are no duplicates for the same type and filters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still not sure what should we do with non-equivalent but conflicting. Shall we also forbid that too? wdyt @patriot1burke

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@matejvasek could you please give an example of "non-equivalent but conflicting"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is a filter on source and a filter on source & subject

I mean this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @matejvasek I will try to cover this case as well


import io.quarkus.funqy.knative.events.CloudEvent;

public class CEAttributeLiteralEqualsFilter implements Predicate<Object> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we merge these two filters?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also could be this Predicate<CloudEvent> ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it could be merged indeed. About the type to be set to CloudEvent it then collides with the main type which is Predicate<Object> as it resides outside of the Knative events module. So I kept it really generic (Object). If needed I could make it more specific by introducing an interface that the cloud event would implement. Not sure it's worth it....

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason why we should modify code outside Knative events module. Changes done in the Common module are used only in the Knative events module. The type/trigger info is not stored in Common so the filters shouldn't be.

diff --git a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/KnativeEventsBindingRecorder.java b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/KnativeEventsBindingRecorder.java
index ec3b1af7ee..8d39417d14 100644
--- a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/KnativeEventsBindingRecorder.java
+++ b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/KnativeEventsBindingRecorder.java
@@ -57,6 +57,7 @@ public class KnativeEventsBindingRecorder {
     private static ObjectMapper objectMapper;
     private static QueryObjectMapper queryMapper;
     private static Map<String, Collection<FunctionInvoker>> typeTriggers;
+    private static Map<FunctionInvoker, List<Predicate<CloudEvent>>> invokersFilters;
 
     public static final String RESPONSE_TYPE = "response.cloud.event.type";
     public static final String RESPONSE_SOURCE = "response.cloud.event.source";
@@ -67,6 +68,7 @@ public class KnativeEventsBindingRecorder {
 
     public void init() {
         typeTriggers = new HashMap<>();
+        invokersFilters = new HashMap<>();
         objectMapper = getObjectMapper()
                 .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
                 .configure(MapperFeature.ACCEPT_CASE_INSENSITIVE_PROPERTIES, true);
@@ -75,12 +77,15 @@ public class KnativeEventsBindingRecorder {
             Method method = invoker.getMethod();
             String trigger;
             CloudEventMapping annotation = method.getAnnotation(CloudEventMapping.class);
+            final List<Predicate<CloudEvent>> filter;
             if (annotation != null && !annotation.trigger().isEmpty()) {
                 trigger = annotation.trigger();
-                invoker.setFilterExpression(filter(invoker.getName(), annotation));
+                filter = filter(invoker.getName(), annotation);
             } else {
                 trigger = invoker.getName();
+                filter = null;
             }
+            invokersFilters.put(invoker, filter);
             typeTriggers.compute(trigger, (k, v) -> {
                 if (v == null) {
                     v = new ArrayList<>();
@@ -88,7 +93,7 @@ public class KnativeEventsBindingRecorder {
                 // validate if there are no duplicates for the same type (trigger) and defined filters
                 // as resolution based on trigger (ce-type) and optional filters (on ce-attributes) can return only
                 // one function invoker
-                if (v.stream().anyMatch(i -> hasSameFilters(i.getFilterExpressions(), invoker.getFilterExpressions()))) {
+                if (v.stream().anyMatch(i -> hasSameFilters(invokersFilters.get(i), filter))) {
                     throw new IllegalStateException("Function for trigger '" + trigger + "' has multiple matching invokers");
                 }
 
@@ -221,17 +226,17 @@ public class KnativeEventsBindingRecorder {
 
         Handler<RoutingContext> handler = new VertxRequestHandler(vertx.get(), rootPath, beanContainer, objectMapper,
                 eventsConfig,
-                defaultInvoker, typeTriggers, executor);
+                defaultInvoker, typeTriggers, invokersFilters, executor);
 
         return handler;
     }
 
-    private List<Predicate<Object>> filter(String functionName, CloudEventMapping mapping) {
+    private List<Predicate<CloudEvent>> filter(String functionName, CloudEventMapping mapping) {
 
         if (mapping.attributes() == null || mapping.attributes().length == 0) {
             return Collections.emptyList();
         }
-        List<Predicate<Object>> filters = new ArrayList<>();
+        List<Predicate<CloudEvent>> filters = new ArrayList<>();
         for (EventAttribute attribute : mapping.attributes()) {
             Objects.requireNonNull(attribute.name(),
                     "Attribute name of the EventAttribure on function " + functionName + " is required");
@@ -249,7 +254,7 @@ public class KnativeEventsBindingRecorder {
         return filters;
     }
 
-    private boolean hasSameFilters(List<Predicate<Object>> one, List<Predicate<Object>> two) {
+    private boolean hasSameFilters(List<Predicate<CloudEvent>> one, List<Predicate<CloudEvent>> two) {
         if (one == null && two == null) {
             return true;
         }
diff --git a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/VertxRequestHandler.java b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/VertxRequestHandler.java
index 488dd18bad..ed12ea3466 100644
--- a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/VertxRequestHandler.java
+++ b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/VertxRequestHandler.java
@@ -12,13 +12,10 @@ import static io.quarkus.funqy.runtime.bindings.knative.events.KnativeEventsBind
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
 
 import javax.enterprise.inject.Instance;
 import javax.enterprise.inject.spi.CDI;
@@ -64,6 +61,7 @@ public class VertxRequestHandler implements Handler<RoutingContext> {
     protected final Executor executor;
     protected final FunctionInvoker defaultInvoker;
     protected final Map<String, Collection<FunctionInvoker>> typeTriggers;
+    protected final Map<FunctionInvoker, List<Predicate<CloudEvent>>> invokersFilters;
     protected final String rootPath;
 
     public VertxRequestHandler(Vertx vertx,
@@ -73,6 +71,7 @@ public class VertxRequestHandler implements Handler<RoutingContext> {
             FunqyKnativeEventsConfig config,
             FunctionInvoker defaultInvoker,
             Map<String, Collection<FunctionInvoker>> typeTriggers,
+            Map<FunctionInvoker, List<Predicate<CloudEvent>>> invokersFilters,
             Executor executor) {
         this.rootPath = rootPath;
         this.defaultInvoker = defaultInvoker;
@@ -81,6 +80,7 @@ public class VertxRequestHandler implements Handler<RoutingContext> {
         this.executor = executor;
         this.mapper = mapper;
         this.typeTriggers = typeTriggers;
+        this.invokersFilters = invokersFilters;
         Instance<CurrentIdentityAssociation> association = CDI.current().select(CurrentIdentityAssociation.class);
         this.association = association.isResolvable() ? association.get() : null;
         this.currentVertxRequest = CDI.current().select(CurrentVertxRequest.class).get();
@@ -188,7 +188,7 @@ public class VertxRequestHandler implements Handler<RoutingContext> {
 
                     if (!match(invoker, inputCloudEvent)) {
                         log.debug(
-                                "Filters didn't match the cloud event. filters: '" + invoker.getFilterExpressions() + "' event "
+                                "Filters didn't match the cloud event. filters: '" + invokersFilters.get(invoker) + "' event "
                                         + inputCloudEvent + " skipping function " + invoker.getName());
                         continue;
                     }
@@ -386,11 +386,11 @@ public class VertxRequestHandler implements Handler<RoutingContext> {
 
     private boolean match(FunctionInvoker invoker, CloudEvent<?> inputCloudEvent) {
 
-        if (invoker.getFilterExpressions() == null || invoker.getFilterExpressions().isEmpty()) {
+        if (invokersFilters.get(invoker) == null || invokersFilters.get(invoker).isEmpty()) {
             return true;
         }
 
-        return invoker.getFilterExpressions().stream().allMatch(p -> p.test(inputCloudEvent));
+        return invokersFilters.get(invoker).stream().allMatch(p -> p.test(inputCloudEvent));
     }
 
     private void regularFunqyHttp(RoutingContext routingContext) {
diff --git a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/filters/CEAttributeLiteralEqualsFilter.java b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/filters/CEAttributeLiteralEqualsFilter.java
index 3112e69329..d982d08aa0 100644
--- a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/filters/CEAttributeLiteralEqualsFilter.java
+++ b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/filters/CEAttributeLiteralEqualsFilter.java
@@ -4,7 +4,7 @@ import java.util.function.Predicate;
 
 import io.quarkus.funqy.knative.events.CloudEvent;
 
-public class CEAttributeLiteralEqualsFilter implements Predicate<Object> {
+public class CEAttributeLiteralEqualsFilter implements Predicate<CloudEvent> {
 
     private String attributeName;
     private String expectedValue;
@@ -15,7 +15,7 @@ public class CEAttributeLiteralEqualsFilter implements Predicate<Object> {
     }
 
     @Override
-    public boolean test(Object event) {
+    public boolean test(CloudEvent event) {
 
         CloudEvent<?> ceEvent = (CloudEvent<?>) event;
         String value;
diff --git a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/filters/CustomAttributeLiteralEqualsFilter.java b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/filters/CustomAttributeLiteralEqualsFilter.java
index a2abba27fa..2e0aeb1509 100644
--- a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/filters/CustomAttributeLiteralEqualsFilter.java
+++ b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/filters/CustomAttributeLiteralEqualsFilter.java
@@ -4,7 +4,7 @@ import java.util.function.Predicate;
 
 import io.quarkus.funqy.knative.events.CloudEvent;
 
-public class CustomAttributeLiteralEqualsFilter implements Predicate<Object> {
+public class CustomAttributeLiteralEqualsFilter implements Predicate<CloudEvent> {
 
     private String attributeName;
     private String expectedValue;
@@ -15,7 +15,7 @@ public class CustomAttributeLiteralEqualsFilter implements Predicate<Object> {
     }
 
     @Override
-    public boolean test(Object event) {
+    public boolean test(CloudEvent event) {
 
         CloudEvent<?> ceEvent = (CloudEvent<?>) event;
         String value = ceEvent.extensions().get(attributeName);
diff --git a/extensions/funqy/funqy-server-common/runtime/src/main/java/io/quarkus/funqy/runtime/FunctionInvoker.java b/extensions/funqy/funqy-server-common/runtime/src/main/java/io/quarkus/funqy/runtime/FunctionInvoker.java
index ba372b5a15..f633eb47d6 100644
--- a/extensions/funqy/funqy-server-common/runtime/src/main/java/io/quarkus/funqy/runtime/FunctionInvoker.java
+++ b/extensions/funqy/funqy-server-common/runtime/src/main/java/io/quarkus/funqy/runtime/FunctionInvoker.java
@@ -1,15 +1,10 @@
 package io.quarkus.funqy.runtime;
 
 import java.lang.annotation.Annotation;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
+import java.lang.reflect.*;
 import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Predicate;
 
 import io.smallrye.mutiny.Uni;
 
@@ -23,8 +18,6 @@ public class FunctionInvoker {
     protected Type outputType;
     protected boolean isAsync;
 
-    protected List<Predicate<Object>> filterExpressions = new ArrayList<>();
-
     protected Map<String, Object> bindingContext = new ConcurrentHashMap<>();
 
     public FunctionInvoker(String name, Class<?> targetClass, Method method) {
@@ -113,14 +106,6 @@ public class FunctionInvoker {
         return method;
     }
 
-    public List<Predicate<Object>> getFilterExpressions() {
-        return filterExpressions;
-    }
-
-    public void setFilterExpression(List<Predicate<Object>> filterExpressions) {
-        this.filterExpressions = filterExpressions;
-    }
-
     public void invoke(FunqyServerRequest request, FunqyServerResponse response) {
         Object[] args = null;
         if (parameterInjectors != null) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind having filters on the FunctionInvoker was to allow filtering on any binding and not just knative events. But if you don't see this to be needed we can certainly move this as you suggested. One question though - should be use invoker name as key instead of the complete invoker instance?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind having filters on the FunctionInvoker was to allow filtering on any binding and not just knative events.

I see. However right now we only need filter in Knative events, so I would rather not do any changes to Common. Maybe there will change in future. We can always change it once we need it.

One question though - should be use invoker name as key instead of the complete invoker instance?

👍 That's probably good idea (although whole object with its default "addressof" equals()/hashCode() should work too).

what is your opinion @patriot1burke

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@matejvasek all changes applied, please have a look again, thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea behind having filters on the FunctionInvoker was to allow filtering on any binding and not just knative events.

I see. However right now we only need filter in Knative events, so I would rather not do any changes to Common. Maybe there will change in future. We can always change it once we need it.

One question though - should be use invoker name as key instead of the complete invoker instance?

👍 That's probably good idea (although whole object with its default "addressof" equals()/hashCode() should work too).

what is your opinion @patriot1burke

Only for Knative events please. You can invoke different functions with Funqy HTTP binding by change your path. That is good enough. Lambda, Azure and Google are only meant to have one endpoint.

@mswiderski
Copy link
Contributor Author

@matejvasek @patriot1burke any feedback?

@matejvasek
Copy link
Contributor

@mswiderski sorry I haven't rereviewed this yet, will try today or tomorrow.

import io.quarkus.funqy.knative.events.CloudEventMapping;
import io.quarkus.funqy.knative.events.EventAttribute;

public class WithOverlapingAttributeFilter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo

import io.quarkus.funqy.knative.events.EventAttribute;

public class WithOverlapingAttributeFilter {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mswiderski we don't want test whether there is an overlap.

We want to know whether one filter is super-set of another.

Example A:

    @Funq
    @CloudEventMapping(trigger = "listOfStrings", attributes = {
            @EventAttribute(name = "source", value = "test"),
            @EventAttribute(name = "custom", value = "value") })
    public String foo(List<Identity> identityList) {
        return "";
    }

    @Funq
    @CloudEventMapping(trigger = "listOfStrings", attributes = {
            @EventAttribute(name = "source", value = "test"),
            @EventAttribute(name = "custom", value = "someOtherValue") })
    public String bar(List<Identity> identityList) {
        return "";
    }

This won't compile because there is overlap. However those filters are not in conflict.

Example B:

    @Funq
    @CloudEventMapping(trigger = "listOfStrings")
    public String foo(List<Identity> identityList) {
        return "";
    }

    @Funq
    @CloudEventMapping(trigger = "listOfStrings", attributes = {
            @EventAttribute(name = "source", value = "test") })
    public String bar(List<Identity> identityList) {
        return "";
    }

This would compile since there is no overlap. However this should fail since the foo() matches all that is matched by bar() (and more).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well actually checking if one list is super-set of the another is not quite solution:

Example C:

    @Funq
    @CloudEventMapping(trigger = "listOfStrings", attributes = {
            @EventAttribute(name = "source", value = "test"),
            @EventAttribute(name = "customA", value = "value") })
    public String foo(List<Identity> identityList) {
        return "";
    }

    @Funq
    @CloudEventMapping(trigger = "listOfStrings", attributes = {
            @EventAttribute(name = "source", value = "test"),
            @EventAttribute(name = "customB", value = "value") })
    public String bar(List<Identity> identityList) {
        return "";
    }

here is no strict sub-set relation between the two filters, still they are conflicting:
there may be an CloudEvent that matches both.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have test for A,B,C.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These three examples are describing three situation:

A -- every CE that matches foo doesn't match filter bar and vice versa
there is never a conflict, IMHO this should compile

B -- every CE that matches filter bar also matches foo
there is always conflict, IMHO this shouldn't compile

C -- there may be CEs that matches both, one, or none of the two.
there is sometime conflict, I am not sure what to do with this. Maybe it can compile, but return 409 in runtime?

return filters;
}

private boolean hasSameFilters(List<Predicate<CloudEvent>> one, List<Predicate<CloudEvent>> two) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to test whether intersection of those two list is non-empty. I don't think that's what we want. See my comment on class WithOverlapingAttributeFilter. We should test if one list is super-set of the another.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe what we want to check is that there exist two elements ea1 ∊ one, ea2 ∊ two such that ea1.name = ea2.name and ea1.value ≠ ea2.value.

If the above is true then the two lists are not conflicting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is getting complicated. Maybe we could be satisfied with runtime conflict checking after all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we could prevent at build time the duplicates in case someone defined the same filters so at least that would be covered, wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we should throw build time error only if there is guaranteed conflict. It happens when set of elements form one list is sub-set of the set of elements from the another. I described this as Example B in some other comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

diff --git a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/KnativeEventsBindingRecorder.java b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/KnativeEventsBindingRecorder.java
index 252fa53523..6b6e78759a 100644
--- a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/KnativeEventsBindingRecorder.java
+++ b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/KnativeEventsBindingRecorder.java
@@ -3,14 +3,7 @@ package io.quarkus.funqy.runtime.bindings.knative.events;
 import java.lang.reflect.Method;
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
+import java.util.*;
 import java.util.concurrent.Executor;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
@@ -255,22 +248,15 @@ public class KnativeEventsBindingRecorder {
     }
 
     private boolean hasSameFilters(List<Predicate<CloudEvent>> one, List<Predicate<CloudEvent>> two) {
-        System.out.println(one + " " + two);
-        if (one == null && two == null) {
-            return true;
-        }
-
-        if (one != null && two != null) {
 
-            if (one.isEmpty() && two.isEmpty()) {
-                return true;
-            }
+        final List<Predicate<CloudEvent>> first = one != null ? one : Collections.emptyList();
+        final List<Predicate<CloudEvent>> second = two != null ? two : Collections.emptyList();
 
-            if (one.stream().anyMatch(item -> two.contains(item))) {
-                return true;
-            }
+        // empty set is sub-set of any set
+        if (first.size() <= 0 || second.size() <= 0) {
+            return true;
         }
 
-        return false;
+        return first.size() <= second.size() ? second.containsAll(first) : first.containsAll(second);
     }
 }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, will include these updates and push it by the end of today

final ObjectReader reader = (ObjectReader) invoker.getBindingContext().get(DATA_OBJECT_READER);
final ObjectWriter writer = (ObjectWriter) invoker.getBindingContext().get(DATA_OBJECT_WRITER);
boolean anyMatched = false;
for (FunctionInvoker invoker : candidates) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this all should be in for loop.

We should first check how many matching candidates are there.
zero -- return 404
one -- execute the single matching candidate
two or more -- return 409

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm looking more carefully it might be difficult to get rid of the loop, maybe leave it as it is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same round of tries and ended up with the loop :) so let's keep the loop and I think it should return directly after first matching invoker, agreed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so let's keep the loop and I think it should return directly after first matching invoker, agreed?

I don't like the idea. It would pick single function basically at random, I think it depends on order in which annotations are processed. There should be just one match and if there are multiple we should return 409.

What about this:

diff against upstream not your branch:

diff --git a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/VertxRequestHandler.java b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/VertxRequestHandler.java
index 5de658751d..a584cdc3c7 100644
--- a/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/VertxRequestHandler.java
+++ b/extensions/funqy/funqy-knative-events/runtime/src/main/java/io/quarkus/funqy/runtime/bindings/knative/events/VertxRequestHandler.java
@@ -12,11 +12,11 @@ import static io.quarkus.funqy.runtime.bindings.knative.events.KnativeEventsBind
 import java.io.IOException;
 import java.io.InputStream;
 import java.lang.reflect.Type;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import javax.enterprise.inject.Instance;
 import javax.enterprise.inject.spi.CDI;
@@ -61,7 +61,8 @@ public class VertxRequestHandler implements Handler<RoutingContext> {
     protected final CurrentVertxRequest currentVertxRequest;
     protected final Executor executor;
     protected final FunctionInvoker defaultInvoker;
-    protected final Map<String, FunctionInvoker> typeTriggers;
+    protected final Map<String, Collection<FunctionInvoker>> typeTriggers;
+    protected final Map<String, List<Predicate<CloudEvent>>> invokersFilters;
     protected final String rootPath;
 
     public VertxRequestHandler(Vertx vertx,
@@ -70,7 +71,8 @@ public class VertxRequestHandler implements Handler<RoutingContext> {
             ObjectMapper mapper,
             FunqyKnativeEventsConfig config,
             FunctionInvoker defaultInvoker,
-            Map<String, FunctionInvoker> typeTriggers,
+            Map<String, Collection<FunctionInvoker>> typeTriggers,
+            Map<String, List<Predicate<CloudEvent>>> invokersFilters,
             Executor executor) {
         this.rootPath = rootPath;
         this.defaultInvoker = defaultInvoker;
@@ -79,6 +81,7 @@ public class VertxRequestHandler implements Handler<RoutingContext> {
         this.executor = executor;
         this.mapper = mapper;
         this.typeTriggers = typeTriggers;
+        this.invokersFilters = invokersFilters;
         Instance<CurrentIdentityAssociation> association = CDI.current().select(CurrentIdentityAssociation.class);
         this.association = association.isResolvable() ? association.get() : null;
         this.currentVertxRequest = CDI.current().select(CurrentVertxRequest.class).get();
@@ -145,18 +148,52 @@ public class VertxRequestHandler implements Handler<RoutingContext> {
                     log.warnf("Unexpected CloudEvent spec-version '%s'.", ceSpecVersion);
                 }
 
-                final FunctionInvoker invoker;
+                final Collection<FunctionInvoker> candidates;
                 if (defaultInvoker != null) {
-                    invoker = defaultInvoker;
+                    candidates = Collections.singleton(defaultInvoker);
                 } else {
-                    invoker = typeTriggers.get(ceType);
-                    if (invoker == null) {
+                    candidates = typeTriggers.get(ceType);
+
+                    if (candidates == null || candidates.isEmpty()) {
                         routingContext.fail(404);
                         log.error("Couldn't map CloudEvent type: '" + ceType + "' to a function.");
                         return;
                     }
                 }
 
+                final CloudEvent<?> evt;
+                if (binaryCE) {
+                    evt = new HeaderCloudEventImpl<>(
+                            httpRequest.headers(),
+                            null,
+                            null,
+                            null,
+                            null);
+                } else {
+                    evt = new JsonCloudEventImpl<>(
+                            structuredPayload,
+                            null,
+                            null,
+                            null);
+                }
+                List<FunctionInvoker> matchingCandidates = candidates
+                        .stream()
+                        .filter(fi -> match(fi, evt))
+                        .collect(Collectors.toList());
+
+                if (matchingCandidates.size() <= 0) {
+                    routingContext.fail(404);
+                    log.error("Couldn't map CloudEvent type: '" + ceType + "' to any function.");
+                    return;
+                }
+                if (matchingCandidates.size() > 1) {
+                    routingContext.fail(409);
+                    log.error("CloudEvent type: '" + ceType + "' matches multiple function.");
+                    return;
+                }
+
+                final FunctionInvoker invoker = matchingCandidates.get(0);
+
                 final Type inputCeDataType = (Type) invoker.getBindingContext().get(INPUT_CE_DATA_TYPE);
                 final Type outputCeDataType = (Type) invoker.getBindingContext().get(OUTPUT_CE_DATA_TYPE);
                 final Type innerInputType = inputCeDataType != null ? inputCeDataType : invoker.getInputType();
@@ -165,6 +202,7 @@ public class VertxRequestHandler implements Handler<RoutingContext> {
                 final ObjectWriter writer = (ObjectWriter) invoker.getBindingContext().get(DATA_OBJECT_WRITER);
 
                 final CloudEvent<?> inputCloudEvent;
+
                 final Object input;
                 if (invoker.hasInput()) {
                     if (binaryCE) {
@@ -363,6 +401,15 @@ public class VertxRequestHandler implements Handler<RoutingContext> {
 
     }
 
+    private boolean match(FunctionInvoker invoker, CloudEvent<?> inputCloudEvent) {
+
+        if (invokersFilters.get(invoker.getName()) == null || invokersFilters.get(invoker.getName()).isEmpty()) {
+            return true;
+        }
+
+        return invokersFilters.get(invoker.getName()).stream().allMatch(p -> p.test(inputCloudEvent));
+    }
+
     private void regularFunqyHttp(RoutingContext routingContext) {
         String path = routingContext.request().path();
         if (path == null) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we create the cloud events with basic info required to evaluate filters and then build them again fully populated, did I got it right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's the idea, we just need attributes other than data.

@@ -0,0 +1,84 @@
package io.quarkus.funqy.runtime.bindings.knative.events.filters;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably ask this, but we can merge those two predicates right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, already have it done locally and will push with all other updates

@mswiderski
Copy link
Contributor Author

@matejvasek I believe I address all comments, please have a look. Thanks!

@mswiderski
Copy link
Contributor Author

@matejvasek @patriot1burke are there any chances to get this reviewed (fix if required) and merge for 2.1 version of Quarkus?

@matejvasek
Copy link
Contributor

@mswiderski sorry, I am going to look at this now. Note that I am not approver here, so we will need final nod from patriot1burke.

@@ -165,22 +207,23 @@ private void processCloudEvent(RoutingContext routingContext) {
final ObjectWriter writer = (ObjectWriter) invoker.getBindingContext().get(DATA_OBJECT_WRITER);

final CloudEvent<?> inputCloudEvent;
if (binaryCE) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be pull out of the if?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure I got the question, would you mind elaborate bit more?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before inputCloudEvent was initialized conditionally , if was created only if the function accepted some input, invoker.hasInput() == true, otherwise it was set to null in else branch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah you're right, make sense to move it into the if, thanks for details

value = ceEvent.extensions().get(attributeName);
break;
}
if (value == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this would be useful but: what if user wants to check that extension is not present? Do you think that it could be useful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd defer this to later as this filter is mainly designed to do the equals comparison - see class name. Later on we could expand to more filter types.

@mswiderski
Copy link
Contributor Author

@mswiderski sorry, I am going to look at this now. Note that I am not approver here, so we will need final nod from patriot1burke.

understood but you provide very valuable feedback so having your "good to go" is already of huge help

} else {
typeTriggers.put(invoker.getName(), invoker);
trigger = invoker.getName();
filter = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I would prefer to have Collctions.emptyList() here to prevent NPE.
IIRC emptyList() doesn't do any allocation and even if it did it just executed once at startup/compilation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will update to use the Collctions.emptyList

@mswiderski
Copy link
Contributor Author

@matejvasek changes applied

@matejvasek
Copy link
Contributor

In general looks good. But I need go trough tests more carefully now.

import io.quarkus.test.QuarkusUnitTest;
import io.restassured.RestAssured;

public class WithInvalidAttributeFilterTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this name is not descriptive. I would almost say that it can be called overlapping, but that is not true either. The case we are testing here is that two predicates are in sub-set relation. Can you think of any better name, or maybe adding comment would be enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about WithConflictingAttributeFilterTest?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, that's better, or maybe WithAlwaysConflictingAttributeFilterTest, but that's quite mouthful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to WithConflictingAttributeFilterTest

if (v == null) {
v = new ArrayList<>();
}
// validate if there are no duplicates for the same type (trigger) and defined filters
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think duplicates is not correct anymore. We are checking for sub-set relation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replaced with conflicting

@mswiderski
Copy link
Contributor Author

@matejvasek I guess you're ok with the changes, right?

@patriot1burke would you mind having a look to move this forward?

@matejvasek
Copy link
Contributor

LGTM

@gsmet
Copy link
Member

gsmet commented Jun 28, 2021

@patriot1burke any chance you could have a look at this one?

@patriot1burke
Copy link
Contributor

I've been following. Matej has done a good drop driving this PR with @mswiderski LGTM once CI passes.

@mswiderski
Copy link
Contributor Author

@patriot1burke @gsmet it looks like CI is green but I am not sure it's a complete CI cycle done. Maybe it requires approval before the complete CI will run?

@patriot1burke
Copy link
Contributor

@matejvasek @mswiderski Ok to merge?

@mswiderski
Copy link
Contributor Author

@matejvasek @mswiderski Ok to merge?

big yes from my side :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants