Skip to content

Commit

Permalink
Return error from handler to be used in stats
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Dec 15, 2022
1 parent 65046c9 commit 1f3fcdc
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 39 deletions.
5 changes: 3 additions & 2 deletions micro/example_package_test.go
Expand Up @@ -34,15 +34,16 @@ func Example() {

// Service handler is a function which takes Service.Request as argument.
// req.Respond or req.Error should be used to respond to the request.
incrementHandler := func(req *Request) {
incrementHandler := func(req *Request) error {
val, err := strconv.Atoi(string(req.Data))
if err != nil {
req.Error("400", "request data should be a number", nil)
return
return nil
}

responseData := val + 1
req.Respond([]byte(strconv.Itoa(responseData)))
return nil
}

config := Config{
Expand Down
20 changes: 11 additions & 9 deletions micro/example_test.go
Expand Up @@ -27,8 +27,9 @@ func ExampleAddService() {
}
defer nc.Close()

echoHandler := func(req *Request) {
echoHandler := func(req *Request) error {
req.Respond(req.Data)
return nil
}

config := Config{
Expand Down Expand Up @@ -71,7 +72,7 @@ func ExampleService_Info() {
Name: "EchoService",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) {},
Handler: func(*Request) error { return nil },
},
}

Expand Down Expand Up @@ -99,7 +100,7 @@ func ExampleService_Stats() {
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) {},
Handler: func(*Request) error { return nil },
},
}

Expand All @@ -109,7 +110,7 @@ func ExampleService_Stats() {
stats := srv.Stats()

fmt.Println(stats.AverageProcessingTime)
fmt.Println(stats.TotalProcessingTime)
fmt.Println(stats.ProcessingTime)

}

Expand All @@ -125,7 +126,7 @@ func ExampleService_Stop() {
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) {},
Handler: func(*Request) error { return nil },
},
}

Expand Down Expand Up @@ -156,7 +157,7 @@ func ExampleService_Stopped() {
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) {},
Handler: func(*Request) error { return nil },
},
}

Expand Down Expand Up @@ -185,7 +186,7 @@ func ExampleService_Reset() {
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "echo",
Handler: func(*Request) {},
Handler: func(*Request) error { return nil },
},
}

Expand Down Expand Up @@ -252,12 +253,13 @@ func ExampleRequest_RespondJSON() {
}

func ExampleRequest_Error() {
handler := func(req *Request) {
handler := func(req *Request) error {
// respond with an error
// Error sets Nats-Service-Error and Nats-Service-Error-Code headers in the response
if err := req.Error("400", "bad request", []byte(`{"error": "value should be a number"}`)); err != nil {
log.Fatal(err)
return err
}
return nil
}

fmt.Printf("%T", handler)
Expand Down
9 changes: 6 additions & 3 deletions micro/request.go
Expand Up @@ -24,11 +24,15 @@ import (
type (
Request struct {
*nats.Msg
errResponse bool
}

// RequestHandler is a function used as a Handler for a service.
RequestHandler func(*Request)
// It takes a request, which contains the data (payload and headers) of the request,
// as well as exposes methods to respond to the request.
//
// RequestHandler returns an error - if returned, the request will be accounted form in stats (in num_requests),
// and last_error will be set with the value.
RequestHandler func(*Request) error
)

var (
Expand Down Expand Up @@ -71,6 +75,5 @@ func (r *Request) Error(code, description string, data []byte) error {
},
}
response.Data = data
r.errResponse = true
return r.RespondMsg(response)
}
26 changes: 16 additions & 10 deletions micro/service.go
Expand Up @@ -71,7 +71,8 @@ type (
ServiceIdentity
NumRequests int `json:"num_requests"`
NumErrors int `json:"num_errors"`
TotalProcessingTime time.Duration `json:"total_processing_time"`
LastError error `json:"last_error"`
ProcessingTime time.Duration `json:"processing_time"`
AverageProcessingTime time.Duration `json:"average_processing_time"`
Started string `json:"started"`
Data interface{} `json:"data,omitempty"`
Expand Down Expand Up @@ -263,40 +264,44 @@ func AddService(nc *nats.Conn, config Config) (Service, error) {
Version: config.Version,
}

infoHandler := func(req *Request) {
infoHandler := func(req *Request) error {
response, _ := json.Marshal(svc.Info())
if err := req.Respond(response); err != nil {
if err := req.Error("500", fmt.Sprintf("Error handling INFO request: %s", err), nil); err != nil && config.ErrorHandler != nil {
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.Subject, err.Error()}) })
}
}
return nil
}

