From a8d695d0ccb69c5999a7171fe140c25ed01e5384 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Wed, 4 Jan 2023 17:53:18 +0100 Subject: [PATCH 1/6] [ADDED] Support for multi-endpoint services --- micro/example_handler_test.go | 13 +- micro/example_package_test.go | 31 +++- micro/example_test.go | 78 +++++---- micro/service.go | 197 +++++++++++++++------- micro/test/service_test.go | 304 ++++++++++++++++++++++------------ 5 files changed, 417 insertions(+), 206 deletions(-) diff --git a/micro/example_handler_test.go b/micro/example_handler_test.go index 6eff36634..89ca2e5fc 100644 --- a/micro/example_handler_test.go +++ b/micro/example_handler_test.go @@ -30,11 +30,14 @@ func ExampleHandler() { rec := rectangle{10, 5} config := micro.Config{ - Name: "RectangleAreaService", - Version: "0.1.0", - Endpoint: micro.Endpoint{ - Handler: rec, - Subject: "rectangle.area", + Name: "RectangleAreaService", + Version: "0.1.0", + RootSubject: "area", + Endpoints: map[string]micro.Endpoint{ + "Rectangle": { + Subject: "rec", + Handler: rec, + }, }, } svc, err := micro.AddService(nc, config) diff --git a/micro/example_package_test.go b/micro/example_package_test.go index 9b8cdf868..13a76a38c 100644 --- a/micro/example_package_test.go +++ b/micro/example_package_test.go @@ -30,7 +30,7 @@ func Example() { } defer nc.Close() - // service handler - in this case, HandlerFunc is used, + // endpoint handler - in this case, HandlerFunc is used, // which is a built-in implementation of Handler interface incrementHandler := func(req micro.Request) { val, err := strconv.Atoi(string(req.Data())) @@ -43,15 +43,34 @@ func Example() { req.Respond([]byte(strconv.Itoa(responseData))) } + // second endpoint + multiply := func(req micro.Request) { + val, err := strconv.Atoi(string(req.Data())) + if err != nil { + req.Error("400", "request data should be a number", nil) + return + } + + responseData := val * 2 + req.Respond([]byte(strconv.Itoa(responseData))) + } + config := micro.Config{ Name: "IncrementService", Version: "0.1.0", Description: "Increment numbers", - Endpoint: micro.Endpoint{ - // service handler - Handler: micro.HandlerFunc(incrementHandler), - // a unique subject serving as a service endpoint - Subject: "numbers.increment", + RootSubject: "numbers", + Endpoints: map[string]micro.Endpoint{ + "Increment": { + // service handler + Handler: micro.HandlerFunc(incrementHandler), + // a unique subject serving as a service endpoint + Subject: "increment", + }, + "Multiply": { + Handler: micro.HandlerFunc(multiply), + Subject: "multiply", + }, }, } // Multiple instances of the servcice with the same name can be created. diff --git a/micro/example_test.go b/micro/example_test.go index 43a324848..dca8dc8a1 100644 --- a/micro/example_test.go +++ b/micro/example_test.go @@ -37,9 +37,12 @@ func ExampleAddService() { Name: "EchoService", Version: "1.0.0", Description: "Send back what you receive", - Endpoint: micro.Endpoint{ - Subject: "echo", - Handler: micro.HandlerFunc(echoHandler), + RootSubject: "svc", + Endpoints: map[string]micro.Endpoint{ + "Echo": { + Subject: "echo", + Handler: micro.HandlerFunc(echoHandler), + }, }, // DoneHandler can be set to customize behavior on stopping a service. @@ -70,10 +73,13 @@ func ExampleService_Info() { defer nc.Close() config := micro.Config{ - Name: "EchoService", - Endpoint: micro.Endpoint{ - Subject: "echo", - Handler: micro.HandlerFunc(func(micro.Request) {}), + Name: "EchoService", + RootSubject: "svc", + Endpoints: map[string]micro.Endpoint{ + "Echo": { + Subject: "echo", + Handler: micro.HandlerFunc(func(r micro.Request) {}), + }, }, } @@ -86,7 +92,7 @@ func ExampleService_Info() { fmt.Println(info.Name) fmt.Println(info.Description) fmt.Println(info.Version) - fmt.Println(info.Subject) + fmt.Println(info.RootSubject) } func ExampleService_Stats() { @@ -97,11 +103,14 @@ func ExampleService_Stats() { defer nc.Close() config := micro.Config{ - Name: "EchoService", - Version: "0.1.0", - Endpoint: micro.Endpoint{ - Subject: "echo", - Handler: micro.HandlerFunc(func(micro.Request) {}), + Name: "EchoService", + Version: "0.1.0", + RootSubject: "svc", + Endpoints: map[string]micro.Endpoint{ + "Echo": { + Subject: "echo", + Handler: micro.HandlerFunc(func(r micro.Request) {}), + }, }, } @@ -110,8 +119,8 @@ func ExampleService_Stats() { // stats of a service instance stats := srv.Stats() - fmt.Println(stats.AverageProcessingTime) - fmt.Println(stats.ProcessingTime) + fmt.Println(stats.Endpoints["Echo"].AverageProcessingTime) + fmt.Println(stats.Endpoints["Echo"].ProcessingTime) } @@ -123,11 +132,14 @@ func ExampleService_Stop() { defer nc.Close() config := micro.Config{ - Name: "EchoService", - Version: "0.1.0", - Endpoint: micro.Endpoint{ - Subject: "echo", - Handler: micro.HandlerFunc(func(micro.Request) {}), + Name: "EchoService", + Version: "0.1.0", + RootSubject: "svc", + Endpoints: map[string]micro.Endpoint{ + "Echo": { + Subject: "echo", + Handler: micro.HandlerFunc(func(r micro.Request) {}), + }, }, } @@ -154,11 +166,14 @@ func ExampleService_Stopped() { defer nc.Close() config := micro.Config{ - Name: "EchoService", - Version: "0.1.0", - Endpoint: micro.Endpoint{ - Subject: "echo", - Handler: micro.HandlerFunc(func(micro.Request) {}), + Name: "EchoService", + Version: "0.1.0", + RootSubject: "svc", + Endpoints: map[string]micro.Endpoint{ + "Echo": { + Subject: "echo", + Handler: micro.HandlerFunc(func(r micro.Request) {}), + }, }, } @@ -183,11 +198,14 @@ func ExampleService_Reset() { defer nc.Close() config := micro.Config{ - Name: "EchoService", - Version: "0.1.0", - Endpoint: micro.Endpoint{ - Subject: "echo", - Handler: micro.HandlerFunc(func(micro.Request) {}), + Name: "EchoService", + Version: "0.1.0", + RootSubject: "svc", + Endpoints: map[string]micro.Endpoint{ + "Echo": { + Subject: "echo", + Handler: micro.HandlerFunc(func(r micro.Request) {}), + }, }, } diff --git a/micro/service.go b/micro/service.go index 7dd401f24..03dfa4eb2 100644 --- a/micro/service.go +++ b/micro/service.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "regexp" + "strings" "sync" "time" @@ -64,11 +65,15 @@ type ( Version string `json:"version"` } - // Stats is the type returned by STATS monitoring endpoint. - // It contains stats for a specific endpoint (either request handler or monitoring enpoints). Stats struct { ServiceIdentity - Type string `json:"type"` + Type string `json:"type"` + Endpoints map[string]*EndpointStats `json:"endpoints"` + } + + // Stats is the type returned by STATS monitoring endpoint. + // It contains stats for a specific endpoint (either request handler or monitoring enpoints). + EndpointStats struct { NumRequests int `json:"num_requests"` NumErrors int `json:"num_errors"` LastError string `json:"last_error"` @@ -89,7 +94,7 @@ type ( ServiceIdentity Type string `json:"type"` Description string `json:"description"` - Subject string `json:"subject"` + RootSubject string `json:"root_subject"` } // SchemaResp is the response value for SCHEMA requests. @@ -117,13 +122,34 @@ type ( // Config is a configuration of a service. Config struct { - Name string `json:"name"` - Version string `json:"version"` - Description string `json:"description"` - Schema Schema `json:"schema"` - Endpoint Endpoint `json:"endpoint"` + // Name represents the name of the service. + Name string `json:"name"` + + // RootSubject is the root subject of the service. + // All endpoints will be prefixed with root subject. + RootSubject string `json:"root_subject"` + + // Version is a SemVer compatible version string. + Version string `json:"version"` + + // Description of the service. + Description string `json:"description"` + + // Service schema + Schema Schema `json:"schema"` + + // Endpoints is a collenction of service endpoints. + // Map key serves as endpoint name. + Endpoints map[string]Endpoint `json:"endpoint"` + + // StatsHandler is a user-defined custom function + // used to calculate additional service stats. StatsHandler StatsHandler - DoneHandler DoneHandler + + // DoneHandler is invoked when all service subscription are stopped. + DoneHandler DoneHandler + + // ErrorHandler is invoked on any nats-related service error. ErrorHandler ErrHandler } @@ -144,7 +170,7 @@ type ( m sync.Mutex id string - reqSub *nats.Subscription + endpointSubs map[string]*nats.Subscription verbSubs map[string]*nats.Subscription stats *Stats conn *nats.Conn @@ -245,37 +271,43 @@ func AddService(nc *nats.Conn, config Config) (Service, error) { asyncDispatcher: asyncCallbacksHandler{ cbQueue: make(chan func(), 100), }, + endpointSubs: make(map[string]*nats.Subscription), + verbSubs: make(map[string]*nats.Subscription), } svcIdentity := ServiceIdentity{ Name: config.Name, ID: id, Version: config.Version, } - svc.verbSubs = make(map[string]*nats.Subscription) svc.stats = &Stats{ ServiceIdentity: svcIdentity, + Endpoints: make(map[string]*EndpointStats), } svc.setupAsyncCallbacks() go svc.asyncDispatcher.asyncCBDispatcher() - // Setup internal subscriptions. - var err error - - svc.reqSub, err = nc.QueueSubscribe(config.Endpoint.Subject, QG, func(m *nats.Msg) { - svc.reqHandler(&request{msg: m}) - }) - if err != nil { - svc.asyncDispatcher.close() - return nil, err - } - - ping := Ping{ - ServiceIdentity: svcIdentity, - Type: PingResponseType, + for name, endpoint := range config.Endpoints { + sub, err := nc.QueueSubscribe( + fmt.Sprintf("%s.%s", config.RootSubject, endpoint.Subject), + QG, + func(m *nats.Msg) { + svc.reqHandler(name, &request{msg: m}) + }, + ) + if err != nil { + svc.Stop() + svc.asyncDispatcher.close() + return nil, err + } + svc.endpointSubs[name] = sub + svc.stats.Endpoints[name] = &EndpointStats{ + Started: time.Now().UTC(), + } } + // Setup internal subscriptions. infoHandler := func(req Request) { response, _ := json.Marshal(svc.Info()) if err := req.Respond(response); err != nil { @@ -285,6 +317,10 @@ func AddService(nc *nats.Conn, config Config) (Service, error) { } } + ping := Ping{ + ServiceIdentity: svcIdentity, + Type: PingResponseType, + } pingHandler := func(req Request) { response, _ := json.Marshal(ping) if err := req.Respond(response); err != nil { @@ -334,7 +370,6 @@ func AddService(nc *nats.Conn, config Config) (Service, error) { svc.asyncDispatcher.close() return nil, err } - svc.stats.Started = time.Now().UTC() return svc, nil } @@ -359,14 +394,28 @@ func (ac *asyncCallbacksHandler) close() { close(ac.cbQueue) } -func (s *Config) valid() error { - if !serviceNameRegexp.MatchString(s.Name) { +func (c *Config) valid() error { + if !serviceNameRegexp.MatchString(c.Name) { return fmt.Errorf("%w: service name: name should not be empty and should consist of alphanumerical charactest, dashes and underscores", ErrConfigValidation) } - if !semVerRegexp.MatchString(s.Version) { + if !semVerRegexp.MatchString(c.Version) { return fmt.Errorf("%w: version: version should not be empty should match the SemVer format", ErrConfigValidation) } - return s.Endpoint.valid() + if len(c.Endpoints) == 0 { + return fmt.Errorf("%w: endpoints: service should have at least one endpoint configured", ErrConfigValidation) + } + if c.RootSubject == "" { + return fmt.Errorf("%w: root subject: cannot be empty", ErrConfigValidation) + } + for name, endpoint := range c.Endpoints { + if name == "" { + return fmt.Errorf("%w: endpoint name cannot be empty", ErrConfigValidation) + } + if err := endpoint.valid(); err != nil { + return fmt.Errorf("%w: endpoint %q: %s", ErrConfigValidation, name, err) + } + } + return nil } func (e *Endpoint) valid() error { @@ -407,8 +456,14 @@ func (svc *service) setupAsyncCallbacks() { }) } svc.m.Lock() - svc.stats.NumErrors++ - svc.stats.LastError = err.Error() + for name, endpointSub := range svc.endpointSubs { + if endpointSub.Subject == s.Subject { + endpointStats := svc.stats.Endpoints[name] + endpointStats.NumErrors++ + endpointStats.LastError = err.Error() + svc.stats.Endpoints[name] = endpointStats + } + } svc.m.Unlock() svc.Stop() svc.natsHandlers.asyncErr(c, s, err) @@ -425,8 +480,13 @@ func (svc *service) setupAsyncCallbacks() { }) } svc.m.Lock() - svc.stats.NumErrors++ - svc.stats.LastError = err.Error() + for name, endpointSub := range svc.endpointSubs { + if endpointSub.Subject == s.Subject { + endpointStats := svc.stats.Endpoints[name] + endpointStats.NumErrors++ + endpointStats.LastError = err.Error() + } + } svc.m.Unlock() svc.Stop() }) @@ -434,7 +494,7 @@ func (svc *service) setupAsyncCallbacks() { } func (svc *service) matchSubscriptionSubject(subj string) bool { - if svc.reqSub.Subject == subj { + if strings.HasPrefix(subj, svc.RootSubject) { return true } for _, verbSub := range svc.verbSubs { @@ -480,18 +540,23 @@ func (s *service) addInternalHandler(nc *nats.Conn, verb Verb, kind, id, name st } // reqHandler invokes the service request handler and modifies service stats -func (s *service) reqHandler(req *request) { +func (s *service) reqHandler(endpointName string, req *request) { + endpoint, ok := s.Endpoints[endpointName] + if !ok { + return + } start := time.Now() - s.Endpoint.Handler.Handle(req) + endpoint.Handler.Handle(req) s.m.Lock() - s.stats.NumRequests++ - s.stats.ProcessingTime += time.Since(start) - avgProcessingTime := s.stats.ProcessingTime.Nanoseconds() / int64(s.stats.NumRequests) - s.stats.AverageProcessingTime = time.Duration(avgProcessingTime) + stats := s.stats.Endpoints[endpointName] + stats.NumRequests++ + stats.ProcessingTime += time.Since(start) + avgProcessingTime := stats.ProcessingTime.Nanoseconds() / int64(stats.NumRequests) + stats.AverageProcessingTime = time.Duration(avgProcessingTime) if req.respondError != nil { - s.stats.NumErrors++ - s.stats.LastError = req.respondError.Error() + stats.NumErrors++ + stats.LastError = req.respondError.Error() } s.m.Unlock() } @@ -499,15 +564,15 @@ func (s *service) reqHandler(req *request) { // Stop drains the endpoint subscriptions and marks the service as stopped. func (s *service) Stop() error { s.m.Lock() + defer s.m.Unlock() if s.stopped { return nil } - defer s.m.Unlock() - if s.reqSub != nil { - if err := s.reqSub.Drain(); err != nil { + for name, sub := range s.endpointSubs { + if err := sub.Drain(); err != nil { return fmt.Errorf("draining subscription for request handler: %w", err) } - s.reqSub = nil + delete(s.endpointSubs, name) } var keys []string for key, sub := range s.verbSubs { @@ -543,7 +608,7 @@ func (s *service) Info() Info { }, Type: InfoResponseType, Description: s.Config.Description, - Subject: s.Config.Endpoint.Subject, + RootSubject: s.Config.RootSubject, } } @@ -551,24 +616,32 @@ func (s *service) Info() Info { func (s *service) Stats() Stats { s.m.Lock() defer s.m.Unlock() - if s.StatsHandler != nil { - s.stats.Data, _ = json.Marshal(s.StatsHandler(s.Endpoint)) - } info := s.Info() - return Stats{ + stats := Stats{ ServiceIdentity: ServiceIdentity{ Name: info.Name, ID: info.ID, Version: info.Version, }, - Type: StatsResponseType, - NumRequests: s.stats.NumRequests, - NumErrors: s.stats.NumErrors, - ProcessingTime: s.stats.ProcessingTime, - AverageProcessingTime: s.stats.AverageProcessingTime, - Started: s.stats.Started, - Data: s.stats.Data, + Endpoints: make(map[string]*EndpointStats), + } + for name, endpoint := range s.Endpoints { + currentStats := s.stats.Endpoints[name] + endpointStats := &EndpointStats{ + NumRequests: currentStats.NumRequests, + NumErrors: currentStats.NumErrors, + LastError: currentStats.LastError, + ProcessingTime: currentStats.ProcessingTime, + AverageProcessingTime: currentStats.AverageProcessingTime, + Started: currentStats.Started, + } + if s.StatsHandler != nil { + data, _ := json.Marshal(s.StatsHandler(endpoint)) + endpointStats.Data = data + } + stats.Endpoints[name] = endpointStats } + return stats } // Reset resets all statistics on a service instance. @@ -576,6 +649,10 @@ func (s *service) Reset() { s.m.Lock() s.stats = &Stats{ ServiceIdentity: s.Info().ServiceIdentity, + Endpoints: make(map[string]*EndpointStats), + } + for name := range s.Endpoints { + s.stats.Endpoints[name] = &EndpointStats{} } s.m.Unlock() } diff --git a/micro/test/service_test.go b/micro/test/service_test.go index 38816cc08..c1da1ecd6 100644 --- a/micro/test/service_test.go +++ b/micro/test/service_test.go @@ -66,9 +66,12 @@ func TestServiceBasics(t *testing.T) { Name: "CoolAddService", Version: "0.1.0", Description: "Add things together", - Endpoint: micro.Endpoint{ - Subject: "svc.add", - Handler: micro.HandlerFunc(doAdd), + RootSubject: "svc", + Endpoints: map[string]micro.Endpoint{ + "Add": { + Subject: "add", + Handler: micro.HandlerFunc(doAdd), + }, }, Schema: micro.Schema{Request: "", Response: ""}, } @@ -114,8 +117,8 @@ func TestServiceBasics(t *testing.T) { if err := json.Unmarshal(info.Data, &inf); err != nil { t.Fatalf("Unexpected error: %v", err) } - if inf.Subject != "svc.add" { - t.Fatalf("expected service subject to be srv.add: %s", inf.Subject) + if inf.RootSubject != "svc" { + t.Fatalf("expected service subject to be srv.add: %s", inf.RootSubject) } // Ping all services. Multiple responses. @@ -169,7 +172,7 @@ func TestServiceBasics(t *testing.T) { if err := json.Unmarshal(resp.Data, &srvStats); err != nil { t.Fatalf("Unexpected error: %v", err) } - requestsNum += srvStats.NumRequests + requestsNum += srvStats.Endpoints["Add"].NumRequests stats = append(stats, srvStats) } if len(stats) != 5 { @@ -182,12 +185,8 @@ func TestServiceBasics(t *testing.T) { } // Reset stats for a service svcs[0].Reset() - emptyStats := micro.Stats{ - Type: micro.StatsResponseType, - ServiceIdentity: svcs[0].Info().ServiceIdentity, - } - if !reflect.DeepEqual(svcs[0].Stats(), emptyStats) { + if svcs[0].Stats().Endpoints["Add"].NumRequests != 0 { t.Fatalf("Expected empty stats after reset; got: %+v", svcs[0].Stats()) } @@ -212,11 +211,43 @@ func TestAddService(t *testing.T) { { name: "minimal config", givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - Endpoint: micro.Endpoint{ - Subject: "test.sub", - Handler: micro.HandlerFunc(testHandler), + Name: "test_service", + Version: "0.1.0", + RootSubject: "test", + Endpoints: map[string]micro.Endpoint{ + "func": { + Subject: "func", + Handler: micro.HandlerFunc(testHandler), + }, + }, + }, + expectedPing: micro.Ping{ + Type: micro.PingResponseType, + ServiceIdentity: micro.ServiceIdentity{ + Name: "test_service", + Version: "0.1.0", + }, + }, + }, + { + name: "multiple endpoints", + givenConfig: micro.Config{ + Name: "test_service", + Version: "0.1.0", + RootSubject: "test", + Endpoints: map[string]micro.Endpoint{ + "func1": { + Subject: "func1", + Handler: micro.HandlerFunc(testHandler), + }, + "func2": { + Subject: "fun2", + Handler: micro.HandlerFunc(testHandler), + }, + "func3": { + Subject: "func3", + Handler: micro.HandlerFunc(testHandler), + }, }, }, expectedPing: micro.Ping{ @@ -230,11 +261,14 @@ func TestAddService(t *testing.T) { { name: "with done handler, no handlers on nats connection", givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - Endpoint: micro.Endpoint{ - Subject: "test.sub", - Handler: micro.HandlerFunc(testHandler), + Name: "test_service", + Version: "0.1.0", + RootSubject: "test", + Endpoints: map[string]micro.Endpoint{ + "func": { + Subject: "func", + Handler: micro.HandlerFunc(testHandler), + }, }, DoneHandler: func(micro.Service) { doneService <- struct{}{} @@ -251,11 +285,14 @@ func TestAddService(t *testing.T) { { name: "with error handler, no handlers on nats connection", givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - Endpoint: micro.Endpoint{ - Subject: "test.sub", - Handler: micro.HandlerFunc(testHandler), + Name: "test_service", + Version: "0.1.0", + RootSubject: "test", + Endpoints: map[string]micro.Endpoint{ + "func": { + Subject: "func", + Handler: micro.HandlerFunc(testHandler), + }, }, ErrorHandler: func(micro.Service, *micro.NATSError) { errService <- struct{}{} @@ -273,11 +310,14 @@ func TestAddService(t *testing.T) { { name: "with error handler, no handlers on nats connection, error on monitoring subject", givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - Endpoint: micro.Endpoint{ - Subject: "test.sub", - Handler: micro.HandlerFunc(testHandler), + Name: "test_service", + Version: "0.1.0", + RootSubject: "test", + Endpoints: map[string]micro.Endpoint{ + "func": { + Subject: "func", + Handler: micro.HandlerFunc(testHandler), + }, }, ErrorHandler: func(micro.Service, *micro.NATSError) { errService <- struct{}{} @@ -295,11 +335,14 @@ func TestAddService(t *testing.T) { { name: "with done handler, append to nats handlers", givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - Endpoint: micro.Endpoint{ - Subject: "test.sub", - Handler: micro.HandlerFunc(testHandler), + Name: "test_service", + Version: "0.1.0", + RootSubject: "test", + Endpoints: map[string]micro.Endpoint{ + "func": { + Subject: "func", + Handler: micro.HandlerFunc(testHandler), + }, }, DoneHandler: func(micro.Service) { doneService <- struct{}{} @@ -323,11 +366,14 @@ func TestAddService(t *testing.T) { { name: "with error handler, append to nats handlers", givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - Endpoint: micro.Endpoint{ - Subject: "test.sub", - Handler: micro.HandlerFunc(testHandler), + Name: "test_service", + Version: "0.1.0", + RootSubject: "test", + Endpoints: map[string]micro.Endpoint{ + "func": { + Subject: "func", + Handler: micro.HandlerFunc(testHandler), + }, }, DoneHandler: func(micro.Service) { doneService <- struct{}{} @@ -350,11 +396,14 @@ func TestAddService(t *testing.T) { { name: "with error handler, append to nats handlers, error on monitoring subject", givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - Endpoint: micro.Endpoint{ - Subject: "test.sub", - Handler: micro.HandlerFunc(testHandler), + Name: "test_service", + Version: "0.1.0", + RootSubject: "test", + Endpoints: map[string]micro.Endpoint{ + "func": { + Subject: "func", + Handler: micro.HandlerFunc(testHandler), + }, }, DoneHandler: func(micro.Service) { doneService <- struct{}{} @@ -378,11 +427,14 @@ func TestAddService(t *testing.T) { { name: "validation error, invalid service name", givenConfig: micro.Config{ - Name: "test_service!", - Version: "0.1.0", - Endpoint: micro.Endpoint{ - Subject: "test.sub", - Handler: micro.HandlerFunc(testHandler), + Name: "test_service!", + Version: "0.1.0", + RootSubject: "test", + Endpoints: map[string]micro.Endpoint{ + "func": { + Subject: "func", + Handler: micro.HandlerFunc(testHandler), + }, }, }, withError: micro.ErrConfigValidation, @@ -390,11 +442,14 @@ func TestAddService(t *testing.T) { { name: "validation error, invalid version", givenConfig: micro.Config{ - Name: "test_service!", - Version: "abc", - Endpoint: micro.Endpoint{ - Subject: "test.sub", - Handler: micro.HandlerFunc(testHandler), + Name: "test_service!", + Version: "abc", + RootSubject: "test", + Endpoints: map[string]micro.Endpoint{ + "func": { + Subject: "func", + Handler: micro.HandlerFunc(testHandler), + }, }, }, withError: micro.ErrConfigValidation, @@ -402,11 +457,14 @@ func TestAddService(t *testing.T) { { name: "validation error, empty subject", givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - Endpoint: micro.Endpoint{ - Subject: "", - Handler: micro.HandlerFunc(testHandler), + Name: "test_service", + Version: "0.1.0", + RootSubject: "test", + Endpoints: map[string]micro.Endpoint{ + "func": { + Subject: "", + Handler: micro.HandlerFunc(testHandler), + }, }, }, withError: micro.ErrConfigValidation, @@ -414,11 +472,29 @@ func TestAddService(t *testing.T) { { name: "validation error, no handler", givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - Endpoint: micro.Endpoint{ - Subject: "test_subject", - Handler: nil, + Name: "test_service", + Version: "0.1.0", + RootSubject: "test", + Endpoints: map[string]micro.Endpoint{ + "func": { + Subject: "func", + Handler: nil, + }, + }, + }, + withError: micro.ErrConfigValidation, + }, + { + name: "validation error, no root subject", + givenConfig: micro.Config{ + Name: "test_service", + Version: "0.1.0", + RootSubject: "", + Endpoints: map[string]micro.Endpoint{ + "func": { + Subject: "func", + Handler: nil, + }, }, }, withError: micro.ErrConfigValidation, @@ -550,11 +626,14 @@ func TestMonitoringHandlers(t *testing.T) { } config := micro.Config{ - Name: "test_service", - Version: "0.1.0", - Endpoint: micro.Endpoint{ - Subject: "test.sub", - Handler: micro.HandlerFunc(func(micro.Request) {}), + Name: "test_service", + Version: "0.1.0", + RootSubject: "test", + Endpoints: map[string]micro.Endpoint{ + "func": { + Subject: "func", + Handler: micro.HandlerFunc(func(r micro.Request) {}), + }, }, Schema: micro.Schema{ Request: "some_schema", @@ -626,7 +705,7 @@ func TestMonitoringHandlers(t *testing.T) { Version: "0.1.0", ID: info.ID, }, - Subject: "test.sub", + RootSubject: "test", }, }, { @@ -639,7 +718,7 @@ func TestMonitoringHandlers(t *testing.T) { Version: "0.1.0", ID: info.ID, }, - Subject: "test.sub", + RootSubject: "test", }, }, { @@ -652,7 +731,7 @@ func TestMonitoringHandlers(t *testing.T) { Version: "0.1.0", ID: info.ID, }, - Subject: "test.sub", + RootSubject: "test", }, }, { @@ -774,22 +853,28 @@ func TestServiceStats(t *testing.T) { { name: "without schema or stats handler", config: micro.Config{ - Name: "test_service", - Version: "0.1.0", - Endpoint: micro.Endpoint{ - Subject: "test.sub", - Handler: micro.HandlerFunc(handler), + Name: "test_service", + Version: "0.1.0", + RootSubject: "test", + Endpoints: map[string]micro.Endpoint{ + "func": { + Subject: "func", + Handler: micro.HandlerFunc(handler), + }, }, }, }, { name: "with stats handler", config: micro.Config{ - Name: "test_service", - Version: "0.1.0", - Endpoint: micro.Endpoint{ - Subject: "test.sub", - Handler: micro.HandlerFunc(handler), + Name: "test_service", + Version: "0.1.0", + RootSubject: "test", + Endpoints: map[string]micro.Endpoint{ + "func": { + Subject: "func", + Handler: micro.HandlerFunc(handler), + }, }, StatsHandler: func(e micro.Endpoint) interface{} { return map[string]interface{}{ @@ -804,11 +889,14 @@ func TestServiceStats(t *testing.T) { { name: "with schema", config: micro.Config{ - Name: "test_service", - Version: "0.1.0", - Endpoint: micro.Endpoint{ - Subject: "test.sub", - Handler: micro.HandlerFunc(handler), + Name: "test_service", + Version: "0.1.0", + RootSubject: "test", + Endpoints: map[string]micro.Endpoint{ + "func": { + Subject: "func", + Handler: micro.HandlerFunc(handler), + }, }, Schema: micro.Schema{ Request: "some_schema", @@ -818,11 +906,14 @@ func TestServiceStats(t *testing.T) { { name: "with schema and stats handler", config: micro.Config{ - Name: "test_service", - Version: "0.1.0", - Endpoint: micro.Endpoint{ - Subject: "test.sub", - Handler: micro.HandlerFunc(handler), + Name: "test_service", + Version: "0.1.0", + RootSubject: "test", + Endpoints: map[string]micro.Endpoint{ + "func": { + Subject: "func", + Handler: micro.HandlerFunc(handler), + }, }, Schema: micro.Schema{ Request: "some_schema", @@ -856,14 +947,14 @@ func TestServiceStats(t *testing.T) { } defer srv.Stop() for i := 0; i < 10; i++ { - if _, err := nc.Request(srv.Info().Subject, []byte("msg"), time.Second); err != nil { + if _, err := nc.Request("test.func", []byte("msg"), time.Second); err != nil { t.Fatalf("Unexpected error: %v", err) } } // Malformed request, missing reply subjtct // This should be reflected in errors - if err := nc.Publish(srv.Info().Subject, []byte("err")); err != nil { + if err := nc.Publish("test.func", []byte("err")); err != nil { t.Fatalf("Unexpected error: %v", err) } time.Sleep(10 * time.Millisecond) @@ -885,15 +976,15 @@ func TestServiceStats(t *testing.T) { if stats.ID != info.ID { t.Errorf("Unexpected service name; want: %s; got: %s", info.ID, stats.ID) } - if stats.NumRequests != 11 { - t.Errorf("Unexpected num_requests; want: 11; got: %d", stats.NumRequests) + if stats.Endpoints["func"].NumRequests != 11 { + t.Errorf("Unexpected num_requests; want: 11; got: %d", stats.Endpoints["func"].NumRequests) } - if stats.NumErrors != 1 { - t.Errorf("Unexpected num_errors; want: 1; got: %d", stats.NumErrors) + if stats.Endpoints["func"].NumErrors != 1 { + t.Errorf("Unexpected num_errors; want: 1; got: %d", stats.Endpoints["func"].NumErrors) } if test.expectedStats != nil { var data map[string]interface{} - if err := json.Unmarshal(stats.Data, &data); err != nil { + if err := json.Unmarshal(stats.Endpoints["func"].Data, &data); err != nil { t.Fatalf("Unexpected error: %v", err) } if !reflect.DeepEqual(data, test.expectedStats) { @@ -1050,9 +1141,12 @@ func TestRequestRespond(t *testing.T) { Name: "CoolService", Version: "0.1.0", Description: "test service", - Endpoint: micro.Endpoint{ - Subject: "svc.test", - Handler: micro.HandlerFunc(handler), + RootSubject: "test", + Endpoints: map[string]micro.Endpoint{ + "func": { + Subject: "func", + Handler: micro.HandlerFunc(handler), + }, }, }) if err != nil { @@ -1061,7 +1155,7 @@ func TestRequestRespond(t *testing.T) { defer svc.Stop() resp, err := nc.RequestMsg(&nats.Msg{ - Subject: svc.Info().Subject, + Subject: "test.func", Data: nil, Header: nats.Header{"key": []string{"value"}}, }, 50*time.Millisecond) From 3985969ef6c3799b8210a2ff465a773dea657907 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Mon, 9 Jan 2023 15:41:27 +0100 Subject: [PATCH 2/6] Add grouping --- micro/example_handler_test.go | 23 ++- micro/example_package_test.go | 45 +++-- micro/example_test.go | 110 +++++----- micro/request.go | 2 +- micro/service.go | 369 ++++++++++++++++++++-------------- micro/test/service_test.go | 297 ++++++++++++--------------- 6 files changed, 454 insertions(+), 392 deletions(-) diff --git a/micro/example_handler_test.go b/micro/example_handler_test.go index 89ca2e5fc..004b25a1c 100644 --- a/micro/example_handler_test.go +++ b/micro/example_handler_test.go @@ -1,3 +1,16 @@ +// Copyright 2022-2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package micro_test import ( @@ -33,16 +46,14 @@ func ExampleHandler() { Name: "RectangleAreaService", Version: "0.1.0", RootSubject: "area", - Endpoints: map[string]micro.Endpoint{ - "Rectangle": { - Subject: "rec", - Handler: rec, - }, - }, } svc, err := micro.AddService(nc, config) if err != nil { log.Fatal(err) } defer svc.Stop() + _, err = svc.AddEndpoint("Rectangle", rec) + if err != nil { + log.Fatal(err) + } } diff --git a/micro/example_package_test.go b/micro/example_package_test.go index 13a76a38c..6ec3a867c 100644 --- a/micro/example_package_test.go +++ b/micro/example_package_test.go @@ -1,4 +1,4 @@ -// Copyright 2022 The NATS Authors +// Copyright 2022-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -60,31 +60,32 @@ func Example() { Version: "0.1.0", Description: "Increment numbers", RootSubject: "numbers", - Endpoints: map[string]micro.Endpoint{ - "Increment": { - // service handler - Handler: micro.HandlerFunc(incrementHandler), - // a unique subject serving as a service endpoint - Subject: "increment", - }, - "Multiply": { - Handler: micro.HandlerFunc(multiply), - Subject: "multiply", - }, - }, } - // Multiple instances of the servcice with the same name can be created. - // Requests to a service with the same name will be load-balanced. - for i := 0; i < 5; i++ { - svc, err := micro.AddService(nc, config) - if err != nil { - log.Fatal(err) - } - defer svc.Stop() + svc, err := micro.AddService(nc, config) + if err != nil { + log.Fatal(err) + } + defer svc.Stop() + + // register endpoints on a service + _, err = svc.AddEndpoint("Increment", micro.HandlerFunc(incrementHandler)) + if err != nil { + log.Fatal(err) + } + _, err = svc.AddEndpoint("Multiply", micro.HandlerFunc(multiply)) + if err != nil { + log.Fatal(err) + } + + // add a group + v1 := svc.AddGroup("v1") + _, err = v1.AddEndpoint("Increment", micro.HandlerFunc(incrementHandler)) + if err != nil { + log.Fatal(err) } // send a request to a service - resp, err := nc.Request("numbers.increment", []byte("3"), 1*time.Second) + resp, err := nc.Request("numbers.v1.increment", []byte("3"), 1*time.Second) if err != nil { log.Fatal(err) } diff --git a/micro/example_test.go b/micro/example_test.go index dca8dc8a1..54dff0e96 100644 --- a/micro/example_test.go +++ b/micro/example_test.go @@ -1,4 +1,4 @@ -// Copyright 2022 The NATS Authors +// Copyright 2022-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -29,22 +29,11 @@ func ExampleAddService() { } defer nc.Close() - echoHandler := func(req micro.Request) { - req.Respond(req.Data()) - } - config := micro.Config{ Name: "EchoService", Version: "1.0.0", Description: "Send back what you receive", RootSubject: "svc", - Endpoints: map[string]micro.Endpoint{ - "Echo": { - Subject: "echo", - Handler: micro.HandlerFunc(echoHandler), - }, - }, - // DoneHandler can be set to customize behavior on stopping a service. DoneHandler: func(srv micro.Service) { info := srv.Info() @@ -65,6 +54,67 @@ func ExampleAddService() { defer srv.Stop() } +func ExampleService_AddEndpoint() { + nc, err := nats.Connect("127.0.0.1:4222") + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + echoHandler := func(req micro.Request) { + req.Respond(req.Data()) + } + + config := micro.Config{ + Name: "EchoService", + Version: "1.0.0", + RootSubject: "svc", + } + + srv, err := micro.AddService(nc, config) + if err != nil { + log.Fatal(err) + } + + endpoint, err := srv.AddEndpoint("Echo", micro.HandlerFunc(echoHandler)) + if err != nil { + log.Fatal(err) + } + // Endpoints can be stopped individually or all at once using srv.Stop() + defer endpoint.Stop() +} + +func ExampleService_AddGroup() { + nc, err := nats.Connect("127.0.0.1:4222") + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + echoHandler := func(req micro.Request) { + req.Respond(req.Data()) + } + + config := micro.Config{ + Name: "EchoService", + Version: "1.0.0", + RootSubject: "svc", + } + + srv, err := micro.AddService(nc, config) + if err != nil { + log.Fatal(err) + } + + v1 := srv.AddGroup("v1") + + // endpoint will be registered under "v1.Echo" subject + _, err = v1.AddEndpoint("Echo", micro.HandlerFunc(echoHandler)) + if err != nil { + log.Fatal(err) + } +} + func ExampleService_Info() { nc, err := nats.Connect("127.0.0.1:4222") if err != nil { @@ -75,12 +125,6 @@ func ExampleService_Info() { config := micro.Config{ Name: "EchoService", RootSubject: "svc", - Endpoints: map[string]micro.Endpoint{ - "Echo": { - Subject: "echo", - Handler: micro.HandlerFunc(func(r micro.Request) {}), - }, - }, } srv, _ := micro.AddService(nc, config) @@ -106,21 +150,15 @@ func ExampleService_Stats() { Name: "EchoService", Version: "0.1.0", RootSubject: "svc", - Endpoints: map[string]micro.Endpoint{ - "Echo": { - Subject: "echo", - Handler: micro.HandlerFunc(func(r micro.Request) {}), - }, - }, } srv, _ := micro.AddService(nc, config) - + srv.AddEndpoint("Echo", micro.HandlerFunc(func(r micro.Request) {})) // stats of a service instance stats := srv.Stats() - fmt.Println(stats.Endpoints["Echo"].AverageProcessingTime) - fmt.Println(stats.Endpoints["Echo"].ProcessingTime) + fmt.Println(stats.Endpoints[0].AverageProcessingTime) + fmt.Println(stats.Endpoints[0].ProcessingTime) } @@ -135,12 +173,6 @@ func ExampleService_Stop() { Name: "EchoService", Version: "0.1.0", RootSubject: "svc", - Endpoints: map[string]micro.Endpoint{ - "Echo": { - Subject: "echo", - Handler: micro.HandlerFunc(func(r micro.Request) {}), - }, - }, } srv, _ := micro.AddService(nc, config) @@ -169,12 +201,6 @@ func ExampleService_Stopped() { Name: "EchoService", Version: "0.1.0", RootSubject: "svc", - Endpoints: map[string]micro.Endpoint{ - "Echo": { - Subject: "echo", - Handler: micro.HandlerFunc(func(r micro.Request) {}), - }, - }, } srv, _ := micro.AddService(nc, config) @@ -201,12 +227,6 @@ func ExampleService_Reset() { Name: "EchoService", Version: "0.1.0", RootSubject: "svc", - Endpoints: map[string]micro.Endpoint{ - "Echo": { - Subject: "echo", - Handler: micro.HandlerFunc(func(r micro.Request) {}), - }, - }, } srv, _ := micro.AddService(nc, config) diff --git a/micro/request.go b/micro/request.go index dbc4ce0f7..2d1b05686 100644 --- a/micro/request.go +++ b/micro/request.go @@ -1,4 +1,4 @@ -// Copyright 2022 The NATS Authors +// Copyright 2022-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/micro/service.go b/micro/service.go index 03dfa4eb2..4894e243d 100644 --- a/micro/service.go +++ b/micro/service.go @@ -1,4 +1,4 @@ -// Copyright 2022 The NATS Authors +// Copyright 2022-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -31,14 +31,26 @@ import ( // This functionality is EXPERIMENTAL and may be changed in later releases. type ( + + // Service exposes methods to operate on a service instance. Service interface { + // AddEndpoint registers endpoint with given name on a specific service. + // If service has root subject, the endpoint will be registered on {root_subject}.{name} subject. + // Otherwise, the endpoint subject is the same as endpoint name. + AddEndpoint(string, Handler) (Endpoint, error) + + // AddGroup returns a Group interface, allowing for more complex endpoint topologies. + // A group can be used to register endpoints with given prefix. + // Root subject will be prepended to the prefix (if available). + AddGroup(string) Group + // Info returns the service info. Info() Info // Stats returns statistics for the service endpoint and all monitoring endpoints. Stats() Stats - // Reset resets all statistics on a service instance. + // Reset resets all statistics (for all endpoints) on a service instance. Reset() // Stop drains the endpoint subscriptions and marks the service as stopped. @@ -48,6 +60,19 @@ type ( Stopped() bool } + // Group allows for grouping endpoints on a service. + // + // Endpoints created using AddEndpoint will be grouped under common prefix (group name) + // New groups can also be derived from a group using AddGroup. + Group interface { + // AddGroup creates a new group, prefixed by this group's prefix. + AddGroup(string) Group + + // AddEndpoint registers new endpoints on a service. + // The endpoint's subject will be prefixed with the group prefix. + AddEndpoint(string, Handler) (Endpoint, error) + } + // ErrHandler is a function used to configure a custom error handler for a service, ErrHandler func(Service, *NATSError) @@ -65,21 +90,24 @@ type ( Version string `json:"version"` } + // Stats is the type returned by STATS monitoring endpoint. + // It contains stats of all registered endpoints. Stats struct { ServiceIdentity - Type string `json:"type"` - Endpoints map[string]*EndpointStats `json:"endpoints"` + Type string `json:"type"` + Started time.Time `json:"started"` + Endpoints []*EndpointStats `json:"endpoints"` } - // Stats is the type returned by STATS monitoring endpoint. - // It contains stats for a specific endpoint (either request handler or monitoring enpoints). + // EndpointStats contains stats for a specific endpoint. EndpointStats struct { + Name string `json:"name"` + Subject string `json:"subject"` NumRequests int `json:"num_requests"` NumErrors int `json:"num_errors"` LastError string `json:"last_error"` ProcessingTime time.Duration `json:"processing_time"` AverageProcessingTime time.Duration `json:"average_processing_time"` - Started time.Time `json:"started"` Data json.RawMessage `json:"data,omitempty"` } @@ -92,9 +120,10 @@ type ( // Info is the basic information about a service type. Info struct { ServiceIdentity - Type string `json:"type"` - Description string `json:"description"` - RootSubject string `json:"root_subject"` + Type string `json:"type"` + Description string `json:"description"` + RootSubject string `json:"root_subject"` + Endpoints []string `json:"endpoints"` } // SchemaResp is the response value for SCHEMA requests. @@ -111,10 +140,26 @@ type ( Response string `json:"response"` } - // Endpoint is used to configure a subject and handler for a service. - Endpoint struct { - Subject string `json:"subject"` - Handler Handler + // Endpoint manages a service endpoint. + // There is not need to call Stop on each endpoint on cleanup. + // Instead, Service.Stop will stop all endpoints registered on a service. + Endpoint interface { + Stop() error + Subject() string + } + + endpoint struct { + service *service + subject string + handler Handler + + stats EndpointStats + subscription *nats.Subscription + } + + group struct { + service *service + prefix string } // Verb represents a name of the monitoring service. @@ -125,7 +170,7 @@ type ( // Name represents the name of the service. Name string `json:"name"` - // RootSubject is the root subject of the service. + // RootSubject is the optional root subject of the service. // All endpoints will be prefixed with root subject. RootSubject string `json:"root_subject"` @@ -138,10 +183,6 @@ type ( // Service schema Schema Schema `json:"schema"` - // Endpoints is a collenction of service endpoints. - // Map key serves as endpoint name. - Endpoints map[string]Endpoint `json:"endpoint"` - // StatsHandler is a user-defined custom function // used to calculate additional service stats. StatsHandler StatsHandler @@ -170,10 +211,10 @@ type ( m sync.Mutex id string - endpointSubs map[string]*nats.Subscription + endpoints []*endpoint verbSubs map[string]*nats.Subscription - stats *Stats - conn *nats.Conn + started time.Time + nc *nats.Conn natsHandlers handlers stopped bool @@ -253,8 +294,8 @@ func (s Verb) String() string { } // AddService adds a microservice. -// It will enable internal common services (PING, STATS, INFO and SCHEMA) as well as -// the actual service handler on the subject provided in config.Endpoint +// It will enable internal common services (PING, STATS, INFO and SCHEMA). +// Request handlers have to be registered separately using Service.AddEndpoint. // A service name, version and Endpoint configuration are required to add a service. // AddService returns a [Service] interface, allowing service management. // Each service is assigned a unique ID. @@ -266,47 +307,24 @@ func AddService(nc *nats.Conn, config Config) (Service, error) { id := nuid.Next() svc := &service{ Config: config, - conn: nc, + nc: nc, id: id, asyncDispatcher: asyncCallbacksHandler{ cbQueue: make(chan func(), 100), }, - endpointSubs: make(map[string]*nats.Subscription), - verbSubs: make(map[string]*nats.Subscription), + verbSubs: make(map[string]*nats.Subscription), + endpoints: make([]*endpoint, 0), } svcIdentity := ServiceIdentity{ Name: config.Name, ID: id, Version: config.Version, } - svc.stats = &Stats{ - ServiceIdentity: svcIdentity, - Endpoints: make(map[string]*EndpointStats), - } svc.setupAsyncCallbacks() go svc.asyncDispatcher.asyncCBDispatcher() - for name, endpoint := range config.Endpoints { - sub, err := nc.QueueSubscribe( - fmt.Sprintf("%s.%s", config.RootSubject, endpoint.Subject), - QG, - func(m *nats.Msg) { - svc.reqHandler(name, &request{msg: m}) - }, - ) - if err != nil { - svc.Stop() - svc.asyncDispatcher.close() - return nil, err - } - svc.endpointSubs[name] = sub - svc.stats.Endpoints[name] = &EndpointStats{ - Started: time.Now().UTC(), - } - } - // Setup internal subscriptions. infoHandler := func(req Request) { response, _ := json.Marshal(svc.Info()) @@ -371,9 +389,53 @@ func AddService(nc *nats.Conn, config Config) (Service, error) { return nil, err } + svc.started = time.Now().UTC() return svc, nil } +func (s *service) AddEndpoint(name string, handler Handler) (Endpoint, error) { + subject := name + if s.RootSubject != "" { + subject = fmt.Sprintf("%s.%s", s.RootSubject, name) + } + return addEndpoint(s, name, subject, handler) +} + +func addEndpoint(s *service, name, subject string, handler Handler) (Endpoint, error) { + endpoint := &endpoint{ + service: s, + subject: subject, + handler: handler, + } + sub, err := s.nc.QueueSubscribe( + subject, + QG, + func(m *nats.Msg) { + s.reqHandler(endpoint, &request{msg: m}) + }, + ) + if err != nil { + return nil, err + } + endpoint.subscription = sub + s.endpoints = append(s.endpoints, endpoint) + endpoint.stats = EndpointStats{ + Name: name, + Subject: subject, + } + return endpoint, nil +} + +func (s *service) AddGroup(name string) Group { + if s.RootSubject != "" { + name = fmt.Sprintf("%s.%s", s.RootSubject, name) + } + return &group{ + service: s, + prefix: name, + } +} + // dispatch is responsible for calling any async callbacks func (ac *asyncCallbacksHandler) asyncCBDispatcher() { for { @@ -401,94 +463,67 @@ func (c *Config) valid() error { if !semVerRegexp.MatchString(c.Version) { return fmt.Errorf("%w: version: version should not be empty should match the SemVer format", ErrConfigValidation) } - if len(c.Endpoints) == 0 { - return fmt.Errorf("%w: endpoints: service should have at least one endpoint configured", ErrConfigValidation) - } - if c.RootSubject == "" { - return fmt.Errorf("%w: root subject: cannot be empty", ErrConfigValidation) - } - for name, endpoint := range c.Endpoints { - if name == "" { - return fmt.Errorf("%w: endpoint name cannot be empty", ErrConfigValidation) - } - if err := endpoint.valid(); err != nil { - return fmt.Errorf("%w: endpoint %q: %s", ErrConfigValidation, name, err) - } - } - return nil -} - -func (e *Endpoint) valid() error { - if e.Subject == "" { - return fmt.Errorf("%w: endpoint: subject is required", ErrConfigValidation) - } - if e.Handler == nil { - return fmt.Errorf("%w: endpoint: handler is required", ErrConfigValidation) - } return nil } -func (svc *service) setupAsyncCallbacks() { - svc.m.Lock() - defer svc.m.Unlock() - svc.natsHandlers.closed = svc.conn.ClosedHandler() - if svc.natsHandlers.closed != nil { - svc.conn.SetClosedHandler(func(c *nats.Conn) { - svc.Stop() - svc.natsHandlers.closed(c) +func (s *service) setupAsyncCallbacks() { + s.m.Lock() + defer s.m.Unlock() + s.natsHandlers.closed = s.nc.ClosedHandler() + if s.natsHandlers.closed != nil { + s.nc.SetClosedHandler(func(c *nats.Conn) { + s.Stop() + s.natsHandlers.closed(c) }) } else { - svc.conn.SetClosedHandler(func(c *nats.Conn) { - svc.Stop() + s.nc.SetClosedHandler(func(c *nats.Conn) { + s.Stop() }) } - svc.natsHandlers.asyncErr = svc.conn.ErrorHandler() - if svc.natsHandlers.asyncErr != nil { - svc.conn.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) { - if !svc.matchSubscriptionSubject(s.Subject) { - svc.natsHandlers.asyncErr(c, s, err) + s.natsHandlers.asyncErr = s.nc.ErrorHandler() + if s.natsHandlers.asyncErr != nil { + s.nc.SetErrorHandler(func(c *nats.Conn, sub *nats.Subscription, err error) { + if !s.matchSubscriptionSubject(sub.Subject) { + s.natsHandlers.asyncErr(c, sub, err) } - if svc.Config.ErrorHandler != nil { - svc.Config.ErrorHandler(svc, &NATSError{ - Subject: s.Subject, + if s.Config.ErrorHandler != nil { + s.Config.ErrorHandler(s, &NATSError{ + Subject: sub.Subject, Description: err.Error(), }) } - svc.m.Lock() - for name, endpointSub := range svc.endpointSubs { - if endpointSub.Subject == s.Subject { - endpointStats := svc.stats.Endpoints[name] - endpointStats.NumErrors++ - endpointStats.LastError = err.Error() - svc.stats.Endpoints[name] = endpointStats + s.m.Lock() + for _, endpoint := range s.endpoints { + if endpoint.Subject() == sub.Subject { + endpoint.stats.NumErrors++ + endpoint.stats.LastError = err.Error() } } - svc.m.Unlock() - svc.Stop() - svc.natsHandlers.asyncErr(c, s, err) + s.m.Unlock() + s.Stop() + s.natsHandlers.asyncErr(c, sub, err) }) } else { - svc.conn.SetErrorHandler(func(c *nats.Conn, s *nats.Subscription, err error) { - if !svc.matchSubscriptionSubject(s.Subject) { + s.nc.SetErrorHandler(func(c *nats.Conn, sub *nats.Subscription, err error) { + if !s.matchSubscriptionSubject(sub.Subject) { return } - if svc.Config.ErrorHandler != nil { - svc.Config.ErrorHandler(svc, &NATSError{ - Subject: s.Subject, + if s.Config.ErrorHandler != nil { + s.Config.ErrorHandler(s, &NATSError{ + Subject: sub.Subject, Description: err.Error(), }) } - svc.m.Lock() - for name, endpointSub := range svc.endpointSubs { - if endpointSub.Subject == s.Subject { - endpointStats := svc.stats.Endpoints[name] - endpointStats.NumErrors++ - endpointStats.LastError = err.Error() + s.m.Lock() + for _, endpoint := range s.endpoints { + if endpoint.subject == sub.Subject { + endpoint.stats.NumErrors++ + endpoint.stats.LastError = err.Error() } } - svc.m.Unlock() - svc.Stop() + s.m.Unlock() + s.Stop() }) } } @@ -540,23 +575,18 @@ func (s *service) addInternalHandler(nc *nats.Conn, verb Verb, kind, id, name st } // reqHandler invokes the service request handler and modifies service stats -func (s *service) reqHandler(endpointName string, req *request) { - endpoint, ok := s.Endpoints[endpointName] - if !ok { - return - } +func (s *service) reqHandler(endpoint *endpoint, req *request) { start := time.Now() - endpoint.Handler.Handle(req) + endpoint.handler.Handle(req) s.m.Lock() - stats := s.stats.Endpoints[endpointName] - stats.NumRequests++ - stats.ProcessingTime += time.Since(start) - avgProcessingTime := stats.ProcessingTime.Nanoseconds() / int64(stats.NumRequests) - stats.AverageProcessingTime = time.Duration(avgProcessingTime) + endpoint.stats.NumRequests++ + endpoint.stats.ProcessingTime += time.Since(start) + avgProcessingTime := endpoint.stats.ProcessingTime.Nanoseconds() / int64(endpoint.stats.NumRequests) + endpoint.stats.AverageProcessingTime = time.Duration(avgProcessingTime) if req.respondError != nil { - stats.NumErrors++ - stats.LastError = req.respondError.Error() + endpoint.stats.NumErrors++ + endpoint.stats.LastError = req.respondError.Error() } s.m.Unlock() } @@ -568,11 +598,10 @@ func (s *service) Stop() error { if s.stopped { return nil } - for name, sub := range s.endpointSubs { - if err := sub.Drain(); err != nil { - return fmt.Errorf("draining subscription for request handler: %w", err) + for _, e := range s.endpoints { + if err := e.Stop(); err != nil { + return err } - delete(s.endpointSubs, name) } var keys []string for key, sub := range s.verbSubs { @@ -584,7 +613,7 @@ func (s *service) Stop() error { for _, key := range keys { delete(s.verbSubs, key) } - restoreAsyncHandlers(s.conn, s.natsHandlers) + restoreAsyncHandlers(s.nc, s.natsHandlers) s.stopped = true if s.DoneHandler != nil { s.asyncDispatcher.push(func() { s.DoneHandler(s) }) @@ -600,6 +629,10 @@ func restoreAsyncHandlers(nc *nats.Conn, handlers handlers) { // Info returns information about the service func (s *service) Info() Info { + endpoints := make([]string, 0, len(s.endpoints)) + for _, e := range s.endpoints { + endpoints = append(endpoints, e.subject) + } return Info{ ServiceIdentity: ServiceIdentity{ Name: s.Config.Name, @@ -609,6 +642,7 @@ func (s *service) Info() Info { Type: InfoResponseType, Description: s.Config.Description, RootSubject: s.Config.RootSubject, + Endpoints: endpoints, } } @@ -623,23 +657,21 @@ func (s *service) Stats() Stats { ID: info.ID, Version: info.Version, }, - Endpoints: make(map[string]*EndpointStats), + Endpoints: make([]*EndpointStats, 0), } - for name, endpoint := range s.Endpoints { - currentStats := s.stats.Endpoints[name] + for _, endpoint := range s.endpoints { endpointStats := &EndpointStats{ - NumRequests: currentStats.NumRequests, - NumErrors: currentStats.NumErrors, - LastError: currentStats.LastError, - ProcessingTime: currentStats.ProcessingTime, - AverageProcessingTime: currentStats.AverageProcessingTime, - Started: currentStats.Started, + NumRequests: endpoint.stats.NumRequests, + NumErrors: endpoint.stats.NumErrors, + LastError: endpoint.stats.LastError, + ProcessingTime: endpoint.stats.ProcessingTime, + AverageProcessingTime: endpoint.stats.AverageProcessingTime, } if s.StatsHandler != nil { data, _ := json.Marshal(s.StatsHandler(endpoint)) endpointStats.Data = data } - stats.Endpoints[name] = endpointStats + stats.Endpoints = append(stats.Endpoints, endpointStats) } return stats } @@ -647,12 +679,8 @@ func (s *service) Stats() Stats { // Reset resets all statistics on a service instance. func (s *service) Reset() { s.m.Lock() - s.stats = &Stats{ - ServiceIdentity: s.Info().ServiceIdentity, - Endpoints: make(map[string]*EndpointStats), - } - for name := range s.Endpoints { - s.stats.Endpoints[name] = &EndpointStats{} + for _, endpoint := range s.endpoints { + endpoint.reset() } s.m.Unlock() } @@ -664,6 +692,45 @@ func (s *service) Stopped() bool { return s.stopped } +func (e *NATSError) Error() string { + return fmt.Sprintf("%q: %s", e.Subject, e.Description) +} + +func (g *group) AddEndpoint(name string, handler Handler) (Endpoint, error) { + subject := fmt.Sprintf("%s.%s", g.prefix, name) + return addEndpoint(g.service, name, subject, handler) +} + +func (g *group) AddGroup(name string) Group { + return &group{ + service: g.service, + prefix: fmt.Sprintf("%s.%s", g.prefix, name), + } +} + +func (e *endpoint) Subject() string { + return e.subject +} + +func (e *endpoint) Stop() error { + if err := e.subscription.Drain(); err != nil { + return fmt.Errorf("draining subscription for request handler: %w", err) + } + for i, endpoint := range e.service.endpoints { + if endpoint.subject == e.subject { + e.service.endpoints = append(e.service.endpoints[:i], e.service.endpoints[i+1:]...) + } + } + return nil +} + +func (e *endpoint) reset() { + e.stats = EndpointStats{ + Name: e.stats.Name, + Subject: e.stats.Subject, + } +} + // ControlSubject returns monitoring subjects used by the Service. // Providing a verb is mandatory (it should be one of Ping, Schema, Info or Stats). // Depending on whether kind and id are provided, ControlSubject will return one of the following: @@ -686,7 +753,3 @@ func ControlSubject(verb Verb, name, id string) (string, error) { } return fmt.Sprintf("%s.%s.%s.%s", APIPrefix, verbStr, name, id), nil } - -func (e *NATSError) Error() string { - return fmt.Sprintf("%q: %s", e.Subject, e.Description) -} diff --git a/micro/test/service_test.go b/micro/test/service_test.go index c1da1ecd6..e335fd795 100644 --- a/micro/test/service_test.go +++ b/micro/test/service_test.go @@ -1,4 +1,4 @@ -// Copyright 2022 The NATS Authors +// Copyright 2022-2023 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -67,13 +67,7 @@ func TestServiceBasics(t *testing.T) { Version: "0.1.0", Description: "Add things together", RootSubject: "svc", - Endpoints: map[string]micro.Endpoint{ - "Add": { - Subject: "add", - Handler: micro.HandlerFunc(doAdd), - }, - }, - Schema: micro.Schema{Request: "", Response: ""}, + Schema: micro.Schema{Request: "", Response: ""}, } for i := 0; i < 5; i++ { @@ -81,6 +75,10 @@ func TestServiceBasics(t *testing.T) { if err != nil { t.Fatalf("Expected to create Service, got %v", err) } + _, err = svc.AddEndpoint("add", micro.HandlerFunc(doAdd)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } defer svc.Stop() svcs = append(svcs, svc) } @@ -172,7 +170,7 @@ func TestServiceBasics(t *testing.T) { if err := json.Unmarshal(resp.Data, &srvStats); err != nil { t.Fatalf("Unexpected error: %v", err) } - requestsNum += srvStats.Endpoints["Add"].NumRequests + requestsNum += srvStats.Endpoints[0].NumRequests stats = append(stats, srvStats) } if len(stats) != 5 { @@ -186,7 +184,7 @@ func TestServiceBasics(t *testing.T) { // Reset stats for a service svcs[0].Reset() - if svcs[0].Stats().Endpoints["Add"].NumRequests != 0 { + if svcs[0].Stats().Endpoints[0].NumRequests != 0 { t.Fatalf("Expected empty stats after reset; got: %+v", svcs[0].Stats()) } @@ -202,6 +200,7 @@ func TestAddService(t *testing.T) { tests := []struct { name string givenConfig micro.Config + endpoints []string natsClosedHandler nats.ConnHandler natsErrorHandler nats.ErrHandler asyncErrorSubject string @@ -214,13 +213,8 @@ func TestAddService(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(testHandler), - }, - }, }, + endpoints: []string{"func"}, expectedPing: micro.Ping{ Type: micro.PingResponseType, ServiceIdentity: micro.ServiceIdentity{ @@ -235,21 +229,8 @@ func TestAddService(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func1": { - Subject: "func1", - Handler: micro.HandlerFunc(testHandler), - }, - "func2": { - Subject: "fun2", - Handler: micro.HandlerFunc(testHandler), - }, - "func3": { - Subject: "func3", - Handler: micro.HandlerFunc(testHandler), - }, - }, }, + endpoints: []string{"func1", "func2", "func3"}, expectedPing: micro.Ping{ Type: micro.PingResponseType, ServiceIdentity: micro.ServiceIdentity{ @@ -264,16 +245,11 @@ func TestAddService(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(testHandler), - }, - }, DoneHandler: func(micro.Service) { doneService <- struct{}{} }, }, + endpoints: []string{"func"}, expectedPing: micro.Ping{ Type: micro.PingResponseType, ServiceIdentity: micro.ServiceIdentity{ @@ -288,16 +264,11 @@ func TestAddService(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(testHandler), - }, - }, ErrorHandler: func(micro.Service, *micro.NATSError) { errService <- struct{}{} }, }, + endpoints: []string{"func"}, expectedPing: micro.Ping{ Type: micro.PingResponseType, ServiceIdentity: micro.ServiceIdentity{ @@ -313,16 +284,11 @@ func TestAddService(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(testHandler), - }, - }, ErrorHandler: func(micro.Service, *micro.NATSError) { errService <- struct{}{} }, }, + endpoints: []string{"func"}, expectedPing: micro.Ping{ Type: micro.PingResponseType, ServiceIdentity: micro.ServiceIdentity{ @@ -338,16 +304,11 @@ func TestAddService(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(testHandler), - }, - }, DoneHandler: func(micro.Service) { doneService <- struct{}{} }, }, + endpoints: []string{"func"}, natsClosedHandler: func(c *nats.Conn) { closedNats <- struct{}{} }, @@ -369,16 +330,11 @@ func TestAddService(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(testHandler), - }, - }, DoneHandler: func(micro.Service) { doneService <- struct{}{} }, }, + endpoints: []string{"func"}, natsClosedHandler: func(c *nats.Conn) { closedNats <- struct{}{} }, @@ -399,16 +355,11 @@ func TestAddService(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(testHandler), - }, - }, DoneHandler: func(micro.Service) { doneService <- struct{}{} }, }, + endpoints: []string{"func"}, natsClosedHandler: func(c *nats.Conn) { closedNats <- struct{}{} }, @@ -430,13 +381,8 @@ func TestAddService(t *testing.T) { Name: "test_service!", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(testHandler), - }, - }, }, + endpoints: []string{"func"}, withError: micro.ErrConfigValidation, }, { @@ -445,58 +391,8 @@ func TestAddService(t *testing.T) { Name: "test_service!", Version: "abc", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(testHandler), - }, - }, - }, - withError: micro.ErrConfigValidation, - }, - { - name: "validation error, empty subject", - givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "", - Handler: micro.HandlerFunc(testHandler), - }, - }, - }, - withError: micro.ErrConfigValidation, - }, - { - name: "validation error, no handler", - givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: nil, - }, - }, - }, - withError: micro.ErrConfigValidation, - }, - { - name: "validation error, no root subject", - givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - RootSubject: "", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: nil, - }, - }, }, + endpoints: []string{"func"}, withError: micro.ErrConfigValidation, }, } @@ -525,6 +421,11 @@ func TestAddService(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + for _, endpoint := range test.endpoints { + if _, err := srv.AddEndpoint(endpoint, micro.HandlerFunc(testHandler)); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } info := srv.Info() pingSubject, err := micro.ControlSubject(micro.PingVerb, info.Name, info.ID) @@ -610,6 +511,92 @@ func TestAddService(t *testing.T) { } } +func TestGroups(t *testing.T) { + tests := []struct { + name string + rootSubject string + endpointName string + groups []string + expectedSubject string + }{ + { + name: "empty root subject, no groups", + endpointName: "foo", + expectedSubject: "foo", + }, + { + name: "with root subject, no groups", + rootSubject: "root", + endpointName: "foo", + expectedSubject: "root.foo", + }, + { + name: "empty root subject, with group", + endpointName: "foo", + groups: []string{"g1"}, + expectedSubject: "g1.foo", + }, + { + name: "with root subject and group", + endpointName: "foo", + rootSubject: "root", + groups: []string{"g1"}, + expectedSubject: "root.g1.foo", + }, + { + name: "with root subject and multiple groups", + endpointName: "foo", + rootSubject: "root", + groups: []string{"g1", "g2", "g3"}, + expectedSubject: "root.g1.g2.g3.foo", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + + srv, err := micro.AddService(nc, micro.Config{ + Name: "test_service", + RootSubject: test.rootSubject, + Version: "0.0.1", + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer srv.Stop() + + var endpoint micro.Endpoint + if len(test.groups) > 0 { + group := srv.AddGroup(test.groups[0]) + for _, g := range test.groups[1:] { + group = group.AddGroup(g) + } + endpoint, err = group.AddEndpoint(test.endpointName, micro.HandlerFunc(func(r micro.Request) {})) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } else { + endpoint, err = srv.AddEndpoint(test.endpointName, micro.HandlerFunc(func(r micro.Request) {})) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + + if endpoint.Subject() != test.expectedSubject { + t.Fatalf("Invalid subject; want: %s, got: %s", test.expectedSubject, endpoint.Subject()) + } + }) + } +} + func TestMonitoringHandlers(t *testing.T) { s := RunServerOnPort(-1) defer s.Shutdown() @@ -629,12 +616,6 @@ func TestMonitoringHandlers(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(func(r micro.Request) {}), - }, - }, Schema: micro.Schema{ Request: "some_schema", }, @@ -644,6 +625,10 @@ func TestMonitoringHandlers(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + _, err = srv.AddEndpoint("func", micro.HandlerFunc(func(r micro.Request) {})) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } defer func() { srv.Stop() if !srv.Stopped() { @@ -706,6 +691,7 @@ func TestMonitoringHandlers(t *testing.T) { ID: info.ID, }, RootSubject: "test", + Endpoints: []string{"test.func"}, }, }, { @@ -719,6 +705,7 @@ func TestMonitoringHandlers(t *testing.T) { ID: info.ID, }, RootSubject: "test", + Endpoints: []string{"test.func"}, }, }, { @@ -732,6 +719,7 @@ func TestMonitoringHandlers(t *testing.T) { ID: info.ID, }, RootSubject: "test", + Endpoints: []string{"test.func"}, }, }, { @@ -856,12 +844,6 @@ func TestServiceStats(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(handler), - }, - }, }, }, { @@ -870,12 +852,6 @@ func TestServiceStats(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(handler), - }, - }, StatsHandler: func(e micro.Endpoint) interface{} { return map[string]interface{}{ "key": "val", @@ -892,12 +868,6 @@ func TestServiceStats(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(handler), - }, - }, Schema: micro.Schema{ Request: "some_schema", }, @@ -909,12 +879,6 @@ func TestServiceStats(t *testing.T) { Name: "test_service", Version: "0.1.0", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(handler), - }, - }, Schema: micro.Schema{ Request: "some_schema", }, @@ -945,6 +909,9 @@ func TestServiceStats(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } + if _, err := srv.AddEndpoint("func", micro.HandlerFunc(handler)); err != nil { + t.Fatalf("Unexpected error: %v", err) + } defer srv.Stop() for i := 0; i < 10; i++ { if _, err := nc.Request("test.func", []byte("msg"), time.Second); err != nil { @@ -976,15 +943,15 @@ func TestServiceStats(t *testing.T) { if stats.ID != info.ID { t.Errorf("Unexpected service name; want: %s; got: %s", info.ID, stats.ID) } - if stats.Endpoints["func"].NumRequests != 11 { - t.Errorf("Unexpected num_requests; want: 11; got: %d", stats.Endpoints["func"].NumRequests) + if stats.Endpoints[0].NumRequests != 11 { + t.Errorf("Unexpected num_requests; want: 11; got: %d", stats.Endpoints[0].NumRequests) } - if stats.Endpoints["func"].NumErrors != 1 { - t.Errorf("Unexpected num_errors; want: 1; got: %d", stats.Endpoints["func"].NumErrors) + if stats.Endpoints[0].NumErrors != 1 { + t.Errorf("Unexpected num_errors; want: 1; got: %d", stats.Endpoints[0].NumErrors) } if test.expectedStats != nil { var data map[string]interface{} - if err := json.Unmarshal(stats.Endpoints["func"].Data, &data); err != nil { + if err := json.Unmarshal(stats.Endpoints[0].Data, &data); err != nil { t.Fatalf("Unexpected error: %v", err) } if !reflect.DeepEqual(data, test.expectedStats) { @@ -1098,6 +1065,9 @@ func TestRequestRespond(t *testing.T) { if val := req.Headers().Get("key"); val != "value" { t.Fatalf("Expected headers in the request") } + if !bytes.Equal(req.Data(), []byte("req")) { + t.Fatalf("Invalid request data; want: %q; got: %q", "req", req.Data()) + } if errCode == "" && errDesc == "" { if resp, ok := respData.([]byte); ok { err := req.Respond(resp, micro.WithHeaders(test.respondHeaders)) @@ -1142,21 +1112,18 @@ func TestRequestRespond(t *testing.T) { Version: "0.1.0", Description: "test service", RootSubject: "test", - Endpoints: map[string]micro.Endpoint{ - "func": { - Subject: "func", - Handler: micro.HandlerFunc(handler), - }, - }, }) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer svc.Stop() + if _, err := svc.AddEndpoint("func", micro.HandlerFunc(handler)); err != nil { + t.Fatalf("Unexpected error: %v", err) + } resp, err := nc.RequestMsg(&nats.Msg{ Subject: "test.func", - Data: nil, + Data: []byte("req"), Header: nats.Header{"key": []string{"value"}}, }, 50*time.Millisecond) if test.withRespondError != nil { From 36cfad796d8d49544d6f7b07be8b9ffbc44c8f60 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Tue, 10 Jan 2023 12:53:53 +0100 Subject: [PATCH 3/6] Add base handler, fix error handler --- micro/example_handler_test.go | 13 +- micro/example_package_test.go | 36 ++-- micro/example_test.go | 59 ++++--- micro/service.go | 164 ++++++++++-------- micro/test/service_test.go | 309 ++++++++++++++++++++++------------ 5 files changed, 354 insertions(+), 227 deletions(-) diff --git a/micro/example_handler_test.go b/micro/example_handler_test.go index 004b25a1c..27b1db557 100644 --- a/micro/example_handler_test.go +++ b/micro/example_handler_test.go @@ -43,17 +43,16 @@ func ExampleHandler() { rec := rectangle{10, 5} config := micro.Config{ - Name: "RectangleAreaService", - Version: "0.1.0", - RootSubject: "area", + Name: "RectangleAreaService", + Version: "0.1.0", + Endpoint: µ.EndpointConfig{ + Subject: "area.rectangle", + Handler: rec, + }, } svc, err := micro.AddService(nc, config) if err != nil { log.Fatal(err) } defer svc.Stop() - _, err = svc.AddEndpoint("Rectangle", rec) - if err != nil { - log.Fatal(err) - } } diff --git a/micro/example_package_test.go b/micro/example_package_test.go index 6ec3a867c..5fe7644ad 100644 --- a/micro/example_package_test.go +++ b/micro/example_package_test.go @@ -1,4 +1,4 @@ -// Copyright 2022-2023 The NATS Authors +// Copyright 2022 The NATS Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -32,6 +32,11 @@ func Example() { // endpoint handler - in this case, HandlerFunc is used, // which is a built-in implementation of Handler interface + echoHandler := func(req micro.Request) { + req.Respond(req.Data()) + } + + // second endpoint incrementHandler := func(req micro.Request) { val, err := strconv.Atoi(string(req.Data())) if err != nil { @@ -43,8 +48,8 @@ func Example() { req.Respond([]byte(strconv.Itoa(responseData))) } - // second endpoint - multiply := func(req micro.Request) { + // third endpoint + multiplyHandler := func(req micro.Request) { val, err := strconv.Atoi(string(req.Data())) if err != nil { req.Error("400", "request data should be a number", nil) @@ -59,7 +64,12 @@ func Example() { Name: "IncrementService", Version: "0.1.0", Description: "Increment numbers", - RootSubject: "numbers", + + // base handler - for simple services with single endpoints this is sufficient + Endpoint: µ.EndpointConfig{ + Subject: "echo", + Handler: micro.HandlerFunc(echoHandler), + }, } svc, err := micro.AddService(nc, config) if err != nil { @@ -67,25 +77,21 @@ func Example() { } defer svc.Stop() - // register endpoints on a service - _, err = svc.AddEndpoint("Increment", micro.HandlerFunc(incrementHandler)) - if err != nil { - log.Fatal(err) - } - _, err = svc.AddEndpoint("Multiply", micro.HandlerFunc(multiply)) + // add a group to aggregate endpoints under common prefix + numbers := svc.AddGroup("numbers") + + // register endpoints in a group + err = numbers.AddEndpoint("Increment", "increment", micro.HandlerFunc(incrementHandler)) if err != nil { log.Fatal(err) } - - // add a group - v1 := svc.AddGroup("v1") - _, err = v1.AddEndpoint("Increment", micro.HandlerFunc(incrementHandler)) + err = numbers.AddEndpoint("Multiply", "multiply", micro.HandlerFunc(multiplyHandler)) if err != nil { log.Fatal(err) } // send a request to a service - resp, err := nc.Request("numbers.v1.increment", []byte("3"), 1*time.Second) + resp, err := nc.Request("numbers.increment", []byte("3"), 1*time.Second) if err != nil { log.Fatal(err) } diff --git a/micro/example_test.go b/micro/example_test.go index 54dff0e96..0b9a4637f 100644 --- a/micro/example_test.go +++ b/micro/example_test.go @@ -29,11 +29,14 @@ func ExampleAddService() { } defer nc.Close() + echoHandler := func(req micro.Request) { + req.Respond(req.Data()) + } + config := micro.Config{ Name: "EchoService", Version: "1.0.0", Description: "Send back what you receive", - RootSubject: "svc", // DoneHandler can be set to customize behavior on stopping a service. DoneHandler: func(srv micro.Service) { info := srv.Info() @@ -45,6 +48,12 @@ func ExampleAddService() { info := srv.Info() fmt.Printf("Service %q returned an error on subject %q: %s", info.Name, err.Subject, err.Description) }, + + // optional base handler + Endpoint: µ.EndpointConfig{ + Subject: "echo", + Handler: micro.HandlerFunc(echoHandler), + }, } srv, err := micro.AddService(nc, config) @@ -66,9 +75,8 @@ func ExampleService_AddEndpoint() { } config := micro.Config{ - Name: "EchoService", - Version: "1.0.0", - RootSubject: "svc", + Name: "EchoService", + Version: "1.0.0", } srv, err := micro.AddService(nc, config) @@ -76,12 +84,10 @@ func ExampleService_AddEndpoint() { log.Fatal(err) } - endpoint, err := srv.AddEndpoint("Echo", micro.HandlerFunc(echoHandler)) + err = srv.AddEndpoint("Echo", "echo", micro.HandlerFunc(echoHandler)) if err != nil { log.Fatal(err) } - // Endpoints can be stopped individually or all at once using srv.Stop() - defer endpoint.Stop() } func ExampleService_AddGroup() { @@ -96,9 +102,8 @@ func ExampleService_AddGroup() { } config := micro.Config{ - Name: "EchoService", - Version: "1.0.0", - RootSubject: "svc", + Name: "EchoService", + Version: "1.0.0", } srv, err := micro.AddService(nc, config) @@ -108,8 +113,8 @@ func ExampleService_AddGroup() { v1 := srv.AddGroup("v1") - // endpoint will be registered under "v1.Echo" subject - _, err = v1.AddEndpoint("Echo", micro.HandlerFunc(echoHandler)) + // endpoint will be registered under "v1.echo" subject + err = v1.AddEndpoint("Echo", "echo", micro.HandlerFunc(echoHandler)) if err != nil { log.Fatal(err) } @@ -123,8 +128,7 @@ func ExampleService_Info() { defer nc.Close() config := micro.Config{ - Name: "EchoService", - RootSubject: "svc", + Name: "EchoService", } srv, _ := micro.AddService(nc, config) @@ -136,7 +140,7 @@ func ExampleService_Info() { fmt.Println(info.Name) fmt.Println(info.Description) fmt.Println(info.Version) - fmt.Println(info.RootSubject) + fmt.Println(info.Subjects) } func ExampleService_Stats() { @@ -147,13 +151,15 @@ func ExampleService_Stats() { defer nc.Close() config := micro.Config{ - Name: "EchoService", - Version: "0.1.0", - RootSubject: "svc", + Name: "EchoService", + Version: "0.1.0", + Endpoint: µ.EndpointConfig{ + Subject: "echo", + Handler: micro.HandlerFunc(func(r micro.Request) {}), + }, } srv, _ := micro.AddService(nc, config) - srv.AddEndpoint("Echo", micro.HandlerFunc(func(r micro.Request) {})) // stats of a service instance stats := srv.Stats() @@ -170,9 +176,8 @@ func ExampleService_Stop() { defer nc.Close() config := micro.Config{ - Name: "EchoService", - Version: "0.1.0", - RootSubject: "svc", + Name: "EchoService", + Version: "0.1.0", } srv, _ := micro.AddService(nc, config) @@ -198,9 +203,8 @@ func ExampleService_Stopped() { defer nc.Close() config := micro.Config{ - Name: "EchoService", - Version: "0.1.0", - RootSubject: "svc", + Name: "EchoService", + Version: "0.1.0", } srv, _ := micro.AddService(nc, config) @@ -224,9 +228,8 @@ func ExampleService_Reset() { defer nc.Close() config := micro.Config{ - Name: "EchoService", - Version: "0.1.0", - RootSubject: "svc", + Name: "EchoService", + Version: "0.1.0", } srv, _ := micro.AddService(nc, config) diff --git a/micro/service.go b/micro/service.go index 4894e243d..d62272fb2 100644 --- a/micro/service.go +++ b/micro/service.go @@ -34,14 +34,11 @@ type ( // Service exposes methods to operate on a service instance. Service interface { - // AddEndpoint registers endpoint with given name on a specific service. - // If service has root subject, the endpoint will be registered on {root_subject}.{name} subject. - // Otherwise, the endpoint subject is the same as endpoint name. - AddEndpoint(string, Handler) (Endpoint, error) + // AddEndpoint registers endpoint with given name on a specific subject. + AddEndpoint(name, subject string, handler Handler) error // AddGroup returns a Group interface, allowing for more complex endpoint topologies. // A group can be used to register endpoints with given prefix. - // Root subject will be prepended to the prefix (if available). AddGroup(string) Group // Info returns the service info. @@ -70,7 +67,7 @@ type ( // AddEndpoint registers new endpoints on a service. // The endpoint's subject will be prefixed with the group prefix. - AddEndpoint(string, Handler) (Endpoint, error) + AddEndpoint(name, subject string, handler Handler) error } // ErrHandler is a function used to configure a custom error handler for a service, @@ -81,7 +78,7 @@ type ( // StatsHandler is a function used to configure a custom STATS endpoint. // It should return a value which can be serialized to JSON. - StatsHandler func(Endpoint) interface{} + StatsHandler func(*Endpoint) interface{} // ServiceIdentity contains fields helping to identity a service instance. ServiceIdentity struct { @@ -122,8 +119,7 @@ type ( ServiceIdentity Type string `json:"type"` Description string `json:"description"` - RootSubject string `json:"root_subject"` - Endpoints []string `json:"endpoints"` + Subjects []string `json:"subjects"` } // SchemaResp is the response value for SCHEMA requests. @@ -141,17 +137,9 @@ type ( } // Endpoint manages a service endpoint. - // There is not need to call Stop on each endpoint on cleanup. - // Instead, Service.Stop will stop all endpoints registered on a service. - Endpoint interface { - Stop() error - Subject() string - } - - endpoint struct { + Endpoint struct { + EndpointConfig service *service - subject string - handler Handler stats EndpointStats subscription *nats.Subscription @@ -170,9 +158,10 @@ type ( // Name represents the name of the service. Name string `json:"name"` - // RootSubject is the optional root subject of the service. - // All endpoints will be prefixed with root subject. - RootSubject string `json:"root_subject"` + // Endpoint is an optional endpoint configuration. + // More complex, multi-endpoint services can be configured using + // Service.AddGroup and Service.AddEndpoint methods. + Endpoint *EndpointConfig `json:"endpoint"` // Version is a SemVer compatible version string. Version string `json:"version"` @@ -194,6 +183,11 @@ type ( ErrorHandler ErrHandler } + EndpointConfig struct { + Subject string + Handler Handler + } + // NATSError represents an error returned by a NATS Subscription. // It contains a subject on which the subscription failed, so that // it can be linked with a specific service endpoint. @@ -211,7 +205,7 @@ type ( m sync.Mutex id string - endpoints []*endpoint + endpoints []*Endpoint verbSubs map[string]*nats.Subscription started time.Time nc *nats.Conn @@ -262,8 +256,9 @@ const ( var ( // this regular expression is suggested regexp for semver validation: https://semver.org/ - semVerRegexp = regexp.MustCompile(`^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$`) - serviceNameRegexp = regexp.MustCompile(`^[A-Za-z0-9\-_]+$`) + semVerRegexp = regexp.MustCompile(`^(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(?:-((?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(?:\.(?:0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*))?(?:\+([0-9a-zA-Z-]+(?:\.[0-9a-zA-Z-]+)*))?$`) + nameRegexp = regexp.MustCompile(`^[A-Za-z0-9\-_]+$`) + subjectRegexp = regexp.MustCompile(`^[^ >]+[>]?$`) ) // Common errors returned by the Service framework. @@ -313,7 +308,7 @@ func AddService(nc *nats.Conn, config Config) (Service, error) { cbQueue: make(chan func(), 100), }, verbSubs: make(map[string]*nats.Subscription), - endpoints: make([]*endpoint, 0), + endpoints: make([]*Endpoint, 0), } svcIdentity := ServiceIdentity{ Name: config.Name, @@ -325,6 +320,13 @@ func AddService(nc *nats.Conn, config Config) (Service, error) { go svc.asyncDispatcher.asyncCBDispatcher() + if config.Endpoint != nil { + if err := addEndpoint(svc, "default", config.Endpoint.Subject, config.Endpoint.Handler); err != nil { + svc.asyncDispatcher.close() + return nil, err + } + } + // Setup internal subscriptions. infoHandler := func(req Request) { response, _ := json.Marshal(svc.Info()) @@ -393,19 +395,23 @@ func AddService(nc *nats.Conn, config Config) (Service, error) { return svc, nil } -func (s *service) AddEndpoint(name string, handler Handler) (Endpoint, error) { - subject := name - if s.RootSubject != "" { - subject = fmt.Sprintf("%s.%s", s.RootSubject, name) - } +func (s *service) AddEndpoint(name, subject string, handler Handler) error { return addEndpoint(s, name, subject, handler) } -func addEndpoint(s *service, name, subject string, handler Handler) (Endpoint, error) { - endpoint := &endpoint{ +func addEndpoint(s *service, name, subject string, handler Handler) error { + if !nameRegexp.MatchString(name) { + return fmt.Errorf("%w: invalid endpoint name", ErrConfigValidation) + } + if !subjectRegexp.MatchString(subject) { + return fmt.Errorf("%w: invalid endpoint subject", ErrConfigValidation) + } + endpoint := &Endpoint{ service: s, - subject: subject, - handler: handler, + EndpointConfig: EndpointConfig{ + Subject: subject, + Handler: handler, + }, } sub, err := s.nc.QueueSubscribe( subject, @@ -415,7 +421,7 @@ func addEndpoint(s *service, name, subject string, handler Handler) (Endpoint, e }, ) if err != nil { - return nil, err + return err } endpoint.subscription = sub s.endpoints = append(s.endpoints, endpoint) @@ -423,13 +429,10 @@ func addEndpoint(s *service, name, subject string, handler Handler) (Endpoint, e Name: name, Subject: subject, } - return endpoint, nil + return nil } func (s *service) AddGroup(name string) Group { - if s.RootSubject != "" { - name = fmt.Sprintf("%s.%s", s.RootSubject, name) - } return &group{ service: s, prefix: name, @@ -457,7 +460,7 @@ func (ac *asyncCallbacksHandler) close() { } func (c *Config) valid() error { - if !serviceNameRegexp.MatchString(c.Name) { + if !nameRegexp.MatchString(c.Name) { return fmt.Errorf("%w: service name: name should not be empty and should consist of alphanumerical charactest, dashes and underscores", ErrConfigValidation) } if !semVerRegexp.MatchString(c.Version) { @@ -484,8 +487,10 @@ func (s *service) setupAsyncCallbacks() { s.natsHandlers.asyncErr = s.nc.ErrorHandler() if s.natsHandlers.asyncErr != nil { s.nc.SetErrorHandler(func(c *nats.Conn, sub *nats.Subscription, err error) { - if !s.matchSubscriptionSubject(sub.Subject) { + endpoint, match := s.matchSubscriptionSubject(sub.Subject) + if !match { s.natsHandlers.asyncErr(c, sub, err) + return } if s.Config.ErrorHandler != nil { s.Config.ErrorHandler(s, &NATSError{ @@ -494,11 +499,9 @@ func (s *service) setupAsyncCallbacks() { }) } s.m.Lock() - for _, endpoint := range s.endpoints { - if endpoint.Subject() == sub.Subject { - endpoint.stats.NumErrors++ - endpoint.stats.LastError = err.Error() - } + if endpoint != nil { + endpoint.stats.NumErrors++ + endpoint.stats.LastError = err.Error() } s.m.Unlock() s.Stop() @@ -506,7 +509,8 @@ func (s *service) setupAsyncCallbacks() { }) } else { s.nc.SetErrorHandler(func(c *nats.Conn, sub *nats.Subscription, err error) { - if !s.matchSubscriptionSubject(sub.Subject) { + endpoint, match := s.matchSubscriptionSubject(sub.Subject) + if !match { return } if s.Config.ErrorHandler != nil { @@ -516,11 +520,9 @@ func (s *service) setupAsyncCallbacks() { }) } s.m.Lock() - for _, endpoint := range s.endpoints { - if endpoint.subject == sub.Subject { - endpoint.stats.NumErrors++ - endpoint.stats.LastError = err.Error() - } + if endpoint != nil { + endpoint.stats.NumErrors++ + endpoint.stats.LastError = err.Error() } s.m.Unlock() s.Stop() @@ -528,16 +530,35 @@ func (s *service) setupAsyncCallbacks() { } } -func (svc *service) matchSubscriptionSubject(subj string) bool { - if strings.HasPrefix(subj, svc.RootSubject) { - return true - } - for _, verbSub := range svc.verbSubs { +func (s *service) matchSubscriptionSubject(subj string) (*Endpoint, bool) { + for _, verbSub := range s.verbSubs { if verbSub.Subject == subj { + return nil, true + } + } + for _, e := range s.endpoints { + if matchEndpointSubject(e.Subject, subj) { + return e, true + } + } + return nil, false +} + +func matchEndpointSubject(endpointSubject, literalSubject string) bool { + subjectTokens := strings.Split(literalSubject, ".") + endpointTokens := strings.Split(endpointSubject, ".") + if len(endpointTokens) > len(subjectTokens) { + return false + } + for i, et := range endpointTokens { + if i == len(endpointTokens)-1 && et == ">" { return true } + if et != subjectTokens[i] && et != "*" { + return false + } } - return false + return true } // verbHandlers generates control handlers for a specific verb. @@ -575,9 +596,9 @@ func (s *service) addInternalHandler(nc *nats.Conn, verb Verb, kind, id, name st } // reqHandler invokes the service request handler and modifies service stats -func (s *service) reqHandler(endpoint *endpoint, req *request) { +func (s *service) reqHandler(endpoint *Endpoint, req *request) { start := time.Now() - endpoint.handler.Handle(req) + endpoint.Handler.Handle(req) s.m.Lock() endpoint.stats.NumRequests++ endpoint.stats.ProcessingTime += time.Since(start) @@ -599,7 +620,7 @@ func (s *service) Stop() error { return nil } for _, e := range s.endpoints { - if err := e.Stop(); err != nil { + if err := e.stop(); err != nil { return err } } @@ -631,7 +652,7 @@ func restoreAsyncHandlers(nc *nats.Conn, handlers handlers) { func (s *service) Info() Info { endpoints := make([]string, 0, len(s.endpoints)) for _, e := range s.endpoints { - endpoints = append(endpoints, e.subject) + endpoints = append(endpoints, e.Subject) } return Info{ ServiceIdentity: ServiceIdentity{ @@ -641,8 +662,7 @@ func (s *service) Info() Info { }, Type: InfoResponseType, Description: s.Config.Description, - RootSubject: s.Config.RootSubject, - Endpoints: endpoints, + Subjects: endpoints, } } @@ -696,8 +716,8 @@ func (e *NATSError) Error() string { return fmt.Sprintf("%q: %s", e.Subject, e.Description) } -func (g *group) AddEndpoint(name string, handler Handler) (Endpoint, error) { - subject := fmt.Sprintf("%s.%s", g.prefix, name) +func (g *group) AddEndpoint(name, subject string, handler Handler) error { + subject = fmt.Sprintf("%s.%s", g.prefix, subject) return addEndpoint(g.service, name, subject, handler) } @@ -708,23 +728,19 @@ func (g *group) AddGroup(name string) Group { } } -func (e *endpoint) Subject() string { - return e.subject -} - -func (e *endpoint) Stop() error { +func (e *Endpoint) stop() error { if err := e.subscription.Drain(); err != nil { return fmt.Errorf("draining subscription for request handler: %w", err) } for i, endpoint := range e.service.endpoints { - if endpoint.subject == e.subject { + if endpoint.Subject == e.Subject { e.service.endpoints = append(e.service.endpoints[:i], e.service.endpoints[i+1:]...) } } return nil } -func (e *endpoint) reset() { +func (e *Endpoint) reset() { e.stats = EndpointStats{ Name: e.stats.Name, Subject: e.stats.Subject, diff --git a/micro/test/service_test.go b/micro/test/service_test.go index e335fd795..322a7daff 100644 --- a/micro/test/service_test.go +++ b/micro/test/service_test.go @@ -66,8 +66,11 @@ func TestServiceBasics(t *testing.T) { Name: "CoolAddService", Version: "0.1.0", Description: "Add things together", - RootSubject: "svc", Schema: micro.Schema{Request: "", Response: ""}, + Endpoint: µ.EndpointConfig{ + Subject: "svc.add", + Handler: micro.HandlerFunc(doAdd), + }, } for i := 0; i < 5; i++ { @@ -75,10 +78,6 @@ func TestServiceBasics(t *testing.T) { if err != nil { t.Fatalf("Expected to create Service, got %v", err) } - _, err = svc.AddEndpoint("add", micro.HandlerFunc(doAdd)) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } defer svc.Stop() svcs = append(svcs, svc) } @@ -115,9 +114,6 @@ func TestServiceBasics(t *testing.T) { if err := json.Unmarshal(info.Data, &inf); err != nil { t.Fatalf("Unexpected error: %v", err) } - if inf.RootSubject != "svc" { - t.Fatalf("expected service subject to be srv.add: %s", inf.RootSubject) - } // Ping all services. Multiple responses. inbox := nats.NewInbox() @@ -210,11 +206,9 @@ func TestAddService(t *testing.T) { { name: "minimal config", givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - RootSubject: "test", + Name: "test_service", + Version: "0.1.0", }, - endpoints: []string{"func"}, expectedPing: micro.Ping{ Type: micro.PingResponseType, ServiceIdentity: micro.ServiceIdentity{ @@ -224,11 +218,32 @@ func TestAddService(t *testing.T) { }, }, { - name: "multiple endpoints", + name: "with single base endpoint", givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - RootSubject: "test", + Name: "test_service", + Version: "0.1.0", + Endpoint: µ.EndpointConfig{ + Subject: "test", + Handler: micro.HandlerFunc(testHandler), + }, + }, + expectedPing: micro.Ping{ + Type: micro.PingResponseType, + ServiceIdentity: micro.ServiceIdentity{ + Name: "test_service", + Version: "0.1.0", + }, + }, + }, + { + name: "with base endpoint and additional endpoints", + givenConfig: micro.Config{ + Name: "test_service", + Version: "0.1.0", + Endpoint: µ.EndpointConfig{ + Subject: "test", + Handler: micro.HandlerFunc(testHandler), + }, }, endpoints: []string{"func1", "func2", "func3"}, expectedPing: micro.Ping{ @@ -242,9 +257,8 @@ func TestAddService(t *testing.T) { { name: "with done handler, no handlers on nats connection", givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - RootSubject: "test", + Name: "test_service", + Version: "0.1.0", DoneHandler: func(micro.Service) { doneService <- struct{}{} }, @@ -261,9 +275,8 @@ func TestAddService(t *testing.T) { { name: "with error handler, no handlers on nats connection", givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - RootSubject: "test", + Name: "test_service", + Version: "0.1.0", ErrorHandler: func(micro.Service, *micro.NATSError) { errService <- struct{}{} }, @@ -276,14 +289,13 @@ func TestAddService(t *testing.T) { Version: "0.1.0", }, }, - asyncErrorSubject: "test.sub", + asyncErrorSubject: "test.func", }, { name: "with error handler, no handlers on nats connection, error on monitoring subject", givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - RootSubject: "test", + Name: "test_service", + Version: "0.1.0", ErrorHandler: func(micro.Service, *micro.NATSError) { errService <- struct{}{} }, @@ -296,14 +308,13 @@ func TestAddService(t *testing.T) { Version: "0.1.0", }, }, - asyncErrorSubject: "$SVC.PING.TEST_SERVICE", + asyncErrorSubject: "$SRV.PING.test_service", }, { name: "with done handler, append to nats handlers", givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - RootSubject: "test", + Name: "test_service", + Version: "0.1.0", DoneHandler: func(micro.Service) { doneService <- struct{}{} }, @@ -327,9 +338,8 @@ func TestAddService(t *testing.T) { { name: "with error handler, append to nats handlers", givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - RootSubject: "test", + Name: "test_service", + Version: "0.1.0", DoneHandler: func(micro.Service) { doneService <- struct{}{} }, @@ -352,9 +362,8 @@ func TestAddService(t *testing.T) { { name: "with error handler, append to nats handlers, error on monitoring subject", givenConfig: micro.Config{ - Name: "test_service", - Version: "0.1.0", - RootSubject: "test", + Name: "test_service", + Version: "0.1.0", DoneHandler: func(micro.Service) { doneService <- struct{}{} }, @@ -373,14 +382,13 @@ func TestAddService(t *testing.T) { Version: "0.1.0", }, }, - asyncErrorSubject: "$SVC.PING.TEST_SERVICE", + asyncErrorSubject: "$SRV.PING.TEST_SERVICE", }, { name: "validation error, invalid service name", givenConfig: micro.Config{ - Name: "test_service!", - Version: "0.1.0", - RootSubject: "test", + Name: "test_service!", + Version: "0.1.0", }, endpoints: []string{"func"}, withError: micro.ErrConfigValidation, @@ -388,13 +396,21 @@ func TestAddService(t *testing.T) { { name: "validation error, invalid version", givenConfig: micro.Config{ - Name: "test_service!", - Version: "abc", - RootSubject: "test", + Name: "test_service!", + Version: "abc", }, endpoints: []string{"func"}, withError: micro.ErrConfigValidation, }, + { + name: "validation error, invalid endpoint name", + givenConfig: micro.Config{ + Name: "test_service!", + Version: "abc", + }, + endpoints: []string{"endpoint name"}, + withError: micro.ErrConfigValidation, + }, } for _, test := range tests { @@ -422,12 +438,19 @@ func TestAddService(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } for _, endpoint := range test.endpoints { - if _, err := srv.AddEndpoint(endpoint, micro.HandlerFunc(testHandler)); err != nil { + if err := srv.AddEndpoint(endpoint, fmt.Sprintf("test.%s", endpoint), micro.HandlerFunc(testHandler)); err != nil { t.Fatalf("Unexpected error: %v", err) } } info := srv.Info() + subjectsNum := len(test.endpoints) + if test.givenConfig.Endpoint != nil { + subjectsNum += 1 + } + if subjectsNum != len(info.Subjects) { + t.Fatalf("Invalid number of registered endpoints; want: %d; got: %d", subjectsNum, len(info.Subjects)) + } pingSubject, err := micro.ControlSubject(micro.PingVerb, info.Name, info.ID) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -511,44 +534,130 @@ func TestAddService(t *testing.T) { } } +func TestErrHandlerSubjectMatch(t *testing.T) { + tests := []struct { + name string + endpointSubject string + errSubject string + expectServiceErr bool + }{ + { + name: "exact match", + endpointSubject: "foo.bar.baz", + errSubject: "foo.bar.baz", + expectServiceErr: true, + }, + { + name: "match with *", + endpointSubject: "foo.*.baz", + errSubject: "foo.bar.baz", + expectServiceErr: true, + }, + { + name: "match with >", + endpointSubject: "foo.bar.>", + errSubject: "foo.bar.baz.1", + expectServiceErr: true, + }, + { + name: "monitoring handler", + endpointSubject: "foo.bar.>", + errSubject: "$SRV.PING", + expectServiceErr: true, + }, + { + name: "endpoint longer than subject", + endpointSubject: "foo.bar.baz", + errSubject: "foo.bar", + expectServiceErr: false, + }, + { + name: "no match", + endpointSubject: "foo.bar.baz", + errSubject: "foo.baz.bar", + expectServiceErr: false, + }, + { + name: "no match with *", + endpointSubject: "foo.*.baz", + errSubject: "foo.bar.foo", + expectServiceErr: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + coreNatsAsyncErrors := []nats.ErrHandler{nil, func(c *nats.Conn, s *nats.Subscription, err error) {}} + for _, cb := range coreNatsAsyncErrors { + errChan := make(chan struct{}) + errHandler := func(s micro.Service, err *micro.NATSError) { + errChan <- struct{}{} + } + s := RunServerOnPort(-1) + defer s.Shutdown() + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Expected to connect to server, got %v", err) + } + defer nc.Close() + nc.SetErrorHandler(cb) + svc, err := micro.AddService(nc, micro.Config{ + Name: "test_service", + Version: "0.0.1", + ErrorHandler: micro.ErrHandler(errHandler), + Endpoint: µ.EndpointConfig{ + Subject: test.endpointSubject, + Handler: micro.HandlerFunc(func(r micro.Request) {}), + }, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer svc.Stop() + + go nc.Opts.AsyncErrorCB(nc, &nats.Subscription{Subject: test.errSubject}, fmt.Errorf("oops")) + if test.expectServiceErr { + select { + case <-errChan: + case <-time.After(10 * time.Millisecond): + t.Fatalf("Expected service error callback") + } + } else { + select { + case <-errChan: + t.Fatalf("Expected no service error callback") + case <-time.After(10 * time.Millisecond): + } + } + } + }) + } +} + func TestGroups(t *testing.T) { tests := []struct { name string - rootSubject string endpointName string groups []string expectedSubject string }{ { - name: "empty root subject, no groups", + name: "no groups", endpointName: "foo", expectedSubject: "foo", }, { - name: "with root subject, no groups", - rootSubject: "root", - endpointName: "foo", - expectedSubject: "root.foo", - }, - { - name: "empty root subject, with group", + name: "single group", endpointName: "foo", groups: []string{"g1"}, expectedSubject: "g1.foo", }, { - name: "with root subject and group", + name: "multiple groups", endpointName: "foo", - rootSubject: "root", - groups: []string{"g1"}, - expectedSubject: "root.g1.foo", - }, - { - name: "with root subject and multiple groups", - endpointName: "foo", - rootSubject: "root", groups: []string{"g1", "g2", "g3"}, - expectedSubject: "root.g1.g2.g3.foo", + expectedSubject: "g1.g2.g3.foo", }, } @@ -564,34 +673,36 @@ func TestGroups(t *testing.T) { defer nc.Close() srv, err := micro.AddService(nc, micro.Config{ - Name: "test_service", - RootSubject: test.rootSubject, - Version: "0.0.1", + Name: "test_service", + Version: "0.0.1", }) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer srv.Stop() - var endpoint micro.Endpoint if len(test.groups) > 0 { group := srv.AddGroup(test.groups[0]) for _, g := range test.groups[1:] { group = group.AddGroup(g) } - endpoint, err = group.AddEndpoint(test.endpointName, micro.HandlerFunc(func(r micro.Request) {})) + err = group.AddEndpoint(test.endpointName, test.endpointName, micro.HandlerFunc(func(r micro.Request) {})) if err != nil { t.Fatalf("Unexpected error: %v", err) } } else { - endpoint, err = srv.AddEndpoint(test.endpointName, micro.HandlerFunc(func(r micro.Request) {})) + err = srv.AddEndpoint(test.endpointName, test.endpointName, micro.HandlerFunc(func(r micro.Request) {})) if err != nil { t.Fatalf("Unexpected error: %v", err) } } - if endpoint.Subject() != test.expectedSubject { - t.Fatalf("Invalid subject; want: %s, got: %s", test.expectedSubject, endpoint.Subject()) + info := srv.Info() + if len(info.Subjects) != 1 { + t.Fatalf("Expected 1 registered endpoint; got: %d", len(info.Subjects)) + } + if info.Subjects[0] != test.expectedSubject { + t.Fatalf("Invalid subject; want: %s, got: %s", test.expectedSubject, info.Subjects[0]) } }) } @@ -613,22 +724,21 @@ func TestMonitoringHandlers(t *testing.T) { } config := micro.Config{ - Name: "test_service", - Version: "0.1.0", - RootSubject: "test", + Name: "test_service", + Version: "0.1.0", Schema: micro.Schema{ Request: "some_schema", }, ErrorHandler: errHandler, + Endpoint: µ.EndpointConfig{ + Subject: "test.func", + Handler: micro.HandlerFunc(func(r micro.Request) {}), + }, } srv, err := micro.AddService(nc, config) if err != nil { t.Fatalf("Unexpected error: %v", err) } - _, err = srv.AddEndpoint("func", micro.HandlerFunc(func(r micro.Request) {})) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } defer func() { srv.Stop() if !srv.Stopped() { @@ -690,8 +800,7 @@ func TestMonitoringHandlers(t *testing.T) { Version: "0.1.0", ID: info.ID, }, - RootSubject: "test", - Endpoints: []string{"test.func"}, + Subjects: []string{"test.func"}, }, }, { @@ -704,8 +813,7 @@ func TestMonitoringHandlers(t *testing.T) { Version: "0.1.0", ID: info.ID, }, - RootSubject: "test", - Endpoints: []string{"test.func"}, + Subjects: []string{"test.func"}, }, }, { @@ -718,8 +826,7 @@ func TestMonitoringHandlers(t *testing.T) { Version: "0.1.0", ID: info.ID, }, - RootSubject: "test", - Endpoints: []string{"test.func"}, + Subjects: []string{"test.func"}, }, }, { @@ -841,18 +948,16 @@ func TestServiceStats(t *testing.T) { { name: "without schema or stats handler", config: micro.Config{ - Name: "test_service", - Version: "0.1.0", - RootSubject: "test", + Name: "test_service", + Version: "0.1.0", }, }, { name: "with stats handler", config: micro.Config{ - Name: "test_service", - Version: "0.1.0", - RootSubject: "test", - StatsHandler: func(e micro.Endpoint) interface{} { + Name: "test_service", + Version: "0.1.0", + StatsHandler: func(e *micro.Endpoint) interface{} { return map[string]interface{}{ "key": "val", } @@ -865,9 +970,8 @@ func TestServiceStats(t *testing.T) { { name: "with schema", config: micro.Config{ - Name: "test_service", - Version: "0.1.0", - RootSubject: "test", + Name: "test_service", + Version: "0.1.0", Schema: micro.Schema{ Request: "some_schema", }, @@ -876,13 +980,12 @@ func TestServiceStats(t *testing.T) { { name: "with schema and stats handler", config: micro.Config{ - Name: "test_service", - Version: "0.1.0", - RootSubject: "test", + Name: "test_service", + Version: "0.1.0", Schema: micro.Schema{ Request: "some_schema", }, - StatsHandler: func(e micro.Endpoint) interface{} { + StatsHandler: func(e *micro.Endpoint) interface{} { return map[string]interface{}{ "key": "val", } @@ -909,7 +1012,7 @@ func TestServiceStats(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - if _, err := srv.AddEndpoint("func", micro.HandlerFunc(handler)); err != nil { + if err := srv.AddEndpoint("func", "test.func", micro.HandlerFunc(handler)); err != nil { t.Fatalf("Unexpected error: %v", err) } defer srv.Stop() @@ -1111,15 +1214,15 @@ func TestRequestRespond(t *testing.T) { Name: "CoolService", Version: "0.1.0", Description: "test service", - RootSubject: "test", + Endpoint: µ.EndpointConfig{ + Subject: "test.func", + Handler: micro.HandlerFunc(handler), + }, }) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer svc.Stop() - if _, err := svc.AddEndpoint("func", micro.HandlerFunc(handler)); err != nil { - t.Fatalf("Unexpected error: %v", err) - } resp, err := nc.RequestMsg(&nats.Msg{ Subject: "test.func", From 7d69e57392364a0b2d7e111cd77417405ff3fc6b Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Tue, 10 Jan 2023 13:44:31 +0100 Subject: [PATCH 4/6] Fix race --- micro/service.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/micro/service.go b/micro/service.go index d62272fb2..0648c4bf7 100644 --- a/micro/service.go +++ b/micro/service.go @@ -531,6 +531,8 @@ func (s *service) setupAsyncCallbacks() { } func (s *service) matchSubscriptionSubject(subj string) (*Endpoint, bool) { + s.m.Lock() + defer s.m.Unlock() for _, verbSub := range s.verbSubs { if verbSub.Subject == subj { return nil, true @@ -702,6 +704,7 @@ func (s *service) Reset() { for _, endpoint := range s.endpoints { endpoint.reset() } + s.started = time.Now().UTC() s.m.Unlock() } From 2e784be2905eddfc304b194e36bc7943c6b403ac Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Tue, 10 Jan 2023 15:53:03 +0100 Subject: [PATCH 5/6] Add setting endpoint subject as an option --- micro/example_package_test.go | 6 ++--- micro/example_test.go | 35 +++++++++++++++++++++++++++--- micro/service.go | 41 +++++++++++++++++++++++++++++++---- micro/test/service_test.go | 10 ++++----- 4 files changed, 77 insertions(+), 15 deletions(-) diff --git a/micro/example_package_test.go b/micro/example_package_test.go index 5fe7644ad..1472c8ac4 100644 --- a/micro/example_package_test.go +++ b/micro/example_package_test.go @@ -81,17 +81,17 @@ func Example() { numbers := svc.AddGroup("numbers") // register endpoints in a group - err = numbers.AddEndpoint("Increment", "increment", micro.HandlerFunc(incrementHandler)) + err = numbers.AddEndpoint("Increment", micro.HandlerFunc(incrementHandler)) if err != nil { log.Fatal(err) } - err = numbers.AddEndpoint("Multiply", "multiply", micro.HandlerFunc(multiplyHandler)) + err = numbers.AddEndpoint("Multiply", micro.HandlerFunc(multiplyHandler)) if err != nil { log.Fatal(err) } // send a request to a service - resp, err := nc.Request("numbers.increment", []byte("3"), 1*time.Second) + resp, err := nc.Request("numbers.Increment", []byte("3"), 1*time.Second) if err != nil { log.Fatal(err) } diff --git a/micro/example_test.go b/micro/example_test.go index 0b9a4637f..1e16170cb 100644 --- a/micro/example_test.go +++ b/micro/example_test.go @@ -84,7 +84,36 @@ func ExampleService_AddEndpoint() { log.Fatal(err) } - err = srv.AddEndpoint("Echo", "echo", micro.HandlerFunc(echoHandler)) + // endpoint will be registered under "Echo" subject + err = srv.AddEndpoint("Echo", micro.HandlerFunc(echoHandler)) + if err != nil { + log.Fatal(err) + } +} + +func ExampleWithEndpointSubject() { + nc, err := nats.Connect("127.0.0.1:4222") + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + echoHandler := func(req micro.Request) { + req.Respond(req.Data()) + } + + config := micro.Config{ + Name: "EchoService", + Version: "1.0.0", + } + + srv, err := micro.AddService(nc, config) + if err != nil { + log.Fatal(err) + } + + // endpoint will be registered under "service.echo" subject + err = srv.AddEndpoint("Echo", micro.HandlerFunc(echoHandler), micro.WithEndpointSubject("service.echo")) if err != nil { log.Fatal(err) } @@ -113,8 +142,8 @@ func ExampleService_AddGroup() { v1 := srv.AddGroup("v1") - // endpoint will be registered under "v1.echo" subject - err = v1.AddEndpoint("Echo", "echo", micro.HandlerFunc(echoHandler)) + // endpoint will be registered under "v1.Echo" subject + err = v1.AddEndpoint("Echo", micro.HandlerFunc(echoHandler)) if err != nil { log.Fatal(err) } diff --git a/micro/service.go b/micro/service.go index 0648c4bf7..05e890f48 100644 --- a/micro/service.go +++ b/micro/service.go @@ -35,7 +35,7 @@ type ( // Service exposes methods to operate on a service instance. Service interface { // AddEndpoint registers endpoint with given name on a specific subject. - AddEndpoint(name, subject string, handler Handler) error + AddEndpoint(string, Handler, ...EndpointOpt) error // AddGroup returns a Group interface, allowing for more complex endpoint topologies. // A group can be used to register endpoints with given prefix. @@ -67,7 +67,13 @@ type ( // AddEndpoint registers new endpoints on a service. // The endpoint's subject will be prefixed with the group prefix. - AddEndpoint(name, subject string, handler Handler) error + AddEndpoint(string, Handler, ...EndpointOpt) error + } + + EndpointOpt func(*endpointOpts) error + + endpointOpts struct { + subject string } // ErrHandler is a function used to configure a custom error handler for a service, @@ -395,7 +401,17 @@ func AddService(nc *nats.Conn, config Config) (Service, error) { return svc, nil } -func (s *service) AddEndpoint(name, subject string, handler Handler) error { +func (s *service) AddEndpoint(name string, handler Handler, opts ...EndpointOpt) error { + var options endpointOpts + for _, opt := range opts { + if err := opt(&options); err != nil { + return err + } + } + subject := name + if options.subject != "" { + subject = options.subject + } return addEndpoint(s, name, subject, handler) } @@ -719,7 +735,17 @@ func (e *NATSError) Error() string { return fmt.Sprintf("%q: %s", e.Subject, e.Description) } -func (g *group) AddEndpoint(name, subject string, handler Handler) error { +func (g *group) AddEndpoint(name string, handler Handler, opts ...EndpointOpt) error { + var options endpointOpts + for _, opt := range opts { + if err := opt(&options); err != nil { + return err + } + } + subject := name + if options.subject != "" { + subject = options.subject + } subject = fmt.Sprintf("%s.%s", g.prefix, subject) return addEndpoint(g.service, name, subject, handler) } @@ -772,3 +798,10 @@ func ControlSubject(verb Verb, name, id string) (string, error) { } return fmt.Sprintf("%s.%s.%s.%s", APIPrefix, verbStr, name, id), nil } + +func WithEndpointSubject(subject string) EndpointOpt { + return func(e *endpointOpts) error { + e.subject = subject + return nil + } +} diff --git a/micro/test/service_test.go b/micro/test/service_test.go index 322a7daff..29cc3de89 100644 --- a/micro/test/service_test.go +++ b/micro/test/service_test.go @@ -289,7 +289,7 @@ func TestAddService(t *testing.T) { Version: "0.1.0", }, }, - asyncErrorSubject: "test.func", + asyncErrorSubject: "func", }, { name: "with error handler, no handlers on nats connection, error on monitoring subject", @@ -438,7 +438,7 @@ func TestAddService(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } for _, endpoint := range test.endpoints { - if err := srv.AddEndpoint(endpoint, fmt.Sprintf("test.%s", endpoint), micro.HandlerFunc(testHandler)); err != nil { + if err := srv.AddEndpoint(endpoint, micro.HandlerFunc(testHandler)); err != nil { t.Fatalf("Unexpected error: %v", err) } } @@ -686,12 +686,12 @@ func TestGroups(t *testing.T) { for _, g := range test.groups[1:] { group = group.AddGroup(g) } - err = group.AddEndpoint(test.endpointName, test.endpointName, micro.HandlerFunc(func(r micro.Request) {})) + err = group.AddEndpoint(test.endpointName, micro.HandlerFunc(func(r micro.Request) {})) if err != nil { t.Fatalf("Unexpected error: %v", err) } } else { - err = srv.AddEndpoint(test.endpointName, test.endpointName, micro.HandlerFunc(func(r micro.Request) {})) + err = srv.AddEndpoint(test.endpointName, micro.HandlerFunc(func(r micro.Request) {})) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -1012,7 +1012,7 @@ func TestServiceStats(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - if err := srv.AddEndpoint("func", "test.func", micro.HandlerFunc(handler)); err != nil { + if err := srv.AddEndpoint("func", micro.HandlerFunc(handler), micro.WithEndpointSubject("test.func")); err != nil { t.Fatalf("Unexpected error: %v", err) } defer srv.Stop() From 7ce2839ec4402c6a36b9abde04295fbc628e11f1 Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Wed, 11 Jan 2023 12:50:38 +0100 Subject: [PATCH 6/6] Add handling empty groups --- micro/service.go | 18 +++++++++++++++--- micro/test/service_test.go | 12 ++++++++++++ 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/micro/service.go b/micro/service.go index 05e890f48..ac257b6f4 100644 --- a/micro/service.go +++ b/micro/service.go @@ -746,14 +746,26 @@ func (g *group) AddEndpoint(name string, handler Handler, opts ...EndpointOpt) e if options.subject != "" { subject = options.subject } - subject = fmt.Sprintf("%s.%s", g.prefix, subject) - return addEndpoint(g.service, name, subject, handler) + endpointSubject := fmt.Sprintf("%s.%s", g.prefix, subject) + if g.prefix == "" { + endpointSubject = subject + } + return addEndpoint(g.service, name, endpointSubject, handler) } func (g *group) AddGroup(name string) Group { + parts := make([]string, 0, 2) + if g.prefix != "" { + parts = append(parts, g.prefix) + } + if name != "" { + parts = append(parts, name) + } + prefix := strings.Join(parts, ".") + return &group{ service: g.service, - prefix: fmt.Sprintf("%s.%s", g.prefix, name), + prefix: prefix, } } diff --git a/micro/test/service_test.go b/micro/test/service_test.go index 29cc3de89..4ec52b3ee 100644 --- a/micro/test/service_test.go +++ b/micro/test/service_test.go @@ -653,6 +653,18 @@ func TestGroups(t *testing.T) { groups: []string{"g1"}, expectedSubject: "g1.foo", }, + { + name: "single empty group", + endpointName: "foo", + groups: []string{""}, + expectedSubject: "foo", + }, + { + name: "empty groups", + endpointName: "foo", + groups: []string{"", "g1", ""}, + expectedSubject: "g1.foo", + }, { name: "multiple groups", endpointName: "foo",