Skip to content
This repository has been archived by the owner on Dec 16, 2021. It is now read-only.

make controller variables available to kafka trigger #41

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/kafka-trigger-controller/kafka-controller.go
Expand Up @@ -22,12 +22,12 @@ import (
"os/signal"
"syscall"

"github.com/kubeless/kafka-trigger/pkg/controller"
kafkautils "github.com/kubeless/kafka-trigger/pkg/utils"
"github.com/kubeless/kafka-trigger/pkg/version"
kubelessutils "github.com/kubeless/kubeless/pkg/utils"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/kubeless/kafka-trigger/pkg/controller"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe these changes in the imports order causes a gofmt issue, you'd need to set it back:

These files are not properly gofmt'd:
 - cmd/kafka-trigger-controller/kafka-controller.go
 - pkg/controller/kafka_trigger_controller.go

kafkautils "github.com/kubeless/kafka-trigger/pkg/utils"
"github.com/kubeless/kafka-trigger/pkg/version"
)

const (
Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -10,6 +10,7 @@ require (
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/imdario/mergo v0.3.7
github.com/kubeless/kubeless v1.0.7
github.com/mitchellh/gox v1.0.1 // indirect
uedun marked this conversation as resolved.
Show resolved Hide resolved
github.com/sirupsen/logrus v1.2.0
github.com/spf13/cobra v0.0.3
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Expand Up @@ -122,6 +122,8 @@ github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:Fecb
github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE=
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-version v1.0.0 h1:21MVWPKDphxa7ineQQTrCU5brh7OuVVAzGOCnnCPtE8=
github.com/hashicorp/go-version v1.0.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hashicorp/golang-lru v0.0.0-20160207214719-a0d98a5f2880 h1:OaRuzt9oCKNui8cCskZijoKUwe+aCuuCwvx1ox8FNyw=
github.com/hashicorp/golang-lru v0.0.0-20160207214719-a0d98a5f2880/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.0 h1:CL2msUPvZTLb5O648aiLNJw3hnBxN2+1Jq8rCOH9wdo=
Expand Down Expand Up @@ -176,6 +178,10 @@ github.com/matttproud/golang_protobuf_extensions v0.0.0-20150406173934-fc2b8d3a7
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4=
github.com/mitchellh/gox v1.0.1 h1:x0jD3dcHk9a9xPSDN6YEL4xL6Qz0dvNYm8yZqui5chI=
github.com/mitchellh/gox v1.0.1/go.mod h1:ED6BioOGXMswlXa2zxfh/xdd5QhwYliBFn9V18Ap4z4=
github.com/mitchellh/iochan v1.0.0 h1:C+X3KsSTLFVBr/tK1eYN/vs4rJcvsiLU338UhYPJWeY=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg=
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/kafka_trigger_controller.go
Expand Up @@ -31,15 +31,15 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

kubelessApi "github.com/kubeless/kubeless/pkg/apis/kubeless/v1beta1"
kubelessversioned "github.com/kubeless/kubeless/pkg/client/clientset/versioned"
kubelessInformers "github.com/kubeless/kubeless/pkg/client/informers/externalversions/kubeless/v1beta1"
kubelessutils "github.com/kubeless/kubeless/pkg/utils"
kafkaApi "github.com/kubeless/kafka-trigger/pkg/apis/kubeless/v1beta1"
"github.com/kubeless/kafka-trigger/pkg/client/clientset/versioned"
kafkaInformers "github.com/kubeless/kafka-trigger/pkg/client/informers/externalversions/kubeless/v1beta1"
"github.com/kubeless/kafka-trigger/pkg/event-consumers/kafka"
"github.com/kubeless/kafka-trigger/pkg/utils"
kubelessApi "github.com/kubeless/kubeless/pkg/apis/kubeless/v1beta1"
kubelessversioned "github.com/kubeless/kubeless/pkg/client/clientset/versioned"
kubelessInformers "github.com/kubeless/kubeless/pkg/client/informers/externalversions/kubeless/v1beta1"
kubelessutils "github.com/kubeless/kubeless/pkg/utils"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion pkg/event-consumers/kafka/kafka-consumer.go
Expand Up @@ -230,7 +230,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
b := getBackOff(c.backoff)

for msg := range claim.Messages() {
req, err := utils.GetHTTPReq(c.funcName, c.funcPort, msg.Topic, c.ns, kafkatriggersNamespace, "POST", string(msg.Value))
req, err := utils.GetHTTPReq(c.funcName, c.funcPort, msg, c.ns, kafkatriggersNamespace)
if err != nil {
logrus.Errorf("Unable to elaborate request (namespace = %v function = %v topic = %v partition = %v offset = %v): %v", c.ns, c.funcName, msg.Topic, msg.Partition, msg.Offset, err)
continue
Expand Down
15 changes: 12 additions & 3 deletions pkg/utils/event_sender.go
Expand Up @@ -20,12 +20,14 @@ import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"strings"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

"github.com/Shopify/sarama"
kubelessutil "github.com/kubeless/kubeless/pkg/utils"
)

Expand All @@ -46,8 +48,9 @@ func GetFunctionPort(clientset kubernetes.Interface, namespace, functionName str
}

// GetHTTPReq returns the http request object that can be used to send a event with payload to function service
func GetHTTPReq(funcName string, funcPort int, kafkaTopic, namespace, eventNamespace, method, body string) (*http.Request, error) {
req, err := http.NewRequest(method, fmt.Sprintf("http://%s.%s.svc.cluster.local:%d", funcName, namespace, funcPort), strings.NewReader(body))
func GetHTTPReq(funcName string, funcPort int, kafkaMessage *sarama.ConsumerMessage, namespace, eventNamespace string) (*http.Request, error) {
body := string(kafkaMessage.Value)
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s.%s.svc.cluster.local:%d", funcName, namespace, funcPort), strings.NewReader(body))
if err != nil {
return nil, fmt.Errorf("Unable to create request %v", err)
}
Expand All @@ -56,10 +59,16 @@ func GetHTTPReq(funcName string, funcPort int, kafkaTopic, namespace, eventNames
if err != nil {
return nil, fmt.Errorf("Failed to create a event-ID %v", err)
}
messageTimestamp := kafkaMessage.Timestamp.UTC().Format(time.RFC3339)
req.Header.Add("event-id", eventID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure what is the intention of having a n id, which has nothing to do with the message. It's not a X-Request-ID. Should we remove it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The event id was always present. Are you suggesting it is redundant?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but that is @andresmgot call 🤷‍♂️

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of the eventID was to simply have a unique ID for the message. I would leave it (regardless of its usefulness) to avoid breaking changes.

req.Header.Add("event-time", timestamp.String())
uedun marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I was misunderstood. What I had in mind is to use the message timestamp instead of the current time

Suggested change
req.Header.Add("event-time", timestamp.String())
req.Header.Add("event-time", kafkaMessage.Timestamp.UTC().Format(time.RFC3339))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sepetrov re event-id and event-time: removing or changing the values would be changing the meaning of those fields and represent a breaking change

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by adding the more relevant fields and new headers, we keep backwards compat but start putting in more useful info

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is true. My suggestions are BC-breaking, because I personally to see any value in current implementation. That is @andresmgot call again. I would be happy to have the new metadata available regardless the name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, let's avoid breaking changes if possible. Some functions may rely in the current format.

req.Header.Add("event-namespace", eventNamespace)
req.Header.Add("event-topic", kafkaTopic)
req.Header.Add("event-topic", kafkaMessage.Topic)
req.Header.Add("event-partition", strconv.FormatInt(int64(kafkaMessage.Partition), 10))
req.Header.Add("event-offset", strconv.FormatInt(kafkaMessage.Offset, 10))
req.Header.Add("event-key", string(kafkaMessage.Key))
req.Header.Add("event-message-timestamp", messageTimestamp)

if IsJSON(body) {
req.Header.Add("Content-Type", "application/json")
req.Header.Add("event-type", "application/json")
Expand Down
36 changes: 34 additions & 2 deletions pkg/utils/event_sender_test.go
Expand Up @@ -16,12 +16,32 @@ limitations under the License.
package utils

import (
"fmt"
"io/ioutil"
"testing"
"time"

"github.com/Shopify/sarama"
)

func GenConsumerMessageWithBody(body string) *sarama.ConsumerMessage {
timestamp, err := time.Parse(time.RFC3339, "2020-11-27T14:41:29+02:00")
if err != nil {
fmt.Println("Unable to parse time")
}
return &sarama.ConsumerMessage{
Offset: 1023435314301,
Partition: 2,
Topic: "mytopic",
Value: []byte(body),
Key: []byte("1234"),
Timestamp: timestamp,
}
}
func TestGetHTTPRequest(t *testing.T) {
req, err := GetHTTPReq("foo", 1234, "mytopic", "myns", "kafkatriggers.kubeless.io", "POST", "my msg")
value := "my msg"
msg := GenConsumerMessageWithBody(value)
req, err := GetHTTPReq("foo", 1234, msg, "myns", "kafkatriggers.kubeless.io")
if err != nil {
t.Errorf("Unexpected error %v", err)
}
Expand Down Expand Up @@ -56,10 +76,22 @@ func TestGetHTTPRequest(t *testing.T) {
if req.Header.Get("event-topic") != "mytopic" {
t.Errorf("Unexpected event-topic %s", req.Header.Get("event-topic"))
}
if req.Header.Get("event-offset") != "1023435314301" {
t.Errorf("Unexpected event-offset %s", req.Header.Get("event-offset"))
}
if req.Header.Get("event-message-timestamp") != "2020-11-27T12:41:29Z" {
t.Errorf("Unexpected event-message-timestamp %s", req.Header.Get("event-message-timestamp"))
}
if req.Header.Get("event-key") != "1234" {
t.Errorf("Unexpected event-key %s", req.Header.Get("event-key"))
}
}

func TestGetJSONHTTPRequest(t *testing.T) {
req, err := GetHTTPReq("foo", 1234, "mytopic", "myns", "kafkatriggers.kubeless.io", "POST", `{"hello": "world"}`)
value := `{"hello": "world"}`
msg := GenConsumerMessageWithBody(value)
req, err := GetHTTPReq("foo", int(1234), msg, "myns", "kafkatriggers.kubeless.io")
// req, err := GetHTTPReq("foo", 1234, msg, "myns", "kafkatriggers.kubeless.io", "POST", `{"hello": "world"}`)
uedun marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
t.Errorf("Unexpected error %v", err)
}
Expand Down