From 7a7438443346cbe1c7c080012d1cebedbf59b3a8 Mon Sep 17 00:00:00 2001 From: boriwo Date: Mon, 11 Mar 2024 16:25:50 -0700 Subject: [PATCH 1/4] prevent existing tenant from being overwritten by distinguishing between PUT and POST --- internal/pkg/app/handlers_v1.go | 67 +++++++++++++++++++++++++++++---- 1 file changed, 59 insertions(+), 8 deletions(-) diff --git a/internal/pkg/app/handlers_v1.go b/internal/pkg/app/handlers_v1.go index cf4d54c8..2ac26713 100644 --- a/internal/pkg/app/handlers_v1.go +++ b/internal/pkg/app/handlers_v1.go @@ -36,7 +36,7 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/global" "go.opentelemetry.io/otel/trace" - "io/ioutil" + "io" "net/http" "regexp" "strings" @@ -175,12 +175,16 @@ func NewAPIManager(routingMgr tablemgr.RoutingTableManager, tenantStorer tenant. api.muxRouter.HandleFunc("/ears/v1/orgs/{orgId}/applications/{appId}/fragments/{fragmentId}", api.getFragmentHandler).Methods(http.MethodGet) api.muxRouter.HandleFunc("/ears/v1/orgs/{orgId}/applications/{appId}/fragments", api.getAllTenantFragmentsHandler).Methods(http.MethodGet) + // old tenant APIs for backward compatibility + api.muxRouter.HandleFunc("/ears/v1/orgs/{orgId}/applications/{appId}/config", api.getTenantConfigHandler).Methods(http.MethodGet) - api.muxRouter.HandleFunc("/ears/v1/orgs/{orgId}/applications/{appId}/config", api.addTenantConfigHandler).Methods(http.MethodPut) + api.muxRouter.HandleFunc("/ears/v1/orgs/{orgId}/applications/{appId}/config", api.updateTenantConfigHandler).Methods(http.MethodPut) + api.muxRouter.HandleFunc("/ears/v1/orgs/{orgId}/applications/{appId}/config", api.addTenantConfigHandler).Methods(http.MethodPost) api.muxRouter.HandleFunc("/ears/v1/orgs/{orgId}/applications/{appId}/config", api.deleteTenantConfigHandler).Methods(http.MethodDelete) api.muxRouter.HandleFunc("/ears/v1/orgs/{orgId}/applications/{appId}", api.getTenantConfigHandler).Methods(http.MethodGet) - api.muxRouter.HandleFunc("/ears/v1/orgs/{orgId}/applications/{appId}", api.addTenantConfigHandler).Methods(http.MethodPut) + api.muxRouter.HandleFunc("/ears/v1/orgs/{orgId}/applications/{appId}", api.updateTenantConfigHandler).Methods(http.MethodPut) + api.muxRouter.HandleFunc("/ears/v1/orgs/{orgId}/applications/{appId}", api.addTenantConfigHandler).Methods(http.MethodPost) api.muxRouter.HandleFunc("/ears/v1/orgs/{orgId}/applications/{appId}", api.deleteTenantConfigHandler).Methods(http.MethodDelete) api.muxRouter.HandleFunc("/ears/v1/routes", api.getAllRoutesHandler).Methods(http.MethodGet) @@ -327,7 +331,7 @@ func (a *APIManager) webhookHandler(w http.ResponseWriter, r *http.Request) { resp.Respond(ctx, w, doYaml(r)) return } - body, err := ioutil.ReadAll(r.Body) + body, err := io.ReadAll(r.Body) if err != nil { log.Ctx(ctx).Error().Str("op", "webhookHandler").Msg(err.Error()) resp := ErrorResponse(&InternalServerError{err}) @@ -387,7 +391,7 @@ func (a *APIManager) sendEventHandler(w http.ResponseWriter, r *http.Request) { r.Body = http.MaxBytesReader(w, r.Body, maxEventSize) defer r.Body.Close() } - body, err := ioutil.ReadAll(r.Body) + body, err := io.ReadAll(r.Body) if err != nil { log.Ctx(ctx).Error().Str("op", "sendEventHandler").Msg(err.Error()) resp := ErrorResponse(&InternalServerError{err}) @@ -506,7 +510,7 @@ func (a *APIManager) addRouteHandler(w http.ResponseWriter, r *http.Request) { return } routeId := vars["routeId"] - body, err := ioutil.ReadAll(r.Body) + body, err := io.ReadAll(r.Body) if err != nil { log.Ctx(ctx).Error().Str("op", "addRouteHandler").Msg(err.Error()) a.addRouteFailureRecorder.Add(ctx, 1.0) @@ -872,7 +876,7 @@ func (a *APIManager) addFragmentHandler(w http.ResponseWriter, r *http.Request) return } fragmentId := vars["fragmentId"] - body, err := ioutil.ReadAll(r.Body) + body, err := io.ReadAll(r.Body) if err != nil { log.Ctx(ctx).Error().Str("op", "addFragmentHandler").Msg(err.Error()) a.addRouteFailureRecorder.Add(ctx, 1.0) @@ -964,7 +968,15 @@ func (a *APIManager) addTenantConfigHandler(w http.ResponseWriter, r *http.Reque resp.Respond(ctx, w, doYaml(r)) return } - body, err := ioutil.ReadAll(r.Body) + config, err := a.tenantStorer.GetConfig(ctx, *tid) + if err == nil && config != nil { + err = errors.New("cannot update existing tenant " + tid.AppId) + log.Ctx(ctx).Error().Str("op", "addTenantConfigHandler").Str("error", err.Error()).Msg("tenant " + tid.AppId + "already exists") + resp := ErrorResponse(convertToApiError(ctx, err)) + resp.Respond(ctx, w, doYaml(r)) + return + } + body, err := io.ReadAll(r.Body) if err != nil { log.Ctx(ctx).Error().Str("op", "addTenantConfigHandler").Str("error", err.Error()).Msg("error reading request body") resp := ErrorResponse(&InternalServerError{err}) @@ -993,6 +1005,45 @@ func (a *APIManager) addTenantConfigHandler(w http.ResponseWriter, r *http.Reque resp.Respond(ctx, w, doYaml(r)) } +func (a *APIManager) updateTenantConfigHandler(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + vars := mux.Vars(r) + tid, apiErr := getTenant(ctx, vars) + if apiErr != nil { + log.Ctx(ctx).Error().Str("op", "updateTenantConfigHandler").Str("error", apiErr.Error()).Msg("orgId or appId empty") + resp := ErrorResponse(apiErr) + resp.Respond(ctx, w, doYaml(r)) + return + } + body, err := io.ReadAll(r.Body) + if err != nil { + log.Ctx(ctx).Error().Str("op", "updateTenantConfigHandler").Str("error", err.Error()).Msg("error reading request body") + resp := ErrorResponse(&InternalServerError{err}) + resp.Respond(ctx, w, doYaml(r)) + return + } + var tenantConfig tenant.Config + err = yaml.Unmarshal(body, &tenantConfig) + if err != nil { + log.Ctx(ctx).Error().Str("op", "updateTenantConfigHandler").Str("error", err.Error()).Msg("error unmarshal request body") + resp := ErrorResponse(&BadRequestError{"Cannot unmarshal request body", err}) + resp.Respond(ctx, w, doYaml(r)) + return + } + tenantConfig.Tenant = *tid + err = a.tenantStorer.SetConfig(ctx, tenantConfig) + if err != nil { + log.Ctx(ctx).Error().Str("op", "updateTenantConfigHandler").Str("error", err.Error()).Msg("error setting tenant config") + resp := ErrorResponse(convertToApiError(ctx, err)) + resp.Respond(ctx, w, doYaml(r)) + return + } + a.quotaManager.PublishQuota(ctx, *tid) + log.Ctx(ctx).Info().Str("op", "updateTenantConfigHandler").Msg("success") + resp := ItemResponse(tenantConfig) + resp.Respond(ctx, w, doYaml(r)) +} + func (a *APIManager) deleteTenantConfigHandler(w http.ResponseWriter, r *http.Request) { ctx := r.Context() vars := mux.Vars(r) From f3a1760b59310d09eb40e6969090d214625a3310 Mon Sep 17 00:00:00 2001 From: boriwo Date: Tue, 12 Mar 2024 10:48:28 -0700 Subject: [PATCH 2/4] dedicated field to configure client id for credentials api --- internal/pkg/appsecret/config_vault.go | 12 +++++++++--- pkg/tenant/types.go | 5 +++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/internal/pkg/appsecret/config_vault.go b/internal/pkg/appsecret/config_vault.go index 2e6d6bf9..ddc1de79 100644 --- a/internal/pkg/appsecret/config_vault.go +++ b/internal/pkg/appsecret/config_vault.go @@ -14,7 +14,7 @@ import ( "time" ) -//ConfigVault provides secrets from ears app configuration +// ConfigVault provides secrets from ears app configuration type ConfigVault struct { config config.Config } @@ -139,13 +139,19 @@ func (v *TenantConfigVault) getSatBearerToken(ctx context.Context) (string, erro if err != nil { return "", err } - if len(tenantConfig.ClientIds) == 0 { + clientId := "" + if tenantConfig.ClientId != "" { + clientId = tenantConfig.ClientId + } else if len(tenantConfig.ClientIds) > 0 { + clientId = tenantConfig.ClientIds[0] + } + if clientId == "" { return "", errors.New("tenant has no client IDs") } if tenantConfig.ClientSecret == "" { return "", errors.New("tenant has no client secret") } - req.Header.Add("X-Client-Id", tenantConfig.ClientIds[0]) + req.Header.Add("X-Client-Id", clientId) req.Header.Add("X-Client-Secret", tenantConfig.ClientSecret) req.Header.Add("Cache-Control", "no-cache") resp, err := v.httpClient.Do(req) diff --git a/pkg/tenant/types.go b/pkg/tenant/types.go index 7c01e1d0..b30c381e 100644 --- a/pkg/tenant/types.go +++ b/pkg/tenant/types.go @@ -58,8 +58,9 @@ func (id Id) ToString() string { type Config struct { Tenant Id `json:"tenant"` // tenant id Quota Quota `json:"quota"` // tenant quota - ClientIds []string `json:"clientIds,omitempty"` // jwt subjects or client IDs - ClientSecret string `json:"clientSecret"` // client secret for first client id in list, needed to make calls to credentials API + ClientIds []string `json:"clientIds,omitempty"` // list client IDs that can make EARS API calls for this tenant + ClientId string `json:"clientId,omitempty"` // client id used to make calls to credentials API (if blank or omitted, first client ID from ClientIds will be used instead) + ClientSecret string `json:"clientSecret"` // client secret for ClientId or first client id in ClientIds list to make calls to credentials API OpenEventApi bool `json:"openEventApi,omitempty"` // if true, allow unauthenticated calls to the event API for routes under that tenant Modified int64 `json:"modified,omitempty"` // last time when the tenant config is modified } From 51ff74f567b52568bc0dd507f6b2d2712786e6db Mon Sep 17 00:00:00 2001 From: boriwo Date: Tue, 12 Mar 2024 14:39:10 -0700 Subject: [PATCH 3/4] make execution of google profiler conditional --- internal/pkg/app/app.go | 80 +++++++++++++++++++++-------------------- 1 file changed, 42 insertions(+), 38 deletions(-) diff --git a/internal/pkg/app/app.go b/internal/pkg/app/app.go index d3a10a8d..5c388026 100644 --- a/internal/pkg/app/app.go +++ b/internal/pkg/app/app.go @@ -253,44 +253,48 @@ func SetupAPIServer(lifecycle fx.Lifecycle, config config.Config, logger *zerolo Handler: mux, } - // initialize google profiler - profilerConfigMap := make(map[string]string, 0) - profilerConfigMap["type"] = config.GetString("ears.profiler.type") - profilerConfigMap["project_id"] = config.GetString("ears.profiler.project_id") - profilerConfigMap["private_key_id"] = config.GetString("ears.profiler.private_key_id") - profilerConfigMap["private_key"] = config.GetString("ears.profiler.private_key") - profilerConfigMap["client_email"] = config.GetString("ears.profiler.client_email") - profilerConfigMap["client_id"] = config.GetString("ears.profiler.client_id") - profilerConfigMap["auth_uri"] = config.GetString("ears.profiler.auth_uri") - profilerConfigMap["token_uri"] = config.GetString("ears.profiler.token_uri") - profilerConfigMap["auth_provider_x509_cert_url"] = config.GetString("ears.profiler.auth_provider_x509_cert_url") - profilerConfigMap["client_x509_cert_url"] = config.GetString("ears.profiler.client_x509_cert_url") - profilerConfigMap["universe_domain"] = config.GetString("ears.profiler.universe_domain") - buf, err := json.MarshalIndent(profilerConfigMap, "", "\t") - if err != nil { - logger.Error().Str("op", "SetupAPIServer.StartProfiler").Msg(err.Error()) - } - err = os.WriteFile("google_profiler_config.json", buf, 0644) - if err != nil { - logger.Error().Str("op", "SetupAPIServer.StartProfiler").Msg(err.Error()) - } - workingDir, err := os.Getwd() - if err != nil { - logger.Error().Str("op", "SetupAPIServer.StartProfiler").Msg(err.Error()) - } - os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", filepath.Join(workingDir, "google_profiler_config.json")) - taskArn, err := GetTaskArn() - if err != nil { - logger.Error().Str("op", "SetupAPIServer.StartProfiler").Msg(err.Error()) - } - cfg := profiler.Config{ - Service: "ears", - ServiceVersion: app.Version, - ProjectID: "ears-project", - Zone: config.GetString("ears.env") + "-" + taskArn, - } - if err := profiler.Start(cfg); err != nil { - logger.Error().Str("op", "SetupAPIServer.StartProfiler").Msg(err.Error()) + if config.GetString("ears.profiler.active") == "yes" { + // initialize google profiler + profilerConfigMap := make(map[string]string, 0) + profilerConfigMap["type"] = config.GetString("ears.profiler.type") + profilerConfigMap["project_id"] = config.GetString("ears.profiler.project_id") + profilerConfigMap["private_key_id"] = config.GetString("ears.profiler.private_key_id") + profilerConfigMap["private_key"] = config.GetString("ears.profiler.private_key") + profilerConfigMap["client_email"] = config.GetString("ears.profiler.client_email") + profilerConfigMap["client_id"] = config.GetString("ears.profiler.client_id") + profilerConfigMap["auth_uri"] = config.GetString("ears.profiler.auth_uri") + profilerConfigMap["token_uri"] = config.GetString("ears.profiler.token_uri") + profilerConfigMap["auth_provider_x509_cert_url"] = config.GetString("ears.profiler.auth_provider_x509_cert_url") + profilerConfigMap["client_x509_cert_url"] = config.GetString("ears.profiler.client_x509_cert_url") + profilerConfigMap["universe_domain"] = config.GetString("ears.profiler.universe_domain") + buf, err := json.MarshalIndent(profilerConfigMap, "", "\t") + if err != nil { + logger.Error().Str("op", "SetupAPIServer.StartProfiler").Msg(err.Error()) + } + err = os.WriteFile("google_profiler_config.json", buf, 0644) + if err != nil { + logger.Error().Str("op", "SetupAPIServer.StartProfiler").Msg(err.Error()) + } + workingDir, err := os.Getwd() + if err != nil { + logger.Error().Str("op", "SetupAPIServer.StartProfiler").Msg(err.Error()) + } + os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", filepath.Join(workingDir, "google_profiler_config.json")) + taskArn, err := GetTaskArn() + if err != nil { + logger.Error().Str("op", "SetupAPIServer.StartProfiler").Msg(err.Error()) + } + cfg := profiler.Config{ + Service: "ears", + ServiceVersion: app.Version, + ProjectID: "ears-project", + Zone: config.GetString("ears.env") + "-" + taskArn, + } + if err := profiler.Start(cfg); err != nil { + logger.Error().Str("op", "SetupAPIServer.StartProfiler").Msg(err.Error()) + } + } else { + logger.Info().Msg("Google profiler disabled") } // initialize event logger From c9d32ed747f84c71f6f78b5b0a1055213f1d6cd8 Mon Sep 17 00:00:00 2001 From: boriwo Date: Tue, 12 Mar 2024 14:43:11 -0700 Subject: [PATCH 4/4] some clarification around the otel b3 propagator --- internal/pkg/app/middlewares.go | 3 ++ internal/pkg/app/middlewares_test.go | 2 +- .../pkg/tablemgr/routing_table_manager.go | 9 ++-- pkg/app/types.go | 20 ++++++++ pkg/event/event.go | 49 ++++++++++++------- pkg/filter/ws/webservice.go | 3 +- 6 files changed, 60 insertions(+), 26 deletions(-) create mode 100644 pkg/app/types.go diff --git a/internal/pkg/app/middlewares.go b/internal/pkg/app/middlewares.go index b8945088..438f58c7 100644 --- a/internal/pkg/app/middlewares.go +++ b/internal/pkg/app/middlewares.go @@ -38,7 +38,10 @@ var eventUrlValidator = regexp.MustCompile(`^\/ears\/v1\/orgs\/[a-zA-Z0-9][a-zA- func NewMiddleware(logger *zerolog.Logger, jwtManager jwt.JWTConsumer) []func(next http.Handler) http.Handler { middlewareLogger = logger jwtMgr = jwtManager + // for this extraction to work, X-B3-TraceId header must be present and exactly 32 byte hex and X-B3-SpanId + // must be present and exactly 16 byte hex otelMiddleware := otelmux.Middleware("ears", otelmux.WithPropagators(b3.New())) + //otelMiddleware := otelmux.Middleware("ears", otelmux.WithPropagators(otel.GetTextMapPropagator())) return []func(next http.Handler) http.Handler{ authenticateMiddleware, otelMiddleware, diff --git a/internal/pkg/app/middlewares_test.go b/internal/pkg/app/middlewares_test.go index e7eda274..061aaf42 100644 --- a/internal/pkg/app/middlewares_test.go +++ b/internal/pkg/app/middlewares_test.go @@ -87,7 +87,7 @@ func TestAuthMiddleware(t *testing.T) { w := httptest.NewRecorder() r := httptest.NewRequest(http.MethodGet, "/", nil) - r.Header.Set("X-B3-TraceId", "123456") + r.Header.Set(HeaderTraceId, "123456") m.ServeHTTP(w, r.WithContext(subCtx)) } diff --git a/internal/pkg/tablemgr/routing_table_manager.go b/internal/pkg/tablemgr/routing_table_manager.go index 0ccdab17..03010b17 100644 --- a/internal/pkg/tablemgr/routing_table_manager.go +++ b/internal/pkg/tablemgr/routing_table_manager.go @@ -227,9 +227,8 @@ func (r *DefaultRoutingTableManager) RouteEvent(ctx context.Context, tid tenant. } var wg sync.WaitGroup wg.Add(1) - //sctx, cancel := context.WithTimeout(ctx, 5*time.Second) - //sctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) // no need to cancel context here because RouteEvent is only used synchronously via API call + //sctx, cancel := context.WithTimeout(ctx, 5*time.Second) e, err := event.New(ctx, payload, event.WithAck( func(evt event.Event) { r.logger.Info().Str("op", "routeEventWebhook").Str("tid", tid.ToString()).Msg("success") @@ -246,10 +245,10 @@ func (r *DefaultRoutingTableManager) RouteEvent(ctx context.Context, tid tenant. event.WithTracePayloadOnNack(false), ) if err != nil { - return "", errors.New("bad test event for route " + routeId) + return "", errors.New("invalid event for route " + routeId) } - traceId, _, _ := e.GetPathValue("trace.id") - traceIdStr, _ := traceId.(string) + actualTraceId, _, _ := e.GetPathValue("trace.id") + traceIdStr, _ := actualTraceId.(string) lrw.Receiver.Trigger(e) wg.Wait() return traceIdStr, nil diff --git a/pkg/app/types.go b/pkg/app/types.go new file mode 100644 index 00000000..26a6892a --- /dev/null +++ b/pkg/app/types.go @@ -0,0 +1,20 @@ +// Copyright 2021 Comcast Cable Communications Management, LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +const ( + HeaderTraceId = "X-B3-TraceId" + HeaderTenantId = "Application-Id" +) diff --git a/pkg/event/event.go b/pkg/event/event.go index 91e9dc68..97527e4d 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -52,6 +52,7 @@ type event struct { span trace.Span //Only valid in the root event tracePayloadOnNack bool tracePayload interface{} + traceId string created time.Time deepcopied bool } @@ -72,7 +73,7 @@ func GetEventLogger() *zerolog.Logger { return nil } -//Create a new event given a context, a payload, and other event options +// Create a new event given a context, a payload, and other event options func New(ctx context.Context, payload interface{}, options ...EventOption) (Event, error) { e := &event{ payload: payload, @@ -88,8 +89,6 @@ func New(ctx context.Context, payload interface{}, options ...EventOption) (Even return nil, err } } - traceId := uuid.New().String() - // enable otel tracing if e.spanName != "" { tracer := otel.Tracer(rtsemconv.EARSTracerName) @@ -97,32 +96,37 @@ func New(ctx context.Context, payload interface{}, options ...EventOption) (Even ctx, span = tracer.Start(ctx, e.spanName) span.SetAttributes(rtsemconv.EARSEventTrace) span.SetAttributes(rtsemconv.EARSOrgId.String(e.tid.OrgId), rtsemconv.EARSAppId.String(e.tid.AppId)) - traceId = span.SpanContext().TraceID().String() - span.SetAttributes(rtsemconv.EARSTraceId.String(traceId)) + if e.traceId == "" { + e.traceId = span.SpanContext().TraceID().String() + } + span.SetAttributes(rtsemconv.EARSTraceId.String(e.traceId)) e.span = span } - + // last resort, make up trace id here + if e.traceId == "" { + e.traceId = uuid.New().String() + } // setting up logger for the event parentLogger, ok := logger.Load().(*zerolog.Logger) if ok { ctx = logs.SubLoggerCtx(ctx, parentLogger) - logs.StrToLogCtx(ctx, rtsemconv.EarsLogTraceIdKey, traceId) + logs.StrToLogCtx(ctx, rtsemconv.EarsLogTraceIdKey, e.traceId) logs.StrToLogCtx(ctx, rtsemconv.EarsLogTenantIdKey, e.tid.ToString()) } e.SetContext(ctx) return e, nil } -//event acknowledge option with two completion functions, -//handledFn and errFn. An event with acknowledgement option will be notified through -//the handledFn when an event is handled, or through the errFn when there is an -//error handling it. -//An event is considered handled when it and all its child events (derived from the -//Clone function) have called the Ack function. -//An event is considered to have an error if it or any of its child events (derived from -//the Clone function) has called the Nack function. -//An event can also error out if it does not receive all the acknowledgements before -//the context timeout/cancellation. +// event acknowledge option with two completion functions, +// handledFn and errFn. An event with acknowledgement option will be notified through +// the handledFn when an event is handled, or through the errFn when there is an +// error handling it. +// An event is considered handled when it and all its child events (derived from the +// Clone function) have called the Ack function. +// An event is considered to have an error if it or any of its child events (derived from +// the Clone function) has called the Nack function. +// An event can also error out if it does not receive all the acknowledgements before +// the context timeout/cancellation. func WithAck(handledFn func(Event), errFn func(Event, error)) EventOption { return func(e *event) error { if handledFn == nil || errFn == nil { @@ -159,7 +163,6 @@ func WithTracePayloadOnNack(tracePayloadOnNack bool) EventOption { e.tracePayloadOnNack = tracePayloadOnNack if tracePayloadOnNack { e.tracePayload = deepcopy.DeepCopy(e.payload) - } return nil } @@ -172,6 +175,13 @@ func WithId(eid string) EventOption { } } +func WithTraceId(traceId string) EventOption { + return func(e *event) error { + e.traceId = traceId + return nil + } +} + func WithMetadata(metadata map[string]interface{}) EventOption { return func(e *event) error { e.SetMetadata(metadata) @@ -196,7 +206,7 @@ func WithTenant(tid tenant.Id) EventOption { } } -//WithOtelTracing enables opentelemtry tracing for the event +// WithOtelTracing enables opentelemtry tracing for the event func WithOtelTracing(spanName string) EventOption { return func(e *event) error { e.spanName = spanName @@ -583,6 +593,7 @@ func (e *event) Clone(ctx context.Context) (Event, error) { eid: e.eid, tid: e.tid, created: e.created, + traceId: e.traceId, }, nil } diff --git a/pkg/filter/ws/webservice.go b/pkg/filter/ws/webservice.go index 613c626c..2a77eab3 100644 --- a/pkg/filter/ws/webservice.go +++ b/pkg/filter/ws/webservice.go @@ -23,6 +23,7 @@ import ( "github.com/boriwo/deepcopy" "github.com/rs/zerolog/log" "github.com/xmidt-org/ears/internal/pkg/syncer" + "github.com/xmidt-org/ears/pkg/app" "github.com/xmidt-org/ears/pkg/event" "github.com/xmidt-org/ears/pkg/filter" "github.com/xmidt-org/ears/pkg/hasher" @@ -255,7 +256,7 @@ func (f *Filter) hitEndpoint(ctx context.Context, evt event.Event) (string, int, req.Header.Set("Content-Type", "application/json") req.Header.Set("User-Agent", "ears") // add trace header to outbound call - traceHeader := "X-B3-TraceId" + traceHeader := app.HeaderTraceId traceId := trace.SpanFromContext(ctx).SpanContext().TraceID().String() req.Header.Set(traceHeader, traceId) // add supplied headers