diff --git a/go-example-telemetry-api-extension/curl.sh b/go-example-telemetry-api-extension/curl.sh new file mode 100755 index 00000000..22a0c7a5 --- /dev/null +++ b/go-example-telemetry-api-extension/curl.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +# URL to be curled +url="" + +# Looping 100 times +for i in {1..100} +do + curl "$url" +done diff --git a/go-example-telemetry-api-extension/function/index.js b/go-example-telemetry-api-extension/function/index.js index 73a294df..cfc55663 100644 --- a/go-example-telemetry-api-extension/function/index.js +++ b/go-example-telemetry-api-extension/function/index.js @@ -1,5 +1,10 @@ -console.log('Hello from function initalization'); - exports.handler = async (event, context) => { - console.log('Hello from function handler', {event}); -} + for (let i = 0; i < 5000; i++) { + console.log("@".repeat(100)); + } + const response = { + statusCode: 200, + body: "hello, world", + }; + return response; +}; diff --git a/go-example-telemetry-api-extension/main.go b/go-example-telemetry-api-extension/main.go index a09c99e0..97f3d9f3 100644 --- a/go-example-telemetry-api-extension/main.go +++ b/go-example-telemetry-api-extension/main.go @@ -71,8 +71,6 @@ func main() { } l.Info("[main] Subscription success") - dispatcher := telemetryApi.NewDispatcher() - // Will block until invoke or shutdown event is received or cancelled via the context. for { select { @@ -88,16 +86,11 @@ func main() { return } - // Dispatching log events from previous invocations - dispatcher.Dispatch(ctx, telemetryListener.LogEventsQueue, false) - l.Info("[main] Received event") if res.EventType == extensionApi.Invoke { handleInvoke(res) } else if res.EventType == extensionApi.Shutdown { - // Dispatch all remaining telemetry, handle shutdown - dispatcher.Dispatch(ctx, telemetryListener.LogEventsQueue, true) handleShutdown(res) return } diff --git a/go-example-telemetry-api-extension/telemetryApi/client.go b/go-example-telemetry-api-extension/telemetryApi/client.go index 5b7ff301..3064b61a 100644 --- a/go-example-telemetry-api-extension/telemetryApi/client.go +++ b/go-example-telemetry-api-extension/telemetryApi/client.go @@ -117,14 +117,14 @@ type SubscribeResponse struct { func (c *Client) Subscribe(ctx context.Context, extensionId string, listenerUri string) (*SubscribeResponse, error) { eventTypes := []EventType{ Platform, - // Function, - // Extension, + Function, + Extension, } bufferingConfig := BufferingCfg{ MaxItems: 1000, - MaxBytes: 256 * 1024, - TimeoutMS: 1000, + MaxBytes: 1024 * 1024, + TimeoutMS: 30000, } destination := Destination{ diff --git a/go-example-telemetry-api-extension/telemetryApi/listener.go b/go-example-telemetry-api-extension/telemetryApi/listener.go index 69f98b71..ac08f058 100644 --- a/go-example-telemetry-api-extension/telemetryApi/listener.go +++ b/go-example-telemetry-api-extension/telemetryApi/listener.go @@ -10,6 +10,7 @@ import ( "io/ioutil" "net/http" "os" + "strings" "time" "github.com/golang-collections/go-datastructures/queue" @@ -62,6 +63,9 @@ func (s *TelemetryApiListener) Start() (string, error) { return fmt.Sprintf("http://%s/", address), nil } +var numRequests = 0 +var numLogsProcessed = 0 + // http_handler handles the requests coming from the Telemetry API. // Everytime Telemetry API sends log events, this function will read them from the response body // and put into a synchronous queue to be dispatched later. @@ -79,12 +83,14 @@ func (s *TelemetryApiListener) http_handler(w http.ResponseWriter, r *http.Reque var slice []interface{} _ = json.Unmarshal(body, &slice) - for _, el := range slice { - s.LogEventsQueue.Put(el) - } + numRequests += 1 + numLogsProcessed += len(slice) - l.Info("[listener:http_handler] logEvents received:", len(slice), " LogEventsQueue length:", s.LogEventsQueue.Len()) - slice = nil + l.Infof("[benchmark] processed request %v, size %v, currently processed %v logs", numRequests, len(slice), numLogsProcessed) + + if strings.Contains(string(body), "logsDropped") { + l.Info("[benchmark] LOG DROPPED!") + } } // Terminates the HTTP server listening for logs