Skip to content

Commit

Permalink
feat(build): force use the only one sync server (#6512)
Browse files Browse the repository at this point in the history
Signed-off-by: Yaroslav Pershin <62902094+iapershin@users.noreply.github.com>
Signed-off-by: Aleksei Igrychev <aleksei.igrychev@palark.com>
Co-authored-by: Aleksei Igrychev <aleksei.igrychev@palark.com>
  • Loading branch information
iapershin and alexey-igrychev authored Jan 14, 2025
1 parent 538c7cc commit 6673a31
Show file tree
Hide file tree
Showing 7 changed files with 493 additions and 120 deletions.
2 changes: 1 addition & 1 deletion cmd/werf/common/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewStorageManagerWithOptions(ctx context.Context, c *NewStorageManagerConfi
return nil, fmt.Errorf("error get synchronization: %w", err)
}

storageLockManager, err := GetStorageLockManager(ctx, synchronization)
storageLockManager, err := synchronization.GetStorageLockManager(ctx)
if err != nil {
return nil, fmt.Errorf("error get storage lock manager: %w", err)
}
Expand Down
173 changes: 54 additions & 119 deletions cmd/werf/common/synchronization.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,24 @@ import (
"strings"

"github.com/spf13/cobra"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"

"github.com/werf/kubedog/pkg/kube"
"github.com/werf/logboek"
"github.com/werf/werf/v2/pkg/storage"
"github.com/werf/werf/v2/pkg/storage/synchronization/lock_manager"
"github.com/werf/werf/v2/pkg/storage/synchronization/server"
"github.com/werf/werf/v2/pkg/werf"
"github.com/werf/werf/v2/pkg/werf/global_warnings"
)

const (
syncProtocolKube = "kubernetes://"
syncProtocolHttp = "http://"
syncProtocolHttps = "https://"
)

type Synchronization interface {
// GetStorageLockManager returns lock manager interface based on synchronization server type
GetStorageLockManager(ctx context.Context) (lock_manager.Interface, error)
}

func SetupSynchronization(cmdData *CmdData, cmd *cobra.Command) {
cmdData.Synchronization = new(string)

Expand All @@ -34,21 +40,6 @@ Default:
The same address should be specified for all werf processes that work with a single repo. :local address allows execution of werf processes from a single host only`, server.DefaultAddress))
}

type SynchronizationType string

const (
LocalSynchronization SynchronizationType = "LocalSynchronization"
KubernetesSynchronization SynchronizationType = "KubernetesSynchronization"
HttpSynchronization SynchronizationType = "HttpSynchronization"
)

type SynchronizationParams struct {
ClientID string
Address string
SynchronizationType SynchronizationType
KubeParams *lock_manager.KubernetesParams
}

func checkSynchronizationKubernetesParamsForWarnings(cmdData *CmdData) {
if *cmdData.Synchronization != "" {
return
Expand Down Expand Up @@ -100,111 +91,55 @@ func checkSynchronizationKubernetesParamsForWarnings(cmdData *CmdData) {
}
}

func GetSynchronization(
ctx context.Context,
cmdData *CmdData,
projectName string,
stagesStorage storage.StagesStorage,
) (*SynchronizationParams, error) {
getKubeParamsFunc := func(
address string,
commonKubeInitializer *OndemandKubeInitializer,
) (*SynchronizationParams, error) {
res := &SynchronizationParams{}
res.SynchronizationType = KubernetesSynchronization
res.Address = address

if params, err := lock_manager.ParseKubernetesParams(res.Address); err != nil {
return nil, fmt.Errorf("unable to parse synchronization address %s: %w", res.Address, err)
} else {
res.KubeParams = params
}

if res.KubeParams.ConfigPath == "" {
res.KubeParams.ConfigPath = commonKubeInitializer.KubeConfig
}
if res.KubeParams.ConfigContext == "" {
res.KubeParams.ConfigContext = commonKubeInitializer.KubeContext
}
if res.KubeParams.ConfigDataBase64 == "" {
res.KubeParams.ConfigDataBase64 = commonKubeInitializer.KubeConfigBase64
}
if res.KubeParams.ConfigPathMergeList == nil {
res.KubeParams.ConfigPathMergeList = commonKubeInitializer.KubeConfigPathMergeList
}

return res, nil
// GetSynchronization determines the type of synchronization server
func GetSynchronization(ctx context.Context, cmdData *CmdData, projectName string, stagesStorage storage.StagesStorage) (Synchronization, error) {
params := lock_manager.SynchronizationParams{
ProjectName: projectName,
ServerAddress: *cmdData.Synchronization,
StagesStorage: stagesStorage,
}

getHttpParamsFunc := func(
serverAddress string,
stagesStorage storage.StagesStorage,
) (*SynchronizationParams, error) {
var clientID string
var err error
if err := logboek.Info().LogProcess(fmt.Sprintf("Getting client id for the http synchronization server")).
DoError(func() error {
clientID, err = lock_manager.GetHttpClientID(ctx, projectName, serverAddress, stagesStorage)
if err != nil {
return fmt.Errorf("unable to get client id for the http synchronization server: %w", err)
}

logboek.Info().LogF("Using clientID %q for http synchronization server at address %s\n", clientID, serverAddress)

return err
}); err != nil {
return nil, err
}

return &SynchronizationParams{Address: serverAddress, ClientID: clientID, SynchronizationType: HttpSynchronization}, nil
}

switch {
case *cmdData.Synchronization == "":
if stagesStorage.Address() == storage.LocalStorageAddress {
return &SynchronizationParams{SynchronizationType: LocalSynchronization, Address: storage.LocalStorageAddress}, nil
}

return getHttpParamsFunc(server.DefaultAddress, stagesStorage)
case *cmdData.Synchronization == storage.LocalStorageAddress:
return &SynchronizationParams{Address: *cmdData.Synchronization, SynchronizationType: LocalSynchronization}, nil
case strings.HasPrefix(*cmdData.Synchronization, "kubernetes://"):
if params.ServerAddress == "" {
return initDefault(ctx, params)
} else if protocolIsLocal(params.ServerAddress) {
return lock_manager.NewLocalSynchronization(ctx, params)
} else if protocolIsKube(params.ServerAddress) {
checkSynchronizationKubernetesParamsForWarnings(cmdData)
return getKubeParamsFunc(*cmdData.Synchronization, GetOndemandKubeInitializer())
case strings.HasPrefix(*cmdData.Synchronization, "http://") || strings.HasPrefix(*cmdData.Synchronization, "https://"):
return getHttpParamsFunc(*cmdData.Synchronization, stagesStorage)
default:
return initKube(ctx, params)
} else if protocolIsHttpOrHttps(params.ServerAddress) {
return lock_manager.NewHttpSynchronization(ctx, params)
} else {
return nil, fmt.Errorf("only --synchronization=%s or --synchronization=kubernetes://NAMESPACE or --synchronization=http[s]://HOST:PORT/CLIENT_ID is supported, got %q", storage.LocalStorageAddress, *cmdData.Synchronization)
}
}

func GetStorageLockManager(
ctx context.Context,
synchronization *SynchronizationParams,
) (lock_manager.Interface, error) {
switch synchronization.SynchronizationType {
case LocalSynchronization:
return lock_manager.NewGeneric(werf.GetHostLocker()), nil
case KubernetesSynchronization:
if config, err := kube.GetKubeConfig(kube.KubeConfigOptions{
ConfigPath: synchronization.KubeParams.ConfigPath,
ConfigDataBase64: synchronization.KubeParams.ConfigDataBase64,
ConfigPathMergeList: synchronization.KubeParams.ConfigPathMergeList,
Context: synchronization.KubeParams.ConfigContext,
}); err != nil {
return nil, fmt.Errorf("unable to load synchronization kube config %q (context %q): %w", synchronization.KubeParams.ConfigPath, synchronization.KubeParams.ConfigContext, err)
} else if dynamicClient, err := dynamic.NewForConfig(config.Config); err != nil {
return nil, fmt.Errorf("unable to create synchronization kubernetes dynamic client: %w", err)
} else if client, err := kubernetes.NewForConfig(config.Config); err != nil {
return nil, fmt.Errorf("unable to create synchronization kubernetes client: %w", err)
} else {
return lock_manager.NewKubernetes(synchronization.KubeParams.Namespace, client, dynamicClient, func(projectName string) string {
return fmt.Sprintf("werf-%s", projectName)
}), nil
}
case HttpSynchronization:
return lock_manager.NewHttp(ctx, synchronization.Address, synchronization.ClientID)
default:
panic(fmt.Sprintf("unsupported synchronization type %q", synchronization.SynchronizationType))
func protocolIsKube(address string) bool {
return strings.HasPrefix(address, syncProtocolKube)
}

func protocolIsHttpOrHttps(address string) bool {
return strings.HasPrefix(address, syncProtocolHttp) || strings.HasPrefix(address, syncProtocolHttps)
}

func protocolIsLocal(address string) bool {
return address == storage.LocalStorageAddress
}

func initDefault(ctx context.Context, params lock_manager.SynchronizationParams) (Synchronization, error) {
if params.StagesStorage.Address() == storage.LocalStorageAddress {
return lock_manager.NewLocalSynchronization(ctx, params)
}
params.ServerAddress = server.DefaultAddress
return lock_manager.NewHttpSynchronization(ctx, params)
}

func initKube(ctx context.Context, params lock_manager.SynchronizationParams) (Synchronization, error) {
ondemandKubeInitializer := GetOndemandKubeInitializer()
params.CommonKubeInitializer = &lock_manager.SynchronizationKubeParams{
KubeContext: ondemandKubeInitializer.KubeContext,
KubeConfig: ondemandKubeInitializer.KubeConfig,
KubeConfigBase64: ondemandKubeInitializer.KubeConfigBase64,
KubeConfigPathMergeList: ondemandKubeInitializer.KubeConfigPathMergeList,
}
return lock_manager.NewKubernetesSynchronization(ctx, params)
}
8 changes: 8 additions & 0 deletions pkg/storage/local_stages_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,3 +377,11 @@ func makeLocalImportMetadataName(projectName, importSourceID string) string {
}, ":",
)
}

func (storage *LocalStagesStorage) GetSyncServerRecords(ctx context.Context, projectName string, opts ...Option) ([]*SyncServerRecord, error) {
panic("not implemented")
}

func (storage *LocalStagesStorage) PostSyncServerRecord(ctx context.Context, projectName string, rec *SyncServerRecord) error {
panic("not implemented")
}
70 changes: 70 additions & 0 deletions pkg/storage/repo_stages_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ const (
RepoClientIDRecord_ImageTagPrefix = "client-id-"
RepoClientIDRecord_ImageNameFormat = "%s:client-id-%s-%d"

RepoSyncServerRecord_ImageTagPrefix = "sync-server"
RepoSyncServerRecord_ImageNameFormat = "%s:sync-server"

RepoSyncServerRecord_LabelAddress = "syncserver"
RepoSyncServerRecord_LabelTimestamp = "syncservertimestamp"

UnexpectedTagFormatErrorPrefix = "unexpected tag format"
)

Expand Down Expand Up @@ -918,3 +924,67 @@ func (storage *RepoStagesStorage) CopyFromStorage(ctx context.Context, src Stage
func (storage *RepoStagesStorage) FilterStageDescSetAndProcessRelatedData(_ context.Context, stageDescSet image.StageDescSet, _ FilterStagesAndProcessRelatedDataOptions) (image.StageDescSet, error) {
return stageDescSet, nil
}

// GetSyncServerRecords gets sync server address from repo
func (storage *RepoStagesStorage) GetSyncServerRecords(ctx context.Context, projectName string, opts ...Option) ([]*SyncServerRecord, error) {
logboek.Context(ctx).Debug().LogF("-- RepoStagesStorage.GetSyncServerRecords for project %s\n", projectName)

o := makeOptions(opts...)
tags, err := storage.DockerRegistry.Tags(ctx, storage.RepoAddress, o.dockerRegistryOptions...)
if err != nil {
return nil, fmt.Errorf("unable to get repo %s tags: %w", storage.RepoAddress, err)
}

var res []*SyncServerRecord
for _, tag := range tags {
if !strings.HasPrefix(tag, RepoSyncServerRecord_ImageTagPrefix) {
continue
}

img, err := storage.DockerRegistry.GetRepoImage(ctx, fmt.Sprintf("%s:%s", storage.RepoAddress, tag))
if err != nil {
return nil, err
}

if _, ok := img.Labels[RepoSyncServerRecord_LabelAddress]; !ok {
continue
}

timestampMillisec, err := strconv.ParseInt(img.Labels[RepoSyncServerRecord_LabelTimestamp], 10, 64)
if err != nil {
continue
}

rec := &SyncServerRecord{Server: img.Labels[RepoSyncServerRecord_LabelAddress], TimestampMillisec: timestampMillisec}
res = append(res, rec)

logboek.Context(ctx).Debug().LogF("-- RepoStagesStorage.GetSyncServerRecords got clientID record: %s\n", rec)
}

return res, nil
}

// PostSyncServerRecord posts sync server address to repo
func (storage *RepoStagesStorage) PostSyncServerRecord(ctx context.Context, projectName string, rec *SyncServerRecord) error {
logboek.Context(ctx).Debug().LogF("-- RepoStagesStorage.PostSyncServer %s for project %s\n", rec.Server, projectName)

fullImageName := fmt.Sprintf(RepoSyncServerRecord_ImageNameFormat, storage.RepoAddress)

logboek.Context(ctx).Debug().LogF("-- RepoStagesStorage.PostSyncServer full image name: %s\n", fullImageName)

opts := &docker_registry.PushImageOptions{
Labels: map[string]string{
image.WerfLabel: projectName,
RepoSyncServerRecord_LabelAddress: rec.Server,
RepoSyncServerRecord_LabelTimestamp: fmt.Sprint(rec.TimestampMillisec),
},
}

if err := storage.DockerRegistry.PushImage(ctx, fullImageName, opts); err != nil {
return fmt.Errorf("unable to push image %s: %w", fullImageName, err)
}

logboek.Context(ctx).Info().LogF("Posted new synchronization server %q for project %s\n", rec.Server, projectName)

return nil
}
7 changes: 7 additions & 0 deletions pkg/storage/stages_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type StagesStorage interface {

GetClientIDRecords(ctx context.Context, projectName string, opts ...Option) ([]*ClientIDRecord, error)
PostClientIDRecord(ctx context.Context, projectName string, rec *ClientIDRecord) error
GetSyncServerRecords(ctx context.Context, projectName string, opts ...Option) ([]*SyncServerRecord, error)
PostSyncServerRecord(ctx context.Context, projectName string, rec *SyncServerRecord) error
PostMultiplatformImage(ctx context.Context, projectName, tag string, allPlatformsImages []*image.Info, platforms []string) error
FilterStageDescSetAndProcessRelatedData(ctx context.Context, stageDescSet image.StageDescSet, options FilterStagesAndProcessRelatedDataOptions) (image.StageDescSet, error)

Expand All @@ -101,3 +103,8 @@ type ImageMetadata struct {
type CopyFromStorageOptions struct {
IsMultiplatformImage bool
}

type SyncServerRecord struct {
Server string
TimestampMillisec int64
}
Loading

0 comments on commit 6673a31

Please sign in to comment.