Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions go-example-telemetry-api-extension/curl.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash

# URL to be curled
url="<FUNCTION_URL_HERE>"

# Looping 100 times
for i in {1..100}
do
curl "$url"
done
13 changes: 9 additions & 4 deletions go-example-telemetry-api-extension/function/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
console.log('Hello from function initalization');

exports.handler = async (event, context) => {
console.log('Hello from function handler', {event});
}
for (let i = 0; i < 5000; i++) {
console.log("@".repeat(100));
}
const response = {
statusCode: 200,
body: "hello, world",
};
return response;
};
7 changes: 0 additions & 7 deletions go-example-telemetry-api-extension/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ func main() {
}
l.Info("[main] Subscription success")

dispatcher := telemetryApi.NewDispatcher()

// Will block until invoke or shutdown event is received or cancelled via the context.
for {
select {
Expand All @@ -88,16 +86,11 @@ func main() {
return
}

// Dispatching log events from previous invocations
dispatcher.Dispatch(ctx, telemetryListener.LogEventsQueue, false)

l.Info("[main] Received event")

if res.EventType == extensionApi.Invoke {
handleInvoke(res)
} else if res.EventType == extensionApi.Shutdown {
// Dispatch all remaining telemetry, handle shutdown
dispatcher.Dispatch(ctx, telemetryListener.LogEventsQueue, true)
handleShutdown(res)
return
}
Expand Down
8 changes: 4 additions & 4 deletions go-example-telemetry-api-extension/telemetryApi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,14 @@ type SubscribeResponse struct {
func (c *Client) Subscribe(ctx context.Context, extensionId string, listenerUri string) (*SubscribeResponse, error) {
eventTypes := []EventType{
Platform,
// Function,
// Extension,
Function,
Extension,
}

bufferingConfig := BufferingCfg{
MaxItems: 1000,
MaxBytes: 256 * 1024,
TimeoutMS: 1000,
MaxBytes: 1024 * 1024,
TimeoutMS: 30000,
}

destination := Destination{
Expand Down
16 changes: 11 additions & 5 deletions go-example-telemetry-api-extension/telemetryApi/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io/ioutil"
"net/http"
"os"
"strings"
"time"

"github.com/golang-collections/go-datastructures/queue"
Expand Down Expand Up @@ -62,6 +63,9 @@ func (s *TelemetryApiListener) Start() (string, error) {
return fmt.Sprintf("http://%s/", address), nil
}

var numRequests = 0
var numLogsProcessed = 0

// http_handler handles the requests coming from the Telemetry API.
// Everytime Telemetry API sends log events, this function will read them from the response body
// and put into a synchronous queue to be dispatched later.
Expand All @@ -79,12 +83,14 @@ func (s *TelemetryApiListener) http_handler(w http.ResponseWriter, r *http.Reque
var slice []interface{}
_ = json.Unmarshal(body, &slice)

for _, el := range slice {
s.LogEventsQueue.Put(el)
}
numRequests += 1
numLogsProcessed += len(slice)

l.Info("[listener:http_handler] logEvents received:", len(slice), " LogEventsQueue length:", s.LogEventsQueue.Len())
slice = nil
l.Infof("[benchmark] processed request %v, size %v, currently processed %v logs", numRequests, len(slice), numLogsProcessed)

if strings.Contains(string(body), "logsDropped") {
l.Info("[benchmark] LOG DROPPED!")
}
}

// Terminates the HTTP server listening for logs
Expand Down