Skip to content

Commit

Permalink
Refactor wait for completion logic (#1373)
Browse files Browse the repository at this point in the history
Signed-off-by: Pablo Chico de Guzman <pchico83@gmail.com>
  • Loading branch information
pchico83 committed Apr 4, 2021
1 parent da3c8bd commit 46e81f0
Show file tree
Hide file tree
Showing 6 changed files with 513 additions and 194 deletions.
13 changes: 4 additions & 9 deletions cmd/up/syncthing.go
Expand Up @@ -105,7 +105,6 @@ func (up *upContext) startSyncthing(ctx context.Context) error {
if err := up.Sy.ResetDatabase(ctx, up.Dev); err != nil {
return err
}

up.resetSyncthing = false
}

Expand Down Expand Up @@ -152,15 +151,11 @@ func (up *upContext) synchronizeFiles(ctx context.Context) error {

reporter := make(chan float64)
go func() {
var previous float64
for c := range reporter {
if c > previous {
value := int64(c)
if value > 0 && value < 100 {
spinner.Stop()
progressBar.SetCurrent(value)
previous = c
}
value := int64(c)
if value > 0 && value < 100 {
spinner.Stop()
progressBar.SetCurrent(value)
}
}
quit <- true
Expand Down
20 changes: 18 additions & 2 deletions pkg/cmd/status/run.go
Expand Up @@ -27,12 +27,12 @@ import (

//Run runs the "okteto status" sequence
func Run(ctx context.Context, dev *model.Dev, sy *syncthing.Syncthing) (float64, error) {
progressLocal, err := sy.GetCompletionProgress(ctx, true)
progressLocal, err := getCompletionProgress(ctx, sy, true)
if err != nil {
log.Infof("error accessing local syncthing status: %s", err)
return 0, err
}
progressRemote, err := sy.GetCompletionProgress(ctx, false)
progressRemote, err := getCompletionProgress(ctx, sy, false)
if err != nil {
log.Infof("error accessing remote syncthing status: %s", err)
return 0, err
Expand All @@ -41,6 +41,22 @@ func Run(ctx context.Context, dev *model.Dev, sy *syncthing.Syncthing) (float64,
return computeProgress(progressLocal, progressRemote), nil
}

func getCompletionProgress(ctx context.Context, s *syncthing.Syncthing, local bool) (float64, error) {
device := syncthing.DefaultRemoteDeviceID
if local {
device = syncthing.LocalDeviceID
}
completion, err := s.GetCompletion(ctx, local, device)
if err != nil {
return 0, err
}
if completion.GlobalBytes == 0 {
return 100, nil
}
progress := (float64(completion.GlobalBytes-completion.NeedBytes) / float64(completion.GlobalBytes)) * 100
return progress, nil
}

func computeProgress(local, remote float64) float64 {
if local == 100 && remote == 100 {
return 100
Expand Down
39 changes: 22 additions & 17 deletions pkg/syncthing/api.go
Expand Up @@ -38,31 +38,37 @@ func (akt *addAPIKeyTransport) RoundTrip(req *http.Request) (*http.Response, err
//NewAPIClient returns a new syncthing api client configured to call the syncthing api
func NewAPIClient() *http.Client {
return &http.Client{
Timeout: 30 * time.Second,
Timeout: 10 * time.Second,
Transport: &addAPIKeyTransport{http.DefaultTransport},
}
}

// APICall calls the syncthing API and returns the parsed json or an error
func (s *Syncthing) APICall(ctx context.Context, url, method string, code int, params map[string]string, local bool, body []byte, readBody bool, maxRetries int) ([]byte, error) {
retries := 0
ticker := time.NewTicker(200 * time.Millisecond)
for {
result, err := s.callWithRetry(ctx, url, method, code, params, local, body, readBody)
if err == nil {
return result, nil
select {
case <-ticker.C:
result, err := s.callWithRetry(ctx, url, method, code, params, local, body, readBody)
if err == nil {
return result, nil
}

if retries >= maxRetries {
return nil, err
}
retries++

if strings.Contains(err.Error(), "connection refused") {
log.Infof("syncthing is not ready, retrying local=%t", local)
} else {
log.Infof("retrying syncthing call[%s] local=%t: %s", url, local, err.Error())
}
case <-ctx.Done():
log.Infof("call to syncthing.APICall %s canceled", url)
return nil, ctx.Err()
}
if retries == maxRetries {
return nil, err
}

if strings.Contains(err.Error(), "connection refused") {
log.Infof("syncthing is not ready, retrying local=%t", local)
} else {
log.Infof("retrying syncthing call[%s] local=%t: %s", url, local, err.Error())
}

time.Sleep(200 * time.Millisecond)
retries++
}
}

Expand All @@ -73,7 +79,6 @@ func (s *Syncthing) callWithRetry(ctx context.Context, url, method string, code
s.Client.Timeout = 3 * time.Second
} else {
urlPath = path.Join(s.RemoteGUIAddress, url)
s.Client.Timeout = 25 * time.Second
if url == "rest/db/ignores" || url == "rest/system/ping" {
s.Client.Timeout = 5 * time.Second
}
Expand Down
189 changes: 189 additions & 0 deletions pkg/syncthing/completion.go
@@ -0,0 +1,189 @@
// Copyright 2020 The Okteto Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package syncthing

import (
"context"
"fmt"
"time"

"github.com/okteto/okteto/pkg/analytics"
"github.com/okteto/okteto/pkg/errors"
"github.com/okteto/okteto/pkg/log"
"github.com/okteto/okteto/pkg/model"
)

// Completion represents the completion of a syncthing folder.
type Completion struct {
Completion float64 `json:"completion"`
GlobalBytes int64 `json:"globalBytes"`
NeedBytes int64 `json:"needBytes"`
GlobalItems int64 `json:"globalItems"`
NeedItems int64 `json:"needItems"`
NeedDeletes int64 `json:"needDeletes"`
}

//waitForCompletion represents a wait for completion iteration
type waitForCompletion struct {
localCompletion *Completion
remoteCompletion *Completion
previousLocalGlobalBytes int64
previousRemoteGlobalBytes int64
globalBytesRetries int64
needDeletesRetries int64
retries int64
progress float64
sy *Syncthing
hasBeenReset bool
}

// WaitForCompletion waits for the remote to be totally synched
func (s *Syncthing) WaitForCompletion(ctx context.Context, dev *model.Dev, reporter chan float64) error {
defer close(reporter)
ticker := time.NewTicker(250 * time.Millisecond)
wfc := &waitForCompletion{sy: s}
for {
select {
case <-ticker.C:
wfc.retries++
if wfc.retries%50 == 0 {
log.Info("checking syncthing for error....")
if err := s.IsHealthy(ctx, false, 3); err != nil {
return err
}
}

if err := s.Overwrite(ctx, dev); err != nil {
if err != errors.ErrBusySyncthing {
return err
}
}
if err := wfc.computeProgress(ctx); err != nil {
if err != errors.ErrBusySyncthing {
return err
}
continue
}

reporter <- wfc.progress

if wfc.needsDatabaseReset() {
err := wfc.resetDatabase(ctx, dev)
analytics.TrackResetDatabase(err == nil)
continue
}

if wfc.isCompleted() {
return nil
}

case <-ctx.Done():
log.Info("call to syncthing.WaitForCompletion canceled")
return ctx.Err()
}
}
}

func (wfc *waitForCompletion) computeProgress(ctx context.Context) error {
localCompletion, err := wfc.sy.GetCompletion(ctx, true, DefaultRemoteDeviceID)
if err != nil {
return err
}
wfc.localCompletion = localCompletion
log.Infof("syncthing status in local: globalBytes %d, needBytes %d, globalItems %d, needItems %d, needDeletes %d", localCompletion.GlobalBytes, localCompletion.NeedBytes, localCompletion.GlobalItems, localCompletion.NeedItems, localCompletion.NeedDeletes)

remoteCompletion, err := wfc.sy.GetCompletion(ctx, false, DefaultRemoteDeviceID)
if err != nil {
return err
}
wfc.remoteCompletion = remoteCompletion
log.Infof("syncthing status in remote: globalBytes %d, needBytes %d, globalItems %d, needItems %d, needDeletes %d",
remoteCompletion.GlobalBytes,
remoteCompletion.NeedBytes,
remoteCompletion.GlobalItems,
remoteCompletion.NeedItems,
remoteCompletion.NeedDeletes,
)
if localCompletion.GlobalBytes == 0 {
wfc.progress = 100
} else {
wfc.progress = (float64(localCompletion.GlobalBytes-localCompletion.NeedBytes) / float64(localCompletion.GlobalBytes)) * 100
}
return nil
}

func (wfc *waitForCompletion) needsDatabaseReset() bool {
if wfc.localCompletion.GlobalBytes == wfc.remoteCompletion.GlobalBytes {
wfc.globalBytesRetries = 0
wfc.previousLocalGlobalBytes = wfc.localCompletion.GlobalBytes
wfc.previousRemoteGlobalBytes = wfc.remoteCompletion.GlobalBytes
return false
}
log.Infof("local globalBytes %d, remote global bytes %d", wfc.localCompletion.GlobalBytes, wfc.remoteCompletion.GlobalBytes)
if wfc.localCompletion.GlobalBytes != wfc.previousLocalGlobalBytes {
log.Infof("local globalBytes has changed %d vs %d", wfc.localCompletion.GlobalBytes, wfc.previousLocalGlobalBytes)
wfc.previousLocalGlobalBytes = wfc.localCompletion.GlobalBytes
wfc.previousRemoteGlobalBytes = wfc.remoteCompletion.GlobalBytes
wfc.globalBytesRetries = 0
return false
}
if wfc.remoteCompletion.GlobalBytes != wfc.previousRemoteGlobalBytes {
log.Infof("remote globalBytes has changed %d vs %d", wfc.remoteCompletion.GlobalBytes, wfc.previousRemoteGlobalBytes)
wfc.previousLocalGlobalBytes = wfc.localCompletion.GlobalBytes
wfc.previousRemoteGlobalBytes = wfc.remoteCompletion.GlobalBytes
wfc.globalBytesRetries = 0
return false
}
wfc.globalBytesRetries++
log.Infof("globalBytesRetries %d", wfc.globalBytesRetries)
return wfc.globalBytesRetries > 360 // 90 seconds
}

func (wfc *waitForCompletion) resetDatabase(ctx context.Context, dev *model.Dev) error {
if wfc.hasBeenReset {
return fmt.Errorf("inconsistent syncthing state")
}
log.Info("resetting syncthing database in wait for completion loop...")
if err := wfc.sy.ResetDatabase(ctx, dev); err != nil {
return err
}
wfc.hasBeenReset = true
wfc.globalBytesRetries = 0
if err := wfc.sy.WaitForScanning(ctx, dev, true); err != nil {
return err
}
return wfc.sy.WaitForScanning(ctx, dev, false)
}

func (wfc *waitForCompletion) isCompleted() bool {
if wfc.localCompletion.NeedBytes > 0 {
return false
}
if wfc.localCompletion.GlobalBytes != wfc.remoteCompletion.GlobalBytes {
return false
}

if wfc.localCompletion.NeedDeletes > 0 {
wfc.needDeletesRetries++
if wfc.needDeletesRetries < 50 {
log.Info("synced completed, but need deletes, retrying...")
return false
}
}
if !wfc.sy.IsAllOverwritten() {
log.Info("synced completed, but overwrites not sent, retrying...")
return false
}
return true
}

0 comments on commit 46e81f0

Please sign in to comment.