Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions cmd/bundle/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ import (
)

type syncFlags struct {
interval time.Duration
full bool
watch bool
output flags.Output
dryRun bool
interval time.Duration
full bool
watch bool
output flags.Output
dryRun bool
concurrency int
retryTimeout time.Duration
}

func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, b *bundle.Bundle) (*sync.SyncOptions, error) {
Expand All @@ -48,6 +50,8 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, b *bundle.Bundle)
opts.Full = f.full
opts.PollInterval = f.interval
opts.DryRun = f.dryRun
opts.Concurrency = f.concurrency
opts.RetryTimeout = f.retryTimeout
return opts, nil
}

Expand All @@ -74,8 +78,17 @@ Use 'databricks bundle deploy' for full resource deployment.`,
cmd.Flags().BoolVar(&f.watch, "watch", false, "watch local file system for changes")
cmd.Flags().Var(&f.output, "output", "type of the output format")
cmd.Flags().BoolVar(&f.dryRun, "dry-run", false, "simulate sync execution without making actual changes")
cmd.Flags().IntVar(&f.concurrency, "concurrency", 5, "maximum number of concurrent in-flight requests during sync")
cmd.Flags().DurationVar(&f.retryTimeout, "retry-timeout", sync.DefaultRetryTimeout, "per-call deadline for retrying transient gateway errors (HTTP 502/503/504)")

cmd.RunE = func(cmd *cobra.Command, args []string) error {
if f.concurrency < 1 {
return fmt.Errorf("--concurrency must be a positive integer, got %d", f.concurrency)
}
if f.retryTimeout < 0 {
return fmt.Errorf("--retry-timeout must be non-negative, got %s", f.retryTimeout)
}

b, err := utils.ProcessBundle(cmd, utils.ProcessOptions{})
if err != nil {
return err
Expand Down
33 changes: 24 additions & 9 deletions cmd/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,17 @@ import (

type syncFlags struct {
// project files polling interval
interval time.Duration
full bool
watch bool
output flags.Output
exclude []string
include []string
dryRun bool
excludeFrom string
includeFrom string
interval time.Duration
full bool
watch bool
output flags.Output
exclude []string
include []string
dryRun bool
excludeFrom string
includeFrom string
concurrency int
retryTimeout time.Duration
}

func readPatternsFile(filePath string) ([]string, error) {
Expand Down Expand Up @@ -89,6 +91,8 @@ func (f *syncFlags) syncOptionsFromBundle(cmd *cobra.Command, args []string, b *
opts.Include = append(opts.Include, f.include...)
opts.Include = append(opts.Include, includePatterns...)
opts.DryRun = f.dryRun
opts.Concurrency = f.concurrency
opts.RetryTimeout = f.retryTimeout
return opts, nil
}

Expand Down Expand Up @@ -163,6 +167,8 @@ func (f *syncFlags) syncOptionsFromArgs(cmd *cobra.Command, args []string) (*syn

OutputHandler: outputHandler,
DryRun: f.dryRun,
Concurrency: f.concurrency,
RetryTimeout: f.retryTimeout,
}
return &opts, nil
}
Expand All @@ -187,6 +193,8 @@ func New() *cobra.Command {
cmd.Flags().StringVar(&f.excludeFrom, "exclude-from", "", "file containing patterns to exclude from sync (one pattern per line)")
cmd.Flags().StringVar(&f.includeFrom, "include-from", "", "file containing patterns to include to sync (one pattern per line)")
cmd.Flags().BoolVar(&f.dryRun, "dry-run", false, "simulate sync execution without making actual changes")
cmd.Flags().IntVar(&f.concurrency, "concurrency", 5, "maximum number of concurrent in-flight requests during sync")
cmd.Flags().DurationVar(&f.retryTimeout, "retry-timeout", sync.DefaultRetryTimeout, "per-call deadline for retrying transient gateway errors (HTTP 502/503/504)")

// Wrapper for [root.MustWorkspaceClient] that disables loading authentication configuration from a bundle.
mustWorkspaceClient := func(cmd *cobra.Command, args []string) error {
Expand All @@ -196,6 +204,13 @@ func New() *cobra.Command {

cmd.PreRunE = mustWorkspaceClient
cmd.RunE = func(cmd *cobra.Command, args []string) error {
if f.concurrency < 1 {
return fmt.Errorf("--concurrency must be a positive integer, got %d", f.concurrency)
}
if f.retryTimeout < 0 {
return fmt.Errorf("--retry-timeout must be non-negative, got %s", f.retryTimeout)
}

var opts *sync.SyncOptions
var err error

Expand Down
50 changes: 50 additions & 0 deletions libs/sync/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package sync

import (
"context"
"errors"
"net/http"
"time"

"github.com/databricks/cli/libs/log"
"github.com/databricks/databricks-sdk-go/apierr"
"github.com/databricks/databricks-sdk-go/retries"
)

// DefaultRetryTimeout bounds how long the sync layer keeps retrying transient
// gateway errors per filer call. The SDK only retries 429 and 504
// (httpclient/errors.go DefaultErrorRetriable); 502 and 503 land here.
const DefaultRetryTimeout = 30 * time.Second

func isTransientGatewayError(err error) bool {
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return false
}
switch aerr.StatusCode {
case http.StatusBadGateway, http.StatusServiceUnavailable, http.StatusGatewayTimeout:
return true
}
return false
}

// retryOnTransient runs fn, retrying transient gateway errors
// (HTTP 502/503/504) until timeout elapses. Backoff and jitter are provided
// by retries.Poll.
func retryOnTransient(ctx context.Context, timeout time.Duration, label string, fn func() error) error {
if timeout <= 0 {
return fn()
}
_, err := retries.Poll(ctx, timeout, func() (*struct{}, *retries.Err) {
err := fn()
if err == nil {
return nil, nil
}
if !isTransientGatewayError(err) {
return nil, retries.Halt(err)
}
log.Warnf(ctx, "sync %s: retrying after transient error: %s", label, err)
return nil, retries.Continue(err)
})
return err
}
86 changes: 86 additions & 0 deletions libs/sync/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package sync

import (
"context"
"errors"
"net/http"
"sync/atomic"
"testing"
"time"

"github.com/databricks/databricks-sdk-go/apierr"
"github.com/stretchr/testify/require"
)

func apiErr(status int) error {
return &apierr.APIError{StatusCode: status, Message: http.StatusText(status)}
}

func TestIsTransientGatewayError(t *testing.T) {
cases := map[error]bool{
nil: false,
apiErr(http.StatusBadGateway): true,
apiErr(http.StatusServiceUnavailable): true,
apiErr(http.StatusGatewayTimeout): true,
apiErr(http.StatusInternalServerError): false,
apiErr(http.StatusTooManyRequests): false,
apiErr(http.StatusNotFound): false,
errors.New("not an api error"): false,
}
for err, want := range cases {
require.Equal(t, want, isTransientGatewayError(err), "%v", err)
}
}

func TestRetryOnTransient(t *testing.T) {
t.Run("succeeds after retries", func(t *testing.T) {
var calls atomic.Int32
err := retryOnTransient(t.Context(), 30*time.Second, "test", func() error {
if calls.Add(1) < 3 {
return apiErr(http.StatusBadGateway)
}
return nil
})
require.NoError(t, err)
require.Equal(t, int32(3), calls.Load())
})

t.Run("does not retry non-transient", func(t *testing.T) {
var calls atomic.Int32
err := retryOnTransient(t.Context(), 30*time.Second, "test", func() error {
calls.Add(1)
return apiErr(http.StatusNotFound)
})
require.Error(t, err)
require.Equal(t, int32(1), calls.Load())
})

t.Run("zero timeout disables retries", func(t *testing.T) {
var calls atomic.Int32
err := retryOnTransient(t.Context(), 0, "test", func() error {
calls.Add(1)
return apiErr(http.StatusBadGateway)
})
require.Error(t, err)
require.Equal(t, int32(1), calls.Load())
})

t.Run("times out on persistent transient error", func(t *testing.T) {
var calls atomic.Int32
err := retryOnTransient(t.Context(), 100*time.Millisecond, "test", func() error {
calls.Add(1)
return apiErr(http.StatusBadGateway)
})
require.Error(t, err)
require.GreaterOrEqual(t, calls.Load(), int32(1))
})

t.Run("honors context cancellation", func(t *testing.T) {
ctx, cancel := context.WithCancel(t.Context())
cancel()
err := retryOnTransient(ctx, 30*time.Second, "test", func() error {
return apiErr(http.StatusBadGateway)
})
require.Error(t, err)
})
}
18 changes: 17 additions & 1 deletion libs/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ type SyncOptions struct {
OutputHandler OutputHandler

DryRun bool

// Concurrency is the maximum number of in-flight filer requests during sync.
// Defaults to MaxRequestsInFlight when zero.
Concurrency int

// RetryTimeout bounds how long each filer call may keep retrying transient
// gateway errors (HTTP 502/503/504). Defaults to DefaultRetryTimeout when
// zero; a negative value disables sync-layer retries.
RetryTimeout time.Duration
}

type Sync struct {
Expand Down Expand Up @@ -96,6 +105,13 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) {
return nil, errors.New("failed to resolve host for snapshot")
}

if opts.Concurrency == 0 {
opts.Concurrency = MaxRequestsInFlight
}
if opts.RetryTimeout == 0 {
opts.RetryTimeout = DefaultRetryTimeout
}

// For full sync, we start with an empty snapshot.
// For incremental sync, we try to load an existing snapshot to start from.
var snapshot *Snapshot
Expand All @@ -119,7 +135,7 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) {
var notifier EventNotifier
outputWaitGroup := &stdsync.WaitGroup{}
if opts.OutputHandler != nil {
ch := make(chan Event, MaxRequestsInFlight)
ch := make(chan Event, opts.Concurrency)
notifier = &ChannelNotifier{ch}
outputWaitGroup.Go(func() {
opts.OutputHandler(ctx, ch)
Expand Down
Loading