From 46e81f0d4a1cc063c59aaf6b15276ecf5c5146c9 Mon Sep 17 00:00:00 2001 From: Pablo Chico de Guzman Date: Sun, 4 Apr 2021 14:27:56 +0200 Subject: [PATCH] Refactor wait for completion logic (#1373) Signed-off-by: Pablo Chico de Guzman --- cmd/up/syncthing.go | 13 +- pkg/cmd/status/run.go | 20 ++- pkg/syncthing/api.go | 39 +++-- pkg/syncthing/completion.go | 189 ++++++++++++++++++++++ pkg/syncthing/completion_test.go | 260 +++++++++++++++++++++++++++++++ pkg/syncthing/syncthing.go | 186 +++------------------- 6 files changed, 513 insertions(+), 194 deletions(-) create mode 100644 pkg/syncthing/completion.go create mode 100644 pkg/syncthing/completion_test.go diff --git a/cmd/up/syncthing.go b/cmd/up/syncthing.go index fb36b2ee2fe8..c944d8a156c3 100644 --- a/cmd/up/syncthing.go +++ b/cmd/up/syncthing.go @@ -105,7 +105,6 @@ func (up *upContext) startSyncthing(ctx context.Context) error { if err := up.Sy.ResetDatabase(ctx, up.Dev); err != nil { return err } - up.resetSyncthing = false } @@ -152,15 +151,11 @@ func (up *upContext) synchronizeFiles(ctx context.Context) error { reporter := make(chan float64) go func() { - var previous float64 for c := range reporter { - if c > previous { - value := int64(c) - if value > 0 && value < 100 { - spinner.Stop() - progressBar.SetCurrent(value) - previous = c - } + value := int64(c) + if value > 0 && value < 100 { + spinner.Stop() + progressBar.SetCurrent(value) } } quit <- true diff --git a/pkg/cmd/status/run.go b/pkg/cmd/status/run.go index 3db7338fa693..1c1a9901714a 100644 --- a/pkg/cmd/status/run.go +++ b/pkg/cmd/status/run.go @@ -27,12 +27,12 @@ import ( //Run runs the "okteto status" sequence func Run(ctx context.Context, dev *model.Dev, sy *syncthing.Syncthing) (float64, error) { - progressLocal, err := sy.GetCompletionProgress(ctx, true) + progressLocal, err := getCompletionProgress(ctx, sy, true) if err != nil { log.Infof("error accessing local syncthing status: %s", err) return 0, err } - progressRemote, err := sy.GetCompletionProgress(ctx, false) + progressRemote, err := getCompletionProgress(ctx, sy, false) if err != nil { log.Infof("error accessing remote syncthing status: %s", err) return 0, err @@ -41,6 +41,22 @@ func Run(ctx context.Context, dev *model.Dev, sy *syncthing.Syncthing) (float64, return computeProgress(progressLocal, progressRemote), nil } +func getCompletionProgress(ctx context.Context, s *syncthing.Syncthing, local bool) (float64, error) { + device := syncthing.DefaultRemoteDeviceID + if local { + device = syncthing.LocalDeviceID + } + completion, err := s.GetCompletion(ctx, local, device) + if err != nil { + return 0, err + } + if completion.GlobalBytes == 0 { + return 100, nil + } + progress := (float64(completion.GlobalBytes-completion.NeedBytes) / float64(completion.GlobalBytes)) * 100 + return progress, nil +} + func computeProgress(local, remote float64) float64 { if local == 100 && remote == 100 { return 100 diff --git a/pkg/syncthing/api.go b/pkg/syncthing/api.go index de1e61514723..1916cb921e96 100644 --- a/pkg/syncthing/api.go +++ b/pkg/syncthing/api.go @@ -38,7 +38,7 @@ func (akt *addAPIKeyTransport) RoundTrip(req *http.Request) (*http.Response, err //NewAPIClient returns a new syncthing api client configured to call the syncthing api func NewAPIClient() *http.Client { return &http.Client{ - Timeout: 30 * time.Second, + Timeout: 10 * time.Second, Transport: &addAPIKeyTransport{http.DefaultTransport}, } } @@ -46,23 +46,29 @@ func NewAPIClient() *http.Client { // APICall calls the syncthing API and returns the parsed json or an error func (s *Syncthing) APICall(ctx context.Context, url, method string, code int, params map[string]string, local bool, body []byte, readBody bool, maxRetries int) ([]byte, error) { retries := 0 + ticker := time.NewTicker(200 * time.Millisecond) for { - result, err := s.callWithRetry(ctx, url, method, code, params, local, body, readBody) - if err == nil { - return result, nil + select { + case <-ticker.C: + result, err := s.callWithRetry(ctx, url, method, code, params, local, body, readBody) + if err == nil { + return result, nil + } + + if retries >= maxRetries { + return nil, err + } + retries++ + + if strings.Contains(err.Error(), "connection refused") { + log.Infof("syncthing is not ready, retrying local=%t", local) + } else { + log.Infof("retrying syncthing call[%s] local=%t: %s", url, local, err.Error()) + } + case <-ctx.Done(): + log.Infof("call to syncthing.APICall %s canceled", url) + return nil, ctx.Err() } - if retries == maxRetries { - return nil, err - } - - if strings.Contains(err.Error(), "connection refused") { - log.Infof("syncthing is not ready, retrying local=%t", local) - } else { - log.Infof("retrying syncthing call[%s] local=%t: %s", url, local, err.Error()) - } - - time.Sleep(200 * time.Millisecond) - retries++ } } @@ -73,7 +79,6 @@ func (s *Syncthing) callWithRetry(ctx context.Context, url, method string, code s.Client.Timeout = 3 * time.Second } else { urlPath = path.Join(s.RemoteGUIAddress, url) - s.Client.Timeout = 25 * time.Second if url == "rest/db/ignores" || url == "rest/system/ping" { s.Client.Timeout = 5 * time.Second } diff --git a/pkg/syncthing/completion.go b/pkg/syncthing/completion.go new file mode 100644 index 000000000000..1bcc3422e2b3 --- /dev/null +++ b/pkg/syncthing/completion.go @@ -0,0 +1,189 @@ +// Copyright 2020 The Okteto Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncthing + +import ( + "context" + "fmt" + "time" + + "github.com/okteto/okteto/pkg/analytics" + "github.com/okteto/okteto/pkg/errors" + "github.com/okteto/okteto/pkg/log" + "github.com/okteto/okteto/pkg/model" +) + +// Completion represents the completion of a syncthing folder. +type Completion struct { + Completion float64 `json:"completion"` + GlobalBytes int64 `json:"globalBytes"` + NeedBytes int64 `json:"needBytes"` + GlobalItems int64 `json:"globalItems"` + NeedItems int64 `json:"needItems"` + NeedDeletes int64 `json:"needDeletes"` +} + +//waitForCompletion represents a wait for completion iteration +type waitForCompletion struct { + localCompletion *Completion + remoteCompletion *Completion + previousLocalGlobalBytes int64 + previousRemoteGlobalBytes int64 + globalBytesRetries int64 + needDeletesRetries int64 + retries int64 + progress float64 + sy *Syncthing + hasBeenReset bool +} + +// WaitForCompletion waits for the remote to be totally synched +func (s *Syncthing) WaitForCompletion(ctx context.Context, dev *model.Dev, reporter chan float64) error { + defer close(reporter) + ticker := time.NewTicker(250 * time.Millisecond) + wfc := &waitForCompletion{sy: s} + for { + select { + case <-ticker.C: + wfc.retries++ + if wfc.retries%50 == 0 { + log.Info("checking syncthing for error....") + if err := s.IsHealthy(ctx, false, 3); err != nil { + return err + } + } + + if err := s.Overwrite(ctx, dev); err != nil { + if err != errors.ErrBusySyncthing { + return err + } + } + if err := wfc.computeProgress(ctx); err != nil { + if err != errors.ErrBusySyncthing { + return err + } + continue + } + + reporter <- wfc.progress + + if wfc.needsDatabaseReset() { + err := wfc.resetDatabase(ctx, dev) + analytics.TrackResetDatabase(err == nil) + continue + } + + if wfc.isCompleted() { + return nil + } + + case <-ctx.Done(): + log.Info("call to syncthing.WaitForCompletion canceled") + return ctx.Err() + } + } +} + +func (wfc *waitForCompletion) computeProgress(ctx context.Context) error { + localCompletion, err := wfc.sy.GetCompletion(ctx, true, DefaultRemoteDeviceID) + if err != nil { + return err + } + wfc.localCompletion = localCompletion + log.Infof("syncthing status in local: globalBytes %d, needBytes %d, globalItems %d, needItems %d, needDeletes %d", localCompletion.GlobalBytes, localCompletion.NeedBytes, localCompletion.GlobalItems, localCompletion.NeedItems, localCompletion.NeedDeletes) + + remoteCompletion, err := wfc.sy.GetCompletion(ctx, false, DefaultRemoteDeviceID) + if err != nil { + return err + } + wfc.remoteCompletion = remoteCompletion + log.Infof("syncthing status in remote: globalBytes %d, needBytes %d, globalItems %d, needItems %d, needDeletes %d", + remoteCompletion.GlobalBytes, + remoteCompletion.NeedBytes, + remoteCompletion.GlobalItems, + remoteCompletion.NeedItems, + remoteCompletion.NeedDeletes, + ) + if localCompletion.GlobalBytes == 0 { + wfc.progress = 100 + } else { + wfc.progress = (float64(localCompletion.GlobalBytes-localCompletion.NeedBytes) / float64(localCompletion.GlobalBytes)) * 100 + } + return nil +} + +func (wfc *waitForCompletion) needsDatabaseReset() bool { + if wfc.localCompletion.GlobalBytes == wfc.remoteCompletion.GlobalBytes { + wfc.globalBytesRetries = 0 + wfc.previousLocalGlobalBytes = wfc.localCompletion.GlobalBytes + wfc.previousRemoteGlobalBytes = wfc.remoteCompletion.GlobalBytes + return false + } + log.Infof("local globalBytes %d, remote global bytes %d", wfc.localCompletion.GlobalBytes, wfc.remoteCompletion.GlobalBytes) + if wfc.localCompletion.GlobalBytes != wfc.previousLocalGlobalBytes { + log.Infof("local globalBytes has changed %d vs %d", wfc.localCompletion.GlobalBytes, wfc.previousLocalGlobalBytes) + wfc.previousLocalGlobalBytes = wfc.localCompletion.GlobalBytes + wfc.previousRemoteGlobalBytes = wfc.remoteCompletion.GlobalBytes + wfc.globalBytesRetries = 0 + return false + } + if wfc.remoteCompletion.GlobalBytes != wfc.previousRemoteGlobalBytes { + log.Infof("remote globalBytes has changed %d vs %d", wfc.remoteCompletion.GlobalBytes, wfc.previousRemoteGlobalBytes) + wfc.previousLocalGlobalBytes = wfc.localCompletion.GlobalBytes + wfc.previousRemoteGlobalBytes = wfc.remoteCompletion.GlobalBytes + wfc.globalBytesRetries = 0 + return false + } + wfc.globalBytesRetries++ + log.Infof("globalBytesRetries %d", wfc.globalBytesRetries) + return wfc.globalBytesRetries > 360 // 90 seconds +} + +func (wfc *waitForCompletion) resetDatabase(ctx context.Context, dev *model.Dev) error { + if wfc.hasBeenReset { + return fmt.Errorf("inconsistent syncthing state") + } + log.Info("resetting syncthing database in wait for completion loop...") + if err := wfc.sy.ResetDatabase(ctx, dev); err != nil { + return err + } + wfc.hasBeenReset = true + wfc.globalBytesRetries = 0 + if err := wfc.sy.WaitForScanning(ctx, dev, true); err != nil { + return err + } + return wfc.sy.WaitForScanning(ctx, dev, false) +} + +func (wfc *waitForCompletion) isCompleted() bool { + if wfc.localCompletion.NeedBytes > 0 { + return false + } + if wfc.localCompletion.GlobalBytes != wfc.remoteCompletion.GlobalBytes { + return false + } + + if wfc.localCompletion.NeedDeletes > 0 { + wfc.needDeletesRetries++ + if wfc.needDeletesRetries < 50 { + log.Info("synced completed, but need deletes, retrying...") + return false + } + } + if !wfc.sy.IsAllOverwritten() { + log.Info("synced completed, but overwrites not sent, retrying...") + return false + } + return true +} diff --git a/pkg/syncthing/completion_test.go b/pkg/syncthing/completion_test.go new file mode 100644 index 000000000000..3cdd5583c8a0 --- /dev/null +++ b/pkg/syncthing/completion_test.go @@ -0,0 +1,260 @@ +// Copyright 2020 The Okteto Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncthing + +import ( + "testing" +) + +func Test_needsDatabaseReset(t *testing.T) { + tests := []struct { + name string + wfc *waitForCompletion + previousLocalGlobalBytes int64 + previousRemoteGlobalBytes int64 + globalBytesRetries int64 + want bool + }{ + { + name: "global-bytes-ok", + wfc: &waitForCompletion{ + localCompletion: &Completion{ + GlobalBytes: 10, + }, + remoteCompletion: &Completion{ + GlobalBytes: 10, + }, + previousLocalGlobalBytes: 0, + previousRemoteGlobalBytes: 0, + globalBytesRetries: 10, + }, + previousLocalGlobalBytes: 10, + previousRemoteGlobalBytes: 10, + globalBytesRetries: 0, + want: false, + }, + { + name: "local-global-bytes-changed", + wfc: &waitForCompletion{ + localCompletion: &Completion{ + GlobalBytes: 10, + }, + remoteCompletion: &Completion{ + GlobalBytes: 20, + }, + previousLocalGlobalBytes: 1, + previousRemoteGlobalBytes: 20, + globalBytesRetries: 10, + }, + previousLocalGlobalBytes: 10, + previousRemoteGlobalBytes: 20, + globalBytesRetries: 0, + want: false, + }, + { + name: "remote-global-bytes-changed", + wfc: &waitForCompletion{ + localCompletion: &Completion{ + GlobalBytes: 10, + }, + remoteCompletion: &Completion{ + GlobalBytes: 20, + }, + previousLocalGlobalBytes: 10, + previousRemoteGlobalBytes: 2, + globalBytesRetries: 10, + }, + previousLocalGlobalBytes: 10, + previousRemoteGlobalBytes: 20, + globalBytesRetries: 0, + want: false, + }, + { + name: "increment-global-bytes-retries", + wfc: &waitForCompletion{ + localCompletion: &Completion{ + GlobalBytes: 10, + }, + remoteCompletion: &Completion{ + GlobalBytes: 20, + }, + previousLocalGlobalBytes: 10, + previousRemoteGlobalBytes: 20, + globalBytesRetries: 10, + }, + previousLocalGlobalBytes: 10, + previousRemoteGlobalBytes: 20, + globalBytesRetries: 11, + want: false, + }, + { + name: "reset", + wfc: &waitForCompletion{ + localCompletion: &Completion{ + GlobalBytes: 10, + }, + remoteCompletion: &Completion{ + GlobalBytes: 20, + }, + previousLocalGlobalBytes: 10, + previousRemoteGlobalBytes: 20, + globalBytesRetries: 360, + }, + previousLocalGlobalBytes: 10, + previousRemoteGlobalBytes: 20, + globalBytesRetries: 361, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.wfc.needsDatabaseReset() + if result != tt.want { + t.Errorf("test '%s' wrong completed: %t vs %t", tt.name, result, tt.want) + } + if tt.wfc.previousLocalGlobalBytes != tt.previousLocalGlobalBytes { + t.Errorf("test '%s' wrong previousLocalGlobalBytes: %d vs %d", tt.name, tt.wfc.previousLocalGlobalBytes, tt.previousLocalGlobalBytes) + } + if tt.wfc.previousRemoteGlobalBytes != tt.previousRemoteGlobalBytes { + t.Errorf("test '%s' wrong previousRemoteGlobalBytes: %d vs %d", tt.name, tt.wfc.previousRemoteGlobalBytes, tt.previousRemoteGlobalBytes) + } + if tt.wfc.globalBytesRetries != tt.globalBytesRetries { + t.Errorf("test '%s' wrong globalBytesRetries: %d vs %d", tt.name, tt.wfc.globalBytesRetries, tt.globalBytesRetries) + } + }) + } +} + +func Test_isCompleted(t *testing.T) { + tests := []struct { + name string + wfc *waitForCompletion + needDeletesRetries int64 + want bool + }{ + { + name: "need-bytes", + wfc: &waitForCompletion{ + localCompletion: &Completion{ + NeedBytes: 10, + }, + }, + needDeletesRetries: 0, + want: false, + }, + { + name: "not-matching-global-bytes", + wfc: &waitForCompletion{ + localCompletion: &Completion{ + NeedBytes: 0, + GlobalBytes: 10, + }, + remoteCompletion: &Completion{ + GlobalBytes: 20, + }, + }, + needDeletesRetries: 0, + want: false, + }, + { + name: "need-deletes", + wfc: &waitForCompletion{ + localCompletion: &Completion{ + NeedBytes: 0, + GlobalBytes: 10, + NeedDeletes: 10, + }, + remoteCompletion: &Completion{ + GlobalBytes: 10, + }, + }, + needDeletesRetries: 1, + want: false, + }, + { + name: "completed-retried-need-deletes", + wfc: &waitForCompletion{ + localCompletion: &Completion{ + NeedBytes: 0, + GlobalBytes: 10, + NeedDeletes: 10, + }, + remoteCompletion: &Completion{ + GlobalBytes: 10, + }, + needDeletesRetries: 50, + sy: &Syncthing{ + Folders: []*Folder{}, + }, + }, + needDeletesRetries: 51, + want: true, + }, + { + name: "not-overwritten", + wfc: &waitForCompletion{ + localCompletion: &Completion{ + NeedBytes: 0, + GlobalBytes: 10, + NeedDeletes: 0, + }, + remoteCompletion: &Completion{ + GlobalBytes: 10, + }, + sy: &Syncthing{ + Folders: []*Folder{ + { + Overwritten: false, + }, + }, + }, + }, + needDeletesRetries: 0, + want: false, + }, + { + name: "completed", + wfc: &waitForCompletion{ + localCompletion: &Completion{ + NeedBytes: 0, + GlobalBytes: 10, + NeedDeletes: 0, + }, + remoteCompletion: &Completion{ + GlobalBytes: 10, + }, + sy: &Syncthing{ + Folders: []*Folder{ + { + Overwritten: true, + }, + }, + }, + }, + needDeletesRetries: 0, + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + completed := tt.wfc.isCompleted() + if completed != tt.want { + t.Errorf("test '%s' wrong completed: %t vs %t", tt.name, completed, tt.want) + } + if tt.wfc.needDeletesRetries != tt.needDeletesRetries { + t.Errorf("test '%s' wrong needDeletesRetries: %d vs %d", tt.name, tt.wfc.needDeletesRetries, tt.needDeletesRetries) + } + }) + } +} diff --git a/pkg/syncthing/syncthing.go b/pkg/syncthing/syncthing.go index c3f14f761896..339addea1fd1 100644 --- a/pkg/syncthing/syncthing.go +++ b/pkg/syncthing/syncthing.go @@ -30,7 +30,6 @@ import ( "text/template" "time" - "github.com/okteto/okteto/pkg/analytics" "github.com/okteto/okteto/pkg/config" "github.com/okteto/okteto/pkg/errors" "github.com/okteto/okteto/pkg/log" @@ -53,9 +52,10 @@ const ( configFile = "config.xml" logFile = "syncthing.log" - // DefaultRemoteDeviceID remote syncthing ID + // DefaultRemoteDeviceID remote syncthing device ID DefaultRemoteDeviceID = "ATOPHFJ-VPVLDFY-QVZDCF2-OQQ7IOW-OG4DIXF-OA7RWU3-ZYA4S22-SI4XVAU" - localDeviceID = "ABKAVQF-RUO4CYO-FSC2VIP-VRX4QDA-TQQRN2J-MRDXJUC-FXNWP6N-S6ZSAAR" + // LocalDeviceID local syncthing device ID + LocalDeviceID = "ABKAVQF-RUO4CYO-FSC2VIP-VRX4QDA-TQQRN2J-MRDXJUC-FXNWP6N-S6ZSAAR" // DefaultFileWatcherDelay how much to wait before starting a sync after a file change DefaultFileWatcherDelay = 5 @@ -117,16 +117,6 @@ type Status struct { PullErrors int64 `json:"pullErrors"` } -// Completion represents the completion of a syncthing folder. -type Completion struct { - Completion float64 `json:"completion"` - GlobalBytes int64 `json:"globalBytes"` - NeedBytes int64 `json:"needBytes"` - GlobalItems int64 `json:"globalItems"` - NeedItems int64 `json:"needItems"` - NeedDeletes int64 `json:"needDeletes"` -} - // FolderErrors represents folder errors in syncthing. type FolderErrors struct { Data DataFolderErrors `json:"data"` @@ -308,20 +298,19 @@ func (s *Syncthing) WaitForPing(ctx context.Context, local bool) error { log.Infof("waiting for syncthing local=%t to be ready", local) for i := 0; ; i++ { - if s.Ping(ctx, local) { - return nil - } - if i%5 == 0 { - log.Infof("syncthing local=%t is not ready yet", local) - } - - if time.Now().After(timeout) { - return fmt.Errorf("syncthing local=%t didn't respond after %s", local, to.String()) - } - select { case <-ticker.C: - continue + if s.Ping(ctx, local) { + return nil + } + if i%5 == 0 { + log.Infof("syncthing local=%t is not ready yet", local) + } + + if time.Now().After(timeout) { + return fmt.Errorf("syncthing local=%t didn't respond after %s", local, to.String()) + } + case <-ctx.Done(): log.Infof("syncthing.WaitForPing cancelled local=%t", local) return ctx.Err() @@ -331,7 +320,7 @@ func (s *Syncthing) WaitForPing(ctx context.Context, local bool) error { //Ping checks if syncthing is available func (s *Syncthing) Ping(ctx context.Context, local bool) bool { - _, err := s.APICall(ctx, "rest/system/ping", "GET", 200, nil, local, nil, false, 1) + _, err := s.APICall(ctx, "rest/system/ping", "GET", 200, nil, local, nil, false, 0) if err == nil { return true } @@ -397,15 +386,7 @@ func (s *Syncthing) ResetDatabase(ctx context.Context, dev *model.Dev) error { if err := s.WaitForPing(ctx, false); err != nil { return err } - if err := s.WaitForPing(ctx, true); err != nil { - return err - } - - if err := s.WaitForScanning(ctx, dev, true); err != nil { - return err - } - - return s.WaitForScanning(ctx, dev, false) + return s.WaitForPing(ctx, true) } func (s *Syncthing) resetDatabase(ctx context.Context, dev *model.Dev, local bool) error { @@ -442,12 +423,9 @@ func (s *Syncthing) Overwrite(ctx context.Context, dev *model.Dev) error { return nil } -//IsAllIgnoredAndOverwritten checks if all .stignore files and overwrite operations has been completed -func (s *Syncthing) IsAllIgnoredAndOverwritten() bool { +//IsAllOverwritten checks if all overwrite operations has been completed +func (s *Syncthing) IsAllOverwritten() bool { for _, folder := range s.Folders { - if !folder.SentStIgnore { - return false - } if !folder.Overwritten { return false } @@ -502,113 +480,6 @@ func (s *Syncthing) waitForFolderScanning(ctx context.Context, folder *Folder, l } } -// WaitForCompletion waits for the remote to be totally synched -func (s *Syncthing) WaitForCompletion(ctx context.Context, dev *model.Dev, reporter chan float64) error { - defer close(reporter) - ticker := time.NewTicker(250 * time.Millisecond) - retries := 0 - needDeletesRetries := 0 - globalBytesRetries := 0 - var previousGlobalBytes int64 = 0 - for { - select { - case <-ticker.C: - retries++ - - s.SendStignoreFile(ctx) - if err := s.Overwrite(ctx, dev); err != nil { - if err != errors.ErrBusySyncthing { - return err - } - } - - localCompletion, remoteCompletion, err := s.getLocalAndRemoteCompletion(ctx) - if err != nil { - if err == errors.ErrBusySyncthing { - continue - } - return err - } - - if localCompletion.GlobalBytes != remoteCompletion.GlobalBytes { - log.Infof("local globalBytes %d, remote global bytes %d", localCompletion.GlobalBytes, remoteCompletion.GlobalBytes) - if remoteCompletion.GlobalBytes != previousGlobalBytes { - previousGlobalBytes = remoteCompletion.GlobalBytes - globalBytesRetries = 0 - continue - } - globalBytesRetries++ - if globalBytesRetries >= 120 { - continue - } - log.Infof("globalBytesRetries %d, resetting syncthing database", globalBytesRetries) - if err := s.ResetDatabase(ctx, dev); err != nil { - log.Infof("error resetting syncthing database: %s", err.Error()) - analytics.TrackResetDatabase(false) - continue - } - analytics.TrackResetDatabase(true) - globalBytesRetries = 0 - continue - } - - progress := (float64(remoteCompletion.GlobalBytes-remoteCompletion.NeedBytes) / float64(remoteCompletion.GlobalBytes)) * 100 - log.Infof("syncthing folder: globalBytes %d, needBytes %d, globalItems %d, needItems %d, needDeletes %d", - remoteCompletion.GlobalBytes, - remoteCompletion.NeedBytes, - remoteCompletion.GlobalItems, - remoteCompletion.NeedItems, - remoteCompletion.NeedDeletes, - ) - reporter <- progress - - if remoteCompletion.GlobalBytes == 0 { - if s.IsAllIgnoredAndOverwritten() { - return nil - } - log.Info("synced completed, but stignores and overwrites not sent") - return nil - } - - if remoteCompletion.NeedBytes == 0 { - if remoteCompletion.NeedDeletes > 0 { - needDeletesRetries++ - if needDeletesRetries < 50 { - continue - } - } - if s.IsAllIgnoredAndOverwritten() { - return nil - } - log.Info("synced completed, but stignores and overwrites not sent") - return nil - } - - if retries%50 == 0 { - if err := s.IsHealthy(ctx, false, 3); err != nil { - return err - } - } - - case <-ctx.Done(): - log.Info("call to syncthing.WaitForCompletion canceled") - return ctx.Err() - } - } -} - -func (s *Syncthing) getLocalAndRemoteCompletion(ctx context.Context) (*Completion, *Completion, error) { - localCompletion, err := s.GetCompletion(ctx, true, DefaultRemoteDeviceID) - if err != nil { - return nil, nil, err - } - remoteCompletion, err := s.GetCompletion(ctx, false, DefaultRemoteDeviceID) - if err != nil { - return nil, nil, err - } - return localCompletion, remoteCompletion, nil -} - // GetCompletion returns the syncthing completion func (s *Syncthing) GetCompletion(ctx context.Context, local bool, device string) (*Completion, error) { params := map[string]string{"device": device} @@ -629,23 +500,6 @@ func (s *Syncthing) GetCompletion(ctx context.Context, local bool, device string return completion, nil } -// GetCompletionProgress returns the syncthing completion progress -func (s *Syncthing) GetCompletionProgress(ctx context.Context, local bool) (float64, error) { - device := DefaultRemoteDeviceID - if local { - device = localDeviceID - } - completion, err := s.GetCompletion(ctx, local, device) - if err != nil { - return 0, err - } - if completion.GlobalBytes == 0 { - return 100, nil - } - progress := (float64(completion.GlobalBytes-completion.NeedBytes) / float64(completion.GlobalBytes)) * 100 - return progress, nil -} - // IsHealthy returns the syncthing error or nil func (s *Syncthing) IsHealthy(ctx context.Context, local bool, max int) error { for _, folder := range s.Folders { @@ -661,9 +515,9 @@ func (s *Syncthing) IsHealthy(ctx context.Context, local bool, max int) error { continue } - err = s.GetFolderErrors(ctx, folder, false) - log.Infof("syncthing error in folder '%s' local=%t: %s", folder.RemotePath, local, err) folder.Retries++ + err = s.GetFolderErrors(ctx, folder, false) + log.Infof("syncthing error in folder '%s' local=%t retry %d: %s", folder.RemotePath, local, folder.Retries, err) if folder.Retries <= max { continue }