From a70a75fe0dd3b29f793d2152a8fa6d3782c0421b Mon Sep 17 00:00:00 2001 From: Piotr Piotrowski Date: Fri, 13 Jan 2023 14:16:50 +0100 Subject: [PATCH] Update service schemas, fix STATS response --- micro/example_test.go | 39 +++++++++++ micro/service.go | 91 +++++++++++++++++++----- micro/test/service_test.go | 138 ++++++++++++++++++++++++++++++------- 3 files changed, 226 insertions(+), 42 deletions(-) diff --git a/micro/example_test.go b/micro/example_test.go index 1e16170cb..9ead68f19 100644 --- a/micro/example_test.go +++ b/micro/example_test.go @@ -49,10 +49,18 @@ func ExampleAddService() { fmt.Printf("Service %q returned an error on subject %q: %s", info.Name, err.Subject, err.Description) }, + // optionally, a URL pointing to API specification can be provided + APIURL: "http://api.com/v1", + // optional base handler Endpoint: µ.EndpointConfig{ Subject: "echo", Handler: micro.HandlerFunc(echoHandler), + // Endpoint can be configured with optional request/response schemas + Schema: µ.Schema{ + Request: "request.json", + Response: "response.json", + }, }, } @@ -119,6 +127,37 @@ func ExampleWithEndpointSubject() { } } +func ExampleWithEndpointSchema() { + nc, err := nats.Connect("127.0.0.1:4222") + if err != nil { + log.Fatal(err) + } + defer nc.Close() + + echoHandler := func(req micro.Request) { + req.Respond(req.Data()) + } + + config := micro.Config{ + Name: "EchoService", + Version: "1.0.0", + } + + srv, err := micro.AddService(nc, config) + if err != nil { + log.Fatal(err) + } + + schema := µ.Schema{ + Request: "request.json", + Response: "response.json", + } + err = srv.AddEndpoint("Echo", micro.HandlerFunc(echoHandler), micro.WithEndpointSchema(schema)) + if err != nil { + log.Fatal(err) + } +} + func ExampleService_AddGroup() { nc, err := nats.Connect("127.0.0.1:4222") if err != nil { diff --git a/micro/service.go b/micro/service.go index ac257b6f4..e99ea3af3 100644 --- a/micro/service.go +++ b/micro/service.go @@ -17,6 +17,7 @@ import ( "encoding/json" "errors" "fmt" + "net/url" "regexp" "strings" "sync" @@ -74,6 +75,7 @@ type ( endpointOpts struct { subject string + schema *Schema } // ErrHandler is a function used to configure a custom error handler for a service, @@ -131,8 +133,15 @@ type ( // SchemaResp is the response value for SCHEMA requests. SchemaResp struct { ServiceIdentity - Type string `json:"type"` - Schema Schema `json:"schema"` + Type string `json:"type"` + APIURL string `json:"api_url"` + Endpoints []EndpointSchema `json:"endpoints"` + } + + EndpointSchema struct { + Name string `json:"name"` + Subject string `json:"subject"` + Schema Schema `json:"schema,omitempty"` } // Schema can be used to configure a schema for a service. @@ -175,10 +184,10 @@ type ( // Description of the service. Description string `json:"description"` - // Service schema - Schema Schema `json:"schema"` + // APIURL is an optional url pointing to API specification. + APIURL string `json:"api_url"` - // StatsHandler is a user-defined custom function + // StatsHandler is a user-defined custom function. // used to calculate additional service stats. StatsHandler StatsHandler @@ -190,8 +199,14 @@ type ( } EndpointConfig struct { + // Subject on which the endpoint is registered. Subject string + + // Handler used by the endpoint. Handler Handler + + // Schema is an optional request/response endpoint schema. + Schema *Schema } // NATSError represents an error returned by a NATS Subscription. @@ -327,7 +342,11 @@ func AddService(nc *nats.Conn, config Config) (Service, error) { go svc.asyncDispatcher.asyncCBDispatcher() if config.Endpoint != nil { - if err := addEndpoint(svc, "default", config.Endpoint.Subject, config.Endpoint.Handler); err != nil { + opts := []EndpointOpt{WithEndpointSubject(config.Endpoint.Subject)} + if config.Endpoint.Schema != nil { + opts = append(opts, WithEndpointSchema(config.Endpoint.Schema)) + } + if err := svc.AddEndpoint("default", config.Endpoint.Handler, opts...); err != nil { svc.asyncDispatcher.close() return nil, err } @@ -365,13 +384,8 @@ func AddService(nc *nats.Conn, config Config) (Service, error) { } } - schema := SchemaResp{ - ServiceIdentity: svcIdentity, - Schema: config.Schema, - Type: SchemaResponseType, - } schemaHandler := func(req Request) { - response, _ := json.Marshal(schema) + response, _ := json.Marshal(svc.schema()) if err := req.Respond(response); err != nil { if err := req.Error("500", fmt.Sprintf("Error handling SCHEMA request: %s", err), nil); err != nil && config.ErrorHandler != nil { svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.Subject(), err.Error()}) }) @@ -412,10 +426,10 @@ func (s *service) AddEndpoint(name string, handler Handler, opts ...EndpointOpt) if options.subject != "" { subject = options.subject } - return addEndpoint(s, name, subject, handler) + return addEndpoint(s, name, subject, handler, options.schema) } -func addEndpoint(s *service, name, subject string, handler Handler) error { +func addEndpoint(s *service, name, subject string, handler Handler, schema *Schema) error { if !nameRegexp.MatchString(name) { return fmt.Errorf("%w: invalid endpoint name", ErrConfigValidation) } @@ -427,6 +441,7 @@ func addEndpoint(s *service, name, subject string, handler Handler) error { EndpointConfig: EndpointConfig{ Subject: subject, Handler: handler, + Schema: schema, }, } sub, err := s.nc.QueueSubscribe( @@ -482,6 +497,9 @@ func (c *Config) valid() error { if !semVerRegexp.MatchString(c.Version) { return fmt.Errorf("%w: version: version should not be empty should match the SemVer format", ErrConfigValidation) } + if _, err := url.Parse(c.APIURL); err != nil { + return fmt.Errorf("%w: api_url: invalid url: %s", ErrConfigValidation, err) + } return nil } @@ -696,9 +714,13 @@ func (s *service) Stats() Stats { Version: info.Version, }, Endpoints: make([]*EndpointStats, 0), + Type: StatsResponseType, + Started: s.started, } for _, endpoint := range s.endpoints { endpointStats := &EndpointStats{ + Name: endpoint.stats.Name, + Subject: endpoint.stats.Subject, NumRequests: endpoint.stats.NumRequests, NumErrors: endpoint.stats.NumErrors, LastError: endpoint.stats.LastError, @@ -731,6 +753,27 @@ func (s *service) Stopped() bool { return s.stopped } +func (s *service) schema() SchemaResp { + endpoints := make([]EndpointSchema, 0, len(s.endpoints)) + for _, e := range s.endpoints { + endpoints = append(endpoints, EndpointSchema{ + Name: e.stats.Name, + Subject: e.stats.Subject, + Schema: Schema{ + Request: e.Schema.Request, + Response: e.Schema.Response, + }, + }) + } + + return SchemaResp{ + ServiceIdentity: s.Info().ServiceIdentity, + Type: SchemaResponseType, + APIURL: s.APIURL, + Endpoints: endpoints, + } +} + func (e *NATSError) Error() string { return fmt.Sprintf("%q: %s", e.Subject, e.Description) } @@ -750,7 +793,7 @@ func (g *group) AddEndpoint(name string, handler Handler, opts ...EndpointOpt) e if g.prefix == "" { endpointSubject = subject } - return addEndpoint(g.service, name, endpointSubject, handler) + return addEndpoint(g.service, name, endpointSubject, handler, options.schema) } func (g *group) AddGroup(name string) Group { @@ -773,9 +816,14 @@ func (e *Endpoint) stop() error { if err := e.subscription.Drain(); err != nil { return fmt.Errorf("draining subscription for request handler: %w", err) } - for i, endpoint := range e.service.endpoints { - if endpoint.Subject == e.Subject { - e.service.endpoints = append(e.service.endpoints[:i], e.service.endpoints[i+1:]...) + for i := 0; i < len(e.service.endpoints); i++ { + if e.service.endpoints[i].Subject == e.Subject { + if i != len(e.service.endpoints)-1 { + e.service.endpoints = append(e.service.endpoints[:i], e.service.endpoints[i+1:]...) + } else { + e.service.endpoints = e.service.endpoints[:i] + } + i++ } } return nil @@ -817,3 +865,10 @@ func WithEndpointSubject(subject string) EndpointOpt { return nil } } + +func WithEndpointSchema(schema *Schema) EndpointOpt { + return func(e *endpointOpts) error { + e.schema = schema + return nil + } +} diff --git a/micro/test/service_test.go b/micro/test/service_test.go index 4ec52b3ee..19d1595da 100644 --- a/micro/test/service_test.go +++ b/micro/test/service_test.go @@ -66,7 +66,6 @@ func TestServiceBasics(t *testing.T) { Name: "CoolAddService", Version: "0.1.0", Description: "Add things together", - Schema: micro.Schema{Request: "", Response: ""}, Endpoint: µ.EndpointConfig{ Subject: "svc.add", Handler: micro.HandlerFunc(doAdd), @@ -396,19 +395,30 @@ func TestAddService(t *testing.T) { { name: "validation error, invalid version", givenConfig: micro.Config{ - Name: "test_service!", + Name: "test_service", Version: "abc", }, endpoints: []string{"func"}, withError: micro.ErrConfigValidation, }, { - name: "validation error, invalid endpoint name", + name: "validation error, invalid endpoint subject", givenConfig: micro.Config{ - Name: "test_service!", - Version: "abc", + Name: "test_service", + Version: "0.0.1", + Endpoint: µ.EndpointConfig{ + Subject: "endpoint subject", + }, + }, + withError: micro.ErrConfigValidation, + }, + { + name: "validation error, invalid API URL", + givenConfig: micro.Config{ + Name: "test_service", + Version: "0.0.1", + APIURL: "abc\r\n", }, - endpoints: []string{"endpoint name"}, withError: micro.ErrConfigValidation, }, } @@ -736,15 +746,17 @@ func TestMonitoringHandlers(t *testing.T) { } config := micro.Config{ - Name: "test_service", - Version: "0.1.0", - Schema: micro.Schema{ - Request: "some_schema", - }, + Name: "test_service", + Version: "0.1.0", + APIURL: "http://someapi.com/v1", ErrorHandler: errHandler, Endpoint: µ.EndpointConfig{ Subject: "test.func", Handler: micro.HandlerFunc(func(r micro.Request) {}), + Schema: µ.Schema{ + Request: "request_schema", + Response: "response_schema", + }, }, } srv, err := micro.AddService(nc, config) @@ -851,8 +863,16 @@ func TestMonitoringHandlers(t *testing.T) { Version: "0.1.0", ID: info.ID, }, - Schema: micro.Schema{ - Request: "some_schema", + APIURL: "http://someapi.com/v1", + Endpoints: []micro.EndpointSchema{ + { + Name: "default", + Subject: "test.func", + Schema: micro.Schema{ + Request: "request_schema", + Response: "response_schema", + }, + }, }, }, }, @@ -866,8 +886,16 @@ func TestMonitoringHandlers(t *testing.T) { Version: "0.1.0", ID: info.ID, }, - Schema: micro.Schema{ - Request: "some_schema", + APIURL: "http://someapi.com/v1", + Endpoints: []micro.EndpointSchema{ + { + Name: "default", + Subject: "test.func", + Schema: micro.Schema{ + Request: "request_schema", + Response: "response_schema", + }, + }, }, }, }, @@ -881,8 +909,16 @@ func TestMonitoringHandlers(t *testing.T) { Version: "0.1.0", ID: info.ID, }, - Schema: micro.Schema{ - Request: "some_schema", + APIURL: "http://someapi.com/v1", + Endpoints: []micro.EndpointSchema{ + { + Name: "default", + Subject: "test.func", + Schema: micro.Schema{ + Request: "request_schema", + Response: "response_schema", + }, + }, }, }, }, @@ -955,6 +991,7 @@ func TestServiceStats(t *testing.T) { tests := []struct { name string config micro.Config + schema *micro.Schema expectedStats map[string]interface{} }{ { @@ -984,25 +1021,47 @@ func TestServiceStats(t *testing.T) { config: micro.Config{ Name: "test_service", Version: "0.1.0", - Schema: micro.Schema{ - Request: "some_schema", + APIURL: "http://someapi.com/v1", + }, + schema: µ.Schema{ + Request: "some_request", + }, + }, + { + name: "with default endpoint", + config: micro.Config{ + Name: "test_service", + Version: "0.1.0", + APIURL: "http://someapi.com/v1", + Endpoint: µ.EndpointConfig{ + Subject: "test.func", + Handler: micro.HandlerFunc(handler), + Schema: µ.Schema{ + Request: "some_request", + Response: "some_response", + }, }, }, + schema: µ.Schema{ + Request: "some_request", + Response: "some_response", + }, }, { name: "with schema and stats handler", config: micro.Config{ Name: "test_service", Version: "0.1.0", - Schema: micro.Schema{ - Request: "some_schema", - }, + APIURL: "http://someapi.com/v1", StatsHandler: func(e *micro.Endpoint) interface{} { return map[string]interface{}{ "key": "val", } }, }, + schema: µ.Schema{ + Request: "some_request", + }, expectedStats: map[string]interface{}{ "key": "val", }, @@ -1024,8 +1083,14 @@ func TestServiceStats(t *testing.T) { if err != nil { t.Fatalf("Unexpected error: %v", err) } - if err := srv.AddEndpoint("func", micro.HandlerFunc(handler), micro.WithEndpointSubject("test.func")); err != nil { - t.Fatalf("Unexpected error: %v", err) + if test.config.Endpoint == nil { + opts := []micro.EndpointOpt{micro.WithEndpointSubject("test.func")} + if test.schema != nil { + opts = append(opts, micro.WithEndpointSchema(test.schema)) + } + if err := srv.AddEndpoint("func", micro.HandlerFunc(handler), opts...); err != nil { + t.Fatalf("Unexpected error: %v", err) + } } defer srv.Stop() for i := 0; i < 10; i++ { @@ -1052,18 +1117,43 @@ func TestServiceStats(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } + if len(stats.Endpoints) != 1 { + t.Fatalf("Unexpected number of endpoints: want: %d; got: %d", 1, len(stats.Endpoints)) + } if stats.Name != info.Name { t.Errorf("Unexpected service name; want: %s; got: %s", info.Name, stats.Name) } if stats.ID != info.ID { t.Errorf("Unexpected service name; want: %s; got: %s", info.ID, stats.ID) } + if test.config.Endpoint == nil && stats.Endpoints[0].Name != "func" { + t.Errorf("Invalid endpoint name; want: %s; got: %s", "func", stats.Endpoints[0].Name) + } + if test.config.Endpoint != nil && stats.Endpoints[0].Name != "default" { + t.Errorf("Invalid endpoint name; want: %s; got: %s", "default", stats.Endpoints[0].Name) + } + if stats.Endpoints[0].Subject != "test.func" { + t.Errorf("Invalid endpoint subject; want: %s; got: %s", "test.func", stats.Endpoints[0].Subject) + } if stats.Endpoints[0].NumRequests != 11 { t.Errorf("Unexpected num_requests; want: 11; got: %d", stats.Endpoints[0].NumRequests) } if stats.Endpoints[0].NumErrors != 1 { t.Errorf("Unexpected num_errors; want: 1; got: %d", stats.Endpoints[0].NumErrors) } + if stats.Endpoints[0].AverageProcessingTime == 0 { + t.Errorf("Expected non-empty AverageProcessingTime") + } + if stats.Endpoints[0].ProcessingTime == 0 { + t.Errorf("Expected non-empty ProcessingTime") + } + if stats.Started.IsZero() { + t.Errorf("Expected non-empty start time") + } + if stats.Type != micro.StatsResponseType { + t.Errorf("Invalid response type; want: %s; got: %s", micro.StatsResponseType, stats.Type) + } + if test.expectedStats != nil { var data map[string]interface{} if err := json.Unmarshal(stats.Endpoints[0].Data, &data); err != nil {