Skip to content
Permalink
Browse files
feat(build): speeding up with runtime caching for meta images
- docker registry
  - ability to reuse cached tags from previous requests
  - updating cached tags when a tag is added/removed
- stages storage: ability to work with cached meta image data
- build/cleanup: using caching when working with meta images

Signed-off-by: Alexey Igrychev <alexey.igrychev@flant.com>
  • Loading branch information
alexey-igrychev committed Mar 17, 2022
1 parent 8533997 commit 7ea0a4cd92427567b34734e036c66821b37d2a14
@@ -258,7 +258,17 @@ func (phase *BuildPhase) AfterImageStages(ctx context.Context, img *Image) error

func (phase *BuildPhase) addManagedImage(ctx context.Context, img *Image) error {
if phase.ShouldAddManagedImageRecord {
if err := phase.Conveyor.StorageManager.GetStagesStorage().AddManagedImage(ctx, phase.Conveyor.projectName(), img.GetName()); err != nil {
stagesStorage := phase.Conveyor.StorageManager.GetStagesStorage()
exist, err := stagesStorage.IsManagedImageExist(ctx, phase.Conveyor.projectName(), img.GetName(), storage.WithCache())
if err != nil {
return fmt.Errorf("unable to check existence of managed image: %w", err)
}

if exist {
return nil
}

if err := stagesStorage.AddManagedImage(ctx, phase.Conveyor.projectName(), img.GetName()); err != nil {
return fmt.Errorf("unable to add image %q to the managed images of project %q: %s", img.GetName(), phase.Conveyor.projectName(), err)
}
}
@@ -286,13 +296,14 @@ func (phase *BuildPhase) publishImageMetadata(ctx context.Context, img *Image) e
}

for _, commit := range commits {
exists, err := phase.Conveyor.StorageManager.GetStagesStorage().IsImageMetadataExist(ctx, phase.Conveyor.projectName(), img.GetName(), commit, img.GetStageID())
stagesStorage := phase.Conveyor.StorageManager.GetStagesStorage()
exist, err := stagesStorage.IsImageMetadataExist(ctx, phase.Conveyor.projectName(), img.GetName(), commit, img.GetStageID(), storage.WithCache())
if err != nil {
return fmt.Errorf("unable to get image %s metadata by commit %s and stage ID %s: %s", img.GetName(), commit, img.GetStageID(), err)
}

if !exists {
if err := phase.Conveyor.StorageManager.GetStagesStorage().PutImageMetadata(ctx, phase.Conveyor.projectName(), img.GetName(), commit, img.GetStageID()); err != nil {
if !exist {
if err := stagesStorage.PutImageMetadata(ctx, phase.Conveyor.projectName(), img.GetName(), commit, img.GetStageID()); err != nil {
return fmt.Errorf("unable to put image %s metadata by commit %s and stage ID %s: %s", img.GetName(), commit, img.GetStageID(), err)
}
}
@@ -701,7 +701,7 @@ FilterOutFinalStages:
}

func (m *cleanupManager) initImportsMetadata(ctx context.Context, stageDescriptionList []*image.StageDescription) error {
importMetadataIDs, err := m.StorageManager.GetStagesStorage().GetImportMetadataIDs(ctx, m.ProjectName)
importMetadataIDs, err := m.StorageManager.GetStagesStorage().GetImportMetadataIDs(ctx, m.ProjectName, storage.WithCache())
if err != nil {
return err
}
@@ -49,7 +49,7 @@ func (m *purgeManager) run(ctx context.Context) error {
}

if err := logboek.Context(ctx).Default().LogProcess("Deleting imports metadata").DoError(func() error {
importMetadataIDs, err := m.StorageManager.GetStagesStorage().GetImportMetadataIDs(ctx, m.ProjectName)
importMetadataIDs, err := m.StorageManager.GetStagesStorage().GetImportMetadataIDs(ctx, m.ProjectName, storage.WithCache())
if err != nil {
return err
}
@@ -60,7 +60,7 @@ func (m *purgeManager) run(ctx context.Context) error {
}

if err := logboek.Context(ctx).Default().LogProcess("Deleting managed images").DoError(func() error {
managedImages, err := m.StorageManager.GetStagesStorage().GetManagedImages(ctx, m.ProjectName)
managedImages, err := m.StorageManager.GetStagesStorage().GetManagedImages(ctx, m.ProjectName, storage.WithCache())
if err != nil {
return err
}
@@ -75,7 +75,7 @@ func (m *purgeManager) run(ctx context.Context) error {
}

if err := logboek.Context(ctx).Default().LogProcess("Deleting images metadata").DoError(func() error {
_, imageMetadataByImageName, err := m.StorageManager.GetStagesStorage().GetAllAndGroupImageMetadataByImageName(ctx, m.ProjectName, []string{})
_, imageMetadataByImageName, err := m.StorageManager.GetStagesStorage().GetAllAndGroupImageMetadataByImageName(ctx, m.ProjectName, []string{}, storage.WithCache())
if err != nil {
return err
}
@@ -95,7 +95,7 @@ type GitRepo interface {
}

func (m *Manager) InitImagesMetadata(ctx context.Context, storageManager manager.StorageManagerInterface, localGit GitRepo, projectName string, imageNameList []string) error {
imageMetadataByImageName, imageMetadataByNotManagedImageName, err := storageManager.GetStagesStorage().GetAllAndGroupImageMetadataByImageName(ctx, projectName, imageNameList)
imageMetadataByImageName, imageMetadataByNotManagedImageName, err := storageManager.GetStagesStorage().GetAllAndGroupImageMetadataByImageName(ctx, projectName, imageNameList, storage.WithCache())
if err != nil {
return err
}
@@ -140,7 +140,7 @@ func (m *Manager) InitCustomTagsMetadata(ctx context.Context, storageManager man
}

func GetCustomTagsMetadata(ctx context.Context, storageManager manager.StorageManagerInterface) (stageIDCustomTagList map[string][]string, err error) {
stageCustomTagMetadataIDs, err := storageManager.GetStagesStorage().GetStageCustomTagMetadataIDs(ctx)
stageCustomTagMetadataIDs, err := storageManager.GetStagesStorage().GetStageCustomTagMetadataIDs(ctx, storage.WithCache())
if err != nil {
return nil, fmt.Errorf("unable to get stage custom tag metadata IDs: %s", err)
}
@@ -39,7 +39,7 @@ func newAPI(options apiOptions) *api {
}
}

func (api *api) Tags(ctx context.Context, reference string) ([]string, error) {
func (api *api) Tags(ctx context.Context, reference string, _ ...Option) ([]string, error) {
return api.tags(ctx, reference)
}

@@ -134,7 +134,7 @@ func (api *api) GetRepoImage(_ context.Context, reference string) (*image.Info,
}
}

referenceParts, err := api.ParseReferenceParts(reference)
referenceParts, err := api.parseReferenceParts(reference)
if err != nil {
return nil, fmt.Errorf("unable to parse reference %q: %s", reference, err)
}
@@ -349,7 +349,7 @@ type referenceParts struct {
digest string
}

func (api *api) ParseReferenceParts(reference string) (referenceParts, error) {
func (api *api) parseReferenceParts(reference string) (referenceParts, error) {
// validate reference
parsedReference, err := name.ParseReference(reference, api.parseReferenceOptions()...)
if err != nil {
@@ -12,7 +12,7 @@ type ParseReferencePartsEntry struct {
}

var _ = DescribeTable("Api_ParseReferenceParts", func(entry ParseReferencePartsEntry) {
parts, err := (&api{}).ParseReferenceParts(entry.reference)
parts, err := (&api{}).parseReferenceParts(entry.reference)
Ω(err).ShouldNot(HaveOccurred())
Ω(parts).Should(Equal(entry.expectation))
},
@@ -31,7 +31,7 @@ func newDefaultAPIForImplementation(implementation string, options defaultImplem
return d, nil
}

func (r *defaultImplementation) Tags(ctx context.Context, reference string) ([]string, error) {
func (r *defaultImplementation) Tags(ctx context.Context, reference string, _ ...Option) ([]string, error) {
tags, err := r.api.Tags(ctx, reference)

if (IsHarbor404Error(err) || IsHarborNotFoundError(err)) && r.Implementation != HarborImplementationName {
@@ -49,12 +49,8 @@ func (r *defaultImplementation) Tags(ctx context.Context, reference string) ([]s
return tags, err
}

func (r *defaultImplementation) IsRepoImageExists(ctx context.Context, reference string) (bool, error) {
if imgInfo, err := r.TryGetRepoImage(ctx, reference); err != nil {
return false, err
} else {
return imgInfo != nil, nil
}
func (r *defaultImplementation) IsTagExist(_ context.Context, _ string, _ ...Option) (bool, error) {
panic("not implemented")
}

func (r *defaultImplementation) TryGetRepoImage(ctx context.Context, reference string) (*image.Info, error) {
@@ -92,6 +92,16 @@ func (o *DockerRegistryOptions) defaultOptions() defaultImplementationOptions {
}

func NewDockerRegistry(repositoryAddress string, implementation string, options DockerRegistryOptions) (Interface, error) {
dockerRegistry, err := newDockerRegistry(repositoryAddress, implementation, options)
if err != nil {
return nil, err
}

dockerRegistryWithCache := newDockerRegistryWithCache(dockerRegistry)
return dockerRegistryWithCache, nil
}

func newDockerRegistry(repositoryAddress string, implementation string, options DockerRegistryOptions) (Interface, error) {
switch implementation {
case AwsEcrImplementationName:
return newAwsEcr(options.awsEcrOptions())
@@ -0,0 +1,154 @@
package docker_registry

import (
"context"
"fmt"
"strings"
"sync"

v1 "github.com/google/go-containerregistry/pkg/v1"

"github.com/werf/werf/pkg/image"
"github.com/werf/werf/pkg/util"
)

type DockerRegistryWithCache struct {
Interface
cachedTagsMap *sync.Map
cachedTagsMutexMap *sync.Map
}

func newDockerRegistryWithCache(dockerRegistry Interface) *DockerRegistryWithCache {
return &DockerRegistryWithCache{
Interface: dockerRegistry,
cachedTagsMap: &sync.Map{},
cachedTagsMutexMap: &sync.Map{},
}
}

func (r *DockerRegistryWithCache) Tags(ctx context.Context, reference string, opts ...Option) ([]string, error) {
o := makeOptions(opts...)
return r.withCachedTags(reference, func(cachedTags []string, isExist bool) ([]string, error) {
if isExist && o.cachedTags {
return cachedTags, nil
}

return r.Interface.Tags(ctx, reference, opts...)
})
}

func (r *DockerRegistryWithCache) IsTagExist(ctx context.Context, reference string, opts ...Option) (bool, error) {
referenceParts, err := r.parseReferenceParts(reference)
if err != nil {
return false, err
}

referenceTag := referenceParts.tag
if referenceTag == "" {
panic(fmt.Sprintf("unexpected reference %q: tag required", reference))
}

repositoryAddress := strings.Join([]string{referenceParts.registry, referenceParts.repository}, "/")
tags, err := r.Tags(ctx, repositoryAddress, opts...)
if err != nil {
return false, err
}

for _, tag := range tags {
if referenceTag == tag {
return true, nil
}
}

return false, nil
}

func (r *DockerRegistryWithCache) TagRepoImage(ctx context.Context, repoImage *image.Info, tag string) error {
defer r.mustAddTagToCachedTags(repoImage.Name)
return r.Interface.TagRepoImage(ctx, repoImage, tag)
}

func (r *DockerRegistryWithCache) PushImage(ctx context.Context, reference string, opts *PushImageOptions) error {
defer r.mustAddTagToCachedTags(reference)
return r.Interface.PushImage(ctx, reference, opts)
}

func (r *DockerRegistryWithCache) MutateAndPushImage(ctx context.Context, sourceReference, destinationReference string, mutateConfigFunc func(v1.Config) (v1.Config, error)) error {
defer r.mustAddTagToCachedTags(destinationReference)
return r.Interface.MutateAndPushImage(ctx, sourceReference, destinationReference, mutateConfigFunc)
}

func (r *DockerRegistryWithCache) DeleteRepoImage(ctx context.Context, repoImage *image.Info) error {
defer r.mustDeleteTagFromCachedTags(repoImage.Name)
return r.Interface.DeleteRepoImage(ctx, repoImage)
}

func (r *DockerRegistryWithCache) mustAddTagToCachedTags(reference string) {
_, err := r.withCachedTags(reference, func(tags []string, isExist bool) ([]string, error) {
referenceParts, err := r.parseReferenceParts(reference)
if err != nil {
return nil, fmt.Errorf("unable to parse reference parts %q: %w", reference, err)
}

if !isExist {
return nil, nil
}

tags = append(tags, referenceParts.tag)
return tags, nil
})
if err != nil {
panic(fmt.Sprintf("unexpected err: %s", err))
}
}

func (r *DockerRegistryWithCache) mustDeleteTagFromCachedTags(reference string) {
_, err := r.withCachedTags(reference, func(tags []string, isExist bool) ([]string, error) {
referenceParts, err := r.parseReferenceParts(reference)
if err != nil {
return nil, fmt.Errorf("unable to parse reference parts %q: %w", reference, err)
}

if !isExist {
return nil, nil
}

tags = util.ExcludeFromStringArray(tags, referenceParts.tag)
return tags, nil
})
if err != nil {
panic(fmt.Sprintf("unexpected err: %s", err))
}
}

func (r *DockerRegistryWithCache) withCachedTags(reference string, f func([]string, bool) ([]string, error)) ([]string, error) {
cachedTagsID := r.mustGetCachedTagsID(reference)

mutex := util.MapLoadOrCreateMutex(r.cachedTagsMutexMap, cachedTagsID)
mutex.Lock()
defer mutex.Unlock()

value, isExist := r.cachedTagsMap.Load(cachedTagsID)
var tags []string
if isExist {
tags = value.([]string)
}

newTags, err := f(tags, isExist)
if err != nil {
return nil, err
}

r.cachedTagsMap.Store(cachedTagsID, newTags)
return newTags, nil
}

func (r *DockerRegistryWithCache) mustGetCachedTagsID(reference string) string {
referenceParts, err := r.parseReferenceParts(reference)
if err != nil {
panic(fmt.Sprintf("unexpected reference %q: %s", reference, err))
}

repositoryAddress := strings.Join([]string{referenceParts.registry, referenceParts.repository}, "/")
return repositoryAddress
}
@@ -94,7 +94,7 @@ func (api *genericApi) GetRepoImage(ctx context.Context, reference string) (*ima
func (api *genericApi) mirrorReferenceList(reference string) ([]string, error) {
var referenceList []string

referenceParts, err := api.commonApi.ParseReferenceParts(reference)
referenceParts, err := api.commonApi.parseReferenceParts(reference)
if err != nil {
return nil, err
}
@@ -61,7 +61,7 @@ func newHarbor(options harborOptions) (*harbor, error) {
return harbor, nil
}

func (r *harbor) Tags(ctx context.Context, reference string) ([]string, error) {
func (r *harbor) Tags(ctx context.Context, reference string, _ ...Option) ([]string, error) {
tags, err := r.defaultImplementation.Tags(ctx, reference)
if err != nil {
if IsHarborNotFoundError(err) {
@@ -73,14 +73,6 @@ func (r *harbor) Tags(ctx context.Context, reference string) ([]string, error) {
return tags, nil
}

func (r *harbor) IsRepoImageExists(ctx context.Context, reference string) (bool, error) {
if imgInfo, err := r.TryGetRepoImage(ctx, reference); err != nil {
return false, err
} else {
return imgInfo != nil, nil
}
}

func (r *harbor) TryGetRepoImage(ctx context.Context, reference string) (*image.Info, error) {
res, err := r.api.TryGetRepoImage(ctx, reference)
if err != nil {
@@ -11,16 +11,17 @@ import (
type Interface interface {
CreateRepo(ctx context.Context, reference string) error
DeleteRepo(ctx context.Context, reference string) error
Tags(ctx context.Context, reference string) ([]string, error)
Tags(ctx context.Context, reference string, opts ...Option) ([]string, error)
IsTagExist(ctx context.Context, reference string, opts ...Option) (bool, error)
TagRepoImage(ctx context.Context, repoImage *image.Info, tag string) error
GetRepoImage(ctx context.Context, reference string) (*image.Info, error)
TryGetRepoImage(ctx context.Context, reference string) (*image.Info, error)
IsRepoImageExists(ctx context.Context, reference string) (bool, error)
DeleteRepoImage(ctx context.Context, repoImage *image.Info) error
PushImage(ctx context.Context, reference string, opts *PushImageOptions) error
MutateAndPushImage(ctx context.Context, sourceReference, destinationReference string, mutateConfigFunc func(v1.Config) (v1.Config, error)) error

String() string

parseReferenceParts(reference string) (referenceParts, error)
}

type ApiInterface interface {

0 comments on commit 7ea0a4c

Please sign in to comment.