From a7233e9a2a2d818d45214fc0cbb7b84bb5b72a7d Mon Sep 17 00:00:00 2001 From: Maxime David Date: Fri, 28 Jul 2023 10:18:40 -0400 Subject: [PATCH 1/2] reproducer for logsDropped --- .../function/index.js | 13 ++++++--- .../telemetryApi/client.go | 4 +-- .../telemetryApi/dispatcher.go | 27 ++++++++----------- 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/go-example-telemetry-api-extension/function/index.js b/go-example-telemetry-api-extension/function/index.js index 73a294df..b4976d59 100644 --- a/go-example-telemetry-api-extension/function/index.js +++ b/go-example-telemetry-api-extension/function/index.js @@ -1,5 +1,12 @@ -console.log('Hello from function initalization'); +console.log("Hello from function initalization"); exports.handler = async (event, context) => { - console.log('Hello from function handler', {event}); -} + for (let i = 0; i < 1000; i++) { + console.log(`I'm log ${i}`); + } + const response = { + statusCode: 200, + body: "hello, world", + }; + return response; +}; diff --git a/go-example-telemetry-api-extension/telemetryApi/client.go b/go-example-telemetry-api-extension/telemetryApi/client.go index 5b7ff301..d601be13 100644 --- a/go-example-telemetry-api-extension/telemetryApi/client.go +++ b/go-example-telemetry-api-extension/telemetryApi/client.go @@ -117,8 +117,8 @@ type SubscribeResponse struct { func (c *Client) Subscribe(ctx context.Context, extensionId string, listenerUri string) (*SubscribeResponse, error) { eventTypes := []EventType{ Platform, - // Function, - // Extension, + Function, + Extension, } bufferingConfig := BufferingCfg{ diff --git a/go-example-telemetry-api-extension/telemetryApi/dispatcher.go b/go-example-telemetry-api-extension/telemetryApi/dispatcher.go index 753d1f44..a977a249 100644 --- a/go-example-telemetry-api-extension/telemetryApi/dispatcher.go +++ b/go-example-telemetry-api-extension/telemetryApi/dispatcher.go @@ -4,12 +4,12 @@ package telemetryApi import ( - "bytes" "context" "encoding/json" "net/http" "os" "strconv" + "strings" "github.com/golang-collections/go-datastructures/queue" ) @@ -40,20 +40,15 @@ func NewDispatcher() *Dispatcher { } func (d *Dispatcher) Dispatch(ctx context.Context, logEventsQueue *queue.Queue, force bool) { - if !logEventsQueue.Empty() && (force || logEventsQueue.Len() >= d.minBatchSize) { - l.Info("[dispatcher:Dispatch] Dispatching", logEventsQueue.Len(), "log events") - logEntries, _ := logEventsQueue.Get(logEventsQueue.Len()) - bodyBytes, _ := json.Marshal(logEntries) - req, err := http.NewRequestWithContext(ctx, "POST", d.postUri, bytes.NewBuffer(bodyBytes)) - if err != nil { - panic(err) - } - _, err = d.httpClient.Do(req) - if err != nil { - l.Error("[dispatcher:Dispatch] Failed to dispatch, returning to queue:", err) - for logEntry := range logEntries { - logEventsQueue.Put(logEntry) - } - } + l.Info("[dispatcher:Dispatch] Dispatching", logEventsQueue.Len(), "log events") + logEntries, _ := logEventsQueue.Get(logEventsQueue.Len()) + body, err := json.Marshal(logEntries) + if err != nil { + l.Error("[dispatcher:Dispatch] Failed to marshal log entries", err) + return + } + l.Info("[dispatcher:Dispatch] Dispatched", logEventsQueue.Len(), "log events") + if strings.Contains(string(body), "logsDropped") { + l.Info("[dispatcher:Dispatch] LOG DROPPED!") } } From c2d2f635db3dcdd3336d31d67dcc51ffb3fe9518 Mon Sep 17 00:00:00 2001 From: Ivan Topolcic Date: Fri, 28 Jul 2023 23:12:19 -0400 Subject: [PATCH 2/2] Add dropped logs --- go-example-telemetry-api-extension/Makefile | 2 +- .../telemetryApi/client.go | 6 +++--- .../telemetryApi/listener.go | 16 ++++++++++++++++ 3 files changed, 20 insertions(+), 4 deletions(-) diff --git a/go-example-telemetry-api-extension/Makefile b/go-example-telemetry-api-extension/Makefile index 64d4fd70..c76c2bd7 100644 --- a/go-example-telemetry-api-extension/Makefile +++ b/go-example-telemetry-api-extension/Makefile @@ -13,5 +13,5 @@ buildAndDeployExtensionLayer: cd bin && zip -r extension.zip extensions aws lambda publish-layer-version \ - --layer-name "go-example-telemetry-api-extension-layer" \ + --layer-name "ivan-go-example-telemetry-api-extension-layer" \ --zip-file "fileb://bin/extension.zip" diff --git a/go-example-telemetry-api-extension/telemetryApi/client.go b/go-example-telemetry-api-extension/telemetryApi/client.go index d601be13..60197cc1 100644 --- a/go-example-telemetry-api-extension/telemetryApi/client.go +++ b/go-example-telemetry-api-extension/telemetryApi/client.go @@ -122,9 +122,9 @@ func (c *Client) Subscribe(ctx context.Context, extensionId string, listenerUri } bufferingConfig := BufferingCfg{ - MaxItems: 1000, - MaxBytes: 256 * 1024, - TimeoutMS: 1000, + MaxItems: 10000, + MaxBytes: 1024 * 1024, + TimeoutMS: 30000, } destination := Destination{ diff --git a/go-example-telemetry-api-extension/telemetryApi/listener.go b/go-example-telemetry-api-extension/telemetryApi/listener.go index 69f98b71..27ac263c 100644 --- a/go-example-telemetry-api-extension/telemetryApi/listener.go +++ b/go-example-telemetry-api-extension/telemetryApi/listener.go @@ -69,22 +69,38 @@ func (s *TelemetryApiListener) Start() (string, error) { // receive extension logs. Otherwise, logging here will cause Telemetry API to send new logs for // the printed lines which may create an infinite loop. func (s *TelemetryApiListener) http_handler(w http.ResponseWriter, r *http.Request) { + + w.WriteHeader(200) + + var requestStart = time.Now() + var ioReadTimeStart time.Time + var jsonUnmarshalTimeStart time.Time + var appendToQueueTimeStart time.Time + + ioReadTimeStart = time.Now() body, err := ioutil.ReadAll(r.Body) if err != nil { l.Error("[listener:http_handler] Error reading body:", err) return } + l.Infof("[time_taken] io_read_all: %v, %v bytes", time.Since(ioReadTimeStart), len(body)) + jsonUnmarshalTimeStart = time.Now() // Parse and put the log messages into the queue var slice []interface{} _ = json.Unmarshal(body, &slice) + l.Infof("[time_taken] json_unmarshal: %v, %v items", time.Since(jsonUnmarshalTimeStart), len(slice)) + appendToQueueTimeStart = time.Now() for _, el := range slice { s.LogEventsQueue.Put(el) } + l.Infof("[time_taken] append_to_queue: %v, %v items", time.Since(appendToQueueTimeStart), len(slice)) l.Info("[listener:http_handler] logEvents received:", len(slice), " LogEventsQueue length:", s.LogEventsQueue.Len()) slice = nil + + l.Infof("[time_taken] Time taken to send HTTP OK: %v", time.Since(requestStart)) } // Terminates the HTTP server listening for logs