diff --git a/internal/server/api_content.go b/internal/server/api_content.go index a5bf07dc33..2584cfa425 100644 --- a/internal/server/api_content.go +++ b/internal/server/api_content.go @@ -3,7 +3,6 @@ package server import ( "context" "errors" - "io/ioutil" "net/http" "github.com/gorilla/mux" @@ -13,7 +12,7 @@ import ( "github.com/kopia/kopia/repo/content" ) -func (s *Server) handleContentGet(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleContentGet(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { dr, ok := s.rep.(*repo.DirectRepository) if !ok { return nil, notFoundError("content not found") @@ -29,7 +28,7 @@ func (s *Server) handleContentGet(ctx context.Context, r *http.Request) (interfa return data, nil } -func (s *Server) handleContentInfo(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleContentInfo(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { dr, ok := s.rep.(*repo.DirectRepository) if !ok { return nil, notFoundError("content not found") @@ -50,7 +49,7 @@ func (s *Server) handleContentInfo(ctx context.Context, r *http.Request) (interf } } -func (s *Server) handleContentPut(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleContentPut(ctx context.Context, r *http.Request, data []byte) (interface{}, *apiError) { dr, ok := s.rep.(*repo.DirectRepository) if !ok { return nil, notFoundError("content not found") @@ -59,11 +58,6 @@ func (s *Server) handleContentPut(ctx context.Context, r *http.Request) (interfa cid := content.ID(mux.Vars(r)["contentID"]) prefix := cid.Prefix() - data, err := ioutil.ReadAll(r.Body) - if err != nil { - return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request body") - } - actualCID, err := dr.Content.WriteContent(ctx, data, prefix) if err != nil { return nil, internalServerError(err) diff --git a/internal/server/api_manifest.go b/internal/server/api_manifest.go index 4fa585d581..02e3ac7ac2 100644 --- a/internal/server/api_manifest.go +++ b/internal/server/api_manifest.go @@ -13,7 +13,7 @@ import ( "github.com/kopia/kopia/repo/manifest" ) -func (s *Server) handleManifestGet(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleManifestGet(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { // password already validated by a wrapper, no need to check here. userAtHost, _, _ := r.BasicAuth() @@ -40,7 +40,7 @@ func (s *Server) handleManifestGet(ctx context.Context, r *http.Request) (interf }, nil } -func (s *Server) handleManifestDelete(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleManifestDelete(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { mid := manifest.ID(mux.Vars(r)["manifestID"]) err := s.rep.DeleteManifest(ctx, mid) @@ -55,7 +55,7 @@ func (s *Server) handleManifestDelete(ctx context.Context, r *http.Request) (int return &serverapi.Empty{}, nil } -func (s *Server) handleManifestList(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleManifestList(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { // password already validated by a wrapper, no need to check here. userAtHost, _, _ := r.BasicAuth() @@ -95,10 +95,10 @@ func filterManifests(manifests []*manifest.EntryMetadata, userAtHost string) []* return result } -func (s *Server) handleManifestCreate(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleManifestCreate(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { var req remoterepoapi.ManifestWithMetadata - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + if err := json.Unmarshal(body, &req); err != nil { return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request") } diff --git a/internal/server/api_policies.go b/internal/server/api_policies.go index f667b025c9..4758fcebea 100644 --- a/internal/server/api_policies.go +++ b/internal/server/api_policies.go @@ -12,7 +12,7 @@ import ( "github.com/kopia/kopia/snapshot/policy" ) -func (s *Server) handlePolicyList(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handlePolicyList(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { policies, err := policy.ListPolicies(ctx, s.rep) if err != nil { return nil, internalServerError(err) @@ -50,7 +50,7 @@ func getPolicyTargetFromURL(u *url.URL) snapshot.SourceInfo { } } -func (s *Server) handlePolicyGet(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handlePolicyGet(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { pol, err := policy.GetDefinedPolicy(ctx, s.rep, getPolicyTargetFromURL(r.URL)) if errors.Is(err, policy.ErrPolicyNotFound) { return nil, requestError(serverapi.ErrorNotFound, "policy not found") @@ -59,7 +59,7 @@ func (s *Server) handlePolicyGet(ctx context.Context, r *http.Request) (interfac return pol, nil } -func (s *Server) handlePolicyDelete(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handlePolicyDelete(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { if err := policy.RemovePolicy(ctx, s.rep, getPolicyTargetFromURL(r.URL)); err != nil { return nil, internalServerError(err) } @@ -71,9 +71,9 @@ func (s *Server) handlePolicyDelete(ctx context.Context, r *http.Request) (inter return &serverapi.Empty{}, nil } -func (s *Server) handlePolicyPut(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handlePolicyPut(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { newPolicy := &policy.Policy{} - if err := json.NewDecoder(r.Body).Decode(newPolicy); err != nil { + if err := json.Unmarshal(body, newPolicy); err != nil { return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request body") } diff --git a/internal/server/api_repo.go b/internal/server/api_repo.go index 499839d4e3..4b51a59e50 100644 --- a/internal/server/api_repo.go +++ b/internal/server/api_repo.go @@ -20,7 +20,7 @@ import ( "github.com/kopia/kopia/snapshot/policy" ) -func (s *Server) handleRepoParameters(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleRepoParameters(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { dr, ok := s.rep.(*repo.DirectRepository) if !ok { return &serverapi.StatusResponse{ @@ -37,7 +37,7 @@ func (s *Server) handleRepoParameters(ctx context.Context, r *http.Request) (int return rp, nil } -func (s *Server) handleRepoStatus(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleRepoStatus(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { if s.rep == nil { return &serverapi.StatusResponse{ Connected: false, @@ -79,14 +79,14 @@ func maybeDecodeToken(req *serverapi.ConnectRepositoryRequest) *apiError { return nil } -func (s *Server) handleRepoCreate(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleRepoCreate(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { if s.rep != nil { return nil, requestError(serverapi.ErrorAlreadyConnected, "already connected") } var req serverapi.CreateRepositoryRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + if err := json.Unmarshal(body, &req); err != nil { return nil, requestError(serverapi.ErrorMalformedRequest, "unable to decode request: "+err.Error()) } @@ -125,17 +125,17 @@ func (s *Server) handleRepoCreate(ctx context.Context, r *http.Request) (interfa return nil, internalServerError(errors.Wrap(err, "flush")) } - return s.handleRepoStatus(ctx, r) + return s.handleRepoStatus(ctx, r, nil) } -func (s *Server) handleRepoConnect(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleRepoConnect(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { if s.rep != nil { return nil, requestError(serverapi.ErrorAlreadyConnected, "already connected") } var req serverapi.ConnectRepositoryRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + if err := json.Unmarshal(body, &req); err != nil { return nil, requestError(serverapi.ErrorMalformedRequest, "unable to decode request: "+err.Error()) } @@ -147,10 +147,10 @@ func (s *Server) handleRepoConnect(ctx context.Context, r *http.Request) (interf return nil, err } - return s.handleRepoStatus(ctx, r) + return s.handleRepoStatus(ctx, r, nil) } -func (s *Server) handleRepoSupportedAlgorithms(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleRepoSupportedAlgorithms(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { res := &serverapi.SupportedAlgorithmsResponse{ DefaultHashAlgorithm: hashing.DefaultAlgorithm, HashAlgorithms: hashing.SupportedAlgorithms(), @@ -200,7 +200,7 @@ func (s *Server) connectAndOpen(ctx context.Context, conn blob.ConnectionInfo, p return nil } -func (s *Server) handleRepoDisconnect(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleRepoDisconnect(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { // release shared lock so that SetRepository can acquire exclusive lock s.mu.RUnlock() err := s.SetRepository(ctx, nil) @@ -217,7 +217,7 @@ func (s *Server) handleRepoDisconnect(ctx context.Context, r *http.Request) (int return &serverapi.Empty{}, nil } -func (s *Server) handleRepoSync(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleRepoSync(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { if err := s.rep.Refresh(ctx); err != nil { return nil, internalServerError(errors.Wrap(err, "unable to refresh repository")) } diff --git a/internal/server/api_snapshots.go b/internal/server/api_snapshots.go index 36cb30fd7c..cf2d4b9942 100644 --- a/internal/server/api_snapshots.go +++ b/internal/server/api_snapshots.go @@ -10,7 +10,7 @@ import ( "github.com/kopia/kopia/snapshot/policy" ) -func (s *Server) handleSnapshotList(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleSnapshotList(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { manifestIDs, err := snapshot.ListSnapshotManifests(ctx, s.rep, nil) if err != nil { return nil, internalServerError(err) diff --git a/internal/server/api_sources.go b/internal/server/api_sources.go index 1c0747d6e6..8223367ba4 100644 --- a/internal/server/api_sources.go +++ b/internal/server/api_sources.go @@ -15,7 +15,7 @@ import ( "github.com/kopia/kopia/snapshot/policy" ) -func (s *Server) handleSourcesList(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleSourcesList(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { resp := &serverapi.SourcesResponse{ Sources: []*serverapi.SourceStatus{}, LocalHost: s.rep.Hostname(), @@ -37,10 +37,10 @@ func (s *Server) handleSourcesList(ctx context.Context, r *http.Request) (interf return resp, nil } -func (s *Server) handleSourcesCreate(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleSourcesCreate(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { var req serverapi.CreateSnapshotSourceRequest - if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + if err := json.Unmarshal(body, &req); err != nil { return nil, requestError(serverapi.ErrorMalformedRequest, "malformed request body") } diff --git a/internal/server/server.go b/internal/server/server.go index 050632c04d..3d068449f4 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -4,6 +4,7 @@ package server import ( "context" "encoding/json" + "io/ioutil" "net/http" "net/url" "sync" @@ -25,6 +26,8 @@ var log = logging.GetContextLoggerFunc("kopia/server") const maintenanceAttemptFrequency = 10 * time.Minute +type apiRequestFunc func(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) + // Server exposes simple HTTP API for programmatically accessing Kopia features. type Server struct { OnShutdown func(ctx context.Context) error @@ -85,30 +88,39 @@ func (s *Server) APIHandlers() http.Handler { return m } -func (s *Server) handleAPI(f func(ctx context.Context, r *http.Request) (interface{}, *apiError)) http.HandlerFunc { - return s.handleAPIPossiblyNotConnected(func(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleAPI(f apiRequestFunc) http.HandlerFunc { + return s.handleAPIPossiblyNotConnected(func(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { if s.rep == nil { return nil, requestError(serverapi.ErrorNotConnected, "not connected") } - return f(ctx, r) + return f(ctx, r, body) }) } -func (s *Server) handleAPIPossiblyNotConnected(f func(ctx context.Context, r *http.Request) (interface{}, *apiError)) http.HandlerFunc { +func (s *Server) handleAPIPossiblyNotConnected(f apiRequestFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + // we must pre-read request body before acquiring the lock as it sometimes leads to deadlock + // in HTTP/2 server. + // See https://github.com/golang/go/issues/40816 + body, berr := ioutil.ReadAll(r.Body) + if berr != nil { + http.Error(w, "error reading request body", http.StatusInternalServerError) + return + } + s.mu.RLock() defer s.mu.RUnlock() ctx := r.Context() - log(ctx).Debugf("request %v", r.URL) + log(ctx).Debugf("request %v (%v bytes)", r.URL, len(body)) w.Header().Set("Content-Type", "application/json") e := json.NewEncoder(w) e.SetIndent("", " ") - v, err := f(ctx, r) + v, err := f(ctx, r, body) if err == nil { if b, ok := v.([]byte); ok { @@ -134,7 +146,7 @@ func (s *Server) handleAPIPossiblyNotConnected(f func(ctx context.Context, r *ht } } -func (s *Server) handleRefresh(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleRefresh(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { if err := s.rep.Refresh(ctx); err != nil { return nil, internalServerError(err) } @@ -142,7 +154,7 @@ func (s *Server) handleRefresh(ctx context.Context, r *http.Request) (interface{ return &serverapi.Empty{}, nil } -func (s *Server) handleFlush(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleFlush(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { if err := s.rep.Flush(ctx); err != nil { return nil, internalServerError(err) } @@ -150,7 +162,7 @@ func (s *Server) handleFlush(ctx context.Context, r *http.Request) (interface{}, return &serverapi.Empty{}, nil } -func (s *Server) handleShutdown(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleShutdown(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { log(ctx).Infof("shutting down due to API request") if f := s.OnShutdown; f != nil { @@ -180,11 +192,11 @@ func (s *Server) forAllSourceManagersMatchingURLFilter(ctx context.Context, c fu return resp, nil } -func (s *Server) handleUpload(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleUpload(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { return s.forAllSourceManagersMatchingURLFilter(ctx, (*sourceManager).upload, r.URL.Query()) } -func (s *Server) handleCancel(ctx context.Context, r *http.Request) (interface{}, *apiError) { +func (s *Server) handleCancel(ctx context.Context, r *http.Request, body []byte) (interface{}, *apiError) { return s.forAllSourceManagersMatchingURLFilter(ctx, (*sourceManager).cancel, r.URL.Query()) }