Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: sync registry contents via ssh #4106

Merged
merged 1 commit into from
Oct 17, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
178 changes: 79 additions & 99 deletions pkg/filesystem/registry/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@ package registry
import (
"context"
"fmt"
"io"
"path/filepath"
"strings"
stdsync "sync"
"time"

"github.com/containers/common/pkg/auth"
"github.com/containers/image/v5/copy"
"github.com/containers/image/v5/types"
"golang.org/x/sync/errgroup"
Expand All @@ -48,6 +45,11 @@ const (
defaultTemporaryPort = "5050"
)

const (
httpMode int = iota
sshMode
)

type impl struct {
pathResolver constants.PathResolver
execer exec.Interface
Expand Down Expand Up @@ -77,50 +79,58 @@ func (s *impl) Sync(ctx context.Context, hosts ...string) error {
logger.Debug("running temporary registry on host %s", host)
if err := s.execer.CmdAsyncWithContext(ctx, host, getRegistryServeCommand(s.pathResolver, defaultTemporaryPort)); err != nil {
// ignore expected signal killed error when context cancel
if !strings.Contains(err.Error(), "signal: killed") {
if !strings.Contains(err.Error(), "signal: killed") && !strings.Contains(err.Error(), "context canceled") {
logger.Error(err)
}
}
}(cmdCtx, hosts[i])
}

var endpoints []string
probeCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
eg, _ := errgroup.WithContext(probeCtx)
mutex := &stdsync.Mutex{}
for i := range hosts {
host := hosts[i]
eg.Go(func() error {
ep := sync.ParseRegistryAddress(trimPortStr(host), defaultTemporaryPort)
if err := httputils.WaitUntilEndpointAlive(probeCtx, "http://"+ep); err != nil {
return err
}
mutex.Lock()
endpoints = append(endpoints, ep)
mutex.Unlock()
return nil
})
}
var syncFn func(context.Context, string) error
if err := eg.Wait(); err != nil {
logger.Warn("cannot connect to remote temporary registry: %v, fallback using ssh mode instead", err)
syncFn = syncViaSSH(s, hosts)
} else {
syncFn = syncViaHTTP(endpoints)
type syncOption struct {
target string
typ int
}

outerEg, ctx := errgroup.WithContext(ctx)
for i := range s.mounts {
registryDir := filepath.Join(s.mounts[i].MountPoint, constants.RegistryDirName)
if !file.IsDir(registryDir) {
continue
syncOptionChan := make(chan *syncOption, len(hosts))
go func() {
for i := range hosts {
go func(target string) {
probeCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
ep := sync.ParseRegistryAddress(trimPortStr(target), defaultTemporaryPort)
if err := httputils.WaitUntilEndpointAlive(probeCtx, "http://"+ep); err != nil {
logger.Warn("cannot connect to remote temporary registry %s: %v, fallback using ssh mode instead", ep, err)
syncOptionChan <- &syncOption{target: target, typ: sshMode}
} else {
syncOptionChan <- &syncOption{target: ep, typ: httpMode}
}
}(hosts[i])
}
}()

eg, _ := errgroup.WithContext(ctx)
for i := 0; i < len(hosts); i++ {
opt, ok := <-syncOptionChan
if !ok {
break
}
for j := range s.mounts {
registryDir := filepath.Join(s.mounts[j].MountPoint, constants.RegistryDirName)
if !file.IsDir(registryDir) {
continue
}
eg.Go(func() (err error) {
switch opt.typ {
case httpMode:
err = syncViaHTTP(ctx, opt.target, registryDir)
case sshMode:
err = syncViaSSH(ctx, s, opt.target, registryDir)
}
return
})
}
outerEg.Go(func() error {
return syncFn(ctx, registryDir)
})
}
return outerEg.Wait()
return eg.Wait()
}

func trimPortStr(s string) string {
Expand All @@ -136,76 +146,46 @@ func getRegistryServeCommand(pathResolver constants.PathResolver, port string) s
)
}

//lint:ignore U1000 Ignore unused function temporarily for debugging
func loginRegistry(ctx context.Context, sys *types.SystemContext, username, password, registry string) error {
return auth.Login(ctx, sys, &auth.LoginOptions{
Username: username,
Password: password,
Stdout: io.Discard,
}, []string{registry})
func syncViaSSH(_ context.Context, s *impl, target string, localDir string) error {
return ssh.CopyDir(s.execer, target, localDir, s.pathResolver.RootFSPath(), nil)
}

func syncViaSSH(s *impl, targets []string) func(context.Context, string) error {
return func(ctx context.Context, localDir string) error {
eg, _ := errgroup.WithContext(ctx)
for i := range targets {
target := targets[i]
eg.Go(func() error {
return ssh.CopyDir(s.execer, target, localDir, s.pathResolver.RootFSPath(), constants.IsRegistryDir)
})
}
return eg.Wait()
}
}

func syncViaHTTP(targets []string) func(context.Context, string) error {
func syncViaHTTP(ctx context.Context, target string, localDir string) error {
sys := &types.SystemContext{
DockerInsecureSkipTLSVerify: types.OptionalBoolTrue,
}
return func(ctx context.Context, localDir string) error {
config, err := handler.NewConfig(localDir, 0)
if err != nil {
return err
}
config.Log.AccessLog.Disabled = true
errCh := handler.Run(ctx, config)

eg, inner := errgroup.WithContext(ctx)
for i := range targets {
target := targets[i]
eg.Go(func() error {
src := sync.ParseRegistryAddress(localhost, config.HTTP.Addr)
probeCtx, cancel := context.WithTimeout(inner, time.Second*3)
defer cancel()
if err = httputils.WaitUntilEndpointAlive(probeCtx, "http://"+src); err != nil {
return err
}
opts := &sync.Options{
SystemContext: sys,
Source: src,
Target: target,
SelectionOptions: []copy.ImageListSelection{
copy.CopyAllImages, copy.CopySystemImage,
},
OmitError: true,
}

if err = sync.ToRegistry(inner, opts); err == nil {
return nil
}
if !strings.Contains(err.Error(), "manifest unknown") {
return err
}
return nil
})
}
err = eg.Wait()
go func() {
// for notifying shutdown http Server
errCh <- err
}()
config, err := handler.NewConfig(localDir, 0)
if err != nil {
return err
}
config.Log.AccessLog.Disabled = true
errCh := handler.Run(ctx, config)
defer func() {
// for notifying shutdown http Server
errCh <- nil
}()

src := sync.ParseRegistryAddress(localhost, config.HTTP.Addr)
probeCtx, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
if err = httputils.WaitUntilEndpointAlive(probeCtx, "http://"+src); err != nil {
return err
}
opts := &sync.Options{
SystemContext: sys,
Source: src,
Target: target,
SelectionOptions: []copy.ImageListSelection{
copy.CopyAllImages, copy.CopySystemImage,
},
OmitError: true,
}

if err = sync.ToRegistry(ctx, opts); err != nil && !strings.Contains(err.Error(), "manifest unknown") {
return err
}
return nil
}

func New(pathResolver constants.PathResolver, execer exec.Interface, mounts []v2.MountImage) filesystem.RegistrySyncer {
Expand Down