Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go-example-telemetry-api-extension/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ buildAndDeployExtensionLayer:
cd bin && zip -r extension.zip extensions

aws lambda publish-layer-version \
--layer-name "go-example-telemetry-api-extension-layer" \
--layer-name "ivan-go-example-telemetry-api-extension-layer" \
--zip-file "fileb://bin/extension.zip"
15 changes: 11 additions & 4 deletions go-example-telemetry-api-extension/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,15 @@ module aws-lambda-extensions/go-example-telemetry-api-extension
go 1.18

require (
github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.9.0
github.com/valyala/fasthttp v1.48.0
)

require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/klauspost/compress v1.16.3 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
golang.org/x/sys v0.6.0 // indirect
)
15 changes: 14 additions & 1 deletion go-example-telemetry-api-extension/go.sum
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
github.com/andybalholm/brotli v1.0.5 h1:8uQZIdzKmjc/iuPu7O2ioW48L81FgatrcpfFmiq/cCs=
github.com/andybalholm/brotli v1.0.5/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259 h1:ZHJ7+IGpuOXtVf6Zk/a3WuHQgkC+vXwaqfUBDFwahtI=
github.com/golang-collections/go-datastructures v0.0.0-20150211160725-59788d5eb259/go.mod h1:9Qcha0gTWLw//0VNka1Cbnjvg3pNKGFdAm7E9sBabxE=
github.com/klauspost/compress v1.16.3 h1:XuJt9zzcnaz6a16/OU53ZjWp/v7/42WcR5t2a0PcNQY=
github.com/klauspost/compress v1.16.3/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0=
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 h1:0A+M6Uqn+Eje4kHMK80dtF3JCXC4ykBgQG4Fe06QRhQ=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.48.0 h1:oJWvHb9BIZToTQS3MuQ2R3bJZiNSa2KiNdeI8A+79Tc=
github.com/valyala/fasthttp v1.48.0/go.mod h1:k2zXd82h/7UZc3VOdJ2WaUqt1uZ/XpXAfE9i+HBC3lA=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
44 changes: 16 additions & 28 deletions go-example-telemetry-api-extension/telemetryApi/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,28 @@ package telemetryApi

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"strings"
"time"

"github.com/golang-collections/go-datastructures/queue"
"github.com/valyala/fasthttp"
)

const defaultListenerPort = "4323"
const initialQueueSize = 5

// Used to listen to the Telemetry API
type TelemetryApiListener struct {
httpServer *http.Server
fasthttpServer *fasthttp.Server
// LogEventsQueue is a synchronous queue and is used to put the received log events to be dispatched later
LogEventsQueue *queue.Queue
}

func NewTelemetryApiListener() *TelemetryApiListener {
return &TelemetryApiListener{
httpServer: nil,
fasthttpServer: nil,
LogEventsQueue: queue.New(initialQueueSize),
}
}
Expand All @@ -48,11 +47,11 @@ func listenOnAddress() string {
func (s *TelemetryApiListener) Start() (string, error) {
address := listenOnAddress()
l.Info("[listener:Start] Starting on address", address)
s.httpServer = &http.Server{Addr: address}
http.HandleFunc("/", s.http_handler)
s.fasthttpServer = &fasthttp.Server{
Handler: s.http_handler,
}
go func() {
err := s.httpServer.ListenAndServe()
if err != http.ErrServerClosed {
if err := s.fasthttpServer.ListenAndServe(address); err != nil {
l.Error("[listener:goroutine] Unexpected stop on Http Server:", err)
s.Shutdown()
} else {
Expand All @@ -68,34 +67,23 @@ func (s *TelemetryApiListener) Start() (string, error) {
// Logging or printing besides the error cases below is not recommended if you have subscribed to
// receive extension logs. Otherwise, logging here will cause Telemetry API to send new logs for
// the printed lines which may create an infinite loop.
func (s *TelemetryApiListener) http_handler(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
l.Error("[listener:http_handler] Error reading body:", err)
return
}
func (s *TelemetryApiListener) http_handler(ctx *fasthttp.RequestCtx) {
body := ctx.PostBody()

// Parse and put the log messages into the queue
var slice []interface{}
_ = json.Unmarshal(body, &slice)

for _, el := range slice {
s.LogEventsQueue.Put(el)
if strings.Contains(string(body), "logsDropped") {
l.Info("Dropped: %v", body)
}

l.Info("[listener:http_handler] logEvents received:", len(slice), " LogEventsQueue length:", s.LogEventsQueue.Len())
slice = nil
}

// Terminates the HTTP server listening for logs
func (s *TelemetryApiListener) Shutdown() {
if s.httpServer != nil {
ctx, _ := context.WithTimeout(context.Background(), 1*time.Second)
err := s.httpServer.Shutdown(ctx)
if s.fasthttpServer != nil {
_, _ = context.WithTimeout(context.Background(), 1*time.Second)
err := s.fasthttpServer.Shutdown()
if err != nil {
l.Error("[listener:Shutdown] Failed to shutdown http server gracefully:", err)
} else {
s.httpServer = nil
s.fasthttpServer = nil
}
}
}