From 2b31f9500948da334ab9f613155d856da7249883 Mon Sep 17 00:00:00 2001 From: "laszlo.bica" Date: Tue, 6 Sep 2022 17:07:52 +0200 Subject: [PATCH 1/6] Address feature request in Issue 13 https://github.com/SumoLogic/sumologic-lambda-extensions/issues/13 Implement a feature switch to turn off log enhancement for JSON logs Feature switch defaults to `true`, so behaviour is unchanged --- lambda-extensions/config/config.go | 16 ++++++++++++++-- lambda-extensions/sumoclient/sumoclient.go | 8 ++++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/lambda-extensions/config/config.go b/lambda-extensions/config/config.go index a6c8bea..53fc596 100644 --- a/lambda-extensions/config/config.go +++ b/lambda-extensions/config/config.go @@ -34,6 +34,7 @@ type LambdaExtensionConfig struct { MaxDataPayloadSize int LambdaRegion string SourceCategoryOverride string + EnhanceJsonLogs bool } var defaultLogTypes = []string{"platform", "function"} @@ -73,6 +74,7 @@ func (cfg *LambdaExtensionConfig) setDefaults() { maxConcurrentRequests := os.Getenv("SUMO_MAX_CONCURRENT_REQUESTS") enableFailover := os.Getenv("SUMO_ENABLE_FAILOVER") logTypes := os.Getenv("SUMO_LOG_TYPES") + enhanceJsonLogs := os.Getenv("SUMO_ENHANCE_JSON_LOGS") if numRetry == "" { cfg.NumRetry = 3 @@ -99,9 +101,11 @@ func (cfg *LambdaExtensionConfig) setDefaults() { cfg.LogTypes = strings.Split(logTypes, ",") } if retrySleepTime == "" { - cfg.RetrySleepTime = 300 * time.Millisecond + cfg.RetrySleepTime = 300 * time.Millisecond + } + if enhanceJsonLogs == "" { + cfg.EnhanceJsonLogs = true } - } func (cfg *LambdaExtensionConfig) validateConfig() error { @@ -111,6 +115,7 @@ func (cfg *LambdaExtensionConfig) validateConfig() error { maxConcurrentRequests := os.Getenv("SUMO_MAX_CONCURRENT_REQUESTS") enableFailover := os.Getenv("SUMO_ENABLE_FAILOVER") retrySleepTime := os.Getenv("SUMO_RETRY_SLEEP_TIME_MS") + enhanceJsonLogs := os.Getenv("SUMO_ENHANCE_JSON_LOGS") var allErrors []string var err error @@ -189,6 +194,13 @@ func (cfg *LambdaExtensionConfig) validateConfig() error { } + if enhanceJsonLogs != "" { + cfg.EnhanceJsonLogs, err = strconv.ParseBool(enhanceJsonLogs) + if err != nil { + allErrors = append(allErrors, fmt.Sprintf("Unable to parse SUMO_ENHANCE_JSON_LOGS: %v", err)) + } + } + // test valid log format type for _, logType := range cfg.LogTypes { if !utils.StringInSlice(strings.TrimSpace(logType), validLogTypes) { diff --git a/lambda-extensions/sumoclient/sumoclient.go b/lambda-extensions/sumoclient/sumoclient.go index 4292667..b08b6dd 100644 --- a/lambda-extensions/sumoclient/sumoclient.go +++ b/lambda-extensions/sumoclient/sumoclient.go @@ -202,7 +202,11 @@ func (s *sumoLogicClient) enhanceLogs(msg responseBody) { if err != nil { item["message"] = message } else { - item["message"] = json + if s.config.EnhanceJsonLogs { + item["message"] = json + } else { + item = json + } } } else if ok && logType == "platform.report" { s.createCWLogLine(item) @@ -287,7 +291,7 @@ func (s *sumoLogicClient) SendLogs(ctx context.Context, rawmsg []byte) error { } func (s *sumoLogicClient) SendAllLogs(ctx context.Context, allMessages [][]byte) error { - if (len(allMessages) == 0) { + if len(allMessages) == 0 { s.logger.Debugf("SendAllLogs: No messages to send") return nil } From 9bb28bf1b23eced714427c92a2de8f628122170a Mon Sep 17 00:00:00 2001 From: Nitin Pande Date: Wed, 9 Nov 2022 18:18:47 +0530 Subject: [PATCH 2/6] Checkin for lambda telemetry api support for extension. --- README.md | 2 +- lambda-extensions/config/config.go | 23 +++++++-- lambda-extensions/config/version.go | 2 +- .../lambdaapi/telemetryapiclient.go | 47 +++++++++++++++++++ .../lambdaapi/telemetryapiclient_test.go | 35 ++++++++++++++ lambda-extensions/sumoclient/sumoclient.go | 36 ++++++++++---- lambda-extensions/sumologic-extension.go | 9 ++-- 7 files changed, 134 insertions(+), 20 deletions(-) create mode 100644 lambda-extensions/lambdaapi/telemetryapiclient.go create mode 100644 lambda-extensions/lambdaapi/telemetryapiclient_test.go diff --git a/README.md b/README.md index 473492a..c78fcef 100644 --- a/README.md +++ b/README.md @@ -85,7 +85,7 @@ Releasing new layer versions `sh zip.sh` -- The new wheel package gets released automatically after the tags are pushed using Github actions(Refer tagged-release in https://github.com/marvinpinto/action-automatic-releases). +- The new extension binary and zip files gets released automatically after the tags are pushed using Github actions(Refer tagged-release in https://github.com/marvinpinto/action-automatic-releases). Run below commands to create and push tags diff --git a/lambda-extensions/config/config.go b/lambda-extensions/config/config.go index 53fc596..b087641 100644 --- a/lambda-extensions/config/config.go +++ b/lambda-extensions/config/config.go @@ -35,6 +35,7 @@ type LambdaExtensionConfig struct { LambdaRegion string SourceCategoryOverride string EnhanceJsonLogs bool + EnableSpanDrops bool } var defaultLogTypes = []string{"platform", "function"} @@ -66,6 +67,7 @@ func GetConfig() (*LambdaExtensionConfig, error) { } return config, nil } + func (cfg *LambdaExtensionConfig) setDefaults() { numRetry := os.Getenv("SUMO_NUM_RETRIES") retrySleepTime := os.Getenv("SUMO_RETRY_SLEEP_TIME_MS") @@ -75,6 +77,7 @@ func (cfg *LambdaExtensionConfig) setDefaults() { enableFailover := os.Getenv("SUMO_ENABLE_FAILOVER") logTypes := os.Getenv("SUMO_LOG_TYPES") enhanceJsonLogs := os.Getenv("SUMO_ENHANCE_JSON_LOGS") + enableSpanDrops := os.Getenv("SUMO_SPAN_DROP") if numRetry == "" { cfg.NumRetry = 3 @@ -88,7 +91,6 @@ func (cfg *LambdaExtensionConfig) setDefaults() { if maxConcurrentRequests == "" { cfg.MaxConcurrentRequests = 3 } - if enableFailover == "" { cfg.EnableFailover = false } @@ -106,6 +108,10 @@ func (cfg *LambdaExtensionConfig) setDefaults() { if enhanceJsonLogs == "" { cfg.EnhanceJsonLogs = true } + if enableSpanDrops == "" { + // by default, spans will not be dropped if user did not configure the env variable + cfg.EnableSpanDrops = false + } } func (cfg *LambdaExtensionConfig) validateConfig() error { @@ -116,6 +122,7 @@ func (cfg *LambdaExtensionConfig) validateConfig() error { enableFailover := os.Getenv("SUMO_ENABLE_FAILOVER") retrySleepTime := os.Getenv("SUMO_RETRY_SLEEP_TIME_MS") enhanceJsonLogs := os.Getenv("SUMO_ENHANCE_JSON_LOGS") + enableSpanDrops := os.Getenv("SUMO_SPAN_DROP") var allErrors []string var err error @@ -139,7 +146,7 @@ func (cfg *LambdaExtensionConfig) validateConfig() error { } } - if cfg.EnableFailover == true { + if cfg.EnableFailover { if cfg.S3BucketName == "" { allErrors = append(allErrors, "SUMO_S3_BUCKET_NAME not set in environment variable") } @@ -173,8 +180,8 @@ func (cfg *LambdaExtensionConfig) validateConfig() error { } else { cfg.MaxDataQueueLength = int(customMaxDataQueueLength) } - } + if maxConcurrentRequests != "" { customMaxConcurrentRequests, err := strconv.ParseInt(maxConcurrentRequests, 10, 32) if err != nil { @@ -182,8 +189,8 @@ func (cfg *LambdaExtensionConfig) validateConfig() error { } else { cfg.MaxConcurrentRequests = int(customMaxConcurrentRequests) } - } + if logLevel != "" { customloglevel, err := logrus.ParseLevel(logLevel) if err != nil { @@ -191,7 +198,6 @@ func (cfg *LambdaExtensionConfig) validateConfig() error { } else { cfg.LogLevel = customloglevel } - } if enhanceJsonLogs != "" { @@ -201,6 +207,13 @@ func (cfg *LambdaExtensionConfig) validateConfig() error { } } + if enableSpanDrops != "" { + cfg.EnableSpanDrops, err = strconv.ParseBool(enableSpanDrops) + if err != nil { + allErrors = append(allErrors, fmt.Sprintf("Unable to parse SUMO_SPAN_DROP: %v", err)) + } + } + // test valid log format type for _, logType := range cfg.LogTypes { if !utils.StringInSlice(strings.TrimSpace(logType), validLogTypes) { diff --git a/lambda-extensions/config/version.go b/lambda-extensions/config/version.go index da98125..e9fe1ff 100644 --- a/lambda-extensions/config/version.go +++ b/lambda-extensions/config/version.go @@ -8,7 +8,7 @@ import ( // ExtensionName same as binary name or file name where main exists var ExtensionName = filepath.Base(os.Args[0]) -var layerVersion = "4" +var layerVersion = "7" // SumoLogicExtensionLayerVersionSuffix denotes the layer version published in AWS var SumoLogicExtensionLayerVersionSuffix string = fmt.Sprintf("%s-prod:%s", ExtensionName, layerVersion) diff --git a/lambda-extensions/lambdaapi/telemetryapiclient.go b/lambda-extensions/lambdaapi/telemetryapiclient.go new file mode 100644 index 0000000..f59d74b --- /dev/null +++ b/lambda-extensions/lambdaapi/telemetryapiclient.go @@ -0,0 +1,47 @@ +package lambdaapi + +import ( + "bytes" + "context" + "encoding/json" + "fmt" +) + +const ( + // Base URL for telemetry api extension + telemetryURL = "2022-07-01/telemetry" + // Subscription Body Constants. Subscribe to platform logs and receive them on ${local_ip}:4243 via HTTP protocol. + telemetry_timeoutMs = 1000 + telemetry_maxBytes = 1048576 + telemetry_maxItems = 10000 + telemetry_receiverPort = 4243 +) + +// SubscribeToLogsAPI is - Subscribe to Logs API to receive the Lambda Logs. +func (client *Client) SubscribeToTelemetryAPI(ctx context.Context, logEvents []string) ([]byte, error) { + URL := client.baseURL + telemetryURL + + reqBody, err := json.Marshal(map[string]interface{}{ + "destination": map[string]interface{}{"protocol": "HTTP", "URI": fmt.Sprintf("http://sandbox:%v", receiverPort)}, + "types": logEvents, + "buffering": map[string]interface{}{"timeoutMs": telemetry_timeoutMs, "maxBytes": telemetry_maxBytes, "maxItems": telemetry_maxItems}, + "schemaVersion": "2022-07-01", + }) + if err != nil { + return nil, err + } + headers := map[string]string{ + extensionIdentiferHeader: client.extensionID, + } + var response []byte + if ctx != nil { + response, err = client.MakeRequestWithContext(ctx, headers, bytes.NewBuffer(reqBody), "PUT", URL) + } else { + response, err = client.MakeRequest(headers, bytes.NewBuffer(reqBody), "PUT", URL) + } + if err != nil { + return nil, err + } + + return response, nil +} diff --git a/lambda-extensions/lambdaapi/telemetryapiclient_test.go b/lambda-extensions/lambdaapi/telemetryapiclient_test.go new file mode 100644 index 0000000..54e254c --- /dev/null +++ b/lambda-extensions/lambdaapi/telemetryapiclient_test.go @@ -0,0 +1,35 @@ +package lambdaapi + +import ( + "context" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" +) + +func TestSubscribeToTelemetryAPI(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assertEqual(t, r.Method, http.MethodPut, "Method is not PUT") + assertNotEmpty(t, r.Header.Get(extensionNameHeader), "Extension Name Header not present") + + reqBytes, err := ioutil.ReadAll(r.Body) + assertNoError(t, err, "Received error") + defer r.Body.Close() + assertNotEmpty(t, reqBytes, "Received error in request") + + w.Header().Add(extensionIdentiferHeader, "test-sumo-id") + w.WriteHeader(200) + })) + + defer srv.Close() + client := NewClient(srv.URL[7:], extensionName) + + // Without Context + response, err := client.SubscribeToTelemetryAPI(nil, []string{"platform", "function", "extension"}) + commonAsserts(t, client, response, err) + + // With Context + response, err = client.SubscribeToTelemetryAPI(context.Background(), []string{"platform", "function", "extension"}) + commonAsserts(t, client, response, err) +} diff --git a/lambda-extensions/sumoclient/sumoclient.go b/lambda-extensions/sumoclient/sumoclient.go index b08b6dd..781d6f5 100644 --- a/lambda-extensions/sumoclient/sumoclient.go +++ b/lambda-extensions/sumoclient/sumoclient.go @@ -101,7 +101,7 @@ func (s *sumoLogicClient) failoverHandler(buf *bytes.Buffer) error { } err = utils.UploadToS3(&s.config.S3BucketName, &keyName, buf) if err != nil { - err = fmt.Errorf("Failed to Send to S3 Bucket %s Path %s: %w", s.config.S3BucketName, keyName, err) + err = fmt.Errorf("failed to send to s3 bucket %s path %s: %w", s.config.S3BucketName, keyName, err) } return err } @@ -159,13 +159,22 @@ func (s *sumoLogicClient) createCWLogLine(item map[string]interface{}) { message, ok := item["record"].(map[string]interface{}) if ok { - delete(item, "record") + s.logger.Debug("Not dropping record, if logType is platform.report.") + // delete(item, "record") } + // Todo convert this to struct + // Updated cwMessageLine to also cover new field initDurationMs as record.metrics do have it. metric := message["metrics"].(map[string]interface{}) - cwMessageLine := fmt.Sprintf("REPORT RequestId: %v Duration: %v ms Billed Duration: %v ms Memory Size: %v MB Max Memory Used: %v MB", - message["requestId"], metric["durationMs"], metric["billedDurationMs"], metric["memorySizeMB"], metric["maxMemoryUsedMB"]) - item["message"] = cwMessageLine + if metric["initDurationMs"] == nil { + cwMessageLine := fmt.Sprintf("REPORT RequestId: %v Duration: %v ms Billed Duration: %v ms Memory Size: %v MB Max Memory Used: %v MB", + message["requestId"], metric["durationMs"], metric["billedDurationMs"], metric["memorySizeMB"], metric["maxMemoryUsedMB"]) + item["message"] = cwMessageLine + } else { + cwMessageLine := fmt.Sprintf("REPORT RequestId: %v Duration: %v ms Billed Duration: %v ms Memory Size: %v MB Max Memory Used: %v MB Init Duration: %v ms", + message["requestId"], metric["durationMs"], metric["billedDurationMs"], metric["memorySizeMB"], metric["maxMemoryUsedMB"], metric["initDurationMs"]) + item["message"] = cwMessageLine + } } func (s *sumoLogicClient) getLogGroup() string { @@ -210,6 +219,15 @@ func (s *sumoLogicClient) enhanceLogs(msg responseBody) { } } else if ok && logType == "platform.report" { s.createCWLogLine(item) + } else if ok && logType == "platform.runtimeDone" { + message, ok := item["record"].(map[string]interface{}) + if ok { + _, ok := message["spans"] + if ok && s.config.EnableSpanDrops { + // dropping spans if its present and configured to drop + delete(message, "spans") + } + } } } } @@ -217,10 +235,10 @@ func (s *sumoLogicClient) enhanceLogs(msg responseBody) { func (s *sumoLogicClient) transformBytesToArrayOfMap(rawmsg []byte) (responseBody, error) { s.logger.Debugln("Transforming bytes to array of maps") var msg responseBody - var err error - err = json.Unmarshal(rawmsg, &msg) + // var err error + var err error = json.Unmarshal(rawmsg, &msg) if err != nil { - return msg, fmt.Errorf("Error in parsing payload %s: %v", string(rawmsg), err) + return msg, fmt.Errorf("error in parsing payload %s: %v", string(rawmsg), err) } return msg, err } @@ -253,7 +271,7 @@ func (s *sumoLogicClient) createChunks(msgArr responseBody) ([]string, error) { } chunks = append(chunks, currentChunk.String()) if errorCount > 0 { - err = fmt.Errorf("Dropping %d messages due to json parsing error", errorCount) + err = fmt.Errorf("dropping %d messages due to json parsing error", errorCount) } s.logger.Debugf("Chunks created: %d NumOfParsingError: %d", len(chunks), errorCount) return chunks, err diff --git a/lambda-extensions/sumologic-extension.go b/lambda-extensions/sumologic-extension.go index 1bdf866..5fca432 100644 --- a/lambda-extensions/sumologic-extension.go +++ b/lambda-extensions/sumologic-extension.go @@ -64,13 +64,14 @@ func runTimeAPIInit() (int64, error) { } logger.Debug("Succcessfully Registered with Run Time API Client: ", utils.PrettyPrint(registerResponse)) - // Subscribe to Logs API - logger.Debug("Subscribing Extension to Logs API........") - subscribeResponse, err := extensionClient.SubscribeToLogsAPI(nil, config.LogTypes) + // Subscribe to Telemetry API + logger.Debug("Subscribing Extension to Telemetry API........") + subscribeResponse, err := extensionClient.SubscribeToTelemetryAPI(nil, config.LogTypes) if err != nil { return 0, err } - logger.Debug("Successfully subscribed to Logs API: ", utils.PrettyPrint(string(subscribeResponse))) + + logger.Debug("Successfully subscribed to Telemetry API: ", utils.PrettyPrint(string(subscribeResponse))) // Call next to say registration is successful and get the deadtimems nextResponse, err := nextEvent(nil) From 30abd3ddf6d6d9c99f90ce9d5f81f40060b39ace Mon Sep 17 00:00:00 2001 From: Nate Henjes Date: Thu, 10 Nov 2022 23:04:11 -0500 Subject: [PATCH 3/6] Add ability to use KMS encrypted endpoint --- go.mod | 4 ++ go.sum | 31 ++++++++- lambda-extensions/config/config.go | 27 +++++++- lambda-extensions/sumoclient/sumoclient.go | 76 +++++++++++++++++++++- 4 files changed, 134 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 9b44862..8438aaf 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,10 @@ go 1.15 require ( github.com/aws/aws-sdk-go v1.35.23 + github.com/aws/aws-sdk-go-v2 v1.17.1 + github.com/aws/aws-sdk-go-v2/config v1.17.11 + github.com/aws/aws-sdk-go-v2/service/kms v1.18.16 github.com/google/uuid v1.1.2 github.com/sirupsen/logrus v1.7.0 + golang.org/x/sys v0.2.0 // indirect ) diff --git a/go.sum b/go.sum index 8310e3f..abcef16 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,36 @@ github.com/aws/aws-sdk-go v1.35.23 h1:SCP0d0XvyJTDmfnHEQPvBaYi3kea1VNUo7uQmkVgFts= github.com/aws/aws-sdk-go v1.35.23/go.mod h1:tlPOdRjfxPBpNIwqDj61rmsnA85v9jc0Ps9+muhnW+k= +github.com/aws/aws-sdk-go-v2 v1.17.1 h1:02c72fDJr87N8RAC2s3Qu0YuvMRZKNZJ9F+lAehCazk= +github.com/aws/aws-sdk-go-v2 v1.17.1/go.mod h1:JLnGeGONAyi2lWXI1p0PCIOIy333JMVK1U7Hf0aRFLw= +github.com/aws/aws-sdk-go-v2/config v1.17.11 h1:9JQUKwRN8oUqeOFIrNaH6RSPmmcNk1+bQrDka/f/bPc= +github.com/aws/aws-sdk-go-v2/config v1.17.11/go.mod h1:cw6HIEr0FaZQfcoyRWYZpMfv4qAH19hZFZ5mglwWo3g= +github.com/aws/aws-sdk-go-v2/credentials v1.12.24 h1:yz4fhoMfgwymG0rU6q5eCydFhYNQxk9yrNjMA7L7xmg= +github.com/aws/aws-sdk-go-v2/credentials v1.12.24/go.mod h1:prZpUfBu1KZLBLVX482Sq4DpDXGugAre08TPEc21GUg= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.19 h1:E3PXZSI3F2bzyj6XxUXdTIfvp425HHhwKsFvmzBwHgs= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.19/go.mod h1:VihW95zQpeKQWVPGkwT+2+WJNQV8UXFfMTWdU6VErL8= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.25 h1:nBO/RFxeq/IS5G9Of+ZrgucRciie2qpLy++3UGZ+q2E= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.25/go.mod h1:Zb29PYkf42vVYQY6pvSyJCJcFHlPIiY+YKdPtwnvMkY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.19 h1:oRHDrwCTVT8ZXi4sr9Ld+EXk7N/KGssOr2ygNeojEhw= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.19/go.mod h1:6Q0546uHDp421okhmmGfbxzq2hBqbXFNpi4k+Q1JnQA= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.26 h1:Mza+vlnZr+fPKFKRq/lKGVvM6B/8ZZmNdEopOwSQLms= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.26/go.mod h1:Y2OJ+P+MC1u1VKnavT+PshiEuGPyh/7DqxoDNij4/bg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.19 h1:GE25AWCdNUPh9AOJzI9KIJnja7IwUc1WyUqz/JTyJ/I= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.19/go.mod h1:02CP6iuYP+IVnBX5HULVdSAku/85eHB2Y9EsFhrkEwU= +github.com/aws/aws-sdk-go-v2/service/kms v1.18.16 h1:KHzeOb0G5ZvaIOewRSs3iyHR5MeAKkIZ75tUJCO9ijg= +github.com/aws/aws-sdk-go-v2/service/kms v1.18.16/go.mod h1:kZodDPTQjSH/qM6/OvyTfM5mms5JHB/EKYp5dhn/vI4= +github.com/aws/aws-sdk-go-v2/service/sso v1.11.25 h1:GFZitO48N/7EsFDt8fMa5iYdmWqkUDDB3Eje6z3kbG0= +github.com/aws/aws-sdk-go-v2/service/sso v1.11.25/go.mod h1:IARHuzTXmj1C0KS35vboR0FeJ89OkEy1M9mWbK2ifCI= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.8 h1:jcw6kKZrtNfBPJkaHrscDOZoe5gvi9wjudnxvozYFJo= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.13.8/go.mod h1:er2JHN+kBY6FcMfcBBKNGCT3CarImmdFzishsqBmSRI= +github.com/aws/aws-sdk-go-v2/service/sts v1.17.2 h1:tpwEMRdMf2UsplengAOnmSIRdvAxf75oUFR+blBr92I= +github.com/aws/aws-sdk-go-v2/service/sts v1.17.2/go.mod h1:bXcN3koeVYiJcdDU89n3kCYILob7Y34AeLopUbZgLT4= +github.com/aws/smithy-go v1.13.4 h1:/RN2z1txIJWeXeOkzX+Hk/4Uuvv7dWtCjbmVJcrskyk= +github.com/aws/smithy-go v1.13.4/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= @@ -21,8 +49,9 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A= +golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/lambda-extensions/config/config.go b/lambda-extensions/config/config.go index b087641..3a8e0d1 100644 --- a/lambda-extensions/config/config.go +++ b/lambda-extensions/config/config.go @@ -17,6 +17,7 @@ import ( // LambdaExtensionConfig config for storing all configurable parameters type LambdaExtensionConfig struct { SumoHTTPEndpoint string + KMSKeyId string EnableFailover bool S3BucketName string S3BucketRegion string @@ -36,6 +37,7 @@ type LambdaExtensionConfig struct { SourceCategoryOverride string EnhanceJsonLogs bool EnableSpanDrops bool + KmsCacheSeconds int64 } var defaultLogTypes = []string{"platform", "function"} @@ -46,6 +48,7 @@ func GetConfig() (*LambdaExtensionConfig, error) { config := &LambdaExtensionConfig{ SumoHTTPEndpoint: os.Getenv("SUMO_HTTP_ENDPOINT"), + KMSKeyId: os.Getenv("KMS_KEY_ID"), S3BucketName: os.Getenv("SUMO_S3_BUCKET_NAME"), S3BucketRegion: os.Getenv("SUMO_S3_BUCKET_REGION"), AWSLambdaRuntimeAPI: os.Getenv("AWS_LAMBDA_RUNTIME_API"), @@ -78,40 +81,54 @@ func (cfg *LambdaExtensionConfig) setDefaults() { logTypes := os.Getenv("SUMO_LOG_TYPES") enhanceJsonLogs := os.Getenv("SUMO_ENHANCE_JSON_LOGS") enableSpanDrops := os.Getenv("SUMO_SPAN_DROP") + kmsCacheSeconds := os.Getenv("KMS_CACHE_SECONDS") if numRetry == "" { cfg.NumRetry = 3 } + if logLevel == "" { cfg.LogLevel = logrus.InfoLevel } + if maxDataQueueLength == "" { cfg.MaxDataQueueLength = 20 } + if maxConcurrentRequests == "" { cfg.MaxConcurrentRequests = 3 } + if enableFailover == "" { cfg.EnableFailover = false } + if cfg.AWSLambdaRuntimeAPI == "" { cfg.AWSLambdaRuntimeAPI = "127.0.0.1:9001" } + if logTypes == "" { cfg.LogTypes = defaultLogTypes } else { cfg.LogTypes = strings.Split(logTypes, ",") } + if retrySleepTime == "" { cfg.RetrySleepTime = 300 * time.Millisecond } + if enhanceJsonLogs == "" { cfg.EnhanceJsonLogs = true } + if enableSpanDrops == "" { // by default, spans will not be dropped if user did not configure the env variable cfg.EnableSpanDrops = false } + + if kmsCacheSeconds == "" { + cfg.KmsCacheSeconds = 5 + } } func (cfg *LambdaExtensionConfig) validateConfig() error { @@ -123,6 +140,7 @@ func (cfg *LambdaExtensionConfig) validateConfig() error { retrySleepTime := os.Getenv("SUMO_RETRY_SLEEP_TIME_MS") enhanceJsonLogs := os.Getenv("SUMO_ENHANCE_JSON_LOGS") enableSpanDrops := os.Getenv("SUMO_SPAN_DROP") + kmsCacheSeconds := os.Getenv("KMS_CACHE_SECONDS") var allErrors []string var err error @@ -132,7 +150,7 @@ func (cfg *LambdaExtensionConfig) validateConfig() error { } // Todo test url valid - if cfg.SumoHTTPEndpoint != "" { + if cfg.SumoHTTPEndpoint != "" && cfg.KMSKeyId == "" { _, err = url.ParseRequestURI(cfg.SumoHTTPEndpoint) if err != nil { allErrors = append(allErrors, "SUMO_HTTP_ENDPOINT is not Valid") @@ -214,6 +232,13 @@ func (cfg *LambdaExtensionConfig) validateConfig() error { } } + if kmsCacheSeconds != "" { + cfg.KmsCacheSeconds, err = strconv.ParseInt(kmsCacheSeconds, 10, 32) + if err != nil { + allErrors = append(allErrors, fmt.Sprintf("Unable to parse KMS_CACHE_SECONDS: %v", err)) + } + } + // test valid log format type for _, logType := range cfg.LogTypes { if !utils.StringInSlice(strings.TrimSpace(logType), validLogTypes) { diff --git a/lambda-extensions/sumoclient/sumoclient.go b/lambda-extensions/sumoclient/sumoclient.go index 781d6f5..efd80ff 100644 --- a/lambda-extensions/sumoclient/sumoclient.go +++ b/lambda-extensions/sumoclient/sumoclient.go @@ -3,6 +3,7 @@ package sumoclient import ( "bytes" "context" + b64 "encoding/base64" "encoding/binary" "encoding/json" "fmt" @@ -14,11 +15,18 @@ import ( "github.com/SumoLogic/sumologic-lambda-extensions/lambda-extensions/config" + "github.com/aws/aws-sdk-go-v2/aws" + awsConfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/kms" + uuid "github.com/google/uuid" "github.com/sirupsen/logrus" ) -var isColdStart = true +var isColdStart bool = true + +var decryptedSumoHttpEndpoint string +var kmsEndpointCacheTime = time.Now().Add(-5 * time.Minute) // LogSender interface which needs to be implemented to send logs type LogSender interface { @@ -37,6 +45,12 @@ type sumoLogicClient struct { // It is assumed that logs will be array of json objects and all channel payloads satisfy this format type responseBody []map[string]interface{} +type KMSDecryptAPI interface { + Decrypt(ctx context.Context, + params *kms.DecryptInput, + optFns ...func(*kms.Options)) (*kms.DecryptOutput, error) +} + // NewLogSenderClient returns interface pointing to the concrete version of LogSender client func NewLogSenderClient(logger *logrus.Entry, cfg *config.LambdaExtensionConfig) LogSender { // setting the cold start variable here since this function is called @@ -56,8 +70,12 @@ func (s *sumoLogicClient) getColdStart() bool { } func (s *sumoLogicClient) makeRequest(ctx context.Context, buf *bytes.Buffer) (*http.Response, error) { + endpoint, err := s.getHttpEndpoint() + if err != nil { + err = fmt.Errorf("Failed to get SUMO HTTP Endpoint", err) + } - request, err := http.NewRequestWithContext(ctx, "POST", s.config.SumoHTTPEndpoint, buf) + request, err := http.NewRequestWithContext(ctx, "POST", endpoint, buf) if err != nil { err = fmt.Errorf("http.NewRequest() error: %v", err) return nil, err @@ -74,6 +92,56 @@ func (s *sumoLogicClient) makeRequest(ctx context.Context, buf *bytes.Buffer) (* return response, err } +// Use cached KMS decrypted endpoint, refresh the cached endpoint, or return unencrypted endpoint +func (s *sumoLogicClient) getHttpEndpoint() (string, error) { + if s.config.KMSKeyId == "" { + return s.config.SumoHTTPEndpoint, nil + } + + if s.config.KMSKeyId != "" && time.Until(kmsEndpointCacheTime) > 0 { + return decryptedSumoHttpEndpoint, nil + } + + if s.config.KMSKeyId != "" && (time.Until(kmsEndpointCacheTime) <= 0 || s.config.KmsCacheSeconds == 0) { + + cfg, err := awsConfig.LoadDefaultConfig(context.TODO()) + if err != nil { + fmt.Errorf("Configuration error in aws client,", err) + } + + client := kms.NewFromConfig(cfg) + + blob, err := b64.StdEncoding.DecodeString(s.config.SumoHTTPEndpoint) + if err != nil { + fmt.Errorf("Error converting string to blob,", err) + } + + input := &kms.DecryptInput{ + CiphertextBlob: blob, + KeyId: aws.String(s.config.KMSKeyId), + } + + result, err := DecodeData(context.TODO(), client, input) + + if err != nil { + fmt.Errorf("Got error decrypting data: ", err) + return "", err + } + + // Set the decrypted endpoint var as decrypted string to use as cache + decryptedSumoHttpEndpoint := string(result.Plaintext) + + // Set new cache time + kmsEndpointCacheTime = time.Now() + + return decryptedSumoHttpEndpoint, nil + } + + err := fmt.Errorf("Failed to select a valid Sumo HTTP endpoint") + + return "", err +} + // getS3KeyName returns the key by combining function name, version, date and uuid(version 1) func (s *sumoLogicClient) getS3KeyName() (string, error) { currentTime := time.Now() @@ -414,3 +482,7 @@ func (s *sumoLogicClient) postToSumo(ctx context.Context, logStringToSend *strin return nil } + +func DecodeData(c context.Context, api KMSDecryptAPI, input *kms.DecryptInput) (*kms.DecryptOutput, error) { + return api.Decrypt(c, input) +} From 57efcfd43f57b589f5b1624bfd479e4b1f434670 Mon Sep 17 00:00:00 2001 From: Yohei Kitamura Date: Thu, 17 Aug 2023 15:30:00 -0400 Subject: [PATCH 4/6] switch EnhanceJsonLogs defalut to false --- lambda-extensions/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambda-extensions/config/config.go b/lambda-extensions/config/config.go index 7e85e18..202df1e 100644 --- a/lambda-extensions/config/config.go +++ b/lambda-extensions/config/config.go @@ -122,7 +122,7 @@ func (cfg *LambdaExtensionConfig) setDefaults() { } if enhanceJsonLogs == "" { - cfg.EnhanceJsonLogs = true + cfg.EnhanceJsonLogs = false } if enableSpanDrops == "" { From 15785d4e74ff4bf2412734f65620ec8145af185c Mon Sep 17 00:00:00 2001 From: Yohei Kitamura Date: Thu, 17 Aug 2023 15:58:22 -0400 Subject: [PATCH 5/6] try to extract JSON string --- lambda-extensions/utils/utils.go | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/lambda-extensions/utils/utils.go b/lambda-extensions/utils/utils.go index 083f0c1..26b1895 100644 --- a/lambda-extensions/utils/utils.go +++ b/lambda-extensions/utils/utils.go @@ -5,6 +5,7 @@ import ( "compress/gzip" "encoding/json" "errors" + "strings" ) //------------------Retry Logic Code------------------------------- @@ -72,8 +73,30 @@ func PrettyPrint(v interface{}) string { return string(data) } +var errNonJsonString = errors.New("non JSON string") + // ParseJson to determine whether a string is valid JSON func ParseJson(s string) (js map[string]interface{}, err error) { - err = json.Unmarshal([]byte(s), &js) + json_string := ExtractJsonString(s) + if json_string == "" { + err = errNonJsonString + return + } + + err = json.Unmarshal([]byte(json_string), &js) return } + +// ExtractJsonString to extract a JSON string from a string +func ExtractJsonString(s string) string { + if s[len(s)-1:] != "}" { + return "" + } + + pos := strings.Index(s, "{") + if pos < 0 { + return "" + } + + return s[pos:] +} From 3e626fa916668b73b2866cf13fa6f1037c9f7cff Mon Sep 17 00:00:00 2001 From: Yohei Kitamura Date: Thu, 17 Aug 2023 17:57:55 -0400 Subject: [PATCH 6/6] fix overwriting --- lambda-extensions/sumoclient/sumoclient.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lambda-extensions/sumoclient/sumoclient.go b/lambda-extensions/sumoclient/sumoclient.go index 03c3d2c..e491b51 100644 --- a/lambda-extensions/sumoclient/sumoclient.go +++ b/lambda-extensions/sumoclient/sumoclient.go @@ -272,7 +272,9 @@ func (s *sumoLogicClient) enhanceLogs(msg responseBody) { if s.config.EnhanceJsonLogs { item["message"] = json } else { - item = json + for key, value := range json { + item[key] = value + } } } } else if ok && logType == "platform.report" {