diff --git a/collector/internal/telemetryapi/types.go b/collector/internal/telemetryapi/types.go index 151589d338..1cd4583c9b 100644 --- a/collector/internal/telemetryapi/types.go +++ b/collector/internal/telemetryapi/types.go @@ -24,6 +24,8 @@ const ( PlatformInitStart EventType = Platform + ".initStart" // PlatformInitRuntimeDone is used when function initialization ended. PlatformInitRuntimeDone EventType = Platform + ".initRuntimeDone" + // PlatformReport is used when a report of function invocation is received. + PlatformReport EventType = Platform + ".report" // Function invocation started. PlatformStart EventType = Platform + ".start" // The runtime finished processing an event with either success or failure. @@ -95,3 +97,15 @@ type Event struct { Type string `json:"type"` Record map[string]any `json:"record"` } + +// MetricType represents the type of metric in the platform.report event +// see https://docs.aws.amazon.com/lambda/latest/dg/telemetry-schema-reference.html#ReportMetrics +type MetricType string + +const ( + MetricBilledDurationMs MetricType = "billedDurationMs" + MetricDurationMs MetricType = "durationMs" + MetricMaxMemoryUsedMB MetricType = "maxMemoryUsedMB" + MetricMemorySizeMB MetricType = "memorySizeMB" + MetricInitDurationMs MetricType = "initDurationMs" +) diff --git a/collector/receiver/telemetryapireceiver/config.go b/collector/receiver/telemetryapireceiver/config.go index b51ef1ed57..246b8dde39 100644 --- a/collector/receiver/telemetryapireceiver/config.go +++ b/collector/receiver/telemetryapireceiver/config.go @@ -23,6 +23,7 @@ type Config struct { extensionID string Port int `mapstructure:"port"` Types []string `mapstructure:"types"` + LogReport bool `mapstructure:"log_report"` } // Validate validates the configuration by checking for missing or invalid fields diff --git a/collector/receiver/telemetryapireceiver/receiver.go b/collector/receiver/telemetryapireceiver/receiver.go index b8631809d0..773a335b20 100644 --- a/collector/receiver/telemetryapireceiver/receiver.go +++ b/collector/receiver/telemetryapireceiver/receiver.go @@ -41,8 +41,11 @@ import ( "github.com/open-telemetry/opentelemetry-lambda/collector/internal/telemetryapi" ) -const initialQueueSize = 5 -const scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" +const ( + initialQueueSize = 5 + scopeName = "github.com/open-telemetry/opentelemetry-lambda/collector/receiver/telemetryapi" + logReportFmt = "REPORT RequestId: %s Duration: %.2f ms Billed Duration: %.0f ms Memory Size: %.0f MB Max Memory Used: %.0f MB" +) type telemetryAPIReceiver struct { httpServer *http.Server @@ -57,6 +60,7 @@ type telemetryAPIReceiver struct { types []telemetryapi.EventType resource pcommon.Resource currentFaasInvocationID string + logReport bool } func (r *telemetryAPIReceiver) Start(ctx context.Context, host component.Host) error { @@ -242,12 +246,81 @@ func (r *telemetryAPIReceiver) createLogs(slice []event) (plog.Logs, error) { } } else if el.Type == string(telemetryapi.PlatformRuntimeDone) { r.currentFaasInvocationID = "" + } else if el.Type == string(telemetryapi.PlatformReport) && r.logReport { + if record, ok := el.Record.(map[string]interface{}); ok { + if logRecord := createReportLogRecord(&scopeLog, record); logRecord != nil { + logRecord.Attributes().PutStr("type", el.Type) + if t, err := time.Parse(time.RFC3339, el.Time); err == nil { + logRecord.SetTimestamp(pcommon.NewTimestampFromTime(t)) + logRecord.SetObservedTimestamp(pcommon.NewTimestampFromTime(time.Now())) + } else { + continue + } + } + } } } } return log, nil } +// createReportLogRecord creates a log record for the platform.report event +// returns the log record if successful, otherwise nil +func createReportLogRecord(scopeLog *plog.ScopeLogs, record map[string]interface{}) *plog.LogRecord { + // gathering metrics + metrics, ok := record["metrics"].(map[string]interface{}) + if !ok { + return nil + } + var durationMs, billedDurationMs, memorySizeMB, maxMemoryUsedMB float64 + if durationMs, ok = metrics[string(telemetryapi.MetricDurationMs)].(float64); !ok { + return nil + } + if billedDurationMs, ok = metrics[string(telemetryapi.MetricBilledDurationMs)].(float64); !ok { + return nil + } + if memorySizeMB, ok = metrics[string(telemetryapi.MetricMemorySizeMB)].(float64); !ok { + return nil + } + if maxMemoryUsedMB, ok = metrics[string(telemetryapi.MetricMaxMemoryUsedMB)].(float64); !ok { + return nil + } + + // optionally gather information about cold start time + var initDurationMs float64 + if initDurationMsVal, exists := metrics[string(telemetryapi.MetricInitDurationMs)]; exists { + if val, ok := initDurationMsVal.(float64); ok { + initDurationMs = val + } + } + + // gathering requestId + requestId := "" + if requestId, ok = record["requestId"].(string); !ok { + return nil + } + + // we have all information available, we can create the log record + logRecord := scopeLog.LogRecords().AppendEmpty() + logRecord.Attributes().PutStr(semconv.AttributeFaaSInvocationID, requestId) + + // building the body of the log record, optionally adding the init duration + body := fmt.Sprintf( + logReportFmt, + requestId, + durationMs, + billedDurationMs, + memorySizeMB, + maxMemoryUsedMB, + ) + if initDurationMs > 0 { + body += fmt.Sprintf(" Init Duration: %.2f ms", initDurationMs) + } + logRecord.Body().SetStr(body) + + return &logRecord +} + func severityTextToNumber(severityText string) plog.SeverityNumber { mapping := map[string]plog.SeverityNumber{ "TRACE": plog.SeverityNumberTrace, @@ -366,6 +439,7 @@ func newTelemetryAPIReceiver( port: cfg.Port, types: subscribedTypes, resource: r, + logReport: cfg.LogReport, }, nil } diff --git a/collector/receiver/telemetryapireceiver/receiver_test.go b/collector/receiver/telemetryapireceiver/receiver_test.go index 0a63f14e53..346ec2fc2b 100644 --- a/collector/receiver/telemetryapireceiver/receiver_test.go +++ b/collector/receiver/telemetryapireceiver/receiver_test.go @@ -516,6 +516,244 @@ func TestCreateLogs(t *testing.T) { } } +func TestCreateLogsWithLogReport(t *testing.T) { + t.Parallel() + + testCases := []struct { + desc string + slice []event + logReport bool + expectedLogRecords int + expectedType string + expectedTimestamp string + expectedBody string + expectedAttributes map[string]interface{} + expectError bool + }{ + { + desc: "platform.report with logReport enabled - valid metrics", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "platform.report", + Record: map[string]any{ + "requestId": "test-request-id-123", + "metrics": map[string]any{ + "durationMs": 123.45, + "billedDurationMs": float64(124), + "memorySizeMB": float64(512), + "maxMemoryUsedMB": float64(256), + }, + }, + }, + }, + logReport: true, + expectedLogRecords: 1, + expectedType: "platform.report", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "REPORT RequestId: test-request-id-123 Duration: 123.45 ms Billed Duration: 124 ms Memory Size: 512 MB Max Memory Used: 256 MB", + expectError: false, + }, + { + desc: "platform.report with logReport disabled", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "platform.report", + Record: map[string]any{ + "requestId": "test-request-id-123", + "metrics": map[string]any{ + "durationMs": 123.45, + "billedDurationMs": 124, + "memorySizeMB": 512, + "maxMemoryUsedMB": 256, + }, + }, + }, + }, + logReport: false, + expectedLogRecords: 0, + expectError: false, + }, + { + desc: "platform.report with logReport enabled - missing requestId", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "platform.report", + Record: map[string]any{ + "metrics": map[string]any{ + "durationMs": 123.45, + "billedDurationMs": 124, + "memorySizeMB": 512, + "maxMemoryUsedMB": 256, + }, + }, + }, + }, + logReport: false, + expectedLogRecords: 0, + expectError: false, + }, + { + desc: "platform.report with logReport enabled - invalid timestamp", + slice: []event{ + { + Time: "invalid-timestamp", + Type: "platform.report", + Record: map[string]any{ + "requestId": "test-request-id-123", + "metrics": map[string]any{ + "durationMs": 123.45, + "billedDurationMs": 124, + "memorySizeMB": 512, + "maxMemoryUsedMB": 256, + }, + }, + }, + }, + logReport: false, + expectedLogRecords: 0, + expectError: false, + }, + { + desc: "platform.report with logReport enabled - missing metrics", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "platform.report", + Record: map[string]any{ + "requestId": "test-request-id-123", + }, + }, + }, + logReport: false, + expectedLogRecords: 0, + expectError: false, + }, + { + desc: "platform.report with logReport enabled - invalid metrics format", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "platform.report", + Record: map[string]any{ + "requestId": "test-request-id-123", + "metrics": map[string]any{ + "durationMs": "invalid", + "billedDurationMs": 124, + "memorySizeMB": 512, + "maxMemoryUsedMB": 256, + }, + }, + }, + }, + logReport: false, + expectedLogRecords: 0, + expectError: false, + }, + { + desc: "platform.report with logReport enabled - record not a map", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "platform.report", + Record: "invalid record format", + }, + }, + logReport: true, + expectedLogRecords: 0, + expectError: false, + }, + { + desc: "platform.report with logReport enabled - with initDurationMs", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "platform.report", + Record: map[string]any{ + "requestId": "test-request-id-123", + "metrics": map[string]any{ + "durationMs": 123.45, + "billedDurationMs": 124.0, + "memorySizeMB": 512.0, + "maxMemoryUsedMB": 256.0, + "initDurationMs": 50.5, + }, + }, + }, + }, + logReport: true, + expectedLogRecords: 1, + expectedType: "platform.report", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "REPORT RequestId: test-request-id-123 Duration: 123.45 ms Billed Duration: 124 ms Memory Size: 512 MB Max Memory Used: 256 MB Init Duration: 50.50 ms", + expectError: false, + }, + { + desc: "platform.report with logReport enabled - with invalid initDurationMs type", + slice: []event{ + { + Time: "2022-10-12T00:03:50.000Z", + Type: "platform.report", + Record: map[string]any{ + "requestId": "test-request-id-123", + "metrics": map[string]any{ + "durationMs": 123.45, + "billedDurationMs": 124.0, + "memorySizeMB": 512.0, + "maxMemoryUsedMB": 256.0, + "initDurationMs": "invalid-string", + }, + }, + }, + }, + logReport: true, + expectedLogRecords: 1, + expectedType: "platform.report", + expectedTimestamp: "2022-10-12T00:03:50.000Z", + expectedBody: "REPORT RequestId: test-request-id-123 Duration: 123.45 ms Billed Duration: 124 ms Memory Size: 512 MB Max Memory Used: 256 MB", + expectError: false, + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + r, err := newTelemetryAPIReceiver( + &Config{LogReport: tc.logReport}, + receivertest.NewNopSettings(Type), + ) + require.NoError(t, err) + log, err := r.createLogs(tc.slice) + if tc.expectError { + require.Error(t, err) + } else { + require.NoError(t, err) + require.Equal(t, 1, log.ResourceLogs().Len()) + resourceLog := log.ResourceLogs().At(0) + require.Equal(t, 1, resourceLog.ScopeLogs().Len()) + scopeLog := resourceLog.ScopeLogs().At(0) + require.Equal(t, scopeName, scopeLog.Scope().Name()) + require.Equal(t, tc.expectedLogRecords, scopeLog.LogRecords().Len()) + if scopeLog.LogRecords().Len() > 0 { + logRecord := scopeLog.LogRecords().At(0) + attr, ok := logRecord.Attributes().Get("type") + require.True(t, ok) + require.Equal(t, tc.expectedType, attr.Str()) + if tc.expectedTimestamp != "" { + expectedTime, err := time.Parse(time.RFC3339, tc.expectedTimestamp) + require.NoError(t, err) + require.Equal(t, pcommon.NewTimestampFromTime(expectedTime), logRecord.Timestamp()) + } else { + // For invalid timestamps, no timestamp should be set (zero value) + require.Equal(t, pcommon.Timestamp(0), logRecord.Timestamp()) + } + require.Equal(t, tc.expectedBody, logRecord.Body().Str()) + } + } + }) + } +} + func TestSeverityTextToNumber(t *testing.T) { t.Parallel()