Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHANGED] Service schemas #1188

Merged
merged 1 commit into from
Jan 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 39 additions & 0 deletions micro/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,18 @@ func ExampleAddService() {
fmt.Printf("Service %q returned an error on subject %q: %s", info.Name, err.Subject, err.Description)
},

// optionally, a URL pointing to API specification can be provided
APIURL: "http://api.com/v1",

// optional base handler
Endpoint: &micro.EndpointConfig{
Subject: "echo",
Handler: micro.HandlerFunc(echoHandler),
// Endpoint can be configured with optional request/response schemas
Schema: &micro.Schema{
Request: "request.json",
Response: "response.json",
},
},
}

Expand Down Expand Up @@ -119,6 +127,37 @@ func ExampleWithEndpointSubject() {
}
}

func ExampleWithEndpointSchema() {
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()

echoHandler := func(req micro.Request) {
req.Respond(req.Data())
}

config := micro.Config{
Name: "EchoService",
Version: "1.0.0",
}

srv, err := micro.AddService(nc, config)
if err != nil {
log.Fatal(err)
}

schema := &micro.Schema{
Request: "request.json",
Response: "response.json",
}
err = srv.AddEndpoint("Echo", micro.HandlerFunc(echoHandler), micro.WithEndpointSchema(schema))
if err != nil {
log.Fatal(err)
}
}

