Skip to content

Commit

Permalink
More updates
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Wessendorf <mwessend@redhat.com>
  • Loading branch information
matzew committed Apr 18, 2023
1 parent cd50c89 commit ba33e0d
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 8 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/google/gofuzz v1.2.0
github.com/kedacore/keda/v2 v2.8.1
knative.dev/eventing v0.36.1-0.20230418050954-81f8e6732431
knative.dev/eventing v0.36.1-0.20230418085055-cc3d254d555c
knative.dev/hack v0.0.0-20230417170854-f591fea109b3
knative.dev/pkg v0.0.0-20230418073056-dfad48eaa5d0
knative.dev/reconciler-test v0.0.0-20230413132853-06956b6259d6
knative.dev/reconciler-test v0.0.0-20230418082056-9fbd79e5dbe1
sigs.k8s.io/controller-runtime v0.12.3
)

Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1518,14 +1518,14 @@ k8s.io/utils v0.0.0-20200912215256-4140de9c8800/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2 h1:GfD9OzL11kvZN5iArC6oTS7RTj7oJOIfnislxYlqTj8=
k8s.io/utils v0.0.0-20221108210102-8e77b1f39fe2/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
knative.dev/eventing v0.36.1-0.20230418050954-81f8e6732431 h1:nowz8xwbVnevFR345aYyrK1C3ELNMyrYWz+jtHNlq1Q=
knative.dev/eventing v0.36.1-0.20230418050954-81f8e6732431/go.mod h1:JSLxUv1myIkTh+TKRiwa8znODao1X+jndy12KfWYyqI=
knative.dev/eventing v0.36.1-0.20230418085055-cc3d254d555c h1:9XRvapQa0EA2Gu92v3JMsW0eiF2ZTEBPx5wvBCDKcHw=
knative.dev/eventing v0.36.1-0.20230418085055-cc3d254d555c/go.mod h1:LxPAmS/FakpbFZvC4kkBuV6QSL5EyhwQYV62XLQQaQw=
knative.dev/hack v0.0.0-20230417170854-f591fea109b3 h1:+W4WBOq83tfGXKhtv8OB/uJeYqze3zh69GKiz1ucuqk=
knative.dev/hack v0.0.0-20230417170854-f591fea109b3/go.mod h1:yk2OjGDsbEnQjfxdm0/HJKS2WqTLEFg/N6nUs6Rqx3Q=
knative.dev/pkg v0.0.0-20230418073056-dfad48eaa5d0 h1:EFQcoUo8I4bc+U3y6tR1B3ONYZSHWUdAfI7Vh7dae8g=
knative.dev/pkg v0.0.0-20230418073056-dfad48eaa5d0/go.mod h1:2qWPP9Gjh9Q7ETti+WRHnBnGCSCq+6q7m3p/nmUQviE=
knative.dev/reconciler-test v0.0.0-20230413132853-06956b6259d6 h1:zUBZkr9kTSzYBasHZw2WnMcBJy5COZS5Xau9ThmByFo=
knative.dev/reconciler-test v0.0.0-20230413132853-06956b6259d6/go.mod h1:JwK7KUivj9TX7gJ6SAFfNxhmAfYc45kyASeRT8OG+pM=
knative.dev/reconciler-test v0.0.0-20230418082056-9fbd79e5dbe1 h1:ILiEZPy3UUDOb/wptcDmj/TEHRkdYZrdj1jKaU56oIw=
knative.dev/reconciler-test v0.0.0-20230418082056-9fbd79e5dbe1/go.mod h1:oEXVO+rWs2/SGEOttJ9ikIabrDuioX7n883K+trfgZ0=
pgregory.net/rapid v0.3.3 h1:jCjBsY4ln4Atz78QoBWxUEvAHaFyNDQg9+WU62aCn1U=
pgregory.net/rapid v0.3.3/go.mod h1:UYpPVyjFHzYBGHIxLFoupi8vwk6rXNzRY9OMvVxFIOU=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
Expand Down
196 changes: 196 additions & 0 deletions vendor/knative.dev/eventing/cmd/heartbeats/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
Copyright 2018 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"encoding/json"
"errors"
"flag"
"fmt"
"log"
"net/http"
"os"
"strconv"
"syscall"
"time"

"go.uber.org/zap"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/signals"
"knative.dev/pkg/tracing"
"knative.dev/pkg/tracing/config"

"github.com/cloudevents/sdk-go/observability/opencensus/v2/client"
cloudevents "github.com/cloudevents/sdk-go/v2"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/kelseyhightower/envconfig"
)

type Heartbeat struct {
Sequence int `json:"id"`
Label string `json:"label"`
Msg string `json:"msg,omitempty"`
}

var (
eventSource string
eventType string
sink string
label string
periodStr string
msg string
)

func init() {
flag.StringVar(&eventSource, "eventSource", "", "the event-source (CloudEvents)")
flag.StringVar(&eventType, "eventType", "dev.knative.eventing.samples.heartbeat", "the event-type (CloudEvents)")
flag.StringVar(&sink, "sink", "", "the host url to heartbeat to")
flag.StringVar(&label, "label", "", "a special label")
flag.StringVar(&periodStr, "period", "5s", "the duration between heartbeats. Supported formats: Go (https://pkg.go.dev/time#ParseDuration), integers (interpreted as seconds)")
flag.StringVar(&msg, "msg", "", "message content in data.msg")
}

