Skip to content

Commit

Permalink
Merge b68dad6 into 61a9345
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Dec 21, 2022
2 parents 61a9345 + b68dad6 commit f6f823e
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 108 deletions.
7 changes: 3 additions & 4 deletions micro/example_package_test.go
Expand Up @@ -34,16 +34,15 @@ func Example() {

// Service handler is a function which takes Service.Request as argument.
// req.Respond or req.Error should be used to respond to the request.
incrementHandler := func(req *Request) error {
val, err := strconv.Atoi(string(req.Data))
incrementHandler := func(req *Request) {
val, err := strconv.Atoi(string(req.Data()))
if err != nil {
req.Error("400", "request data should be a number", nil)
return nil
return
}

responseData := val + 1
req.Respond([]byte(strconv.Itoa(responseData)))
return nil
}

config := Config{
Expand Down
26 changes: 12 additions & 14 deletions micro/example_test.go
Expand Up @@ -28,9 +28,8 @@ func ExampleAddService() {
}
defer nc.Close()

echoHandler := func(req *Request) error {
req.Respond(req.Data)
return nil
echoHandler := func(req *Request) {
req.Respond(req.Data())
}

config := Config{
Expand Down Expand Up @@ -73,7 +72,7 @@ func ExampleService_Info() {
Name: "EchoService",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) error { return nil },
Handler: func(*Request) {},
},
}

Expand Down Expand Up @@ -101,7 +100,7 @@ func ExampleService_Stats() {
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) error { return nil },
Handler: func(*Request) {},
},
}

Expand All @@ -127,7 +126,7 @@ func ExampleService_Stop() {
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) error { return nil },
Handler: func(*Request) {},
},
}

Expand Down Expand Up @@ -158,7 +157,7 @@ func ExampleService_Stopped() {
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) error { return nil },
Handler: func(*Request) {},
},
}

Expand Down Expand Up @@ -187,7 +186,7 @@ func ExampleService_Reset() {
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) error { return nil },
Handler: func(*Request) {},
},
}

Expand Down Expand Up @@ -220,14 +219,14 @@ func ExampleControlSubject() {

// Output:
// $SRV.PING
// $SRV.PING.COOLSERVICE
// $SRV.PING.COOLSERVICE.123
// $SRV.PING.CoolService
// $SRV.PING.CoolService.123
}

func ExampleRequest_Respond() {
handler := func(req *Request) {
// respond to the request
if err := req.Respond(req.Data); err != nil {
if err := req.Respond(req.Data()); err != nil {
log.Fatal(err)
}
}
Expand All @@ -254,13 +253,12 @@ func ExampleRequest_RespondJSON() {
}

func ExampleRequest_Error() {
handler := func(req *Request) error {
handler := func(req *Request) {
// respond with an error
// Error sets Nats-Service-Error and Nats-Service-Error-Code headers in the response
if err := req.Error("400", "bad request", []byte(`{"error": "value should be a number"}`)); err != nil {
return err
log.Fatal(err)
}
return nil
}

fmt.Printf("%T", handler)
Expand Down
83 changes: 68 additions & 15 deletions micro/request.go
Expand Up @@ -22,17 +22,19 @@ import (
)

type (
// Request represents service request available in the service handler.
// It exposes methods to respond to the request, as well as
// getting the request data and headers.
Request struct {
*nats.Msg
msg *nats.Msg
respondError error
}

// RequestHandler is a function used as a Handler for a service.
// It takes a request, which contains the data (payload and headers) of the request,
// as well as exposes methods to respond to the request.
//
// RequestHandler returns an error - if returned, the request will be accounted form in stats (in num_requests),
// and last_error will be set with the value.
RequestHandler func(*Request) error
RequestHandler func(*Request)

// Headers is a wrapper around [*nats.Header]
Headers nats.Header
)

var (
Expand All @@ -41,27 +43,37 @@ var (
ErrArgRequired = errors.New("argument required")
)

func (r *Request) Respond(response []byte) error {
if err := r.Msg.Respond(response); err != nil {
return fmt.Errorf("%w: %s", ErrRespond, err)
// RespondOpt is a
type RespondOpt func(*nats.Msg)

func (r *Request) Respond(response []byte, opts ...RespondOpt) error {
respMsg := &nats.Msg{
Data: response,
}
for _, opt := range opts {
opt(respMsg)
}

if err := r.msg.RespondMsg(respMsg); err != nil {
r.respondError = fmt.Errorf("%w: %s", ErrRespond, err)
return r.respondError
}

return nil
}

func (r *Request) RespondJSON(response interface{}) error {
func (r *Request) RespondJSON(response interface{}, opts ...RespondOpt) error {
resp, err := json.Marshal(response)
if err != nil {
return ErrMarshalResponse
}

return r.Respond(resp)
return r.Respond(resp, opts...)
}

// Error prepares and publishes error response from a handler.
// A response error should be set containing an error code and description.
// Optionally, data can be set as response payload.
func (r *Request) Error(code, description string, data []byte) error {
func (r *Request) Error(code, description string, data []byte, opts ...RespondOpt) error {
if code == "" {
return fmt.Errorf("%w: error code", ErrArgRequired)
}
Expand All @@ -74,6 +86,47 @@ func (r *Request) Error(code, description string, data []byte) error {
ErrorCodeHeader: []string{code},
},
}
for _, opt := range opts {
opt(response)
}

response.Data = data
return r.RespondMsg(response)
if err := r.msg.RespondMsg(response); err != nil {
r.respondError = err
return err
}
return nil
}

func WithHeaders(headers Headers) RespondOpt {
return func(m *nats.Msg) {
if m.Header == nil {
m.Header = nats.Header(headers)
return
}

for k, v := range headers {
m.Header[k] = v
}
}
}

func (r *Request) Data() []byte {
return r.msg.Data
}

func (r *Request) Headers() Headers {
return Headers(r.msg.Header)
}

// Get gets the first value associated with the given key.
// It is case-sensitive.
func (h Headers) Get(key string) string {
return nats.Header(h).Get(key)
}

// Values returns all values associated with the given key.
// It is case-sensitive.
func (h Headers) Values(key string) []string {
return nats.Header(h).Values(key)
}
44 changes: 24 additions & 20 deletions micro/service.go
Expand Up @@ -18,7 +18,6 @@ import (
"errors"
"fmt"
"regexp"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -252,7 +251,7 @@ func AddService(nc *nats.Conn, config Config) (Service, error) {
var err error

svc.reqSub, err = nc.QueueSubscribe(config.Endpoint.Subject, QG, func(m *nats.Msg) {
svc.reqHandler(&Request{Msg: m})
svc.reqHandler(&Request{msg: m})
})
if err != nil {
svc.asyncDispatcher.close()
Expand All @@ -261,48 +260,44 @@ func AddService(nc *nats.Conn, config Config) (Service, error) {

ping := Ping(svcIdentity)

infoHandler := func(req *Request) error {
infoHandler := func(req *Request) {
response, _ := json.Marshal(svc.Info())
if err := req.Respond(response); err != nil {
if err := req.Error("500", fmt.Sprintf("Error handling INFO request: %s", err), nil); err != nil && config.ErrorHandler != nil {
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.Subject, err.Error()}) })
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.msg.Subject, err.Error()}) })
}
}
return nil
}

pingHandler := func(req *Request) error {
pingHandler := func(req *Request) {
response, _ := json.Marshal(ping)
if err := req.Respond(response); err != nil {
if err := req.Error("500", fmt.Sprintf("Error handling PING request: %s", err), nil); err != nil && config.ErrorHandler != nil {
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.Subject, err.Error()}) })
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.msg.Subject, err.Error()}) })
}
}
return nil
}

