Skip to content
Merged
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Releasing new layer versions
`sh zip.sh`


- The new wheel package gets released automatically after the tags are pushed using Github actions(Refer tagged-release in https://github.com/marvinpinto/action-automatic-releases).
- The new extension binary and zip files gets released automatically after the tags are pushed using Github actions(Refer tagged-release in https://github.com/marvinpinto/action-automatic-releases).

Run below commands to create and push tags

Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ go 1.15

require (
github.com/aws/aws-sdk-go v1.35.23
github.com/aws/aws-sdk-go-v2 v1.17.1
github.com/aws/aws-sdk-go-v2/config v1.17.11
github.com/aws/aws-sdk-go-v2/service/kms v1.18.16
github.com/google/uuid v1.1.2
github.com/sirupsen/logrus v1.7.0
golang.org/x/sys v0.2.0 // indirect
)
31 changes: 30 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,36 @@
github.com/aws/aws-sdk-go v1.35.23 h1:SCP0d0XvyJTDmfnHEQPvBaYi3kea1VNUo7uQmkVgFts=
github.com/aws/aws-sdk-go v1.35.23/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k=
github.com/aws/aws-sdk-go-v2 v1.17.1 h1:02c72fDJr87N8RAC2s3Qu0YuvMRZKNZJ9F+lAehCazk=
github.com/aws/aws-sdk-go-v2 v1.17.1/go.mod h1:JLnGeGONAyi2lWXI1p0PCIOIy333JMVK1U7Hf0aRFLw=
github.com/aws/aws-sdk-go-v2/config v1.17.11 h1:9JQUKwRN8oUqeOFIrNaH6RSPmmcNk1+bQrDka/f/bPc=
github.com/aws/aws-sdk-go-v2/config v1.17.11/go.mod h1:cw6HIEr0FaZQfcoyRWYZpMfv4qAH19hZFZ5mglwWo3g=
github.com/aws/aws-sdk-go-v2/credentials v1.12.24 h1:yz4fhoMfgwymG0rU6q5eCydFhYNQxk9yrNjMA7L7xmg=
github.com/aws/aws-sdk-go-v2/credentials v1.12.24/go.mod h1:prZpUfBu1KZLBLVX482Sq4DpDXGugAre08TPEc21GUg=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.19 h1:E3PXZSI3F2bzyj6XxUXdTIfvp425HHhwKsFvmzBwHgs=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.19/go.mod h1:VihW95zQpeKQWVPGkwT+2+WJNQV8UXFfMTWdU6VErL8=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.25 h1:nBO/RFxeq/IS5G9Of+ZrgucRciie2qpLy++3UGZ+q2E=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.25/go.mod h1:Zb29PYkf42vVYQY6pvSyJCJcFHlPIiY+YKdPtwnvMkY=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.19 h1:oRHDrwCTVT8ZXi4sr9Ld+EXk7N/KGssOr2ygNeojEhw=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.19/go.mod h1:6Q0546uHDp421okhmmGfbxzq2hBqbXFNpi4k+Q1JnQA=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.26 h1:Mza+vlnZr+fPKFKRq/lKGVvM6B/8ZZmNdEopOwSQLms=
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.26/go.mod h1:Y2OJ+P+MC1u1VKnavT+PshiEuGPyh/7DqxoDNij4/bg=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.19 h1:GE25AWCdNUPh9AOJzI9KIJnja7IwUc1WyUqz/JTyJ/I=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.19/go.mod h1:02CP6iuYP+IVnBX5HULVdSAku/85eHB2Y9EsFhrkEwU=
github.com/aws/aws-sdk-go-v2/service/kms v1.18.16 h1:KHzeOb0G5ZvaIOewRSs3iyHR5MeAKkIZ75tUJCO9ijg=
github.com/aws/aws-sdk-go-v2/service/kms v1.18.16/go.mod h1:kZodDPTQjSH/qM6/OvyTfM5mms5JHB/EKYp5dhn/vI4=
github.com/aws/aws-sdk-go-v2/service/sso v1.11.25 h1:GFZitO48N/7EsFDt8fMa5iYdmWqkUDDB3Eje6z3kbG0=
github.com/aws/aws-sdk-go-v2/service/sso v1.11.25/go.mod h1:IARHuzTXmj1C0KS35vboR0FeJ89OkEy1M9mWbK2ifCI=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.8 h1:jcw6kKZrtNfBPJkaHrscDOZoe5gvi9wjudnxvozYFJo=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.8/go.mod h1:er2JHN+kBY6FcMfcBBKNGCT3CarImmdFzishsqBmSRI=
github.com/aws/aws-sdk-go-v2/service/sts v1.17.2 h1:tpwEMRdMf2UsplengAOnmSIRdvAxf75oUFR+blBr92I=
github.com/aws/aws-sdk-go-v2/service/sts v1.17.2/go.mod h1:bXcN3koeVYiJcdDU89n3kCYILob7Y34AeLopUbZgLT4=
github.com/aws/smithy-go v1.13.4 h1:/RN2z1txIJWeXeOkzX+Hk/4Uuvv7dWtCjbmVJcrskyk=
github.com/aws/smithy-go v1.13.4/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
Expand All @@ -21,8 +49,9 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
Expand Down
60 changes: 55 additions & 5 deletions lambda-extensions/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
// LambdaExtensionConfig config for storing all configurable parameters
type LambdaExtensionConfig struct {
SumoHTTPEndpoint string
KMSKeyId string
EnableFailover bool
S3BucketName string
S3BucketRegion string
Expand All @@ -34,6 +35,9 @@ type LambdaExtensionConfig struct {
MaxDataPayloadSize int
LambdaRegion string
SourceCategoryOverride string
EnhanceJsonLogs bool
EnableSpanDrops bool
KmsCacheSeconds int64
}

var defaultLogTypes = []string{"platform", "function"}
Expand All @@ -48,6 +52,7 @@ func GetConfig() (*LambdaExtensionConfig, error) {

config := &LambdaExtensionConfig{
SumoHTTPEndpoint: sumoHttpEndpoint,
KMSKeyId: os.Getenv("KMS_KEY_ID"),
S3BucketName: os.Getenv("SUMO_S3_BUCKET_NAME"),
S3BucketRegion: os.Getenv("SUMO_S3_BUCKET_REGION"),
AWSLambdaRuntimeAPI: os.Getenv("AWS_LAMBDA_RUNTIME_API"),
Expand All @@ -69,6 +74,7 @@ func GetConfig() (*LambdaExtensionConfig, error) {
}
return config, nil
}

func (cfg *LambdaExtensionConfig) setDefaults() {
numRetry := os.Getenv("SUMO_NUM_RETRIES")
retrySleepTime := os.Getenv("SUMO_RETRY_SLEEP_TIME_MS")
Expand All @@ -77,35 +83,56 @@ func (cfg *LambdaExtensionConfig) setDefaults() {
maxConcurrentRequests := os.Getenv("SUMO_MAX_CONCURRENT_REQUESTS")
enableFailover := os.Getenv("SUMO_ENABLE_FAILOVER")
logTypes := os.Getenv("SUMO_LOG_TYPES")
enhanceJsonLogs := os.Getenv("SUMO_ENHANCE_JSON_LOGS")
enableSpanDrops := os.Getenv("SUMO_SPAN_DROP")
kmsCacheSeconds := os.Getenv("KMS_CACHE_SECONDS")

if numRetry == "" {
cfg.NumRetry = 3
}

if logLevel == "" {
cfg.LogLevel = logrus.InfoLevel
}

if maxDataQueueLength == "" {
cfg.MaxDataQueueLength = 20
}

if maxConcurrentRequests == "" {
cfg.MaxConcurrentRequests = 3
}

if enableFailover == "" {
cfg.EnableFailover = false
}

if cfg.AWSLambdaRuntimeAPI == "" {
cfg.AWSLambdaRuntimeAPI = "127.0.0.1:9001"
}

if logTypes == "" {
cfg.LogTypes = defaultLogTypes
} else {
cfg.LogTypes = strings.Split(logTypes, ",")
}

if retrySleepTime == "" {
cfg.RetrySleepTime = 300 * time.Millisecond
cfg.RetrySleepTime = 300 * time.Millisecond
}

if enhanceJsonLogs == "" {
cfg.EnhanceJsonLogs = false
}

if enableSpanDrops == "" {
// by default, spans will not be dropped if user did not configure the env variable
cfg.EnableSpanDrops = false
}

if kmsCacheSeconds == "" {
cfg.KmsCacheSeconds = 5
}
}

func (cfg *LambdaExtensionConfig) validateConfig() error {
Expand All @@ -115,6 +142,9 @@ func (cfg *LambdaExtensionConfig) validateConfig() error {
maxConcurrentRequests := os.Getenv("SUMO_MAX_CONCURRENT_REQUESTS")
enableFailover := os.Getenv("SUMO_ENABLE_FAILOVER")
retrySleepTime := os.Getenv("SUMO_RETRY_SLEEP_TIME_MS")
enhanceJsonLogs := os.Getenv("SUMO_ENHANCE_JSON_LOGS")
enableSpanDrops := os.Getenv("SUMO_SPAN_DROP")
kmsCacheSeconds := os.Getenv("KMS_CACHE_SECONDS")

var allErrors []string
var err error
Expand All @@ -124,7 +154,7 @@ func (cfg *LambdaExtensionConfig) validateConfig() error {
}

// Todo test url valid
if cfg.SumoHTTPEndpoint != "" {
if cfg.SumoHTTPEndpoint != "" && cfg.KMSKeyId == "" {
_, err = url.ParseRequestURI(cfg.SumoHTTPEndpoint)
if err != nil {
allErrors = append(allErrors, "SUMO_HTTP_ENDPOINT is not Valid")
Expand All @@ -138,7 +168,7 @@ func (cfg *LambdaExtensionConfig) validateConfig() error {
}
}

if cfg.EnableFailover == true {
if cfg.EnableFailover {
if cfg.S3BucketName == "" {
allErrors = append(allErrors, "SUMO_S3_BUCKET_NAME not set in environment variable")
}
Expand Down Expand Up @@ -172,25 +202,45 @@ func (cfg *LambdaExtensionConfig) validateConfig() error {
} else {
cfg.MaxDataQueueLength = int(customMaxDataQueueLength)
}

}

if maxConcurrentRequests != "" {
customMaxConcurrentRequests, err := strconv.ParseInt(maxConcurrentRequests, 10, 32)
if err != nil {
allErrors = append(allErrors, fmt.Sprintf("Unable to parse SUMO_MAX_CONCURRENT_REQUESTS: %v", err))
} else {
cfg.MaxConcurrentRequests = int(customMaxConcurrentRequests)
}

}

if logLevel != "" {
customloglevel, err := logrus.ParseLevel(logLevel)
if err != nil {
allErrors = append(allErrors, fmt.Sprintf("Unable to parse SUMO_LOG_LEVEL: %v", err))
} else {
cfg.LogLevel = customloglevel
}
}

if enhanceJsonLogs != "" {
cfg.EnhanceJsonLogs, err = strconv.ParseBool(enhanceJsonLogs)
if err != nil {
allErrors = append(allErrors, fmt.Sprintf("Unable to parse SUMO_ENHANCE_JSON_LOGS: %v", err))
}
}

if enableSpanDrops != "" {
cfg.EnableSpanDrops, err = strconv.ParseBool(enableSpanDrops)
if err != nil {
allErrors = append(allErrors, fmt.Sprintf("Unable to parse SUMO_SPAN_DROP: %v", err))
}
}

if kmsCacheSeconds != "" {
cfg.KmsCacheSeconds, err = strconv.ParseInt(kmsCacheSeconds, 10, 32)
if err != nil {
allErrors = append(allErrors, fmt.Sprintf("Unable to parse KMS_CACHE_SECONDS: %v", err))
}
}

// test valid log format type
Expand Down
2 changes: 1 addition & 1 deletion lambda-extensions/config/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

// ExtensionName same as binary name or file name where main exists
var ExtensionName = filepath.Base(os.Args[0])
var layerVersion = "4"
var layerVersion = "7"

// SumoLogicExtensionLayerVersionSuffix denotes the layer version published in AWS
var SumoLogicExtensionLayerVersionSuffix string = fmt.Sprintf("%s-prod:%s", ExtensionName, layerVersion)
47 changes: 47 additions & 0 deletions lambda-extensions/lambdaapi/telemetryapiclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package lambdaapi

import (
"bytes"
"context"
"encoding/json"
"fmt"
)

const (
// Base URL for telemetry api extension
telemetryURL = "2022-07-01/telemetry"
// Subscription Body Constants. Subscribe to platform logs and receive them on ${local_ip}:4243 via HTTP protocol.
telemetry_timeoutMs = 1000
telemetry_maxBytes = 1048576
telemetry_maxItems = 10000
telemetry_receiverPort = 4243
)

// SubscribeToLogsAPI is - Subscribe to Logs API to receive the Lambda Logs.
func (client *Client) SubscribeToTelemetryAPI(ctx context.Context, logEvents []string) ([]byte, error) {
URL := client.baseURL + telemetryURL

reqBody, err := json.Marshal(map[string]interface{}{
"destination": map[string]interface{}{"protocol": "HTTP", "URI": fmt.Sprintf("http://sandbox:%v", receiverPort)},
"types": logEvents,
"buffering": map[string]interface{}{"timeoutMs": telemetry_timeoutMs, "maxBytes": telemetry_maxBytes, "maxItems": telemetry_maxItems},
"schemaVersion": "2022-07-01",
})
if err != nil {
return nil, err
}
headers := map[string]string{
extensionIdentiferHeader: client.extensionID,
}
var response []byte
if ctx != nil {
response, err = client.MakeRequestWithContext(ctx, headers, bytes.NewBuffer(reqBody), "PUT", URL)
} else {
response, err = client.MakeRequest(headers, bytes.NewBuffer(reqBody), "PUT", URL)
}
if err != nil {
return nil, err
}

return response, nil
}
35 changes: 35 additions & 0 deletions lambda-extensions/lambdaapi/telemetryapiclient_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package lambdaapi

import (
"context"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
)

func TestSubscribeToTelemetryAPI(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assertEqual(t, r.Method, http.MethodPut, "Method is not PUT")
assertNotEmpty(t, r.Header.Get(extensionNameHeader), "Extension Name Header not present")

reqBytes, err := ioutil.ReadAll(r.Body)
assertNoError(t, err, "Received error")
defer r.Body.Close()
assertNotEmpty(t, reqBytes, "Received error in request")

w.Header().Add(extensionIdentiferHeader, "test-sumo-id")
w.WriteHeader(200)
}))

defer srv.Close()
client := NewClient(srv.URL[7:], extensionName)

// Without Context
response, err := client.SubscribeToTelemetryAPI(nil, []string{"platform", "function", "extension"})
commonAsserts(t, client, response, err)

// With Context
response, err = client.SubscribeToTelemetryAPI(context.Background(), []string{"platform", "function", "extension"})
commonAsserts(t, client, response, err)
}
Loading