pingHandler := func(req *Request) {
pingHandler := func(req *Request) error {
response, _ := json.Marshal(ping)
if err := req.Respond(response); err != nil {
if err := req.Error("500", fmt.Sprintf("Error handling PING request: %s", err), nil); err != nil && config.ErrorHandler != nil {
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.Subject, err.Error()}) })
}
}
return nil
}

statsHandler := func(req *Request) {
statsHandler := func(req *Request) error {
response, _ := json.Marshal(svc.Stats())
if err := req.Respond(response); err != nil {
if err := req.Error("500", fmt.Sprintf("Error handling STATS request: %s", err), nil); err != nil && config.ErrorHandler != nil {
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.Subject, err.Error()}) })
}
}
return nil
}

schemaHandler := func(req *Request) {
schemaHandler := func(req *Request) error {
response, _ := json.Marshal(svc.Schema)
if err := req.Respond(response); err != nil {
if err := req.Error("500", fmt.Sprintf("Error handling SCHEMA request: %s", err), nil); err != nil && config.ErrorHandler != nil {
svc.asyncDispatcher.push(func() { config.ErrorHandler(svc, &NATSError{req.Subject, err.Error()}) })
}
}
return nil
}

if err := svc.verbHandlers(nc, InfoVerb, infoHandler); err != nil {
Expand Down Expand Up @@ -456,15 +461,16 @@ func (s *service) addInternalHandler(nc *nats.Conn, verb Verb, kind, id, name st
// reqHandler itself
func (s *service) reqHandler(req *Request) {
start := time.Now()
s.Endpoint.Handler(req)
err := s.Endpoint.Handler(req)
s.m.Lock()
s.stats.NumRequests++
s.stats.TotalProcessingTime += time.Since(start)
avgProcessingTime := s.stats.TotalProcessingTime.Nanoseconds() / int64(s.stats.NumRequests)
s.stats.ProcessingTime += time.Since(start)
avgProcessingTime := s.stats.ProcessingTime.Nanoseconds() / int64(s.stats.NumRequests)
s.stats.AverageProcessingTime = time.Duration(avgProcessingTime)

if req.errResponse {
if err != nil {
s.stats.NumErrors++
s.stats.LastError = err
}
s.m.Unlock()
}
Expand Down Expand Up @@ -535,7 +541,7 @@ func (s *service) Stats() Stats {
},
NumRequests: s.stats.NumRequests,
NumErrors: s.stats.NumErrors,
TotalProcessingTime: s.stats.TotalProcessingTime,
ProcessingTime: s.stats.ProcessingTime,
AverageProcessingTime: s.stats.AverageProcessingTime,
Started: s.stats.Started,
Data: s.stats.Data,
Expand Down
45 changes: 30 additions & 15 deletions micro/service_test.go
Expand Up @@ -39,12 +39,14 @@ func TestServiceBasics(t *testing.T) {
defer nc.Close()

// Stub service.
doAdd := func(req *Request) {
doAdd := func(req *Request) error {
if rand.Intn(10) == 0 {
if err := req.Error("500", "Unexpected error!", nil); err != nil {
if err := req.Error("400", "client error!", nil); err != nil {
t.Fatalf("Unexpected error when sending error response: %v", err)
}
return

// for client-side errors, return nil to avoid tracking the errors in stats
return nil
}
// Happy Path.
// Random delay between 5-10ms
Expand All @@ -53,8 +55,9 @@ func TestServiceBasics(t *testing.T) {
if err := req.Error("500", "Unexpected error!", nil); err != nil {
t.Fatalf("Unexpected error when sending error response: %v", err)
}
return
return err
}
return nil
}

var svcs []Service
Expand Down Expand Up @@ -191,7 +194,7 @@ func TestServiceBasics(t *testing.T) {
}

func TestAddService(t *testing.T) {
testHandler := func(*Request) {}
testHandler := func(*Request) error { return nil }
errNats := make(chan struct{})
errService := make(chan struct{})
closedNats := make(chan struct{})
Expand Down Expand Up @@ -530,7 +533,7 @@ func TestMonitoringHandlers(t *testing.T) {
Version: "0.1.0",
Endpoint: Endpoint{
Subject: "test.sub",
Handler: func(*Request) {},
Handler: func(*Request) error { return nil },
},
Schema: Schema{
Request: "some_schema",
Expand Down Expand Up @@ -703,11 +706,19 @@ func TestMonitoringHandlers(t *testing.T) {
}

func TestServiceStats(t *testing.T) {
handler := func(r *Request) {
handler := func(r *Request) error {
if bytes.Equal(r.Data, []byte("err")) {
r.Error("500", "oops", nil)
return fmt.Errorf("oops")
}

// client errors (validation etc.) should not be accounted for in stats
if bytes.Equal(r.Data, []byte("client_err")) {
r.Error("400", "bad request", nil)
return nil
}
r.Respond([]byte("ok"))
return nil
}
tests := []struct {
name string
Expand Down Expand Up @@ -803,6 +814,9 @@ func TestServiceStats(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
}
if _, err := nc.Request(srv.Info().Subject, []byte("client_err"), time.Second); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := nc.Request(srv.Info().Subject, []byte("err"), time.Second); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
Expand All @@ -824,11 +838,11 @@ func TestServiceStats(t *testing.T) {
if stats.ID != info.ID {
t.Errorf("Unexpected service name; want: %s; got: %s", info.ID, stats.ID)
}
if stats.NumRequests != 11 {
t.Errorf("Unexpected num_requests; want: 11; got: %d", stats.NumRequests)
if stats.NumRequests != 12 {
t.Errorf("Unexpected num_requests; want: 12; got: %d", stats.NumRequests)
}
if stats.NumErrors != 1 {
t.Errorf("Unexpected num_requests; want: 11; got: %d", stats.NumErrors)
t.Errorf("Unexpected num_requests; want: 1; got: %d", stats.NumErrors)
}
if test.expectedStats != nil {
if val, ok := stats.Data.(map[string]interface{}); !ok || !reflect.DeepEqual(val, test.expectedStats) {
Expand Down Expand Up @@ -920,7 +934,7 @@ func TestRequestRespond(t *testing.T) {
errDesc := test.errDescription
errData := test.errData
// Stub service.
handler := func(req *Request) {
handler := func(req *Request) error {
if errors.Is(test.withRespondError, ErrRespond) {
nc.Close()
}
Expand All @@ -931,7 +945,7 @@ func TestRequestRespond(t *testing.T) {
if !errors.Is(err, respError) {
t.Fatalf("Expected error: %v; got: %v", respError, err)
}
return
return nil
}
if err != nil {
t.Fatalf("Unexpected error when sending response: %v", err)
Expand All @@ -942,25 +956,26 @@ func TestRequestRespond(t *testing.T) {
if !errors.Is(err, respError) {
t.Fatalf("Expected error: %v; got: %v", respError, err)
}
return
return nil
}
if err != nil {
t.Fatalf("Unexpected error when sending response: %v", err)
}
}
return
return nil
}

err := req.Error(errCode, errDesc, errData)
if respError != nil {
if !errors.Is(err, respError) {
t.Fatalf("Expected error: %v; got: %v", respError, err)
}
return
return nil
}
if err != nil {
t.Fatalf("Unexpected error when sending response: %v", err)
}
return nil
}

svc, err := AddService(nc, Config{
Expand Down

0 comments on commit 1f3fcdc

Please sign in to comment.