statsHandler := func(req *Request) error {
statsHandler := func(req *Request) {
response, _ := json.Marshal(svc.Stats())
if err := req.Respond(response); err != nil {
if err := req.Error("500", fmt.Sprintf("Error handling STATS request: %s", err), nil); err != nil && config.ErrorHandler != nil {
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.Subject, err.Error()}) })
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.msg.Subject, err.Error()}) })
}
}
return nil
}

schema := SchemaResp{
ServiceIdentity: svcIdentity,
Schema: config.Schema,
}
schemaHandler := func(req *Request) error {
schemaHandler := func(req *Request) {
response, _ := json.Marshal(schema)
if err := req.Respond(response); err != nil {
if err := req.Error("500", fmt.Sprintf("Error handling SCHEMA request: %s", err), nil); err != nil && config.ErrorHandler != nil {
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.Subject, err.Error()}) })
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.msg.Subject, err.Error()}) })
}
}
return nil
}

if err := svc.verbHandlers(nc, InfoVerb, infoHandler); err != nil {
Expand Down Expand Up @@ -368,6 +363,8 @@ func (e *Endpoint) valid() error {
}

func (svc *service) setupAsyncCallbacks() {
svc.m.Lock()
defer svc.m.Unlock()
svc.natsHandlers.closed = svc.conn.ClosedHandler()
if svc.natsHandlers.closed != nil {
svc.conn.SetClosedHandler(func(c *nats.Conn) {
Expand All @@ -392,6 +389,10 @@ func (svc *service) setupAsyncCallbacks() {
Description: err.Error(),
})
}
svc.m.Lock()
svc.stats.NumErrors++
svc.stats.LastError = err.Error()
svc.m.Unlock()
svc.Stop()
svc.natsHandlers.asyncErr(c, s, err)
})
Expand All @@ -406,6 +407,10 @@ func (svc *service) setupAsyncCallbacks() {
Description: err.Error(),
})
}
svc.m.Lock()
svc.stats.NumErrors++
svc.stats.LastError = err.Error()
svc.m.Unlock()
svc.Stop()
})
}
Expand Down Expand Up @@ -448,7 +453,7 @@ func (s *service) addInternalHandler(nc *nats.Conn, verb Verb, kind, id, name st
}

s.verbSubs[name], err = nc.Subscribe(subj, func(msg *nats.Msg) {
handler(&Request{Msg: msg})
handler(&Request{msg: msg})
})
if err != nil {
s.Stop()
Expand All @@ -457,19 +462,19 @@ func (s *service) addInternalHandler(nc *nats.Conn, verb Verb, kind, id, name st
return nil
}

// reqHandler itself
// reqHandller invokes the service request handler and modifies service stats
func (s *service) reqHandler(req *Request) {
start := time.Now()
err := s.Endpoint.Handler(req)
s.Endpoint.Handler(req)
s.m.Lock()
s.stats.NumRequests++
s.stats.ProcessingTime += time.Since(start)
avgProcessingTime := s.stats.ProcessingTime.Nanoseconds() / int64(s.stats.NumRequests)
s.stats.AverageProcessingTime = time.Duration(avgProcessingTime)

if err != nil {
if req.respondError != nil {
s.stats.NumErrors++
s.stats.LastError = err.Error()
s.stats.LastError = req.respondError.Error()
}
s.m.Unlock()
}
Expand Down Expand Up @@ -577,7 +582,6 @@ func ControlSubject(verb Verb, name, id string) (string, error) {
if name == "" && id != "" {
return "", ErrServiceNameRequired
}
name = strings.ToUpper(name)
if name == "" && id == "" {
return fmt.Sprintf("%s.%s", APIPrefix, verbStr), nil
}
Expand Down

0 comments on commit f6f823e

Please sign in to comment.