-
-
Notifications
You must be signed in to change notification settings - Fork 36
/
kafka-to-elasticsearch.go
70 lines (58 loc) · 1.66 KB
/
kafka-to-elasticsearch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package main
import (
"context"
"fmt"
"log"
"net/http"
"github.com/cloudevents/sdk-go/observability/opencensus/v2/client"
cloudevents "github.com/cloudevents/sdk-go/v2"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/elastic/go-elasticsearch/v8"
)
type Service struct {
elastic *elasticsearch.Client
}
func main() {
ctx := context.Background()
// Init elastic client
elasticClient, err := elasticsearch.NewClient(elasticsearch.Config{
Addresses: []string{
"http://shortlink-master.elasticsearch:9200",
},
})
service := &Service{
elastic: elasticClient,
}
c, err := client.NewClientHTTP(
[]cehttp.Option{cehttp.WithMiddleware(healthzMiddleware)}, nil,
)
if err != nil {
log.Fatal("Failed to create client, ", err)
}
err = c.StartReceiver(ctx, service.display)
if err != nil {
log.Fatal(fmt.Sprintf("Error during receiver's runtime: %v", err))
}
}
// display prints the given Event in a human-readable format.
func (s *Service) display(event cloudevents.Event) {
fmt.Printf("☁️ cloudevents.Event\n%s", event)
// send event to elastic
_, err := s.elastic.Index("shortlink.event.link.new", nil)
if err != nil {
// TODO: use logger
log.Fatal(fmt.Sprintf("Error indexing document: %v", err))
}
}
// HTTP path of the health endpoint used for probing the service.
const healthzPath = "/healthz"
// healthzMiddleware is a cehttp.Middleware which exposes a health endpoint.
func healthzMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.RequestURI == healthzPath {
w.WriteHeader(http.StatusNoContent)
} else {
next.ServeHTTP(w, req)
}
})
}