Skip to content

Commit

Permalink
Merge a70a75f into 44509e6
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Jan 13, 2023
2 parents 44509e6 + a70a75f commit 34ad0e9
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 42 deletions.
39 changes: 39 additions & 0 deletions micro/example_test.go
Expand Up @@ -49,10 +49,18 @@ func ExampleAddService() {
fmt.Printf("Service %q returned an error on subject %q: %s", info.Name, err.Subject, err.Description)
},

// optionally, a URL pointing to API specification can be provided
APIURL: "http://api.com/v1",

// optional base handler
Endpoint: &micro.EndpointConfig{
Subject: "echo",
Handler: micro.HandlerFunc(echoHandler),
// Endpoint can be configured with optional request/response schemas
Schema: &micro.Schema{
Request: "request.json",
Response: "response.json",
},
},
}

Expand Down Expand Up @@ -119,6 +127,37 @@ func ExampleWithEndpointSubject() {
}
}

func ExampleWithEndpointSchema() {
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
log.Fatal(err)
}
defer nc.Close()

echoHandler := func(req micro.Request) {
req.Respond(req.Data())
}

config := micro.Config{
Name: "EchoService",
Version: "1.0.0",
}

srv, err := micro.AddService(nc, config)
if err != nil {
log.Fatal(err)
}

schema := &micro.Schema{
Request: "request.json",
Response: "response.json",
}
err = srv.AddEndpoint("Echo", micro.HandlerFunc(echoHandler), micro.WithEndpointSchema(schema))
if err != nil {
log.Fatal(err)
}
}

