Skip to content

Commit

Permalink
feat(docker): Redis key migration (#227)
Browse files Browse the repository at this point in the history
* fix(docker): Scaffold v2 docker registry cache key

* feat(docker): Adding typed keys for docker registry

* chore(*): Convert migrator to use keyfactory, tests
  • Loading branch information
robzienert committed Mar 12, 2018
1 parent 08ff512 commit a05b86b
Show file tree
Hide file tree
Showing 11 changed files with 642 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ class IgorConfigurationProperties {
static class RedisProperties {
String connection = "redis://localhost:6379"
int timeout = 2000

@Canonical
static class DockerV1KeyMigration {
int ttlDays = 30
int batchSize = 100
}

@NestedConfigurationProperty
DockerV1KeyMigration dockerV1KeyMigration = new DockerV1KeyMigration()
}

@NestedConfigurationProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,21 @@ class DockerMonitor extends CommonPollingMonitor<ImageDelta, DockerPollingDelta>
private final DockerRegistryCache cache
private final DockerRegistryAccounts dockerRegistryAccounts
private final Optional<EchoService> echoService
private final Optional<DockerRegistryCacheV2KeysMigration> keysMigration;

@Autowired
DockerMonitor(IgorConfigurationProperties properties,
Registry registry,
Optional<DiscoveryClient> discoveryClient,
DockerRegistryCache cache,
DockerRegistryAccounts dockerRegistryAccounts,
Optional<EchoService> echoService) {
Optional<EchoService> echoService,
Optional<DockerRegistryCacheV2KeysMigration> keysMigration) {
super(properties, registry, discoveryClient)
this.cache = cache
this.dockerRegistryAccounts = dockerRegistryAccounts
this.echoService = echoService
this.keysMigration = keysMigration;
}

@Override
Expand All @@ -62,6 +65,11 @@ class DockerMonitor extends CommonPollingMonitor<ImageDelta, DockerPollingDelta>

@Override
void poll() {
if (keysMigration.isPresent() && keysMigration.get().running) {
log.warn("Skipping poll cycle: Keys migration is in progress")
return
}

dockerRegistryAccounts.updateAccounts()
dockerRegistryAccounts.accounts.forEach({ account ->
internalPoll(new PollContext((String) account.name, account))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import java.util.List;
import java.util.Map;

import static java.lang.String.format;

@Service
public class DockerRegistryCache {

private final static String ID = "dockerRegistry";
final static String ID = "dockerRegistry";

// docker-digest must conform to hash:hashvalue. The string "~" explicitly avoids this to act as an "empty" placeholder.
private final static String EMPTY_DIGEST = "~";
Expand All @@ -44,12 +46,12 @@ public DockerRegistryCache(RedisClientDelegate redisClientDelegate,

public List<String> getImages(String account) {
return redisClientDelegate.withMultiClient(c -> {
return new ArrayList<>(c.keys(prefix() + ":" + ID + ":" + account + "*"));
return new ArrayList<>(c.keys(makeIndexPattern(prefix(), account)));
});
}

public String getLastDigest(String account, String registry, String repository, String tag) {
String key = makeKey(account, registry, repository, tag);
String key = new DockerRegistryV2Key(prefix(), ID, account, registry, tag).toString();
return redisClientDelegate.withCommandsClient(c -> {
Map<String, String> res = c.hgetAll(key);
if (res.get("digest").equals(EMPTY_DIGEST)) {
Expand All @@ -60,21 +62,15 @@ public String getLastDigest(String account, String registry, String repository,
}

public void setLastDigest(String account, String registry, String repository, String tag, String digest) {
String key = makeKey(account, registry, repository, tag);
String key = new DockerRegistryV2Key(prefix(), ID, account, registry, tag).toString();
String d = digest == null ? EMPTY_DIGEST : digest;
redisClientDelegate.withCommandsClient(c -> {
c.hset(key, "digest", d);
});
}

public void remove(String imageId) {
redisClientDelegate.withCommandsClient(c -> {
c.del(imageId);
});
}

private String makeKey(String account, String registry, String repository, String tag) {
return prefix() + ":" + ID + ":" + account + ":" + registry + ":" + repository + ":" + tag;
static String makeIndexPattern(String prefix, String account) {
return format("%s:%s:v2:%s:*", prefix, ID, account);
}

private String prefix() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright 2018 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.igor.docker;

import com.google.common.collect.Iterables;
import com.netflix.dyno.connectionpool.CursorBasedResult;
import com.netflix.dyno.jedis.DynoJedisClient;
import com.netflix.spinnaker.igor.IgorConfigurationProperties;
import com.netflix.spinnaker.kork.dynomite.DynomiteClientDelegate;
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.MultiKeyCommands;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;
import rx.Scheduler;
import rx.schedulers.Schedulers;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.lang.String.format;

/**
* Migrates docker registry cache keys from v1 to v2. This migrator is backwards-incompatible as the key format
* is destructive (data is being removed from the key). When this migrator is run, the old keys will be copied
* to the new key format, and the old keys TTL'ed.
*/
@Component
@ConditionalOnProperty(value = "redis.dockerV1KeyMigration.enabled", matchIfMissing = true)
public class DockerRegistryCacheV2KeysMigration {

private final static Logger log = LoggerFactory.getLogger(DockerRegistryCacheV2KeysMigration.class);

private final RedisClientDelegate redis;
private final IgorConfigurationProperties properties;
private final Scheduler scheduler;
private final DockerRegistryKeyFactory keyFactory;

private final AtomicBoolean running = new AtomicBoolean();

@Autowired
public DockerRegistryCacheV2KeysMigration(RedisClientDelegate redis,
DockerRegistryKeyFactory keyFactory,
IgorConfigurationProperties properties) {
this(redis, keyFactory, properties, Schedulers.io());
}

public DockerRegistryCacheV2KeysMigration(RedisClientDelegate redis,
DockerRegistryKeyFactory keyFactory,
IgorConfigurationProperties properties,
Scheduler scheduler) {
this.redis = redis;
this.keyFactory = keyFactory;
this.properties = properties;
this.scheduler = scheduler;
}

public boolean isRunning() {
return running.get();
}

@PostConstruct
void run() {
running.set(true);
try {
scheduler.createWorker().schedule(this::migrate);
} finally {
running.set(false);
}
}

void migrate() {
log.debug("Starting migration");

long startTime = System.currentTimeMillis();
List<DockerRegistryV1Key> oldKeys = redis.withMultiClient(this::getV1Keys);
log.info("Migrating {} v1 keys", oldKeys.size());

int batchSize = properties.getRedis().getDockerV1KeyMigration().getBatchSize();
for (List<DockerRegistryV1Key> oldKeyBatch : Iterables.partition(oldKeys, batchSize)) {
// For each key: Check if old exists, if so, copy to new key, set ttl on old key, remove ttl on new key
migrateBatch(oldKeyBatch);
}

log.info("Migrated {} v1 keys in {}ms", oldKeys, System.currentTimeMillis() - startTime);
}

private void migrateBatch(List<DockerRegistryV1Key> oldKeys) {
int expireSeconds = (int) Duration.ofDays(properties.getRedis().getDockerV1KeyMigration().getTtlDays()).getSeconds();
redis.withCommandsClient(c -> {
for (DockerRegistryV1Key oldKey : oldKeys) {
String newKey = keyFactory.convert(oldKey).toString();
if (c.exists(newKey)) {
// Nothing to do here, just move on with life
continue;
}

// Copy contents of v1 to v2
String v1Key = oldKey.toString();
Map<String, String> value = c.hgetAll(v1Key);
c.hmset(newKey, value);
c.expire(v1Key, expireSeconds);
}
});
}

/**
* Dynomite does not yet support the `scan` interface method; instead exposed as `dyno_scan`, so we have two
* different methods that are the same thing until this is fixed.
* TODO rz - switch to common `scan` command
*/
private List<DockerRegistryV1Key> getV1Keys(MultiKeyCommands client) {
if (redis instanceof DynomiteClientDelegate) {
return v1Keys((DynoJedisClient) client);
}
return v1Keys((Jedis) client);
}

private Function<List<String>, List<DockerRegistryV1Key>> oldKeysCallback =
(keys) -> keys.stream().map(this::readV1Key).filter(Objects::nonNull).collect(Collectors.toList());

/**
* Dynomite-compat v1keys
*/
private List<DockerRegistryV1Key> v1Keys(DynoJedisClient dyno) {
List<DockerRegistryV1Key> keys = new ArrayList<>();

String pattern = oldIndexPattern();
CursorBasedResult<String> result;
do {
result = dyno.dyno_scan(pattern);
keys.addAll(oldKeysCallback.apply(result.getResult()));
} while (!result.isComplete());

return keys;
}

/**
* Redis-compat v1keys
*/
private List<DockerRegistryV1Key> v1Keys(Jedis jedis) {
List<DockerRegistryV1Key> keys = new ArrayList<>();

ScanParams params = new ScanParams().match(oldIndexPattern()).count(1000);
String cursor = ScanParams.SCAN_POINTER_START;

ScanResult<String> result;
do {
result = jedis.scan(cursor, params);
keys.addAll(oldKeysCallback.apply(result.getResult()));
} while (!result.getStringCursor().equals("0"));

return keys;
}

private DockerRegistryV1Key readV1Key(String key) {
try {
return keyFactory.parseV1Key(key, true);
} catch (DockerRegistryKeyFormatException e) {
return null;
}
}

private String oldIndexPattern() {
return format("%s:%s:*", prefix(), DockerRegistryCache.ID);
}

private String prefix() {
return properties.getSpinnaker().getJedis().getPrefix();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2018 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.igor.docker;

import com.google.common.base.Splitter;
import com.google.common.base.VerifyException;
import com.netflix.spinnaker.igor.IgorConfigurationProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.List;

import static com.google.common.base.Verify.verify;

@Service
public class DockerRegistryKeyFactory {
private final static String ID = "dockerRegistry";
private final static Splitter SPLITTER = Splitter.on(':');

private final IgorConfigurationProperties igorConfigurationProperties;

@Autowired
public DockerRegistryKeyFactory(IgorConfigurationProperties igorConfigurationProperties) {
this.igorConfigurationProperties = igorConfigurationProperties;
}

DockerRegistryV1Key parseV1Key(String keyStr) throws DockerRegistryKeyFormatException {
return parseV1Key(keyStr, true);
}

DockerRegistryV1Key parseV1Key(String keyStr, boolean includeRepository) throws DockerRegistryKeyFormatException {
List<String> splits = SPLITTER.splitToList(keyStr);
try {
String prefix = splits.get(0);
verify(prefix.equals(prefix()), "Expected prefix '%s', found '%s'", prefix(), prefix);

String id = splits.get(1);
verify(ID.equals(id), "Expected ID '%s', found '%s'", ID, id);

String account = splits.get(2);
verify(!account.isEmpty(), "Empty account string");

String registry = splits.get(3);
verify(!registry.isEmpty(), "Empty registry string");

// the repository URL (typically without "http://"
// it may contain ':' (e.g. port number), so it may be split across multiple tokens
String repository = null;
if (includeRepository) {
List<String> repoSplits = splits.subList(4, splits.size() - 1);
repository = String.join(":", repoSplits);
}

String tag = splits.get(splits.size() - 1);
verify(!tag.isEmpty(), "Empty registry string");

return new DockerRegistryV1Key(prefix, id, account, registry, repository, tag);
} catch(IndexOutOfBoundsException | VerifyException e) {
throw new DockerRegistryKeyFormatException(String.format("Could not parse '%s' as a v1 key", keyStr), e);
}

}

DockerRegistryV1Key parseV2Key(String keyStr) throws DockerRegistryKeyFormatException {
throw new UnsupportedOperationException("parseV2Key not implemented yet");
}

DockerRegistryV2Key convert(DockerRegistryV1Key oldKey) {
return new DockerRegistryV2Key(
oldKey.getPrefix(),
oldKey.getId(),
oldKey.getAccount(),
oldKey.getRegistry(),
oldKey.getTag()
);
}

private String prefix() {
return igorConfigurationProperties.getSpinnaker().getJedis().getPrefix();
}
}
Loading

0 comments on commit a05b86b

Please sign in to comment.