type envConfig struct {
// Sink URL where to send heartbeat cloudevents
Sink string `envconfig:"K_SINK"`

// CEOverrides are the CloudEvents overrides to be applied to the outbound event.
CEOverrides string `envconfig:"K_CE_OVERRIDES"`

// Name of this pod.
Name string `envconfig:"POD_NAME" required:"true"`

// Namespace this pod exists in.
Namespace string `envconfig:"POD_NAMESPACE" required:"true"`

// Whether to run continuously or exit.
OneShot bool `envconfig:"ONE_SHOT" default:"false"`

// JSON configuration for tracing
TracingConfig string `envconfig:"K_CONFIG_TRACING"`
}

func main() {
flag.Parse()

ctx := signals.NewContext()
ctx = cloudevents.ContextWithRetriesExponentialBackoff(ctx, 20*time.Millisecond, 10)

defer maybeQuitIstioProxy()

var env envConfig
if err := envconfig.Process("", &env); err != nil {
log.Printf("[ERROR] Failed to process env var: %s", err)
os.Exit(1)
}

if env.Sink != "" {
sink = env.Sink
}

var ceOverrides *duckv1.CloudEventOverrides
if len(env.CEOverrides) > 0 {
overrides := duckv1.CloudEventOverrides{}
err := json.Unmarshal([]byte(env.CEOverrides), &overrides)
if err != nil {
log.Printf("[ERROR] Unparseable CloudEvents overrides %s: %v", env.CEOverrides, err)
os.Exit(1)
}
ceOverrides = &overrides
}

conf, err := config.JSONToTracingConfig(env.TracingConfig)
if err != nil {
log.Printf("Failed to read tracing config, using the no-op default: %v", err)
}
tracer, err := tracing.SetupPublishingWithStaticConfig(zap.L().Sugar(), "", conf)
if err != nil {
log.Fatalf("Failed to initialize tracing: %v", err)
}
defer tracer.Shutdown(ctx)
c, err := client.NewClientHTTP([]cehttp.Option{cloudevents.WithTarget(sink)}, nil)
if err != nil {
log.Fatalf("failed to create client: %s", err.Error())
}

// default to 5s if unset, try to parse as a duration, then as an int
var period time.Duration
if periodStr == "" {
period = 5 * time.Second
} else if p, err := time.ParseDuration(periodStr); err == nil {
period = p
} else if p, err := strconv.Atoi(periodStr); err == nil {
period = time.Duration(p) * time.Second
} else {
log.Fatalf("Invalid period interval provided: %q", periodStr)
}

if eventSource == "" {
eventSource = fmt.Sprintf("https://knative.dev/eventing-contrib/cmd/heartbeats/#%s/%s", env.Namespace, env.Name)
log.Printf("Heartbeats Source: %s", eventSource)
}

if len(label) > 0 && label[0] == '"' {
label, _ = strconv.Unquote(label)
}
hb := &Heartbeat{
Sequence: 0,
Label: label,
Msg: msg,
}
ticker := time.NewTicker(period)
for {
hb.Sequence++

event := cloudevents.NewEvent("1.0")
event.SetType(eventType)
event.SetSource(eventSource)
event.SetExtension("the", 42)
event.SetExtension("heart", "yes")
event.SetExtension("beats", true)

if ceOverrides != nil && ceOverrides.Extensions != nil {
for n, v := range ceOverrides.Extensions {
event.SetExtension(n, v)
}
}

if err := event.SetData(cloudevents.ApplicationJSON, hb); err != nil {
log.Printf("failed to set cloudevents msg: %s", err.Error())
}

log.Printf("sending cloudevent to %s", sink)
if res := c.Send(ctx, event); !cloudevents.IsACK(res) {
log.Printf("failed to send cloudevent: %v", res)
}

if env.OneShot {
return
}

// Wait for next tick
<-ticker.C
}
}

// maybeQuitIstioProxy shuts down Istio's proxy when available.
func maybeQuitIstioProxy() {
_, err := http.DefaultClient.Get("http://localhost:15020/quitquitquit")
if err != nil && !errors.Is(err, syscall.ECONNREFUSED) {
log.Println("[Ignore this warning if Istio proxy is not used on this pod]", err)
}
}
5 changes: 3 additions & 2 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1221,9 +1221,10 @@ k8s.io/utils/net
k8s.io/utils/pointer
k8s.io/utils/strings/slices
k8s.io/utils/trace
# knative.dev/eventing v0.36.1-0.20230418050954-81f8e6732431
# knative.dev/eventing v0.36.1-0.20230418085055-cc3d254d555c
## explicit; go 1.18
knative.dev/eventing/cmd/event_display
knative.dev/eventing/cmd/heartbeats
knative.dev/eventing/pkg/apis/config
knative.dev/eventing/pkg/apis/duck
knative.dev/eventing/pkg/apis/duck/v1
Expand Down Expand Up @@ -1480,7 +1481,7 @@ knative.dev/pkg/webhook/json
knative.dev/pkg/webhook/resourcesemantics
knative.dev/pkg/webhook/resourcesemantics/defaulting
knative.dev/pkg/webhook/resourcesemantics/validation
# knative.dev/reconciler-test v0.0.0-20230413132853-06956b6259d6
# knative.dev/reconciler-test v0.0.0-20230418082056-9fbd79e5dbe1
## explicit; go 1.18
knative.dev/reconciler-test/cmd/eventshub
knative.dev/reconciler-test/pkg/environment
Expand Down

0 comments on commit ba33e0d

Please sign in to comment.