func ExampleService_AddGroup() {
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
Expand Down
91 changes: 73 additions & 18 deletions micro/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net/url"
"regexp"
"strings"
"sync"
Expand Down Expand Up @@ -74,6 +75,7 @@ type (

endpointOpts struct {
subject string
schema *Schema
}

// ErrHandler is a function used to configure a custom error handler for a service,
Expand Down Expand Up @@ -131,8 +133,15 @@ type (
// SchemaResp is the response value for SCHEMA requests.
SchemaResp struct {
ServiceIdentity
Type string `json:"type"`
Schema Schema `json:"schema"`
Type string `json:"type"`
APIURL string `json:"api_url"`
Endpoints []EndpointSchema `json:"endpoints"`
}

EndpointSchema struct {
Name string `json:"name"`
Subject string `json:"subject"`
Schema Schema `json:"schema,omitempty"`
}

// Schema can be used to configure a schema for a service.
Expand Down Expand Up @@ -175,10 +184,10 @@ type (
// Description of the service.
Description string `json:"description"`

// Service schema
Schema Schema `json:"schema"`
// APIURL is an optional url pointing to API specification.
APIURL string `json:"api_url"`

// StatsHandler is a user-defined custom function
// StatsHandler is a user-defined custom function.
// used to calculate additional service stats.
StatsHandler StatsHandler

Expand All @@ -190,8 +199,14 @@ type (
}

EndpointConfig struct {
// Subject on which the endpoint is registered.
Subject string

// Handler used by the endpoint.
Handler Handler

// Schema is an optional request/response endpoint schema.
Schema *Schema
}

// NATSError represents an error returned by a NATS Subscription.
Expand Down Expand Up @@ -327,7 +342,11 @@ func AddService(nc *nats.Conn, config Config) (Service, error) {
go svc.asyncDispatcher.asyncCBDispatcher()

if config.Endpoint != nil {
if err := addEndpoint(svc, "default", config.Endpoint.Subject, config.Endpoint.Handler); err != nil {
opts := []EndpointOpt{WithEndpointSubject(config.Endpoint.Subject)}
if config.Endpoint.Schema != nil {
opts = append(opts, WithEndpointSchema(config.Endpoint.Schema))
}
if err := svc.AddEndpoint("default", config.Endpoint.Handler, opts...); err != nil {
svc.asyncDispatcher.close()
return nil, err
}
Expand Down Expand Up @@ -365,13 +384,8 @@ func AddService(nc *nats.Conn, config Config) (Service, error) {
}
}

schema := SchemaResp{
ServiceIdentity: svcIdentity,
Schema: config.Schema,
Type: SchemaResponseType,
}
schemaHandler := func(req Request) {
response, _ := json.Marshal(schema)
response, _ := json.Marshal(svc.schema())
if err := req.Respond(response); err != nil {
if err := req.Error("500", fmt.Sprintf("Error handling SCHEMA request: %s", err), nil); err != nil && config.ErrorHandler != nil {
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.Subject(), err.Error()}) })
Expand Down Expand Up @@ -412,10 +426,10 @@ func (s *service) AddEndpoint(name string, handler Handler, opts ...EndpointOpt)
if options.subject != "" {
subject = options.subject
}
return addEndpoint(s, name, subject, handler)
return addEndpoint(s, name, subject, handler, options.schema)
}

func addEndpoint(s *service, name, subject string, handler Handler) error {
func addEndpoint(s *service, name, subject string, handler Handler, schema *Schema) error {
if !nameRegexp.MatchString(name) {
return fmt.Errorf("%w: invalid endpoint name", ErrConfigValidation)
}
Expand All @@ -427,6 +441,7 @@ func addEndpoint(s *service, name, subject string, handler Handler) error {
EndpointConfig: EndpointConfig{
Subject: subject,
Handler: handler,
Schema: schema,
},
}
sub, err := s.nc.QueueSubscribe(
Expand Down Expand Up @@ -482,6 +497,9 @@ func (c *Config) valid() error {
if !semVerRegexp.MatchString(c.Version) {
return fmt.Errorf("%w: version: version should not be empty should match the SemVer format", ErrConfigValidation)
}
if _, err := url.Parse(c.APIURL); err != nil {
aricart marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("%w: api_url: invalid url: %s", ErrConfigValidation, err)
}
return nil
}

Expand Down Expand Up @@ -696,9 +714,13 @@ func (s *service) Stats() Stats {
Version: info.Version,
},
Endpoints: make([]*EndpointStats, 0),
Type: StatsResponseType,
Started: s.started,
}
for _, endpoint := range s.endpoints {
endpointStats := &EndpointStats{
Name: endpoint.stats.Name,
Subject: endpoint.stats.Subject,
NumRequests: endpoint.stats.NumRequests,
NumErrors: endpoint.stats.NumErrors,
LastError: endpoint.stats.LastError,
Expand Down Expand Up @@ -731,6 +753,27 @@ func (s *service) Stopped() bool {
return s.stopped
}

func (s *service) schema() SchemaResp {
endpoints := make([]EndpointSchema, 0, len(s.endpoints))
for _, e := range s.endpoints {
endpoints = append(endpoints, EndpointSchema{
Name: e.stats.Name,
Subject: e.stats.Subject,
Schema: Schema{
Request: e.Schema.Request,
Response: e.Schema.Response,
},
})
}

return SchemaResp{
ServiceIdentity: s.Info().ServiceIdentity,
Type: SchemaResponseType,
APIURL: s.APIURL,
Endpoints: endpoints,
}
}

func (e *NATSError) Error() string {
return fmt.Sprintf("%q: %s", e.Subject, e.Description)
}
Expand All @@ -750,7 +793,7 @@ func (g *group) AddEndpoint(name string, handler Handler, opts ...EndpointOpt) e
if g.prefix == "" {
endpointSubject = subject
}
return addEndpoint(g.service, name, endpointSubject, handler)
return addEndpoint(g.service, name, endpointSubject, handler, options.schema)
}

func (g *group) AddGroup(name string) Group {
Expand All @@ -773,9 +816,14 @@ func (e *Endpoint) stop() error {
if err := e.subscription.Drain(); err != nil {
return fmt.Errorf("draining subscription for request handler: %w", err)
}
for i, endpoint := range e.service.endpoints {
if endpoint.Subject == e.Subject {
e.service.endpoints = append(e.service.endpoints[:i], e.service.endpoints[i+1:]...)
for i := 0; i < len(e.service.endpoints); i++ {
if e.service.endpoints[i].Subject == e.Subject {
if i != len(e.service.endpoints)-1 {
e.service.endpoints = append(e.service.endpoints[:i], e.service.endpoints[i+1:]...)
} else {
e.service.endpoints = e.service.endpoints[:i]
}
i++
}
}
return nil
Expand Down Expand Up @@ -817,3 +865,10 @@ func WithEndpointSubject(subject string) EndpointOpt {
return nil
}
}

func WithEndpointSchema(schema *Schema) EndpointOpt {
return func(e *endpointOpts) error {
e.schema = schema
return nil
}
}