Skip to content

Commit

Permalink
perf(docker): Use parallel streams for caching docker images
Browse files Browse the repository at this point in the history
  • Loading branch information
spinnakerbot authored and ezimanyi committed Aug 26, 2019
1 parent ad713a7 commit 7a485ae
Show file tree
Hide file tree
Showing 4 changed files with 317 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.netflix.spinnaker.clouddriver.docker.registry.cache

import com.netflix.spinnaker.cats.cache.DefaultCacheData
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap

class DefaultCacheDataBuilder {
String id = ''
Expand All @@ -28,7 +30,7 @@ class DefaultCacheDataBuilder {
new DefaultCacheData(id, ttlSeconds, attributes, relationships)
}

public static Map<String, DefaultCacheDataBuilder> defaultCacheDataBuilderMap() {
return [:].withDefault { String id -> new DefaultCacheDataBuilder(id: id) }
public static ConcurrentMap<String, DefaultCacheDataBuilder> defaultCacheDataBuilderMap() {
return new ConcurrentHashMap<String, DefaultCacheDataBuilder>()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.netflix.spinnaker.clouddriver.docker.registry.security.DockerRegistry
import groovy.util.logging.Slf4j
import retrofit.RetrofitError

import java.util.concurrent.ConcurrentMap
import java.util.concurrent.TimeUnit

import static java.util.Collections.unmodifiableSet
Expand Down Expand Up @@ -128,11 +129,11 @@ class DockerRegistryImageCachingAgent implements CachingAgent, AccountAware, Age
private CacheResult buildCacheResult(Map<String, Set<String>> tagMap) {
log.info("Describing items in ${agentType}")

Map<String, DefaultCacheDataBuilder> cachedTags = DefaultCacheDataBuilder.defaultCacheDataBuilderMap()
Map<String, DefaultCacheDataBuilder> cachedIds = DefaultCacheDataBuilder.defaultCacheDataBuilderMap()
ConcurrentMap<String, DefaultCacheDataBuilder> cachedTags = DefaultCacheDataBuilder.defaultCacheDataBuilderMap()
ConcurrentMap<String, DefaultCacheDataBuilder> cachedIds = DefaultCacheDataBuilder.defaultCacheDataBuilderMap()

tagMap.forEach { repository, tags ->
tags.forEach { tag ->
tags.parallelStream().forEach { tag ->
if (!tag) {
log.warn("Empty tag encountered for $accountName/$repository, not caching")
return
Expand All @@ -158,23 +159,28 @@ class DockerRegistryImageCachingAgent implements CachingAgent, AccountAware, Age
}
}
}
try {
creationDate = credentials.client.getCreationDate(repository, tag)
} catch (Exception e) {
log.warn("Unable to fetch tag creation date, reason: {} (tag: {}, repository: {})", e.message, tag, repository)
}

cachedTags[tagKey].with {
attributes.name = "${repository}:${tag}".toString()
attributes.account = accountName
attributes.digest = digest
attributes.date = creationDate
if (credentials.sortTagsByDate) {
try {
creationDate = credentials.client.getCreationDate(repository, tag)
} catch (Exception e) {
log.warn("Unable to fetch tag creation date, reason: {} (tag: {}, repository: {})", e.message, tag, repository)
}
}

cachedIds[imageIdKey].with {
attributes.tagKey = tagKey
attributes.account = accountName
}
def tagData = new DefaultCacheDataBuilder()
tagData.setId(tagKey)
tagData.attributes.put("name", "${repository}:${tag}".toString())
tagData.attributes.put("account", accountName)
tagData.attributes.put("digest", digest)
tagData.attributes.put("date", creationDate)
cachedTags.put(tagKey, tagData)

def idData = new DefaultCacheDataBuilder()
idData.setId(imageIdKey)
idData.attributes.put("tagKey", tagKey)
idData.attributes.put("account", accountName)
cachedIds.put(imageIdKey, idData)
}

null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ class DockerRegistryCredentials {
return trackDigests
}

boolean getSortTagsByDate() {
return sortTagsByDate
}

List<String> getSkip(){
return skip
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.clouddriver.docker.registry.provider.agent

import com.netflix.spinnaker.cats.agent.CacheResult
import com.netflix.spinnaker.clouddriver.docker.registry.DockerRegistryCloudProvider
import com.netflix.spinnaker.clouddriver.docker.registry.api.v2.client.DockerRegistryClient
import com.netflix.spinnaker.clouddriver.docker.registry.api.v2.client.DockerRegistryTags
import com.netflix.spinnaker.clouddriver.docker.registry.security.DockerRegistryCredentials
import retrofit.RetrofitError
import spock.lang.Specification

import java.time.Instant

class DockerRegistryImageCachingAgentTest extends Specification {

final KEY_PREFIX = "dockerRegistry"
final ACCOUNT_NAME = "test-docker"
final REGISTRY_NAME = "test-registry"
final CACHE_GROUP_TAGGED_IMAGE = "taggedImage"
final CACHE_GROUP_IMAGE_ID = "imageId"

DockerRegistryImageCachingAgent agent
def credentials = Mock(DockerRegistryCredentials)
def provider = Mock(DockerRegistryCloudProvider)
def client = Mock(DockerRegistryClient)

def setup() {
credentials.client >> client
agent = new DockerRegistryImageCachingAgent(provider, ACCOUNT_NAME, credentials, 0, 1, 1, REGISTRY_NAME)
}

def "tags loaded from docker registry should be cached"() {
given:
credentials.repositories >> ["repo-1", "repo-2"]
client.getTags("repo-1") >> new DockerRegistryTags().tap {
name = "repo-1"
tags = ["tag-1-1"]
}
client.getTags("repo-2") >> new DockerRegistryTags().tap {
name = "repo-2"
tags = ["tag-2-1", "tag-2-2"]
}
def repoTagSequence = [
["repo-1", "tag-1-1"],
["repo-2", "tag-2-1"],
["repo-2", "tag-2-2"],
]

when:
def cacheResult = agent.loadData(null)

then:
sortCacheResult(cacheResult)
def cacheResultImageIds = cacheResult.cacheResults.get(CACHE_GROUP_IMAGE_ID)
for (int i = 0; i < cacheResultImageIds.size(); i++) {
assert cacheResultImageIds[i].id == buildImageIdCacheKey(repoTagSequence[i][0], repoTagSequence[i][1])
assert cacheResultImageIds[i].attributes.get("tagKey") == buildTaggedImageCacheKey(repoTagSequence[i][0], repoTagSequence[i][1])
assert cacheResultImageIds[i].attributes.get("account") == ACCOUNT_NAME
}
def cacheResultTaggedImages = cacheResult.cacheResults.get(CACHE_GROUP_TAGGED_IMAGE)
for (int i = 0; i < cacheResultTaggedImages.size(); i++) {
assert cacheResultTaggedImages[i].id == buildTaggedImageCacheKey(repoTagSequence[i][0], repoTagSequence[i][1])
assert cacheResultTaggedImages[i].attributes.get("name") == "${repoTagSequence[i][0]}:${repoTagSequence[i][1]}"
assert cacheResultTaggedImages[i].attributes.get("account") == ACCOUNT_NAME
assert cacheResultTaggedImages[i].attributes.get("digest") == null
assert cacheResultTaggedImages[i].attributes.get("date") == null
}
}

def "cached tags should include creation date"() {
given:
credentials.sortTagsByDate >> true
credentials.repositories >> ["repo-1"]
client.getTags("repo-1") >> new DockerRegistryTags().tap {
name="repo-1"
tags=["tag-1", "tag-2"]
}
def repoTagSequence = [
["repo-1", "tag-1"],
["repo-1", "tag-2"],
]
client.getCreationDate("repo-1", "tag-1") >> Instant.ofEpochSecond(0)
client.getCreationDate("repo-1", "tag-2") >> Instant.ofEpochSecond(1)

when:
def cacheResult = agent.loadData(null)

then:
sortCacheResult(cacheResult)
def cacheResultImageIds = cacheResult.cacheResults.get(CACHE_GROUP_IMAGE_ID)
for (int i = 0; i < cacheResultImageIds.size(); i++) {
assert cacheResultImageIds[i].id == buildImageIdCacheKey(repoTagSequence[i][0], repoTagSequence[i][1])
assert cacheResultImageIds[i].attributes.get("tagKey") == buildTaggedImageCacheKey(repoTagSequence[i][0], repoTagSequence[i][1])
assert cacheResultImageIds[i].attributes.get("account") == ACCOUNT_NAME
}
def cacheResultTaggedImages = cacheResult.cacheResults.get(CACHE_GROUP_TAGGED_IMAGE)
for (int i = 0; i < cacheResultTaggedImages.size(); i++) {
assert cacheResultTaggedImages[i].id == buildTaggedImageCacheKey(repoTagSequence[i][0], repoTagSequence[i][1])
assert cacheResultTaggedImages[i].attributes.get("name") == "${repoTagSequence[i][0]}:${repoTagSequence[i][1]}"
assert cacheResultTaggedImages[i].attributes.get("account") == ACCOUNT_NAME
assert cacheResultTaggedImages[i].attributes.get("digest") == null
assert cacheResultTaggedImages[i].attributes.get("date") == Instant.ofEpochSecond(i)
}
}

def "cached tags should include digest"() {
given:
credentials.trackDigests >> true
credentials.repositories >> ["repo-1"]
client.getTags("repo-1") >> new DockerRegistryTags().tap {
name="repo-1"
tags=["tag-1", "tag-2"]
}
def repoTagSequence = [
["repo-1", "tag-1"],
["repo-1", "tag-2"],
]
client.getDigest("repo-1", "tag-1") >> "repo-1_tag-1"
client.getDigest("repo-1", "tag-2") >> "repo-1_tag-2"

when:
def cacheResult = agent.loadData(null)

then:
sortCacheResult(cacheResult)
def cacheResultImageIds = cacheResult.cacheResults.get(CACHE_GROUP_IMAGE_ID)
for (int i = 0; i < cacheResultImageIds.size(); i++) {
assert cacheResultImageIds[i].id == buildImageIdCacheKey(repoTagSequence[i][0], repoTagSequence[i][1])
assert cacheResultImageIds[i].attributes.get("tagKey") == buildTaggedImageCacheKey(repoTagSequence[i][0], repoTagSequence[i][1])
assert cacheResultImageIds[i].attributes.get("account") == ACCOUNT_NAME
}
def cacheResultTaggedImages = cacheResult.cacheResults.get(CACHE_GROUP_TAGGED_IMAGE)
for (int i = 0; i < cacheResultTaggedImages.size(); i++) {
assert cacheResultTaggedImages[i].id == buildTaggedImageCacheKey(repoTagSequence[i][0], repoTagSequence[i][1])
assert cacheResultTaggedImages[i].attributes.get("name") == "${repoTagSequence[i][0]}:${repoTagSequence[i][1]}"
assert cacheResultTaggedImages[i].attributes.get("account") == ACCOUNT_NAME
assert cacheResultTaggedImages[i].attributes.get("digest") == "${repoTagSequence[i][0]}_${repoTagSequence[i][1]}"
assert cacheResultTaggedImages[i].attributes.get("date") == null
}
}

def "error loading tags returns empty result"() {
given:
credentials.repositories >> ["repo-1"]
client.getTags("repo-1") >> {
throw new IOException()
}

when:
def cacheResult = agent.loadData(null)

then:
cacheResult.cacheResults.get(CACHE_GROUP_IMAGE_ID).size() == 0
cacheResult.cacheResults.get(CACHE_GROUP_TAGGED_IMAGE).size() == 0
}

def "error loading tag date should set to null date attribute"() {
given:
credentials.sortTagsByDate >> true
credentials.repositories >> ["repo-1"]
client.getTags("repo-1") >> new DockerRegistryTags().tap {
name="repo-1"
tags=["tag-1", "tag-2"]
}
def repoTagSequence = [
["repo-1", "tag-1"],
["repo-1", "tag-2"],
]
client.getCreationDate("repo-1", "tag-1") >> {
throw RetrofitError.httpError("", null, null, null)
}
client.getCreationDate("repo-1", "tag-2") >> Instant.EPOCH

when:
def cacheResult = agent.loadData(null)

then:
sortCacheResult(cacheResult)
def cacheResultImageIds = cacheResult.cacheResults.get(CACHE_GROUP_IMAGE_ID)
for (int i = 0; i < cacheResultImageIds.size(); i++) {
assert cacheResultImageIds[i].id == buildImageIdCacheKey(repoTagSequence[i][0], repoTagSequence[i][1])
assert cacheResultImageIds[i].attributes.get("tagKey") == buildTaggedImageCacheKey(repoTagSequence[i][0], repoTagSequence[i][1])
assert cacheResultImageIds[i].attributes.get("account") == ACCOUNT_NAME
}
def cacheResultTaggedImages = cacheResult.cacheResults.get(CACHE_GROUP_TAGGED_IMAGE)
for (int i = 0; i < cacheResultTaggedImages.size(); i++) {
assert cacheResultTaggedImages[i].id == buildTaggedImageCacheKey(repoTagSequence[i][0], repoTagSequence[i][1])
assert cacheResultTaggedImages[i].attributes.get("name") == "${repoTagSequence[i][0]}:${repoTagSequence[i][1]}"
assert cacheResultTaggedImages[i].attributes.get("account") == ACCOUNT_NAME
assert cacheResultTaggedImages[i].attributes.get("digest") == null
assert cacheResultTaggedImages[i].attributes.get("date") == (i == 0 ? null : Instant.EPOCH)
}
}

def "error loading tag digest should not cache that tag"() {
given:
credentials.trackDigests >> true
credentials.repositories >> ["repo-1"]
client.getTags("repo-1") >> new DockerRegistryTags().tap {
name="repo-1"
tags=["tag-1", "tag-2"]
}
client.getDigest("repo-1", "tag-1") >> {
throw new IOException()
}
client.getDigest("repo-1", "tag-2") >> "repo-1_tag-2"

when:
def cacheResult = agent.loadData(null)

then:
sortCacheResult(cacheResult)
def cacheResultImageIds = cacheResult.cacheResults.get(CACHE_GROUP_IMAGE_ID)
cacheResultImageIds.size() == 1
cacheResultImageIds[0].id == buildImageIdCacheKey("repo-1", "tag-2")
cacheResultImageIds[0].attributes.get("tagKey") == buildTaggedImageCacheKey("repo-1", "tag-2")
cacheResultImageIds[0].attributes.get("account") == ACCOUNT_NAME
def cacheResultTaggedImages = cacheResult.cacheResults.get(CACHE_GROUP_TAGGED_IMAGE)
cacheResultTaggedImages.size() == 1
cacheResultTaggedImages[0].id == buildTaggedImageCacheKey("repo-1", "tag-2")
cacheResultTaggedImages[0].attributes.get("name") == "repo-1:tag-2"
cacheResultTaggedImages[0].attributes.get("account") == ACCOUNT_NAME
cacheResultTaggedImages[0].attributes.get("digest") == "repo-1_tag-2"
cacheResultTaggedImages[0].attributes.get("date") == null
}

def "empty tags should not be cached"() {
given:
credentials.repositories >> ["repo-1"]
client.getTags("repo-1") >> new DockerRegistryTags().tap {
name="repo-1"
tags=["tag-1", ""]
}

when:
def cacheResult = agent.loadData(null)

then:
sortCacheResult(cacheResult)
def cacheResultImageIds = cacheResult.cacheResults.get(CACHE_GROUP_IMAGE_ID)
cacheResultImageIds.size() == 1
cacheResultImageIds[0].id == buildImageIdCacheKey("repo-1", "tag-1")
cacheResultImageIds[0].attributes.get("tagKey") == buildTaggedImageCacheKey("repo-1", "tag-1")
cacheResultImageIds[0].attributes.get("account") == ACCOUNT_NAME
def cacheResultTaggedImages = cacheResult.cacheResults.get(CACHE_GROUP_TAGGED_IMAGE)
cacheResultTaggedImages.size() == 1
cacheResultTaggedImages[0].id == buildTaggedImageCacheKey("repo-1", "tag-1")
cacheResultTaggedImages[0].attributes.get("name") == "repo-1:tag-1"
cacheResultTaggedImages[0].attributes.get("account") == ACCOUNT_NAME
cacheResultTaggedImages[0].attributes.get("digest") == null
cacheResultTaggedImages[0].attributes.get("date") == null
}


private String buildTaggedImageCacheKey(repo, tag) {
"${KEY_PREFIX}:${CACHE_GROUP_TAGGED_IMAGE}:${ACCOUNT_NAME}:${repo}:${tag}"
}

private String buildImageIdCacheKey(repo, tag) {
"${KEY_PREFIX}:${CACHE_GROUP_IMAGE_ID}:${REGISTRY_NAME}/${repo}:${tag}"
}

private void sortCacheResult(CacheResult cacheResult) {
cacheResult.cacheResults.get(CACHE_GROUP_TAGGED_IMAGE).sort {
it.id
}
cacheResult.cacheResults.get(CACHE_GROUP_IMAGE_ID).sort {
it.id
}
}
}

0 comments on commit 7a485ae

Please sign in to comment.