Skip to content
Open
2 changes: 2 additions & 0 deletions cmd/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func init() {
flags.BoolVar(&buildConfig.Raw, "raw", true, "turning on this flag will build model artifact layers in raw format")
flags.BoolVar(&buildConfig.Reasoning, "reasoning", false, "turning on this flag will mark this model as reasoning model in the config")
flags.BoolVar(&buildConfig.NoCreationTime, "no-creation-time", false, "turning on this flag will not set createdAt in the config, which will be helpful for repeated builds")
flags.BoolVar(&buildConfig.RetryConfig.NoRetry, "no-retry", false, "Disable retry on transient errors")
flags.DurationVar(&buildConfig.RetryConfig.MaxRetryTime, "retry-max-time", 0, "Max total retry time per file (0 = dynamic based on file size)")

if err := viper.BindPFlags(flags); err != nil {
panic(fmt.Errorf("bind build flags to viper: %w", err))
Expand Down
2 changes: 2 additions & 0 deletions cmd/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func init() {
flags.StringVar(&fetchConfig.Output, "output", "", "specify the directory for fetching the model artifact")
flags.StringSliceVar(&fetchConfig.Patterns, "patterns", []string{}, "specify the patterns for fetching the model artifact")
flags.StringVar(&fetchConfig.DragonflyEndpoint, "dragonfly-endpoint", "", "specify the dragonfly endpoint for the pull operation, which will download and hardlink the blob by dragonfly GRPC service.")
flags.BoolVar(&fetchConfig.RetryConfig.NoRetry, "no-retry", false, "Disable retry on transient errors")
flags.DurationVar(&fetchConfig.RetryConfig.MaxRetryTime, "retry-max-time", 0, "Max total retry time per file (0 = dynamic based on file size)")

if err := viper.BindPFlags(flags); err != nil {
panic(fmt.Errorf("bind fetch flags to viper: %w", err))
Expand Down
2 changes: 2 additions & 0 deletions cmd/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ func init() {
flags.StringVar(&pullConfig.ExtractDir, "extract-dir", "", "specify the extract dir for extracting the model artifact")
flags.BoolVar(&pullConfig.ExtractFromRemote, "extract-from-remote", false, "turning on this flag will pull and extract the data from remote registry and no longer store model artifact locally, so user must specify extract-dir as the output directory")
flags.StringVar(&pullConfig.DragonflyEndpoint, "dragonfly-endpoint", "", "specify the dragonfly endpoint for the pull operation, which will download and hardlink the blob by dragonfly GRPC service, this mode requires extract-from-remote must be true")
flags.BoolVar(&pullConfig.RetryConfig.NoRetry, "no-retry", false, "Disable retry on transient errors")
flags.DurationVar(&pullConfig.RetryConfig.MaxRetryTime, "retry-max-time", 0, "Max total retry time per file (0 = dynamic based on file size)")

if err := viper.BindPFlags(flags); err != nil {
panic(fmt.Errorf("bind pull flags to viper: %w", err))
Expand Down
2 changes: 2 additions & 0 deletions cmd/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func init() {
flags.BoolVar(&pushConfig.Insecure, "insecure", false, "turning on this flag will disable TLS verification")
flags.BoolVar(&pushConfig.Nydusify, "nydusify", false, "[EXPERIMENTAL] nydusify the model artifact")
flags.MarkHidden("nydusify")
flags.BoolVar(&pushConfig.RetryConfig.NoRetry, "no-retry", false, "Disable retry on transient errors")
flags.DurationVar(&pushConfig.RetryConfig.MaxRetryTime, "retry-max-time", 0, "Max total retry time per file (0 = dynamic based on file size)")

if err := viper.BindPFlags(flags); err != nil {
panic(fmt.Errorf("bind push flags to viper: %w", err))
Expand Down
22 changes: 22 additions & 0 deletions internal/pb/pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,28 @@ func (p *ProgressBar) Add(prompt, name string, size int64, reader io.Reader) io.
return reader
}

// Placeholder creates or resets a progress bar entry without a reader.
// It is used during retry backoff to keep a visible bar for the item.
func (p *ProgressBar) Placeholder(name string, prompt string, size int64) {
if disableProgress {
return
}

p.mu.RLock()
existing := p.bars[name]
p.mu.RUnlock()

// If the bar already exists, just reset its message.
if existing != nil {
existing.msg = fmt.Sprintf("%s %s", prompt, name)
existing.Bar.SetCurrent(0)
return
}

// Create a new placeholder bar.
p.Add(prompt, name, size, nil)
Comment on lines +146 to +158

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a data race condition here. The existing.msg field is being modified without a lock, while it can be concurrently read by the progress bar's rendering goroutine. This can lead to unpredictable behavior or crashes.

To fix this, you should use a write lock to protect both the read from the p.bars map and the subsequent write to the msg field. The lock should be released before calling p.Add to avoid deadlocks, as p.Add acquires its own locks.

Suggested change
p.mu.RLock()
existing := p.bars[name]
p.mu.RUnlock()
// If the bar already exists, just reset its message.
if existing != nil {
existing.msg = fmt.Sprintf("%s %s", prompt, name)
existing.Bar.SetCurrent(0)
return
}
// Create a new placeholder bar.
p.Add(prompt, name, size, nil)
p.mu.Lock()
if existing, ok := p.bars[name]; ok {
// If the bar already exists, just reset its message.
existing.msg = fmt.Sprintf("%s %s", prompt, name)
existing.Bar.SetCurrent(0)
p.mu.Unlock()
return
}
p.mu.Unlock()
// Create a new placeholder bar.
p.Add(prompt, name, size, nil)

}

// Get returns the progress bar.
func (p *ProgressBar) Get(name string) *progressBar {
p.mu.RLock()
Expand Down
24 changes: 16 additions & 8 deletions pkg/backend/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"os"
"path/filepath"

retry "github.com/avast/retry-go/v4"
modelspec "github.com/modelpack/model-spec/specs-go/v1"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
Expand All @@ -35,6 +34,7 @@ import (
"github.com/modelpack/modctl/pkg/backend/processor"
"github.com/modelpack/modctl/pkg/config"
"github.com/modelpack/modctl/pkg/modelfile"
"github.com/modelpack/modctl/pkg/retrypolicy"
"github.com/modelpack/modctl/pkg/source"
)

Expand Down Expand Up @@ -123,8 +123,8 @@ func (b *backend) Build(ctx context.Context, modelfilePath, workDir, target stri

var configDesc ocispec.Descriptor
// Build the model config.
if err := retry.Do(func() error {
configDesc, err = builder.BuildConfig(ctx, config, hooks.NewHooks(
if err := retrypolicy.Do(ctx, func(rctx context.Context) error {
configDesc, err = builder.BuildConfig(rctx, config, hooks.NewHooks(
hooks.WithOnStart(func(name string, size int64, reader io.Reader) io.Reader {
return pb.Add(internalpb.NormalizePrompt("Building config"), name, size, reader)
}),
Expand All @@ -136,13 +136,17 @@ func (b *backend) Build(ctx context.Context, modelfilePath, workDir, target stri
}),
))
return err
}, append(defaultRetryOpts, retry.Context(ctx))...); err != nil {
}, retrypolicy.DoOpts{
FileSize: 0, // config is small
FileName: "config",
Config: &cfg.RetryConfig,
}); err != nil {
return fmt.Errorf("failed to build model config: %w", err)
}

// Build the model manifest.
if err := retry.Do(func() error {
_, err = builder.BuildManifest(ctx, layers, configDesc, manifestAnnotation(modelfile), hooks.NewHooks(
if err := retrypolicy.Do(ctx, func(rctx context.Context) error {
_, err = builder.BuildManifest(rctx, layers, configDesc, manifestAnnotation(modelfile), hooks.NewHooks(
hooks.WithOnStart(func(name string, size int64, reader io.Reader) io.Reader {
return pb.Add(internalpb.NormalizePrompt("Building manifest"), name, size, reader)
}),
Expand All @@ -154,7 +158,11 @@ func (b *backend) Build(ctx context.Context, modelfilePath, workDir, target stri
}),
))
return err
}, append(defaultRetryOpts, retry.Context(ctx))...); err != nil {
}, retrypolicy.DoOpts{
FileSize: 0, // manifest is small
FileName: "manifest",
Config: &cfg.RetryConfig,
}); err != nil {
return fmt.Errorf("failed to build model manifest: %w", err)
}

Expand Down Expand Up @@ -204,7 +212,7 @@ func (b *backend) getProcessors(modelfile modelfile.Modelfile, cfg *config.Build
func (b *backend) process(ctx context.Context, builder build.Builder, workDir string, pb *internalpb.ProgressBar, cfg *config.Build, processors ...processor.Processor) ([]ocispec.Descriptor, error) {
descriptors := []ocispec.Descriptor{}
for _, p := range processors {
descs, err := p.Process(ctx, builder, workDir, processor.WithConcurrency(cfg.Concurrency), processor.WithProgressTracker(pb))
descs, err := p.Process(ctx, builder, workDir, processor.WithConcurrency(cfg.Concurrency), processor.WithProgressTracker(pb), processor.WithRetryConfig(cfg.RetryConfig))
if err != nil {
return nil, err
}
Expand Down
47 changes: 41 additions & 6 deletions pkg/backend/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package backend
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"

"github.com/bmatcuk/doublestar/v4"
legacymodelspec "github.com/dragonflyoss/model-spec/specs-go/v1"
Expand All @@ -31,6 +34,7 @@ import (
internalpb "github.com/modelpack/modctl/internal/pb"
"github.com/modelpack/modctl/pkg/backend/remote"
"github.com/modelpack/modctl/pkg/config"
"github.com/modelpack/modctl/pkg/retrypolicy"
)

// Fetch fetches partial files to the output.
Expand Down Expand Up @@ -101,9 +105,12 @@ func (b *backend) Fetch(ctx context.Context, target string, cfg *config.Fetch) e
pb.Start()
defer pb.Stop()

g, ctx := errgroup.WithContext(ctx)
g := new(errgroup.Group)
g.SetLimit(cfg.Concurrency)

var mu sync.Mutex
var errs []error

logrus.Infof("fetch: fetching %d matched layers", len(layers))
for _, layer := range layers {
g.Go(func() error {
Expand All @@ -113,17 +120,45 @@ func (b *backend) Fetch(ctx context.Context, target string, cfg *config.Fetch) e
default:
}

logrus.Debugf("fetch: processing layer %s", layer.Digest)
if err := pullAndExtractFromRemote(ctx, pb, internalpb.NormalizePrompt("Fetching blob"), client, cfg.Output, layer); err != nil {
return err
var annoFilepath string
if layer.Annotations != nil {
if layer.Annotations[modelspec.AnnotationFilepath] != "" {
annoFilepath = layer.Annotations[modelspec.AnnotationFilepath]
} else {
annoFilepath = layer.Annotations[legacymodelspec.AnnotationFilepath]
}
}
Comment on lines +123 to 130

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This logic to extract the filepath from annotations is duplicated in pkg/backend/fetch_by_d7y.go and pkg/backend/pull_by_d7y.go. To improve maintainability and reduce code duplication, consider extracting this into a shared helper function within the backend package.

For example, you could define this function:

func getAnnotationFilepath(annotations map[string]string) string {
	if annotations == nil {
		return ""
	}
	if path := annotations[modelspec.AnnotationFilepath]; path != "" {
		return path
	}
	return annotations[legacymodelspec.AnnotationFilepath]
}

And then call it here.

Suggested change
var annoFilepath string
if layer.Annotations != nil {
if layer.Annotations[modelspec.AnnotationFilepath] != "" {
annoFilepath = layer.Annotations[modelspec.AnnotationFilepath]
} else {
annoFilepath = layer.Annotations[legacymodelspec.AnnotationFilepath]
}
}
annoFilepath := getAnnotationFilepath(layer.Annotations)


logrus.Debugf("fetch: successfully processed layer %s", layer.Digest)
logrus.Debugf("fetch: processing layer %s", layer.Digest)
if err := retrypolicy.Do(ctx, func(rctx context.Context) error {
return pullAndExtractFromRemote(rctx, pb, internalpb.NormalizePrompt("Fetching blob"), client, cfg.Output, layer)
}, retrypolicy.DoOpts{
FileSize: layer.Size,
FileName: annoFilepath,
Config: &cfg.RetryConfig,
OnRetry: func(attempt uint, reason string, backoff time.Duration) {
if bar := pb.Get(layer.Digest.String()); bar != nil {
bar.SetRefill(bar.Current())
bar.SetCurrent(0)
bar.EwmaSetCurrent(0, time.Second)
}
},
}); err != nil {
mu.Lock()
errs = append(errs, err)
mu.Unlock()
} else {
logrus.Debugf("fetch: successfully processed layer %s", layer.Digest)
}
return nil
})
}

if err := g.Wait(); err != nil {
_ = g.Wait()
if ctx.Err() != nil {
return fmt.Errorf("fetch cancelled: %w", ctx.Err())
}
if err := errors.Join(errs...); err != nil {
return err
}

Expand Down
49 changes: 41 additions & 8 deletions pkg/backend/fetch_by_d7y.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@ package backend
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"

common "d7y.io/api/v2/pkg/apis/common/v2"
dfdaemon "d7y.io/api/v2/pkg/apis/dfdaemon/v2"
"github.com/avast/retry-go/v4"
"github.com/bmatcuk/doublestar/v4"
legacymodelspec "github.com/dragonflyoss/model-spec/specs-go/v1"
modelspec "github.com/modelpack/model-spec/specs-go/v1"
Expand All @@ -41,6 +43,7 @@ import (
"github.com/modelpack/modctl/pkg/archiver"
"github.com/modelpack/modctl/pkg/backend/remote"
"github.com/modelpack/modctl/pkg/config"
"github.com/modelpack/modctl/pkg/retrypolicy"
)

// fetchByDragonfly fetches partial files via Dragonfly gRPC service based on pattern matching.
Expand Down Expand Up @@ -124,9 +127,12 @@ func (b *backend) fetchByDragonfly(ctx context.Context, target string, cfg *conf
defer pb.Stop()

// Process layers concurrently.
g, ctx := errgroup.WithContext(ctx)
g := new(errgroup.Group)
g.SetLimit(cfg.Concurrency)

var mu sync.Mutex
var errs []error

logrus.Infof("fetch: fetching %d matched layers via dragonfly", len(layers))
for _, layer := range layers {
g.Go(func() error {
Expand All @@ -138,14 +144,21 @@ func (b *backend) fetchByDragonfly(ctx context.Context, target string, cfg *conf

logrus.Debugf("fetch: processing layer %s via dragonfly", layer.Digest)
if err := fetchLayerByDragonfly(ctx, pb, dfdaemon.NewDfdaemonDownloadClient(conn), ref, manifest, layer, authToken, cfg); err != nil {
return err
mu.Lock()
errs = append(errs, err)
mu.Unlock()
} else {
logrus.Debugf("fetch: successfully processed layer %s via dragonfly", layer.Digest)
}
logrus.Debugf("fetch: successfully processed layer %s via dragonfly", layer.Digest)
return nil
})
}

if err := g.Wait(); err != nil {
_ = g.Wait()
if ctx.Err() != nil {
return fmt.Errorf("fetch cancelled: %w", ctx.Err())
}
if err := errors.Join(errs...); err != nil {
return err
}

Expand All @@ -155,18 +168,38 @@ func (b *backend) fetchByDragonfly(ctx context.Context, target string, cfg *conf

// fetchLayerByDragonfly handles downloading and extracting a single layer via Dragonfly.
func fetchLayerByDragonfly(ctx context.Context, pb *internalpb.ProgressBar, client dfdaemon.DfdaemonDownloadClient, ref Referencer, manifest ocispec.Manifest, desc ocispec.Descriptor, authToken string, cfg *config.Fetch) error {
err := retry.Do(func() error {
var annoFilepath string
if desc.Annotations != nil {
if desc.Annotations[modelspec.AnnotationFilepath] != "" {
annoFilepath = desc.Annotations[modelspec.AnnotationFilepath]
} else {
annoFilepath = desc.Annotations[legacymodelspec.AnnotationFilepath]
}
}
Comment on lines +171 to +178

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This logic to extract the filepath from annotations is duplicated in pkg/backend/fetch.go and pkg/backend/pull_by_d7y.go. To improve maintainability and reduce code duplication, consider extracting this into a shared helper function within the backend package.

For example, you could define this function:

func getAnnotationFilepath(annotations map[string]string) string {
	if annotations == nil {
		return ""
	}
	if path := annotations[modelspec.AnnotationFilepath]; path != "" {
		return path
	}
	return annotations[legacymodelspec.AnnotationFilepath]
}

And then call it here.

Suggested change
var annoFilepath string
if desc.Annotations != nil {
if desc.Annotations[modelspec.AnnotationFilepath] != "" {
annoFilepath = desc.Annotations[modelspec.AnnotationFilepath]
} else {
annoFilepath = desc.Annotations[legacymodelspec.AnnotationFilepath]
}
}
annoFilepath := getAnnotationFilepath(desc.Annotations)


err := retrypolicy.Do(ctx, func(rctx context.Context) error {
logrus.Debugf("fetch: processing layer %s", desc.Digest)
cfg.Hooks.BeforePullLayer(desc, manifest) // Call before hook
err := downloadAndExtractFetchLayer(ctx, pb, client, ref, desc, authToken, cfg)
err := downloadAndExtractFetchLayer(rctx, pb, client, ref, desc, authToken, cfg)
cfg.Hooks.AfterPullLayer(desc, err) // Call after hook
if err != nil {
err = fmt.Errorf("pull: failed to download and extract layer %s: %w", desc.Digest, err)
logrus.Error(err)
}

return err
}, append(defaultRetryOpts, retry.Context(ctx))...)
}, retrypolicy.DoOpts{
FileSize: desc.Size,
FileName: annoFilepath,
Config: &cfg.RetryConfig,
OnRetry: func(attempt uint, reason string, backoff time.Duration) {
if bar := pb.Get(desc.Digest.String()); bar != nil {
bar.SetRefill(bar.Current())
bar.SetCurrent(0)
bar.EwmaSetCurrent(0, time.Second)
}
},
})

if err != nil {
err = fmt.Errorf("fetch: failed to download and extract layer %s: %w", desc.Digest, err)
Expand Down
Loading
Loading