From 97b26996371a268adeb138fd4b699cbedfd1a214 Mon Sep 17 00:00:00 2001 From: Ivan Topolcic Date: Mon, 7 Aug 2023 16:33:40 -0400 Subject: [PATCH] Update http server --- go-example-telemetry-api-extension/Makefile | 2 +- go-example-telemetry-api-extension/go.mod | 15 +++++-- go-example-telemetry-api-extension/go.sum | 15 ++++++- .../telemetryApi/listener.go | 44 +++++++------------ 4 files changed, 42 insertions(+), 34 deletions(-) diff --git a/go-example-telemetry-api-extension/Makefile b/go-example-telemetry-api-extension/Makefile index 64d4fd70..c76c2bd7 100644 --- a/go-example-telemetry-api-extension/Makefile +++ b/go-example-telemetry-api-extension/Makefile @@ -13,5 +13,5 @@ buildAndDeployExtensionLayer: cd bin && zip -r extension.zip extensions aws lambda publish-layer-version \ - --layer-name "go-example-telemetry-api-extension-layer" \ + --layer-name "ivan-go-example-telemetry-api-extension-layer" \ --zip-file "fileb://bin/extension.zip" diff --git a/go-example-telemetry-api-extension/go.mod b/go-example-telemetry-api-extension/go.mod index d7d3f492..bf8ca34c 100644 --- a/go-example-telemetry-api-extension/go.mod +++ b/go-example-telemetry-api-extension/go.mod @@ -3,8 +3,15 @@ module aws-lambda-extensions/go-example-telemetry-api-extension go 1.18 require ( - github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 // indirect - github.com/pkg/errors v0.9.1 // indirect - github.com/sirupsen/logrus v1.9.0 // indirect - golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect + github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 + github.com/pkg/errors v0.9.1 + github.com/sirupsen/logrus v1.9.0 + github.com/valyala/fasthttp v1.48.0 +) + +require ( + github.com/andybalholm/brotli v1.0.5 // indirect + github.com/klauspost/compress v1.16.3 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + golang.org/x/sys v0.6.0 // indirect ) diff --git a/go-example-telemetry-api-extension/go.sum b/go-example-telemetry-api-extension/go.sum index 47e4990e..4ded9a32 100644 --- a/go-example-telemetry-api-extension/go.sum +++ b/go-example-telemetry-api-extension/go.sum @@ -1,15 +1,28 @@ +github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs= +github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 h1:ZHJ7+IGpuOXtVf6Zk/a3WuHQgkC+vXwaqfUBDFwahtI= github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259/go.mod h1:9Qcha0gTWLw//0VNka1Cbnjvg3pNKGFdAm7E9sBabxE= +github.com/klauspost/compress v1.16.3 h1:XuJt9zzcnaz6a16/OU53ZjWp/v7/42WcR5t2a0PcNQY= +github.com/klauspost/compress v1.16.3/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.48.0 h1:oJWvHb9BIZToTQS3MuQ2R3bJZiNSa2KiNdeI8A+79Tc= +github.com/valyala/fasthttp v1.48.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/go-example-telemetry-api-extension/telemetryApi/listener.go b/go-example-telemetry-api-extension/telemetryApi/listener.go index 69f98b71..a1ee8570 100644 --- a/go-example-telemetry-api-extension/telemetryApi/listener.go +++ b/go-example-telemetry-api-extension/telemetryApi/listener.go @@ -5,14 +5,13 @@ package telemetryApi import ( "context" - "encoding/json" "fmt" - "io/ioutil" - "net/http" "os" + "strings" "time" "github.com/golang-collections/go-datastructures/queue" + "github.com/valyala/fasthttp" ) const defaultListenerPort = "4323" @@ -20,14 +19,14 @@ const initialQueueSize = 5 // Used to listen to the Telemetry API type TelemetryApiListener struct { - httpServer *http.Server + fasthttpServer *fasthttp.Server // LogEventsQueue is a synchronous queue and is used to put the received log events to be dispatched later LogEventsQueue *queue.Queue } func NewTelemetryApiListener() *TelemetryApiListener { return &TelemetryApiListener{ - httpServer: nil, + fasthttpServer: nil, LogEventsQueue: queue.New(initialQueueSize), } } @@ -48,11 +47,11 @@ func listenOnAddress() string { func (s *TelemetryApiListener) Start() (string, error) { address := listenOnAddress() l.Info("[listener:Start] Starting on address", address) - s.httpServer = &http.Server{Addr: address} - http.HandleFunc("/", s.http_handler) + s.fasthttpServer = &fasthttp.Server{ + Handler: s.http_handler, + } go func() { - err := s.httpServer.ListenAndServe() - if err != http.ErrServerClosed { + if err := s.fasthttpServer.ListenAndServe(address); err != nil { l.Error("[listener:goroutine] Unexpected stop on Http Server:", err) s.Shutdown() } else { @@ -68,34 +67,23 @@ func (s *TelemetryApiListener) Start() (string, error) { // Logging or printing besides the error cases below is not recommended if you have subscribed to // receive extension logs. Otherwise, logging here will cause Telemetry API to send new logs for // the printed lines which may create an infinite loop. -func (s *TelemetryApiListener) http_handler(w http.ResponseWriter, r *http.Request) { - body, err := ioutil.ReadAll(r.Body) - if err != nil { - l.Error("[listener:http_handler] Error reading body:", err) - return - } +func (s *TelemetryApiListener) http_handler(ctx *fasthttp.RequestCtx) { + body := ctx.PostBody() - // Parse and put the log messages into the queue - var slice []interface{} - _ = json.Unmarshal(body, &slice) - - for _, el := range slice { - s.LogEventsQueue.Put(el) + if strings.Contains(string(body), "logsDropped") { + l.Info("Dropped: %v", body) } - - l.Info("[listener:http_handler] logEvents received:", len(slice), " LogEventsQueue length:", s.LogEventsQueue.Len()) - slice = nil } // Terminates the HTTP server listening for logs func (s *TelemetryApiListener) Shutdown() { - if s.httpServer != nil { - ctx, _ := context.WithTimeout(context.Background(), 1*time.Second) - err := s.httpServer.Shutdown(ctx) + if s.fasthttpServer != nil { + _, _ = context.WithTimeout(context.Background(), 1*time.Second) + err := s.fasthttpServer.Shutdown() if err != nil { l.Error("[listener:Shutdown] Failed to shutdown http server gracefully:", err) } else { - s.httpServer = nil + s.fasthttpServer = nil } } }