From c8c6c62449a033457bc544c405d3d5da62878458 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Thu, 22 Feb 2024 09:16:07 -0700 Subject: [PATCH] Added sampling for account's trace destination If an account has a trace destination and an incoming message has the `traceparent` header with the proper sampled flag, a trace message would be triggered. The `sampling` field allows to trace a certain percentage of the traffic. The field `trace_dest` or now `msg_trace` can be a simple string representing the destination, and in this case sampling is 100% or it can be a structure with the `dest` and `sampling` fields. Sampling values that are negative or above 100 will trigger an error on configuration parsing. A value of 0 is converted to 100. If the sampling is specified without an account trace destination, it is set to 0 and a warning is issued when parsing configuration. There is similar support for the property set in a JWT account claim. Relates to #5014 Signed-off-by: Ivan Kozlovic --- go.mod | 2 +- go.sum | 4 +- server/accounts.go | 28 +++-- server/accounts_test.go | 9 +- server/client.go | 4 +- server/config_check_test.go | 90 ++++++++++++++- server/msgtrace.go | 24 +++- server/msgtrace_test.go | 213 ++++++++++++++++++++++++++++++++++++ server/opts.go | 82 +++++++++++++- server/stream.go | 2 +- 10 files changed, 432 insertions(+), 26 deletions(-) diff --git a/go.mod b/go.mod index d4e8dc7a04..74c401baa2 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.20 require ( github.com/klauspost/compress v1.17.6 github.com/minio/highwayhash v1.0.2 - github.com/nats-io/jwt/v2 v2.5.4 + github.com/nats-io/jwt/v2 v2.5.5 github.com/nats-io/nats.go v1.33.1 github.com/nats-io/nkeys v0.4.7 github.com/nats-io/nuid v1.0.1 diff --git a/go.sum b/go.sum index 78052a45b6..a90c64d723 100644 --- a/go.sum +++ b/go.sum @@ -3,8 +3,8 @@ github.com/klauspost/compress v1.17.6 h1:60eq2E/jlfwQXtvZEeBUYADs+BwKBWURIY+Gj2e github.com/klauspost/compress v1.17.6/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= -github.com/nats-io/jwt/v2 v2.5.4 h1:Bz+drKl2GbE30fxTOtb0NYl1BQ5RwZ+Zcqkg3mR5bbI= -github.com/nats-io/jwt/v2 v2.5.4/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= +github.com/nats-io/jwt/v2 v2.5.5 h1:ROfXb50elFq5c9+1ztaUbdlrArNFl2+fQWP6B8HGEq4= +github.com/nats-io/jwt/v2 v2.5.5/go.mod h1:ZdWS1nZa6WMZfFwwgpEaqBV8EPGVgOTDHN/wTbz0Y5A= github.com/nats-io/nats.go v1.33.1 h1:8TxLZZ/seeEfR97qV0/Bl939tpDnt2Z2fK3HkPypj70= github.com/nats-io/nats.go v1.33.1/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= diff --git a/server/accounts.go b/server/accounts.go index 754aa5bbfd..4600dcc306 100644 --- a/server/accounts.go +++ b/server/accounts.go @@ -96,7 +96,12 @@ type Account struct { nameTag string lastLimErr int64 routePoolIdx int - traceDest string + // If the trace destination is specified and a message with a traceParentHdr + // is received, and has the least significant bit of the last token set to 1, + // then if traceDestSampling is > 0 and < 100, a random value will be selected + // and if it falls between 0 and that value, message tracing will be triggered. + traceDest string + traceDestSampling int } const ( @@ -263,11 +268,12 @@ func (a *Account) setTraceDest(dest string) { a.mu.Unlock() } -func (a *Account) getTraceDest() string { +func (a *Account) getTraceDestAndSampling() (string, int) { a.mu.RLock() dest := a.traceDest + sampling := a.traceDestSampling a.mu.RUnlock() - return dest + return dest, sampling } // Used to create shallow copies of accounts for transfer @@ -278,7 +284,7 @@ func (a *Account) getTraceDest() string { func (a *Account) shallowCopy(na *Account) { na.Nkey = a.Nkey na.Issuer = a.Issuer - na.traceDest = a.traceDest + na.traceDest, na.traceDestSampling = a.traceDest, a.traceDestSampling if a.imports.streams != nil { na.imports.streams = make([]*streamImport, 0, len(a.imports.streams)) @@ -3239,12 +3245,18 @@ func (s *Server) updateAccountClaimsWithRefresh(a *Account, ac *jwt.AccountClaim a.nameTag = ac.Name a.tags = ac.Tags + var td string + var tds int if ac.Trace != nil { - // Update TraceDest - a.traceDest = string(ac.Trace.Destination) - } else { - a.traceDest = _EMPTY_ + // Update trace destination and sampling + td, tds = string(ac.Trace.Destination), ac.Trace.Sampling + if !IsValidPublishSubject(td) { + td, tds = _EMPTY_, 0 + } else if tds <= 0 || tds > 100 { + tds = 100 + } } + a.traceDest, a.traceDestSampling = td, tds // Check for external authorization. if ac.HasExternalAuthorization() { diff --git a/server/accounts_test.go b/server/accounts_test.go index 0f8c403ec3..eca0374a78 100644 --- a/server/accounts_test.go +++ b/server/accounts_test.go @@ -510,6 +510,7 @@ func TestAccountSimpleConfig(t *testing.T) { func TestAccountParseConfig(t *testing.T) { traceDest := "my.trace.dest" + traceSampling := 50 confFileName := createConfFile(t, []byte(fmt.Sprintf(` accounts { synadia { @@ -519,14 +520,14 @@ func TestAccountParseConfig(t *testing.T) { ] } nats.io { - trace_dest: %q + msg_trace: {dest: %q, sampling: %d%%} users = [ {user: derek, password: foo} {user: ivan, password: bar} ] } } - `, traceDest))) + `, traceDest, traceSampling))) opts, err := ProcessConfigFile(confFileName) if err != nil { t.Fatalf("Received an error processing config file: %v", err) @@ -550,7 +551,9 @@ func TestAccountParseConfig(t *testing.T) { if natsAcc == nil { t.Fatalf("Error retrieving account for 'nats.io'") } - require_Equal[string](t, natsAcc.getTraceDest(), traceDest) + td, tds := natsAcc.getTraceDestAndSampling() + require_Equal[string](t, td, traceDest) + require_Equal[int](t, tds, traceSampling) for _, u := range opts.Users { if u.Username == "derek" { diff --git a/server/client.go b/server/client.go index 09bacceaad..81b74fa7fb 100644 --- a/server/client.go +++ b/server/client.go @@ -4356,8 +4356,8 @@ func (c *client) processServiceImport(si *serviceImport, acc *Account, msg []byt // We also need to disable the message trace headers so that // if the message is routed, it does not initialize tracing in the // remote. - positions := mt.disableTraceHeaders(c, msg) - defer mt.enableTraceHeaders(c, msg, positions) + positions := disableTraceHeaders(c, msg) + defer enableTraceHeaders(c, msg, positions) } } } diff --git a/server/config_check_test.go b/server/config_check_test.go index 23d3b27e1c..aefebc74a7 100644 --- a/server/config_check_test.go +++ b/server/config_check_test.go @@ -920,7 +920,7 @@ func TestConfigCheck(t *testing.T) { A { trace_dest: 123 } } `, - err: errors.New(`interface conversion: interface {} is int64, not string`), + err: errors.New(`Expected account message trace "trace_dest" to be a string or a map/struct, got int64`), errorLine: 3, errorPos: 23, }, @@ -946,6 +946,94 @@ func TestConfigCheck(t *testing.T) { errorLine: 3, errorPos: 23, }, + { + name: "when account message trace dest is wrong type", + config: ` + accounts { + A { msg_trace: {dest: 123} } + } + `, + err: errors.New(`Field "dest" should be a string, got int64`), + errorLine: 3, + errorPos: 35, + }, + { + name: "when account message trace dest is invalid", + config: ` + accounts { + A { msg_trace: {dest: "invalid..dest"} } + } + `, + err: errors.New(`Trace destination "invalid..dest" is not valid`), + errorLine: 3, + errorPos: 35, + }, + { + name: "when account message trace sampling is wrong type", + config: ` + accounts { + A { msg_trace: {dest: "acc.dest", sampling: {wront: "type"}} } + } + `, + err: errors.New(`Trace destination sampling field "sampling" should be an integer or a percentage, got map[string]interface {}`), + errorLine: 3, + errorPos: 53, + }, + { + name: "when account message trace sampling is wrong string", + config: ` + accounts { + A { msg_trace: {dest: "acc.dest", sampling: abc%} } + } + `, + err: errors.New(`Invalid trace destination sampling value "abc%"`), + errorLine: 3, + errorPos: 53, + }, + { + name: "when account message trace sampling is negative", + config: ` + accounts { + A { msg_trace: {dest: "acc.dest", sampling: -1} } + } + `, + err: errors.New(`Ttrace destination sampling value -1 is invalid, needs to be [1..100]`), + errorLine: 3, + errorPos: 53, + }, + { + name: "when account message trace sampling is zero", + config: ` + accounts { + A { msg_trace: {dest: "acc.dest", sampling: 0} } + } + `, + err: errors.New(`Ttrace destination sampling value 0 is invalid, needs to be [1..100]`), + errorLine: 3, + errorPos: 53, + }, + { + name: "when account message trace sampling is more than 100", + config: ` + accounts { + A { msg_trace: {dest: "acc.dest", sampling: 101} } + } + `, + err: errors.New(`Ttrace destination sampling value 101 is invalid, needs to be [1..100]`), + errorLine: 3, + errorPos: 53, + }, + { + name: "when account message trace has unknown field", + config: ` + accounts { + A { msg_trace: {wrong: "field"} } + } + `, + err: errors.New(`Unknown field "wrong" parsing account message trace map/struct "msg_trace"`), + errorLine: 3, + errorPos: 35, + }, { name: "when user authorization config has both token and users", config: ` diff --git a/server/msgtrace.go b/server/msgtrace.go index 94a27ace95..d538212947 100644 --- a/server/msgtrace.go +++ b/server/msgtrace.go @@ -17,6 +17,7 @@ import ( "bytes" "encoding/json" "fmt" + "math/rand" "strconv" "strings" "sync/atomic" @@ -435,13 +436,21 @@ func (c *client) initMsgTrace() *msgTrace { // If external, we need to have the account's trace destination set, // otherwise, we are not enabling tracing. if external { + var sampling int if acc != nil { - dest = acc.getTraceDest() + dest, sampling = acc.getTraceDestAndSampling() } if dest == _EMPTY_ { // No account destination, no tracing for external trace headers. return nil } + // Check sampling, but only from origin server. + if c.kind == CLIENT && !sample(sampling) { + // Need to desactivate the traceParentHdr so that if the message + // is routed, it does possibly trigger a trace there. + disableTraceHeaders(c, hdr) + return nil + } } c.pa.trace = &msgTrace{ srv: c.srv, @@ -472,6 +481,15 @@ func (c *client) initMsgTrace() *msgTrace { return c.pa.trace } +func sample(sampling int) bool { + // Option parsing should ensure that sampling is [1..100], but consider + // any value outside of this range to be 100%. + if sampling <= 0 || sampling >= 100 { + return true + } + return rand.Int31n(100) <= int32(sampling) +} + // This function will return the header as a map (instead of http.Header because // we want to preserve the header names' case) and a boolean that indicates if // the headers have been lifted due to the presence of the external trace header @@ -637,7 +655,7 @@ func (t *msgTrace) setHopHeader(c *client, msg []byte) []byte { // Note that if `msg` can be either the header alone or the full message // (header and payload). This function will use c.pa.hdr to limit the // search to the header section alone. -func (t *msgTrace) disableTraceHeaders(c *client, msg []byte) []int { +func disableTraceHeaders(c *client, msg []byte) []int { // Code largely copied from getHeader(), except that we don't need the value if c.pa.hdr <= 0 { return []int{-1, -1} @@ -672,7 +690,7 @@ func (t *msgTrace) disableTraceHeaders(c *client, msg []byte) []int { // Changes back the character at the given position `pos` in the `msg` // byte slice to the first character of the MsgTraceSendTo header. -func (t *msgTrace) enableTraceHeaders(c *client, msg []byte, positions []int) { +func enableTraceHeaders(c *client, msg []byte, positions []int) { firstChar := [2]byte{MsgTraceDest[0], traceParentHdr[0]} for i, pos := range positions { if pos == -1 { diff --git a/server/msgtrace_test.go b/server/msgtrace_test.go index 6550f7e1f5..a5d001e5ed 100644 --- a/server/msgtrace_test.go +++ b/server/msgtrace_test.go @@ -4898,3 +4898,216 @@ func TestMsgTraceStreamJWTUpdate(t *testing.T) { }) } } + +func TestMsgTraceParseAccountDestWithSampling(t *testing.T) { + tmpl := ` + port: -1 + accounts { + A { + users: [{user: a, password: pwd}] + %s + } + } + ` + for _, test := range []struct { + name string + samplingStr string + want int + }{ + {"trace sampling no dest", `msg_trace: {sampling: 50}`, 0}, + {"trace dest only", `msg_trace: {dest: foo}`, 100}, + {"trace dest with number only", `msg_trace: {dest: foo, sampling: 20}`, 20}, + {"trace dest with percentage", `msg_trace: {dest: foo, sampling: 50%}`, 50}, + } { + t.Run(test.name, func(t *testing.T) { + conf := createConfFile(t, []byte(fmt.Sprintf(tmpl, test.samplingStr))) + o := LoadConfig(conf) + _, sampling := o.Accounts[0].getTraceDestAndSampling() + require_Equal[int](t, test.want, sampling) + }) + } +} + +func TestMsgTraceAccountDestWithSampling(t *testing.T) { + tmpl := ` + port: -1 + server_name: %s + accounts { + A { + users: [{user: a, password:pwd}] + msg_trace: {dest: "acc.dest"%s} + } + } + cluster { + port: -1 + %s + } + ` + conf1 := createConfFile(t, []byte(fmt.Sprintf(tmpl, "A", _EMPTY_, _EMPTY_))) + s1, o1 := RunServerWithConfig(conf1) + defer s1.Shutdown() + + routes := fmt.Sprintf("routes: [\"nats://127.0.0.1:%d\"]", o1.Cluster.Port) + conf2 := createConfFile(t, []byte(fmt.Sprintf(tmpl, "B", _EMPTY_, routes))) + s2, _ := RunServerWithConfig(conf2) + defer s2.Shutdown() + + checkClusterFormed(t, s1, s2) + + nc2 := natsConnect(t, s2.ClientURL(), nats.UserInfo("a", "pwd")) + defer nc2.Close() + natsSub(t, nc2, "foo", func(_ *nats.Msg) {}) + natsFlush(t, nc2) + + nc1 := natsConnect(t, s1.ClientURL(), nats.UserInfo("a", "pwd")) + defer nc1.Close() + sub := natsSubSync(t, nc1, "acc.dest") + natsFlush(t, nc1) + + checkSubInterest(t, s1, "A", "foo", time.Second) + checkSubInterest(t, s2, "A", "acc.dest", time.Second) + + for iter, test := range []struct { + name string + samplingStr string + sampling int + }{ + // Sampling is considered 100% if not specified or <=0 or >= 100. + // To disable sampling, the account destination should not be specified. + {"no sampling specified", _EMPTY_, 100}, + {"sampling specified", ", sampling: \"25%\"", 25}, + {"no sampling again", _EMPTY_, 100}, + } { + t.Run(test.name, func(t *testing.T) { + if iter > 0 { + reloadUpdateConfig(t, s1, conf1, fmt.Sprintf(tmpl, "A", test.samplingStr, _EMPTY_)) + reloadUpdateConfig(t, s2, conf2, fmt.Sprintf(tmpl, "B", test.samplingStr, routes)) + } + msg := nats.NewMsg("foo") + msg.Header.Set(traceParentHdr, "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01") + total := 400 + for i := 0; i < total; i++ { + err := nc1.PublishMsg(msg) + require_NoError(t, err) + } + // Wait a bit to make sure that we received all traces that should + // have been received. + time.Sleep(500 * time.Millisecond) + n, _, err := sub.Pending() + require_NoError(t, err) + fromClient := 0 + fromRoute := 0 + for i := 0; i < n; i++ { + msg = natsNexMsg(t, sub, time.Second) + var e MsgTraceEvent + err = json.Unmarshal(msg.Data, &e) + require_NoError(t, err) + ingress := e.Ingress() + require_True(t, ingress != nil) + switch ingress.Kind { + case CLIENT: + fromClient++ + case ROUTER: + fromRoute++ + default: + t.Fatalf("Unexpected ingress: %+v", ingress) + } + } + // There should be as many messages coming from the origin server + // and the routed server. This checks that if sampling is not 100% + // then when a message is routed, the header is properly deactivated. + require_Equal[int](t, fromClient, fromRoute) + // Now check that if sampling was 100%, we have the total number + // of published messages. + if test.sampling == 100 { + require_Equal[int](t, fromClient, total) + } else { + // Otherwise, we should have no more (but let's be conservative) + // than the sampling number. + require_LessThan[int](t, fromClient, int(float64(test.sampling*total/100)*1.35)) + } + }) + } +} + +func TestMsgTraceAccDestWithSamplingJWTUpdate(t *testing.T) { + // create system account + sysKp, _ := nkeys.CreateAccount() + sysPub, _ := sysKp.PublicKey() + sysCreds := newUser(t, sysKp) + // create account A + akp, _ := nkeys.CreateAccount() + aPub, _ := akp.PublicKey() + claim := jwt.NewAccountClaims(aPub) + claim.Trace = &jwt.MsgTrace{Destination: "acc.trace.dest"} + aJwt, err := claim.Encode(oKp) + require_NoError(t, err) + + dir := t.TempDir() + conf := createConfFile(t, []byte(fmt.Sprintf(` + listen: -1 + operator: %s + resolver: { + type: full + dir: '%s' + } + system_account: %s + `, ojwt, dir, sysPub))) + s, _ := RunServerWithConfig(conf) + defer s.Shutdown() + updateJwt(t, s.ClientURL(), sysCreds, aJwt, 1) + + nc := natsConnect(t, s.ClientURL(), createUserCreds(t, nil, akp)) + defer nc.Close() + + sub := natsSubSync(t, nc, "acc.trace.dest") + natsFlush(t, nc) + + for iter, test := range []struct { + name string + sampling int + }{ + {"no sampling specified", 100}, + {"sampling", 25}, + {"set back sampling to 0", 100}, + } { + t.Run(test.name, func(t *testing.T) { + if iter > 0 { + claim.Trace = &jwt.MsgTrace{Destination: "acc.trace.dest", Sampling: test.sampling} + aJwt, err = claim.Encode(oKp) + require_NoError(t, err) + updateJwt(t, s.ClientURL(), sysCreds, aJwt, 1) + } + + msg := nats.NewMsg("foo") + msg.Header.Set(traceParentHdr, "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01") + msg.Data = []byte("hello") + + total := 400 + for i := 0; i < total; i++ { + err := nc.PublishMsg(msg) + require_NoError(t, err) + } + // Wait a bit to make sure that we received all traces that should + // have been received. + time.Sleep(500 * time.Millisecond) + n, _, err := sub.Pending() + require_NoError(t, err) + for i := 0; i < n; i++ { + msg = natsNexMsg(t, sub, time.Second) + var e MsgTraceEvent + err = json.Unmarshal(msg.Data, &e) + require_NoError(t, err) + } + // Now check that if sampling was 100%, we have the total number + // of published messages. + if test.sampling == 100 { + require_Equal[int](t, n, total) + } else { + // Otherwise, we should have no more (but let's be conservative) + // than the sampling number. + require_LessThan[int](t, n, int(float64(test.sampling*total/100)*1.35)) + } + }) + } +} diff --git a/server/opts.go b/server/opts.go index c9b147e910..98f90db220 100644 --- a/server/opts.go +++ b/server/opts.go @@ -2896,6 +2896,69 @@ func parseAccountLimits(mv interface{}, acc *Account, errors *[]error, warnings return nil } +func parseAccountMsgTrace(mv any, topKey string, acc *Account) error { + processDest := func(tk token, k string, v any) error { + td, ok := v.(string) + if !ok { + return &configErr{tk, fmt.Sprintf("Field %q should be a string, got %T", k, v)} + } + if !IsValidPublishSubject(td) { + return &configErr{tk, fmt.Sprintf("Trace destination %q is not valid", td)} + } + acc.traceDest = td + return nil + } + processSampling := func(tk token, n int) error { + if n <= 0 || n > 100 { + return &configErr{tk, fmt.Sprintf("Ttrace destination sampling value %d is invalid, needs to be [1..100]", n)} + } + acc.traceDestSampling = n + return nil + } + + var lt token + tk, v := unwrapValue(mv, <) + switch vv := v.(type) { + case string: + return processDest(tk, topKey, v) + case map[string]any: + for k, v := range vv { + tk, v := unwrapValue(v, <) + switch strings.ToLower(k) { + case "dest": + if err := processDest(tk, k, v); err != nil { + return err + } + case "sampling": + switch vv := v.(type) { + case int64: + if err := processSampling(tk, int(vv)); err != nil { + return err + } + case string: + s := strings.TrimSuffix(vv, "%") + n, err := strconv.Atoi(s) + if err != nil { + return &configErr{tk, fmt.Sprintf("Invalid trace destination sampling value %q", vv)} + } + if err := processSampling(tk, n); err != nil { + return err + } + default: + return &configErr{tk, fmt.Sprintf("Trace destination sampling field %q should be an integer or a percentage, got %T", k, v)} + } + default: + if !tk.IsUsedVariable() { + return &configErr{tk, fmt.Sprintf("Unknown field %q parsing account message trace map/struct %q", k, topKey)} + } + } + } + default: + return &configErr{tk, fmt.Sprintf("Expected account message trace %q to be a string or a map/struct, got %T", topKey, v)} + } + return nil +} + // parseAccounts will parse the different accounts syntax. func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]error) error { var ( @@ -3025,14 +3088,23 @@ func parseAccounts(v interface{}, opts *Options, errors *[]error, warnings *[]er *errors = append(*errors, err) continue } - case "trace_dest", "trace_destination", "trace_subject": - td := mv.(string) - if !IsValidPublishSubject(td) { - err := &configErr{tk, fmt.Sprintf("Trace destination %q is not valid", mv)} + case "msg_trace", "trace_dest": + if err := parseAccountMsgTrace(tk, k, acc); err != nil { *errors = append(*errors, err) continue } - acc.traceDest = td + // If trace destination is set but no sampling, set it to 100%. + if acc.traceDest != _EMPTY_ && acc.traceDestSampling == 0 { + acc.traceDestSampling = 100 + } else if acc.traceDestSampling > 0 && acc.traceDest == _EMPTY_ { + // If no trace destination is provided, no trace would be + // triggered, so if the user set a sampling value expecting + // something to happen, want and set the value to 0 for good + // measure. + *warnings = append(*warnings, + &configErr{tk, "Trace destination sampling ignored since no destination was set"}) + acc.traceDestSampling = 0 + } default: if !tk.IsUsedVariable() { err := &unknownConfigFieldErr{ diff --git a/server/stream.go b/server/stream.go index dc3776f21d..405ee9587f 100644 --- a/server/stream.go +++ b/server/stream.go @@ -4224,7 +4224,7 @@ func (mset *stream) processInboundJetStreamMsg(_ *subscription, c *client, _ *Ac // to prevent a trace event to be generated when a stored message // is delivered to a consumer and routed. if !traceOnly { - mt.disableTraceHeaders(c, hdr) + disableTraceHeaders(c, hdr) } // This will add the jetstream event while in the client read loop. // Since the event will be updated in a different go routine, the