func ExampleService_AddGroup() {
nc, err := nats.Connect("127.0.0.1:4222")
if err != nil {
Expand Down
91 changes: 73 additions & 18 deletions micro/service.go
Expand Up @@ -17,6 +17,7 @@ import (
"encoding/json"
"errors"
"fmt"
"net/url"
"regexp"
"strings"
"sync"
Expand Down Expand Up @@ -74,6 +75,7 @@ type (

endpointOpts struct {
subject string
schema *Schema
}

// ErrHandler is a function used to configure a custom error handler for a service,
Expand Down Expand Up @@ -131,8 +133,15 @@ type (
// SchemaResp is the response value for SCHEMA requests.
SchemaResp struct {
ServiceIdentity
Type string `json:"type"`
Schema Schema `json:"schema"`
Type string `json:"type"`
APIURL string `json:"api_url"`
Endpoints []EndpointSchema `json:"endpoints"`
}

EndpointSchema struct {
Name string `json:"name"`
Subject string `json:"subject"`
Schema Schema `json:"schema,omitempty"`
}

// Schema can be used to configure a schema for a service.
Expand Down Expand Up @@ -175,10 +184,10 @@ type (
// Description of the service.
Description string `json:"description"`

// Service schema
Schema Schema `json:"schema"`
// APIURL is an optional url pointing to API specification.
APIURL string `json:"api_url"`

// StatsHandler is a user-defined custom function
// StatsHandler is a user-defined custom function.
// used to calculate additional service stats.
StatsHandler StatsHandler

Expand All @@ -190,8 +199,14 @@ type (
}

EndpointConfig struct {
// Subject on which the endpoint is registered.
Subject string

// Handler used by the endpoint.
Handler Handler

// Schema is an optional request/response endpoint schema.
Schema *Schema
}

// NATSError represents an error returned by a NATS Subscription.
Expand Down Expand Up @@ -327,7 +342,11 @@ func AddService(nc *nats.Conn, config Config) (Service, error) {
go svc.asyncDispatcher.asyncCBDispatcher()

if config.Endpoint != nil {
if err := addEndpoint(svc, "default", config.Endpoint.Subject, config.Endpoint.Handler); err != nil {
opts := []EndpointOpt{WithEndpointSubject(config.Endpoint.Subject)}
if config.Endpoint.Schema != nil {
opts = append(opts, WithEndpointSchema(config.Endpoint.Schema))
}
if err := svc.AddEndpoint("default", config.Endpoint.Handler, opts...); err != nil {
svc.asyncDispatcher.close()
return nil, err
}
Expand Down Expand Up @@ -365,13 +384,8 @@ func AddService(nc *nats.Conn, config Config) (Service, error) {
}
}

schema := SchemaResp{
ServiceIdentity: svcIdentity,
Schema: config.Schema,
Type: SchemaResponseType,
}
schemaHandler := func(req Request) {
response, _ := json.Marshal(schema)
response, _ := json.Marshal(svc.schema())
if err := req.Respond(response); err != nil {
if err := req.Error("500", fmt.Sprintf("Error handling SCHEMA request: %s", err), nil); err != nil && config.ErrorHandler != nil {
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.Subject(), err.Error()}) })
Expand Down Expand Up @@ -412,10 +426,10 @@ func (s *service) AddEndpoint(name string, handler Handler, opts ...EndpointOpt)
if options.subject != "" {
subject = options.subject
}
return addEndpoint(s, name, subject, handler)
return addEndpoint(s, name, subject, handler, options.schema)
}

func addEndpoint(s *service, name, subject string, handler Handler) error {
func addEndpoint(s *service, name, subject string, handler Handler, schema *Schema) error {
if !nameRegexp.MatchString(name) {
return fmt.Errorf("%w: invalid endpoint name", ErrConfigValidation)
}
Expand All @@ -427,6 +441,7 @@ func addEndpoint(s *service, name, subject string, handler Handler) error {
EndpointConfig: EndpointConfig{
Subject: subject,
Handler: handler,
Schema: schema,
},
}
sub, err := s.nc.QueueSubscribe(
Expand Down Expand Up @@ -482,6 +497,9 @@ func (c *Config) valid() error {
if !semVerRegexp.MatchString(c.Version) {
return fmt.Errorf("%w: version: version should not be empty should match the SemVer format", ErrConfigValidation)
}
if _, err := url.Parse(c.APIURL); err != nil {
return fmt.Errorf("%w: api_url: invalid url: %s", ErrConfigValidation, err)
}
return nil
}

Expand Down Expand Up @@ -696,9 +714,13 @@ func (s *service) Stats() Stats {
Version: info.Version,
},
Endpoints: make([]*EndpointStats, 0),
Type: StatsResponseType,
Started: s.started,
}
for _, endpoint := range s.endpoints {
endpointStats := &EndpointStats{
Name: endpoint.stats.Name,
Subject: endpoint.stats.Subject,
NumRequests: endpoint.stats.NumRequests,
NumErrors: endpoint.stats.NumErrors,
LastError: endpoint.stats.LastError,
Expand Down Expand Up @@ -731,6 +753,27 @@ func (s *service) Stopped() bool {
return s.stopped
}

func (s *service) schema() SchemaResp {
endpoints := make([]EndpointSchema, 0, len(s.endpoints))
for _, e := range s.endpoints {
endpoints = append(endpoints, EndpointSchema{
Name: e.stats.Name,
Subject: e.stats.Subject,
Schema: Schema{
Request: e.Schema.Request,
Response: e.Schema.Response,
},
})
}

return SchemaResp{
ServiceIdentity: s.Info().ServiceIdentity,
Type: SchemaResponseType,
APIURL: s.APIURL,
Endpoints: endpoints,
}
}

func (e *NATSError) Error() string {
return fmt.Sprintf("%q: %s", e.Subject, e.Description)
}
Expand All @@ -750,7 +793,7 @@ func (g *group) AddEndpoint(name string, handler Handler, opts ...EndpointOpt) e
if g.prefix == "" {
endpointSubject = subject
}
return addEndpoint(g.service, name, endpointSubject, handler)
return addEndpoint(g.service, name, endpointSubject, handler, options.schema)
}

func (g *group) AddGroup(name string) Group {
Expand All @@ -773,9 +816,14 @@ func (e *Endpoint) stop() error {
if err := e.subscription.Drain(); err != nil {
return fmt.Errorf("draining subscription for request handler: %w", err)
}
for i, endpoint := range e.service.endpoints {
if endpoint.Subject == e.Subject {
e.service.endpoints = append(e.service.endpoints[:i], e.service.endpoints[i+1:]...)
for i := 0; i < len(e.service.endpoints); i++ {
if e.service.endpoints[i].Subject == e.Subject {
if i != len(e.service.endpoints)-1 {
e.service.endpoints = append(e.service.endpoints[:i], e.service.endpoints[i+1:]...)
} else {
e.service.endpoints = e.service.endpoints[:i]
}
i++
}
}
return nil
Expand Down Expand Up @@ -817,3 +865,10 @@ func WithEndpointSubject(subject string) EndpointOpt {
return nil
}
}

func WithEndpointSchema(schema *Schema) EndpointOpt {
return func(e *endpointOpts) error {
e.schema = schema
return nil
}
}

0 comments on commit 34ad0e9

Please sign in to comment.