diff --git a/proxy/merging.go b/proxy/merging.go index fc235ed34..36db281f5 100644 --- a/proxy/merging.go +++ b/proxy/merging.go @@ -88,6 +88,7 @@ func sequentialMerge(patterns []string, timeout time.Duration, rc ResponseCombin errCh := make(chan error, 1) acc := newIncrementalMergeAccumulator(len(next), rc) + TxLoop: for i, n := range next { if i > 0 { for _, match := range reMergeKey.FindAllStringSubmatch(patterns[i], -1) { @@ -109,12 +110,16 @@ func sequentialMerge(patterns []string, timeout time.Duration, rc ResponseCombin requestPart(localCtx, n, request, out, errCh) select { case err := <-errCh: + if i == 0 { + cancel() + return nil, err + } acc.Merge(nil, err) - break + break TxLoop case response := <-out: acc.Merge(response, nil) if !response.IsComplete { - break + break TxLoop } parts[i] = response } diff --git a/proxy/merging_test.go b/proxy/merging_test.go index 745f6db47..c9b2623dd 100644 --- a/proxy/merging_test.go +++ b/proxy/merging_test.go @@ -209,6 +209,53 @@ func TestNewMergeDataMiddleware_sequential_erroredBackend(t *testing.T) { } } +func TestNewMergeDataMiddleware_sequential_erroredFirstBackend(t *testing.T) { + timeout := 500 + endpoint := config.EndpointConfig{ + Backend: []*config.Backend{ + {URLPattern: "/"}, + {URLPattern: "/aaa/{{.Resp0_supu}}"}, + {URLPattern: "/aaa/{{.Resp0_supu}}?x={{.Resp1_tupu}}"}, + }, + Timeout: time.Duration(timeout) * time.Millisecond, + ExtraConfig: config.ExtraConfig{ + Namespace: map[string]interface{}{ + isSequentialKey: true, + }, + }, + } + expecterErr := errors.New("wait for me") + mw := NewMergeDataMiddleware(&endpoint) + p := mw( + func(ctx context.Context, _ *Request) (*Response, error) { + return nil, expecterErr + }, + func(ctx context.Context, r *Request) (*Response, error) { + t.Error("this backend should never be called") + return nil, nil + }, + func(ctx context.Context, r *Request) (*Response, error) { + t.Error("this backend should never be called") + return nil, nil + }, + ) + mustEnd := time.After(time.Duration(2*timeout) * time.Millisecond) + out, err := p(context.Background(), &Request{Params: map[string]string{}}) + if err != expecterErr { + t.Errorf("The middleware did not propagate the expected error: %v\n", err) + return + } + if out != nil { + t.Errorf("The proxy returned a not null result %v", out) + return + } + select { + case <-mustEnd: + t.Errorf("We were expecting a response but we got none\n") + default: + } +} + func TestNewMergeDataMiddleware_mergeIncompleteResults(t *testing.T) { timeout := 500 backend := config.Backend{} diff --git a/test/integration_test.go b/test/integration_test.go index 8e5597d76..4328a7656 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -115,13 +115,14 @@ func testKrakenD(t *testing.T, runRouter func(logging.Logger, *config.ServiceCon } for _, tc := range []struct { - name string - url string - method string - headers map[string]string - body string - expBody string - expHeaders map[string]string + name string + url string + method string + headers map[string]string + body string + expBody string + expHeaders map[string]string + expStatusCode int }{ { name: "static", @@ -237,11 +238,26 @@ func testKrakenD(t *testing.T, runRouter func(logging.Logger, *config.ServiceCon expBody: `{"headers":{"Accept-Encoding":["gzip"],"User-Agent":["KrakenD Version undefined"],"X-Test-1":["some"],"X-Test-2":["none"]},"path":"/all-params","query":{}}`, }, { - name: "sequential", - url: "/sequential/foo", + name: "sequential ok", + url: "/sequential/ok/foo", expHeaders: defaultHeaders, expBody: `{"path":"/recipient/42","random":42}`, }, + { + name: "sequential ko first", + url: "/sequential/ko/first/foo", + expHeaders: map[string]string{ + "X-KrakenD-Completed": "false", + "X-Krakend": "Version undefined", + }, + expStatusCode: 500, + }, + { + name: "sequential ko last", + url: "/sequential/ko/last/foo", + expHeaders: incompleteHeader, + expBody: `{"random":42}`, + }, { name: "redirect", url: "/redirect", @@ -282,9 +298,12 @@ func testKrakenD(t *testing.T, runRouter func(logging.Logger, *config.ServiceCon t.Errorf("%s: nil response", resp.Request.URL.Path) return } - - if resp.StatusCode != http.StatusOK { - t.Errorf("%s: unexpected status code: %d", resp.Request.URL.Path, resp.StatusCode) + expectedStatusCode := http.StatusOK + if tc.expStatusCode != 0 { + expectedStatusCode = tc.expStatusCode + } + if resp.StatusCode != expectedStatusCode { + t.Errorf("%s: unexpected status code. have: %d, want: %d", resp.Request.URL.Path, resp.StatusCode, expectedStatusCode) } for k, v := range tc.expHeaders { @@ -292,6 +311,10 @@ func testKrakenD(t *testing.T, runRouter func(logging.Logger, *config.ServiceCon t.Errorf("%s: unexpected header %s: %s", resp.Request.URL.Path, k, c) } } + if tc.expBody == "" { + return + } + b, _ := ioutil.ReadAll(resp.Body) resp.Body.Close() if tc.expBody != string(b) { diff --git a/test/krakend.json b/test/krakend.json index cbeaf3176..21001d159 100644 --- a/test/krakend.json +++ b/test/krakend.json @@ -227,7 +227,7 @@ ] }, { - "endpoint": "/sequential/{param}", + "endpoint": "/sequential/ok/{param}", "extra_config": { "github.com/devopsfaith/krakend/proxy": { "sequential": true @@ -246,6 +246,46 @@ } ] }, + { + "endpoint": "/sequential/ko/first/{param}", + "extra_config": { + "github.com/devopsfaith/krakend/proxy": { + "sequential": true + } + }, + "backend": [ + { + "url_pattern": "/provider/{param}", + "host": [ "{{.b4}}" ], + "whitelist": [ "random" ] + }, + { + "url_pattern": "/recipient/{resp0_random}", + "host": [ "{{.b7}}" ], + "whitelist": [ "path" ] + } + ] + }, + { + "endpoint": "/sequential/ko/last/{param}", + "extra_config": { + "github.com/devopsfaith/krakend/proxy": { + "sequential": true + } + }, + "backend": [ + { + "url_pattern": "/provider/{param}", + "host": [ "{{.b7}}" ], + "whitelist": [ "random" ] + }, + { + "url_pattern": "/recipient/{resp0_random}", + "host": [ "{{.b4}}" ], + "whitelist": [ "path" ] + } + ] + }, { "endpoint": "/redirect", "backend": [