From 3985969ef6c3799b8210a2ff465a773dea657907 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Mon, 9 Jan 2023 15:41:27 +0100 Subject: [PATCH] Add grouping --- micro/example_handler_test.go | 23 ++- micro/example_package_test.go | 45 +++-- micro/example_test.go | 110 +++++----- micro/request.go | 2 +- micro/service.go | 369 ++++++++++++++++++++-------------- micro/test/service_test.go | 297 ++++++++++++--------------- 6 files changed, 454 insertions(+), 392 deletions(-) diff --git a/micro/example_handler_test.go b/micro/example_handler_test.go index 89ca2e5fc..004b25a1c 100644 --- a/micro/example_handler_test.go +++ b/micro/example_handler_test.go @@ -1,3 +1,16 @@ +// Copyright 2022-2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package micro_test import ( @@ -33,16 +46,14 @@ func ExampleHandler() { Name: "RectangleAreaService", Version: "0.1.0", RootSubject: "area", - Endpoints: map[string]micro.Endpoint{ - "Rectangle": { - Subject: "rec", - Handler: rec, - }, - }, } svc, err := micro.AddService(nc, config) if err != nil { log.Fatal(err) } defer svc.Stop() + _, err = svc.AddEndpoint("Rectangle", rec) + if err != nil { + log.Fatal(err) + } } diff --git a/micro/example_package_test.go b/micro/example_package_test.go index 13a76a38c..6ec3a867c 100644 --- a/micro/example_package_test.go +++ b/micro/example_package_test.go @@ -1,4 +1,4 @@ -// Copyright 2022 The NATS Authors +// Copyright 2022-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -60,31 +60,32 @@ func Example() { Version: "0.1.0", Description: "Increment numbers", RootSubject: "numbers", - Endpoints: map[string]micro.Endpoint{ - "Increment": { - // service handler - Handler: micro.HandlerFunc(incrementHandler), - // a unique subject serving as a service endpoint - Subject: "increment", - }, - "Multiply": { - Handler: micro.HandlerFunc(multiply), - Subject: "multiply", - }, - }, } - // Multiple instances of the servcice with the same name can be created. - // Requests to a service with the same name will be load-balanced. - for i := 0; i < 5; i++ { - svc, err := micro.AddService(nc, config) - if err != nil { - log.Fatal(err) - } - defer svc.Stop() + svc, err := micro.AddService(nc, config) + if err != nil { + log.Fatal(err) + } + defer svc.Stop() + + // register endpoints on a service + _, err = svc.AddEndpoint("Increment", micro.HandlerFunc(incrementHandler)) + if err != nil { + log.Fatal(err) + } + _, err = svc.AddEndpoint("Multiply", micro.HandlerFunc(multiply)) + if err != nil { + log.Fatal(err) + } + + // add a group + v1 := svc.AddGroup("v1") + _, err = v1.AddEndpoint("Increment", micro.HandlerFunc(incrementHandler)) + if err != nil { + log.Fatal(err) } // send a request to a service - resp, err := nc.Request("numbers.increment", []byte("3"), 1*time.Second) + resp, err := nc.Request("numbers.v1.increment", []byte("3"), 1*time.Second) if err != nil { log.Fatal(err) } diff --git a/micro/example_test.go b/micro/example_test.go index dca8dc8a1..54dff0e96 100644 --- a/micro/example_test.go +++ b/micro/example_test.go @@ -1,4 +1,4 @@ -// Copyright 2022 The NATS Authors +// Copyright 2022-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -29,22 +29,11 @@ func ExampleAddService() { } defer nc.Close() - echoHandler := func(req micro.Request) { - req.Respond(req.Data()) - } - config := micro.Config{ Name: "EchoService", Version: "1.0.0", Description: "Send back what you receive", RootSubject: "svc", - Endpoints: map[string]micro.Endpoint{ - "Echo": { - Subject: "echo", - Handler: micro.HandlerFunc(echoHandler), - }, - }, - // DoneHandler can be set to customize behavior on stopping a service. DoneHandler: func(srv micro.Service) { info := srv.Info() @@ -65,6 +54,67 @@ func ExampleAddService() { defer srv.Stop() } +func ExampleService_AddEndpoint() { + nc, err := nats.Connect("127.0.0.1:4222") + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + echoHandler := func(req micro.Request) { + req.Respond(req.Data()) + } + + config := micro.Config{ + Name: "EchoService", + Version: "1.0.0", + RootSubject: "svc", + } + + srv, err := micro.AddService(nc, config) + if err != nil { + log.Fatal(err) + } + + endpoint, err := srv.AddEndpoint("Echo", micro.HandlerFunc(echoHandler)) + if err != nil { + log.Fatal(err) + } + // Endpoints can be stopped individually or all at once using srv.Stop() + defer endpoint.Stop() +} + +func ExampleService_AddGroup() { + nc, err := nats.Connect("127.0.0.1:4222") + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + echoHandler := func(req micro.Request) { + req.Respond(req.Data()) + } + + config := micro.Config{ + Name: "EchoService", + Version: "1.0.0", + RootSubject: "svc", + } + + srv, err := micro.AddService(nc, config) + if err != nil { + log.Fatal(err) + } + + v1 := srv.AddGroup("v1") + + // endpoint will be registered under "v1.Echo" subject + _, err = v1.AddEndpoint("Echo", micro.HandlerFunc(echoHandler)) + if err != nil { + log.Fatal(err) + } +} + func ExampleService_Info() { nc, err := nats.Connect("127.0.0.1:4222") if err != nil { @@ -75,12 +125,6 @@ func ExampleService_Info() { config := micro.Config{ Name: "EchoService", RootSubject: "svc", - Endpoints: map[string]micro.Endpoint{ - "Echo": { - Subject: "echo", - Handler: micro.HandlerFunc(func(r micro.Request) {}), - }, - }, } srv, _ := micro.AddService(nc, config) @@ -106,21 +150,15 @@ func ExampleService_Stats() { Name: "EchoService", Version: "0.1.0", RootSubject: "svc", - Endpoints: map[string]micro.Endpoint{ - "Echo": { - Subject: "echo", - Handler: micro.HandlerFunc(func(r micro.Request) {}), - }, - }, } srv, _ := micro.AddService(nc, config) - + srv.AddEndpoint("Echo", micro.HandlerFunc(func(r micro.Request) {})) // stats of a service instance stats := srv.Stats() - fmt.Println(stats.Endpoints["Echo"].AverageProcessingTime) - fmt.Println(stats.Endpoints["Echo"].ProcessingTime) + fmt.Println(stats.Endpoints[0].AverageProcessingTime) + fmt.Println(stats.Endpoints[0].ProcessingTime) } @@ -135,12 +173,6 @@ func ExampleService_Stop() { Name: "EchoService", Version: "0.1.0", RootSubject: "svc", - Endpoints: map[string]micro.Endpoint{ - "Echo": { - Subject: "echo", - Handler: micro.HandlerFunc(func(r micro.Request) {}), - }, - }, } srv, _ := micro.AddService(nc, config) @@ -169,12 +201,6 @@ func ExampleService_Stopped() { Name: "EchoService", Version: "0.1.0", RootSubject: "svc", - Endpoints: map[string]micro.Endpoint{ - "Echo": { - Subject: "echo", - Handler: micro.HandlerFunc(func(r micro.Request) {}), - }, - }, } srv, _ := micro.AddService(nc, config) @@ -201,12 +227,6 @@ func ExampleService_Reset() { Name: "EchoService", Version: "0.1.0", RootSubject: "svc", - Endpoints: map[string]micro.Endpoint{ - "Echo": { - Subject: "echo", - Handler: micro.HandlerFunc(func(r micro.Request) {}), - }, - }, } srv, _ := micro.AddService(nc, config) diff --git a/micro/request.go b/micro/request.go index dbc4ce0f7..2d1b05686 100644 --- a/micro/request.go +++ b/micro/request.go @@ -1,4 +1,4 @@ -// Copyright 2022 The NATS Authors +// Copyright 2022-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/micro/service.go b/micro/service.go index 03dfa4eb2..4894e243d 100644 --- a/micro/service.go +++ b/micro/service.go @@ -1,4 +1,4 @@ -// Copyright 2022 The NATS Authors +// Copyright 2022-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -31,14 +31,26 @@ import ( // This functionality is EXPERIMENTAL and may be changed in later releases. type ( + + // Service exposes methods to operate on a service instance. Service interface { + // AddEndpoint registers endpoint with given name on a specific service. + // If service has root subject, the endpoint will be registered on {root_subject}.{name} subject. + // Otherwise, the endpoint subject is the same as endpoint name. + AddEndpoint(string, Handler) (Endpoint, error) + + // AddGroup returns a Group interface, allowing for more complex endpoint topologies. + // A group can be used to register endpoints with given prefix. + // Root subject will be prepended to the prefix (if available). + AddGroup(string) Group + // Info returns the service info. Info() Info // Stats returns statistics for the service endpoint and all monitoring endpoints. Stats() Stats - // Reset resets all statistics on a service instance. + // Reset resets all statistics (for all endpoints) on a service instance. Reset() // Stop drains the endpoint subscriptions and marks the service as stopped. @@ -48,6 +60,19 @@ type ( Stopped() bool } + // Group allows for grouping endpoints on a service. + // + // Endpoints created using AddEndpoint will be grouped under common prefix (group name) + // New groups can also be derived from a group using AddGroup. + Group interface { + // AddGroup creates a new group, prefixed by this group's prefix. + AddGroup(string) Group + + // AddEndpoint registers new endpoints on a service. + // The endpoint's subject will be prefixed with the group prefix. + AddEndpoint(string, Handler) (Endpoint, error) + } + // ErrHandler is a function used to configure a custom error handler for a service, ErrHandler func(Service, *NATSError) @@ -65,21 +90,24 @@ type ( Version string `json:"version"` } + // Stats is the type returned by STATS monitoring endpoint. + // It contains stats of all registered endpoints. Stats struct { ServiceIdentity - Type string `json:"type"` - Endpoints map[string]*EndpointStats `json:"endpoints"` + Type string `json:"type"` + Started time.Time `json:"started"` + Endpoints []*EndpointStats `json:"endpoints"` } - // Stats is the type returned by STATS monitoring endpoint. - // It contains stats for a specific endpoint (either request handler or monitoring enpoints). + // EndpointStats contains stats for a specific endpoint. EndpointStats struct { + Name string `json:"name"` + Subject string `json:"subject"` NumRequests int `json:"num_requests"` NumErrors int `json:"num_errors"` LastError string `json:"last_error"` ProcessingTime time.Duration `json:"processing_time"` AverageProcessingTime time.Duration `json:"average_processing_time"` - Started time.Time `json:"started"` Data json.RawMessage `json:"data,omitempty"` } @@ -92,9 +120,10 @@ type ( // Info is the basic information about a service type. Info struct { ServiceIdentity - Type string `json:"type"` - Description string `json:"description"` - RootSubject string `json:"root_subject"` + Type string `json:"type"` + Description string `json:"description"` + RootSubject string `json:"root_subject"` + Endpoints []string `json:"endpoints"` } // SchemaResp is the response value for SCHEMA requests. @@ -111,10 +140,26 @@ type ( Response string `json:"response"` } - // Endpoint is used to configure a subject and handler for a service. - Endpoint struct { - Subject string `json:"subject"` - Handler Handler + // Endpoint manages a service endpoint. + // There is not need to call Stop on each endpoint on cleanup. + // Instead, Service.Stop will stop all endpoints registered on a service. + Endpoint interface { + Stop() error + Subject() string + } + + endpoint struct { + service *service + subject string + handler Handler + + stats EndpointStats + subscription *nats.Subscription + } + + group struct { + service *service + prefix string } // Verb represents a name of the monitoring service. @@ -125,7 +170,7 @@ type ( // Name represents the name of the service. Name string `json:"name"` - // RootSubject is the root subject of the service. + // RootSubject is the optional root subject of the service. // All endpoints will be prefixed with root subject. RootSubject string `json:"root_subject"` @@ -138,10 +183,6 @@ type ( // Service schema Schema Schema `json:"schema"` - // Endpoints is a collenction of service endpoints. - // Map key serves as endpoint name. - Endpoints map[string]Endpoint `json:"endpoint"` - // StatsHandler is a user-defined custom function // used to calculate additional service stats. StatsHandler StatsHandler @@ -170,10 +211,10 @@ type ( m sync.Mutex id string - endpointSubs map[string]*nats.Subscription + endpoints []*endpoint verbSubs map[string]*nats.Subscription - stats *Stats - conn *nats.Conn + started time.Time + nc *nats.Conn natsHandlers handlers stopped bool @@ -253,8 +294,8 @@ func (s Verb) String() string { } // AddService adds a microservice. -// It will enable internal common services (PING, STATS, INFO and SCHEMA) as well as -// the actual service handler on the subject provided in config.Endpoint +// It will enable internal common services (PING, STATS, INFO and SCHEMA). +// Request handlers have to be registered separately using Service.AddEndpoint. // A service name, version and Endpoint configuration are required to add a service. // AddService returns a [Service] interface, allowing service management. // Each service is assigned a unique ID. @@ -266,47 +307,24 @@ func AddService(nc *nats.Conn, config Config) (Service, error) { id := nuid.Next() svc := &service{ Config: config, - conn: nc, + nc: nc, id: id, asyncDispatcher: asyncCallbacksHandler{ cbQueue: make(chan func(), 100), }, - endpointSubs: make(map[string]*nats.Subscription), - verbSubs: make(map[string]*nats.Subscription), + verbSubs: make(map[string]*nats.Subscription), + endpoints: make([]*endpoint, 0), } svcIdentity := ServiceIdentity{ Name: config.Name, ID: id, Version: config.Version, } - svc.stats = &Stats{ - ServiceIdentity: svcIdentity, - Endpoints: make(map[string]*EndpointStats), - } svc.setupAsyncCallbacks() go svc.asyncDispatcher.asyncCBDispatcher() - for name, endpoint := range config.Endpoints { - sub, err := nc.QueueSubscribe( - fmt.Sprintf("%s.%s", config.RootSubject, endpoint.Subject), - QG, - func(m *nats.Msg) { - svc.reqHandler(name, &request{msg: m}) - }, - ) - if err != nil { - svc.Stop() - svc.asyncDispatcher.close() - return nil, err - } - svc.endpointSubs[name] = sub - svc.stats.Endpoints[name] = &EndpointStats{ - Started: time.Now().UTC(), - } - } - // Setup internal subscriptions. infoHandler := func(req Request) { response, _ := json.Marshal(svc.Info()) @@ -371,9 +389,53 @@ func AddService(nc *nats.Conn, config Config) (Service, error) { return nil, err } + svc.started = time.Now().UTC() return svc, nil } +func (s *service) AddEndpoint(name string, handler Handler) (Endpoint, error) { + subject := name + if s.RootSubject != "" { + subject = fmt.Sprintf("%s.%s", s.RootSubject, name) + } + return addEndpoint(s, name, subject, handler) +} + +func addEndpoint(s *service, name, subject string, handler Handler) (Endpoint, error) { + endpoint := &endpoint{ + service: s, + subject: subject, + handler: handler, + } + sub, err := s.nc.QueueSubscribe( + subject, + QG, + func(m *nats.Msg) { + s.reqHandler(endpoint, &request{msg: m}) + }, + ) + if err != nil { + return nil, err + } + endpoint.subscription = sub + s.endpoints = append(s.endpoints, endpoint) + endpoint.stats = EndpointStats{ + Name: name, + Subject: subject, + } + return endpoint, nil +} + +func (s *service) AddGroup(name string) Group { + if s.RootSubject != "" { + name = fmt.Sprintf("%s.%s", s.RootSubject, name) + } + return &group{ + service: s, + prefix: name, + } +} + // dispatch is responsible for calling any async callbacks func (ac *asyncCallbacksHandler) asyncCBDispatcher() { for { @@ -401,94 +463,67 @@ func (c *Config) valid() error { if !semVerRegexp.MatchString(c.Version) { return fmt.Errorf("%w: version: version should not be empty should match the SemVer format", ErrConfigValidation) } - if len(c.Endpoints) == 0 { - return fmt.Errorf("%w: endpoints: service should have at least one endpoint configured", ErrConfigValidation) - } - if c.RootSubject == "" { - return fmt.Errorf("%w: root subject: cannot be empty", ErrConfigValidation) - } - for name, endpoint := range c.Endpoints { - if name == "" { - return fmt.Errorf("%w: endpoint name cannot be empty", ErrConfigValidation) - } - if err := endpoint.valid(); err != nil { - return fmt.Errorf("%w: endpoint %q: %s", ErrConfigValidation, name, err) - } - } - return nil -} - -func (e *Endpoint) valid() error { - if e.Subject == "" { - return fmt.Errorf("%w: endpoint: subject is required", ErrConfigValidation) - } - if e.Handler == nil { - return fmt.Errorf("%w: endpoint: handler is required", ErrConfigValidation) - } return nil } -func (svc *service) setupAsyncCallbacks() { - svc.m.Lock() - defer svc.m.Unlock() - svc.natsHandlers.closed = svc.conn.ClosedHandler() - if svc.natsHandlers.closed != nil { - svc.conn.SetClosedHandler(func(c *nats.Conn) { - svc.Stop() - svc.natsHandlers.closed(c) +func (s *service) setupAsyncCallbacks() { + s.m.Lock() + defer s.m.Unlock() + s.natsHandlers.closed = s.nc.ClosedHandler() + if s.natsHandlers.closed != nil { + s.nc.SetClosedHandler(func(c *nats.Conn) { + s.Stop() + s.natsHandlers.closed(c) }) } else { - svc.conn.SetClosedHandler(func(c *nats.Conn) { - svc.Stop() + s.nc.SetClosedHandler(func(c *nats.Conn) { + s.Stop() }) } - svc.natsHandlers.asyncErr = svc.conn.ErrorHandler() - if svc.natsHandlers.asyncErr != nil { - svc.conn.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) { - if !svc.matchSubscriptionSubject(s.Subject) { - svc.natsHandlers.asyncErr(c, s, err) + s.natsHandlers.asyncErr = s.nc.ErrorHandler() + if s.natsHandlers.asyncErr != nil { + s.nc.SetErrorHandler(func(c *nats.Conn, sub *nats.Subscription, err error) { + if !s.matchSubscriptionSubject(sub.Subject) { + s.natsHandlers.asyncErr(c, sub, err) } - if svc.Config.ErrorHandler != nil { - svc.Config.ErrorHandler(svc, &NATSError{ - Subject: s.Subject, + if s.Config.ErrorHandler != nil { + s.Config.ErrorHandler(s, &NATSError{ + Subject: sub.Subject, Description: err.Error(), }) } - svc.m.Lock() - for name, endpointSub := range svc.endpointSubs { - if endpointSub.Subject == s.Subject { - endpointStats := svc.stats.Endpoints[name] - endpointStats.NumErrors++ - endpointStats.LastError = err.Error() - svc.stats.Endpoints[name] = endpointStats + s.m.Lock() + for _, endpoint := range s.endpoints { + if endpoint.Subject() == sub.Subject { + endpoint.stats.NumErrors++ + endpoint.stats.LastError = err.Error() } } - svc.m.Unlock() - svc.Stop() - svc.natsHandlers.asyncErr(c, s, err) + s.m.Unlock() + s.Stop() + s.natsHandlers.asyncErr(c, sub, err) }) } else { - svc.conn.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) { - if !svc.matchSubscriptionSubject(s.Subject) { + s.nc.SetErrorHandler(func(c *nats.Conn, sub *nats.Subscription, err error) { + if !s.matchSubscriptionSubject(sub.Subject) { return } - if svc.Config.ErrorHandler != nil { - svc.Config.ErrorHandler(svc, &NATSError{ - Subject: s.Subject, + if s.Config.ErrorHandler != nil { + s.Config.ErrorHandler(s, &NATSError{ + Subject: sub.Subject, Description: err.Error(), }) } - svc.m.Lock() - for name, endpointSub := range svc.endpointSubs { - if endpointSub.Subject == s.Subject { - endpointStats := svc.stats.Endpoints[name] - endpointStats.NumErrors++ - endpointStats.LastError = err.Error() + s.m.Lock() + for _, endpoint := range s.endpoints { + if endpoint.subject == sub.Subject { + endpoint.stats.NumErrors++ + endpoint.stats.LastError = err.Error() } } - svc.m.Unlock() - svc.Stop() + s.m.Unlock() + s.Stop() }) } } @@ -540,23 +575,18 @@ func (s *service) addInternalHandler(nc *nats.Conn, verb Verb, kind, id, name st } // reqHandler invokes the service request handler and modifies service stats -func (s *service) reqHandler(endpointName string, req *request) { - endpoint, ok := s.Endpoints[endpointName] - if !ok { - return - } +func (s *service) reqHandler(endpoint *endpoint, req *request) { start := time.Now() - endpoint.Handler.Handle(req) + endpoint.handler.Handle(req) s.m.Lock() - stats := s.stats.Endpoints[endpointName] - stats.NumRequests++ - stats.ProcessingTime += time.Since(start) - avgProcessingTime := stats.ProcessingTime.Nanoseconds() / int64(stats.NumRequests) - stats.AverageProcessingTime = time.Duration(avgProcessingTime) + endpoint.stats.NumRequests++ + endpoint.stats.ProcessingTime += time.Since(start) + avgProcessingTime := endpoint.stats.ProcessingTime.Nanoseconds() / int64(endpoint.stats.NumRequests) + endpoint.stats.AverageProcessingTime = time.Duration(avgProcessingTime) if req.respondError != nil { - stats.NumErrors++ - stats.LastError = req.respondError.Error() + endpoint.stats.NumErrors++ + endpoint.stats.LastError = req.respondError.Error() } s.m.Unlock() } @@ -568,11 +598,10 @@ func (s *service) Stop() error { if s.stopped { return nil } - for name, sub := range s.endpointSubs { - if err := sub.Drain(); err != nil { - return fmt.Errorf("draining subscription for request handler: %w", err) + for _, e := range s.endpoints { + if err := e.Stop(); err != nil { + return err } - delete(s.endpointSubs, name) } var keys []string for key, sub := range s.verbSubs { @@ -584,7 +613,7 @@ func (s *service) Stop() error { for _, key := range keys { delete(s.verbSubs, key) } - restoreAsyncHandlers(s.conn, s.natsHandlers) + restoreAsyncHandlers(s.nc, s.natsHandlers) s.stopped = true if s.DoneHandler != nil { s.asyncDispatcher.push(func() { s.DoneHandler(s) }) @@ -600,6 +629,10 @@ func restoreAsyncHandlers(nc *nats.Conn, handlers handlers) { // Info returns information about the service func (s *service) Info() Info { + endpoints := make([]string, 0, len(s.endpoints)) + for _, e := range s.endpoints { + endpoints = append(endpoints, e.subject) + } return Info{ ServiceIdentity: ServiceIdentity{ Name: s.Config.Name, @@ -609,6 +642,7 @@ func (s *service) Info() Info { Type: InfoResponseType, Description: s.Config.Description, RootSubject: s.Config.RootSubject, + Endpoints: endpoints, } } @@ -623,23 +657,21 @@ func (s *service) Stats() Stats { ID: info.ID, Version: info.Version, }, - Endpoints: make(map[string]*EndpointStats), + Endpoints: make([]*EndpointStats, 0), } - for name, endpoint := range s.Endpoints { - currentStats := s.stats.Endpoints[name] + for _, endpoint := range s.endpoints { endpointStats := &EndpointStats{ - NumRequests: currentStats.NumRequests, - NumErrors: currentStats.NumErrors, - LastError: currentStats.LastError, - ProcessingTime: currentStats.ProcessingTime, - AverageProcessingTime: currentStats.AverageProcessingTime, - Started: currentStats.Started, + NumRequests: endpoint.stats.NumRequests, + NumErrors: endpoint.stats.NumErrors, + LastError: endpoint.stats.LastError, + ProcessingTime: endpoint.stats.ProcessingTime, + AverageProcessingTime: endpoint.stats.AverageProcessingTime, } if s.StatsHandler != nil { data, _ := json.Marshal(s.StatsHandler(endpoint)) endpointStats.Data = data } - stats.Endpoints[name] = endpointStats + stats.Endpoints = append(stats.Endpoints, endpointStats) } return stats } @@ -647,12 +679,8 @@ func (s *service) Stats() Stats { // Reset resets all statistics on a service instance. func (s *service) Reset() { s.m.Lock() - s.stats = &Stats{ - ServiceIdentity: s.Info().ServiceIdentity, - Endpoints: make(map[string]*EndpointStats), - } - for name := range s.Endpoints { - s.stats.Endpoints[name] = &EndpointStats{} + for _, endpoint := range s.endpoints { + endpoint.reset() } s.m.Unlock() } @@ -664,6 +692,45 @@ func (s *service) Stopped() bool { return s.stopped } +func (e *NATSError) Error() string { + return fmt.Sprintf("%q: %s", e.Subject, e.Description) +} + +func (g *group) AddEndpoint(name string, handler Handler) (Endpoint, error) { + subject := fmt.Sprintf("%s.%s", g.prefix, name) + return addEndpoint(g.service, name, subject, handler) +} + +func (g *group) AddGroup(name string) Group { + return &group{ + service: g.service, + prefix: fmt.Sprintf("%s.%s", g.prefix, name), + } +} + +func (e *endpoint) Subject() string { + return e.subject +} + +func (e *endpoint) Stop() error { + if err := e.subscription.Drain(); err != nil { + return fmt.Errorf("draining subscription for request handler: %w", err) + } + for i, endpoint := range e.service.endpoints { + if endpoint.subject == e.subject { + e.service.endpoints = append(e.service.endpoints[:i], e.service.endpoints[i+1:]...) + } + } + return nil +} + +func (e *endpoint) reset() { + e.stats = EndpointStats{ + Name: e.stats.Name, + Subject: e.stats.Subject, + } +} + // ControlSubject returns monitoring subjects used by the Service. // Providing a verb is mandatory (it should be one of Ping, Schema, Info or Stats). // Depending on whether kind and id are provided, ControlSubject will return one of the following: @@ -686,7 +753,3 @@ func ControlSubject(verb Verb, name, id string) (string, error) { } return fmt.Sprintf("%s.%s.%s.%s", APIPrefix, verbStr, name, id), nil } - -func (e *NATSError) Error() string { - return fmt.Sprintf("%q: %s", e.Subject, e.Description) -} diff --git a/micro/test/service_test.go b/micro/test/service_test.go index c1da1ecd6..e335fd795 100644 --- a/micro/test/service_test.go +++ b/micro/test/service_test.go @@ -1,4 +1,4 @@ -// Copyright 2022 The NATS Authors +// Copyright 2022-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -67,13 +67,7 @@ func TestServiceBasics(t *testing.T) { Version: "0.1.0", Description: "Add things together", RootSubject: "svc", - Endpoints: map[string]micro.Endpoint{ - "Add": { - Subject: "add", - Handler: micro.HandlerFunc(doAdd), - }, - }, - Schema: micro.Schema{Request: "", Response: ""}, + Schema: micro.Schema{Request: "", Response: ""}, } for i := 0; i < 5; i++ { @@ -81,6 +75,10 @@ func TestServiceBasics(t *testing.T) { if err != nil { t.Fatalf("Expected to create Service, got %v", err) } + _, err = svc.AddEndpoint("add", micro.HandlerFunc(doAdd)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } defer svc.Stop() svcs = append(svcs, svc) } @@ -172,7 +170,7 @@ func TestServiceBasics(t *testing.T) { if err := json.Unmarshal(resp.Data, &srvStats); err != nil { t.Fatalf("Unexpected error: %v", err) } - requestsNum += srvStats.Endpoints["Add"].NumRequests + requestsNum += srvStats.Endpoints[0].NumRequests stats = append(stats, srvStats) } if len(stats) != 5 { @@ -186,7 +184,7 @@ func TestServiceBasics(t *testing.T) { // Reset stats for a service svcs[0].Reset() - if svcs[0].Stats().Endpoints["Add"].NumRequests != 0 { + if svcs[0].Stats().Endpoints[0].NumRequests != 0 { t.Fatalf("Expected empty stats after reset; got: %+v", svcs[0].Stats()) } @@ -202,6 +200,7 @@ func TestAddService(t *testing.T) { tests := []struct { name string givenConfig micro.Config + endpoints []string natsClosedHandler nats.ConnHandler natsErrorHandler nats.ErrHandler asyncErrorSubject string @@ -214,13 +213,8 @@ func TestAddService(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(testHandler), - }, - }, }, + endpoints: []string{"func"}, expectedPing: micro.Ping{ Type: micro.PingResponseType, ServiceIdentity: micro.ServiceIdentity{ @@ -235,21 +229,8 @@ func TestAddService(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func1": { - Subject: "func1", - Handler: micro.HandlerFunc(testHandler), - }, - "func2": { - Subject: "fun2", - Handler: micro.HandlerFunc(testHandler), - }, - "func3": { - Subject: "func3", - Handler: micro.HandlerFunc(testHandler), - }, - }, }, + endpoints: []string{"func1", "func2", "func3"}, expectedPing: micro.Ping{ Type: micro.PingResponseType, ServiceIdentity: micro.ServiceIdentity{ @@ -264,16 +245,11 @@ func TestAddService(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(testHandler), - }, - }, DoneHandler: func(micro.Service) { doneService <- struct{}{} }, }, + endpoints: []string{"func"}, expectedPing: micro.Ping{ Type: micro.PingResponseType, ServiceIdentity: micro.ServiceIdentity{ @@ -288,16 +264,11 @@ func TestAddService(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(testHandler), - }, - }, ErrorHandler: func(micro.Service, *micro.NATSError) { errService <- struct{}{} }, }, + endpoints: []string{"func"}, expectedPing: micro.Ping{ Type: micro.PingResponseType, ServiceIdentity: micro.ServiceIdentity{ @@ -313,16 +284,11 @@ func TestAddService(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(testHandler), - }, - }, ErrorHandler: func(micro.Service, *micro.NATSError) { errService <- struct{}{} }, }, + endpoints: []string{"func"}, expectedPing: micro.Ping{ Type: micro.PingResponseType, ServiceIdentity: micro.ServiceIdentity{ @@ -338,16 +304,11 @@ func TestAddService(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(testHandler), - }, - }, DoneHandler: func(micro.Service) { doneService <- struct{}{} }, }, + endpoints: []string{"func"}, natsClosedHandler: func(c *nats.Conn) { closedNats <- struct{}{} }, @@ -369,16 +330,11 @@ func TestAddService(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(testHandler), - }, - }, DoneHandler: func(micro.Service) { doneService <- struct{}{} }, }, + endpoints: []string{"func"}, natsClosedHandler: func(c *nats.Conn) { closedNats <- struct{}{} }, @@ -399,16 +355,11 @@ func TestAddService(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(testHandler), - }, - }, DoneHandler: func(micro.Service) { doneService <- struct{}{} }, }, + endpoints: []string{"func"}, natsClosedHandler: func(c *nats.Conn) { closedNats <- struct{}{} }, @@ -430,13 +381,8 @@ func TestAddService(t *testing.T) { Name: "test_service!", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(testHandler), - }, - }, }, + endpoints: []string{"func"}, withError: micro.ErrConfigValidation, }, { @@ -445,58 +391,8 @@ func TestAddService(t *testing.T) { Name: "test_service!", Version: "abc", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(testHandler), - }, - }, - }, - withError: micro.ErrConfigValidation, - }, - { - name: "validation error, empty subject", - givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "", - Handler: micro.HandlerFunc(testHandler), - }, - }, - }, - withError: micro.ErrConfigValidation, - }, - { - name: "validation error, no handler", - givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: nil, - }, - }, - }, - withError: micro.ErrConfigValidation, - }, - { - name: "validation error, no root subject", - givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - RootSubject: "", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: nil, - }, - }, }, + endpoints: []string{"func"}, withError: micro.ErrConfigValidation, }, } @@ -525,6 +421,11 @@ func TestAddService(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + for _, endpoint := range test.endpoints { + if _, err := srv.AddEndpoint(endpoint, micro.HandlerFunc(testHandler)); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } info := srv.Info() pingSubject, err := micro.ControlSubject(micro.PingVerb, info.Name, info.ID) @@ -610,6 +511,92 @@ func TestAddService(t *testing.T) { } } +func TestGroups(t *testing.T) { + tests := []struct { + name string + rootSubject string + endpointName string + groups []string + expectedSubject string + }{ + { + name: "empty root subject, no groups", + endpointName: "foo", + expectedSubject: "foo", + }, + { + name: "with root subject, no groups", + rootSubject: "root", + endpointName: "foo", + expectedSubject: "root.foo", + }, + { + name: "empty root subject, with group", + endpointName: "foo", + groups: []string{"g1"}, + expectedSubject: "g1.foo", + }, + { + name: "with root subject and group", + endpointName: "foo", + rootSubject: "root", + groups: []string{"g1"}, + expectedSubject: "root.g1.foo", + }, + { + name: "with root subject and multiple groups", + endpointName: "foo", + rootSubject: "root", + groups: []string{"g1", "g2", "g3"}, + expectedSubject: "root.g1.g2.g3.foo", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + srv, err := micro.AddService(nc, micro.Config{ + Name: "test_service", + RootSubject: test.rootSubject, + Version: "0.0.1", + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer srv.Stop() + + var endpoint micro.Endpoint + if len(test.groups) > 0 { + group := srv.AddGroup(test.groups[0]) + for _, g := range test.groups[1:] { + group = group.AddGroup(g) + } + endpoint, err = group.AddEndpoint(test.endpointName, micro.HandlerFunc(func(r micro.Request) {})) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } else { + endpoint, err = srv.AddEndpoint(test.endpointName, micro.HandlerFunc(func(r micro.Request) {})) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + + if endpoint.Subject() != test.expectedSubject { + t.Fatalf("Invalid subject; want: %s, got: %s", test.expectedSubject, endpoint.Subject()) + } + }) + } +} + func TestMonitoringHandlers(t *testing.T) { s := RunServerOnPort(-1) defer s.Shutdown() @@ -629,12 +616,6 @@ func TestMonitoringHandlers(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(func(r micro.Request) {}), - }, - }, Schema: micro.Schema{ Request: "some_schema", }, @@ -644,6 +625,10 @@ func TestMonitoringHandlers(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + _, err = srv.AddEndpoint("func", micro.HandlerFunc(func(r micro.Request) {})) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } defer func() { srv.Stop() if !srv.Stopped() { @@ -706,6 +691,7 @@ func TestMonitoringHandlers(t *testing.T) { ID: info.ID, }, RootSubject: "test", + Endpoints: []string{"test.func"}, }, }, { @@ -719,6 +705,7 @@ func TestMonitoringHandlers(t *testing.T) { ID: info.ID, }, RootSubject: "test", + Endpoints: []string{"test.func"}, }, }, { @@ -732,6 +719,7 @@ func TestMonitoringHandlers(t *testing.T) { ID: info.ID, }, RootSubject: "test", + Endpoints: []string{"test.func"}, }, }, { @@ -856,12 +844,6 @@ func TestServiceStats(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(handler), - }, - }, }, }, { @@ -870,12 +852,6 @@ func TestServiceStats(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(handler), - }, - }, StatsHandler: func(e micro.Endpoint) interface{} { return map[string]interface{}{ "key": "val", @@ -892,12 +868,6 @@ func TestServiceStats(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(handler), - }, - }, Schema: micro.Schema{ Request: "some_schema", }, @@ -909,12 +879,6 @@ func TestServiceStats(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(handler), - }, - }, Schema: micro.Schema{ Request: "some_schema", }, @@ -945,6 +909,9 @@ func TestServiceStats(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + if _, err := srv.AddEndpoint("func", micro.HandlerFunc(handler)); err != nil { + t.Fatalf("Unexpected error: %v", err) + } defer srv.Stop() for i := 0; i < 10; i++ { if _, err := nc.Request("test.func", []byte("msg"), time.Second); err != nil { @@ -976,15 +943,15 @@ func TestServiceStats(t *testing.T) { if stats.ID != info.ID { t.Errorf("Unexpected service name; want: %s; got: %s", info.ID, stats.ID) } - if stats.Endpoints["func"].NumRequests != 11 { - t.Errorf("Unexpected num_requests; want: 11; got: %d", stats.Endpoints["func"].NumRequests) + if stats.Endpoints[0].NumRequests != 11 { + t.Errorf("Unexpected num_requests; want: 11; got: %d", stats.Endpoints[0].NumRequests) } - if stats.Endpoints["func"].NumErrors != 1 { - t.Errorf("Unexpected num_errors; want: 1; got: %d", stats.Endpoints["func"].NumErrors) + if stats.Endpoints[0].NumErrors != 1 { + t.Errorf("Unexpected num_errors; want: 1; got: %d", stats.Endpoints[0].NumErrors) } if test.expectedStats != nil { var data map[string]interface{} - if err := json.Unmarshal(stats.Endpoints["func"].Data, &data); err != nil { + if err := json.Unmarshal(stats.Endpoints[0].Data, &data); err != nil { t.Fatalf("Unexpected error: %v", err) } if !reflect.DeepEqual(data, test.expectedStats) { @@ -1098,6 +1065,9 @@ func TestRequestRespond(t *testing.T) { if val := req.Headers().Get("key"); val != "value" { t.Fatalf("Expected headers in the request") } + if !bytes.Equal(req.Data(), []byte("req")) { + t.Fatalf("Invalid request data; want: %q; got: %q", "req", req.Data()) + } if errCode == "" && errDesc == "" { if resp, ok := respData.([]byte); ok { err := req.Respond(resp, micro.WithHeaders(test.respondHeaders)) @@ -1142,21 +1112,18 @@ func TestRequestRespond(t *testing.T) { Version: "0.1.0", Description: "test service", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(handler), - }, - }, }) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer svc.Stop() + if _, err := svc.AddEndpoint("func", micro.HandlerFunc(handler)); err != nil { + t.Fatalf("Unexpected error: %v", err) + } resp, err := nc.RequestMsg(&nats.Msg{ Subject: "test.func", - Data: nil, + Data: []byte("req"), Header: nats.Header{"key": []string{"value"}}, }, 50*time.Millisecond) if test.withRespondError != nil {