From b64d152b31a923bf8ce0f2b3b70909ec5aba5627 Mon Sep 17 00:00:00 2001 From: Greg Arndt Date: Thu, 2 Oct 2014 11:47:43 -0500 Subject: [PATCH 01/15] Volume caching --- bin/worker.js | 12 ++ config/defaults.js | 4 + config/test.js | 4 + deploy/template/etc/default/docker-aufs | 1 + deploy/template/etc/default/docker-btrfs | 1 + lib/gc.js | 34 +++-- lib/task.js | 31 ++++- lib/task_listener.js | 1 + lib/volume_cache.js | 95 +++++++++++++ package.json | 3 +- test/garbage_collection_test.js | 2 +- .../container_volume_caching_test.js | 44 ++++++ test/integration/volume_caching.js | 3 + test/volume_cache_test.js | 128 ++++++++++++++++++ 14 files changed, 349 insertions(+), 14 deletions(-) create mode 100644 lib/volume_cache.js create mode 100644 test/integration/container_volume_caching_test.js create mode 100644 test/integration/volume_caching.js create mode 100644 test/volume_cache_test.js diff --git a/bin/worker.js b/bin/worker.js index eefd150f..67fcf996 100644 --- a/bin/worker.js +++ b/bin/worker.js @@ -12,6 +12,7 @@ var TaskListener = require('../lib/task_listener'); var ShutdownManager = require('../lib/shutdown_manager'); var Stats = require('../lib/stat'); var GarbageCollector = require('../lib/gc'); +var VolumeCache = require('../lib/volume_cache'); // Available target configurations. var allowedHosts = ['aws', 'test']; @@ -162,6 +163,17 @@ co(function *() { config.gc = new GarbageCollector(gcConfig); + config.volumeCache = new VolumeCache({ + rootCachePath: config.cache.volumeCachePath, + log: config.log + }); + + config.gc.on('gc:container:removed', function (container) { + container.caches.forEach(function (cacheKey) { + config.volumeCache.release(cacheKey); + }); + }); + var runtime = new Runtime(config); // Build the listener and connect to the queue. diff --git a/config/defaults.js b/config/defaults.js index 7058029b..ddb8ff7f 100644 --- a/config/defaults.js +++ b/config/defaults.js @@ -18,6 +18,10 @@ module.exports = { dockerVolume: '/mnt' }, + cache: { + volumeCachePath: '/mnt/var/cache/docker-worker' + }, + logging: { liveLogChunkInterval: 5000, // 5 seconds // Added to the current date to make up the expiry time for logs. This is diff --git a/config/test.js b/config/test.js index 2772d29f..75d79c59 100644 --- a/config/test.js +++ b/config/test.js @@ -11,5 +11,9 @@ module.exports = { // Expires one hour from now so test logs don't live too long... liveLogExpires: 3600, bulkLogExpires: 3600 + }, + + cache: { + volumeCachePath: process.env.DOCKER_WORKER_CACHE_DIR || '/var/cache' } }; diff --git a/deploy/template/etc/default/docker-aufs b/deploy/template/etc/default/docker-aufs index 82b90c42..d790b1be 100644 --- a/deploy/template/etc/default/docker-aufs +++ b/deploy/template/etc/default/docker-aufs @@ -20,5 +20,6 @@ mount /dev/instance_storage/all /mnt mkdir -p /mnt/var/lib/docker mkdir -p /mnt/docker-tmp +mkdir -p /mnt/var/cache/docker-worker export TMPDIR="/mnt/docker-tmp" export DOCKER_OPTS=" -g /mnt/var/lib/docker -s aufs" diff --git a/deploy/template/etc/default/docker-btrfs b/deploy/template/etc/default/docker-btrfs index 568e76ce..826e6781 100644 --- a/deploy/template/etc/default/docker-btrfs +++ b/deploy/template/etc/default/docker-btrfs @@ -15,6 +15,7 @@ mount $(echo $devices | cut -d ' ' -f 1) /mnt mkdir -p /mnt/var/lib/docker mkdir -p /mnt/docker-tmp +mkdir -p /mnt/var/cache/docker-worker export TMPDIR="/mnt/docker-tmp" export DOCKER_OPTS=" -g /mnt/var/lib/docker -s btrfs" diff --git a/lib/gc.js b/lib/gc.js index 55a5aaeb..b7d507f4 100644 --- a/lib/gc.js +++ b/lib/gc.js @@ -97,32 +97,48 @@ GarbageCollector.prototype = { }.bind(this)); }, - removeContainer: function (containerId) { - this.markedContainers[containerId] = this.retries; + removeContainer: function (containerId, volumeCaches) { + this.markedContainers[containerId] = { + retries: this.retries, + caches: volumeCaches || [] + }; this.emit('gc:container:marked', containerId); }, removeContainers: function* () { for (var containerId in this.markedContainers) { // If a container can't be removed after 5 tries, more tries won't help - if (this.markedContainers[containerId] !== 0) { + if (this.markedContainers[containerId].retries !== 0) { var c = this.docker.getContainer(containerId); + var caches = this.markedContainers[containerId].caches try { // Even running containers should be removed otherwise shouldn't have // been marked for removal. yield c.remove({force: true}); delete this.markedContainers[containerId]; - this.emit('gc:container:removed', containerId); - this.log('container removed', {container: containerId}); + + this.emit('gc:container:removed', { + id: containerId, + caches: caches + }); + this.log('container removed', { + container: containerId, + caches: caches + }); } catch(e) { var message = e; if (e.reason === 'no such container') { - message = 'No such container. Will remove from marked ' + - 'containers list.'; - delete this.markedContainers[containerId]; + delete this.markedContainers[containerId]; + + message = 'No such container. Will remove from marked ' + + 'containers list.'; + this.emit('gc:container:removed', { + id: containerId, + caches: caches + }); } else { - this.markedContainers[containerId] -= 1; + this.markedContainers[containerId].retries -= 1; } this.emit('gc:error', {message: message, container: containerId}); diff --git a/lib/task.js b/lib/task.js index ca622147..a6dbe6f9 100644 --- a/lib/task.js +++ b/lib/task.js @@ -71,6 +71,25 @@ function buildStateHandlers(task) { return new States(handlers); } +/** +Create a list of cached volumes that will be mounted within the docker container. + +@param {object} volume cache +@param {object} volumes to mount in the container + */ +function buildVolumeBindings(taskVolumeBindings, volumeCache) { + var bindings = []; + var caches = []; + for (var volumeName in taskVolumeBindings) { + var cacheInstance = volumeCache.get(volumeName); + var binding = cacheInstance.path + ':' + taskVolumeBindings[volumeName]; + bindings.push(binding); + caches.push(cacheInstance.key); + } + return [caches, bindings]; + +} + function Task(runtime, runId, task, status) { this.runId = runId; this.task = task; @@ -112,7 +131,7 @@ Task.prototype = { StdinOnce: false, Env: taskEnvToDockerEnv(env) } - } + }; if (links) { procConfig.start.Links = links.map(function(link) { @@ -120,6 +139,12 @@ Task.prototype = { }); } + if (this.task.payload.cache) { + var bindings = buildVolumeBindings(this.task.payload.cache, + this.runtime.volumeCache); + this.volumeCaches = bindings[0]; + procConfig.start.Binds = bindings[1]; + } return procConfig; }, @@ -174,7 +199,7 @@ Task.prototype = { */ scheduleReclaim: function* (claim) { // Figure out when to issue the next claim... - var takenUntil = (new Date(claim.takenUntil) - new Date()) + var takenUntil = (new Date(claim.takenUntil) - new Date()); var nextClaim = takenUntil / this.runtime.task.reclaimDivisor; // This is tricky ensure we have logs... @@ -440,7 +465,7 @@ Task.prototype = { yield this.stream.end.bind(this.stream); // Garbage collect containers - gc.removeContainer(dockerProc.container.id); + gc.removeContainer(dockerProc.container.id, this.volumeCaches); yield stats.timeGen('tasks.time.states.killed', this.states.killed(this)); // If the results validation failed we consider this task failure. diff --git a/lib/task_listener.js b/lib/task_listener.js index 2f577625..de3265ae 100644 --- a/lib/task_listener.js +++ b/lib/task_listener.js @@ -90,6 +90,7 @@ TaskListener.prototype = { taskId: content.status.taskId, runId: content.runId, message: e.toString(), + stack: e.stack, err: e }); } else { diff --git a/lib/volume_cache.js b/lib/volume_cache.js new file mode 100644 index 00000000..38863075 --- /dev/null +++ b/lib/volume_cache.js @@ -0,0 +1,95 @@ +var path = require('path'); +var mkdirp = require('mkdirp'); +var fs = require('fs'); + + +function VolumeCache(config) { + this.rootCachePath = config.rootCachePath; + this.log = config.log; + this.cache = {}; +} + +VolumeCache.prototype = { + createCacheVolume: function(cacheName) { + var cachePath = path.join(this.rootCachePath, cacheName); + this.cache[cacheName] = {}; + + if(!fs.existsSync(cachePath)) { + mkdirp.sync(cachePath); + var cacheDetails = {cacheName: cacheName, cachPath: cachePath}; + this.log('created cached volume', cacheDetails); + } + }, + + add: function(cacheName, instancePath) { + var instanceId = Date.now().toString(); + if (!instancePath) { + var cachePath = path.join(this.rootCachePath, cacheName); + instancePath = path.join(cachePath, instanceId); + } + // TODO if the cache can't be created? error task? + if (!fs.existsSync(instancePath)) { + mkdirp.sync(instancePath); + } + this.cache[cacheName][instanceId] = { + path: instancePath, + mounted: false + }; + + var instance = {key: cacheName + '::' + instanceId, path: instancePath}; + return instance; + }, + + get: function (cacheName) { + var instanceId; + + if (!this.cache[cacheName]) { + this.createCacheVolume(cacheName); + } else { + var instanceIds = Object.keys(this.cache[cacheName]).sort().reverse(); + for (var i = 0; i < instanceIds.length; i++) { + var id = instanceIds[i]; + if (!this.cache[cacheName][id].mounted) { + instanceId = id; + this.cache[cacheName][id].mounted = true; + break; + } + } + } + + var instance; + var log_message = '' + + if (!instanceId) { + log_message = 'cache miss'; + instance = this.add(cacheName); + this.set(instance.key, {mounted: true}); + } else { + log_message = 'cache hit'; + instance = {key: cacheName + '::' + instanceId, + path: this.cache[cacheName][instanceId].path + }; + } + this.log(log_message, instance); + return instance; + }, + + release: function(cacheKey) { + var cacheName = cacheKey.split('::')[0]; + var instanceId = cacheKey.split('::')[1]; + var oldPath = this.cache[cacheName][instanceId].path; + delete this.cache[cacheName][instanceId]; + this.add(cacheName, oldPath); + this.log("released cached volume", {key: cacheKey, path: oldPath}); + }, + + set: function(cacheKey, value) { + var cacheName = cacheKey.split('::')[0]; + var instanceId = cacheKey.split('::')[1]; + for (var key in value) { + this.cache[cacheName][instanceId][key] = value[key]; + } + } +}; + +module.exports = VolumeCache; diff --git a/package.json b/package.json index 89bbdf40..9eb482ee 100644 --- a/package.json +++ b/package.json @@ -39,8 +39,10 @@ "jayschema": "^0.2.7", "middleware-object-hooks": "0.0.3", "mime": "^1.2.11", + "mkdirp": "^0.5.0", "promise": "^5.0.0", "proxy": "^0.2.3", + "rimraf": "^2.2.8", "readdirrsync": "0.0.3", "statsd-client": "0.0.15", "superagent": "^0.18.0", @@ -65,7 +67,6 @@ "json-templater": "^1.0.1", "koa": "^0.8.1", "koa-router": "^3.1.4", - "mkdirp": "^0.5.0", "mocha": "^1.20.1", "slugid": "^1.0.1", "split2": "^0.1.2", diff --git a/test/garbage_collection_test.js b/test/garbage_collection_test.js index b4690936..32278f6b 100644 --- a/test/garbage_collection_test.js +++ b/test/garbage_collection_test.js @@ -113,7 +113,7 @@ suite('garbage collection tests', function () { var container = yield docker.createContainer({Image: IMAGE}); gc.removeContainer(container.id); - gc.markedContainers[container.id] = 0; + gc.markedContainers[container.id].retries = 0; var error = yield waitForEvent(gc, 'gc:error'); assert.ok(error.container === container.id); diff --git a/test/integration/container_volume_caching_test.js b/test/integration/container_volume_caching_test.js new file mode 100644 index 00000000..f782744c --- /dev/null +++ b/test/integration/container_volume_caching_test.js @@ -0,0 +1,44 @@ +suite('container volume cache tests', function () { + var co = require('co'); + var cmd = require('./helper/cmd'); + var fs = require('fs'); + var rmrf = require('rimraf'); + var path = require('path'); + var testworker = require('../post_task'); + + var cacheDir = process.env.DOCKER_WORKER_CACHE_DIR || '/var/cache'; + + test('mount cached folder in docker worker', co(function* () { + var cacheName = 'tmp-obj-dir-' + Date.now().toString(); + var fullCacheDir = path.join(cacheDir, cacheName); + + var task = { + payload: { + image: 'taskcluster/test-ubuntu', + command: cmd( + 'echo "foo" > /tmp-obj-dir/foo.txt' + ), + features: { + // No need to actually issue live logging... + localLiveLog: false + }, + cache: {}, + maxRunTime: 5 * 60 + } + }; + + task.payload.cache[cacheName] = '/tmp-obj-dir'; + + var result = yield testworker(task); + + // Get task specific results + assert.ok(result.run.success, 'task was successful'); + + var objDir = fs.readdirSync(fullCacheDir); + assert.ok(fs.existsSync(path.join(fullCacheDir, objDir[0], 'foo.txt'))); + + if (fs.existsSync(fullCacheDir)) { + rmrf.sync(fullCacheDir); + } + })); +}); diff --git a/test/integration/volume_caching.js b/test/integration/volume_caching.js new file mode 100644 index 00000000..39df0fe1 --- /dev/null +++ b/test/integration/volume_caching.js @@ -0,0 +1,3 @@ +suite('caching volumes tests'), function () { + var testworker = require('../post_task'); +}); diff --git a/test/volume_cache_test.js b/test/volume_cache_test.js new file mode 100644 index 00000000..5c657fcc --- /dev/null +++ b/test/volume_cache_test.js @@ -0,0 +1,128 @@ +suite('volume cache test', function () { + var VolumeCache = require('../lib/volume_cache'); + var createLogger = require('../lib/log'); + var docker = require('../lib/docker')(); + var waitForEvent = require('../lib/wait_for_event'); + var fs = require('fs'); + var path = require('path'); + var mkdirp = require('mkdirp'); + var rmrf = require('rimraf'); + var co = require('co'); + var pullImage = require('../lib/pull_image_to_stream'); + var cmd = require('./integration/helper/cmd'); + + // Location on the machine running the test where the cache will live + var localCacheDir = process.env.DOCKER_WORKER_CACHE_DIR || '/var/cache'; + + var log = createLogger({ + source: 'top', + provisionerId: 'test_provisioner', + workerId: 'test_worker', + workerGroup: 'test_worker_group', + workerType: 'test_worker_type' + }); + + var IMAGE = 'taskcluster/test-ubuntu'; + + setup(co(function* () { + yield pullImage(docker, IMAGE, process.stdout); + })); + + test('cache directories created', function () { + var cache = new VolumeCache({ + cachePath: localCacheDir, + log: log + }); + + var cacheName = 'tmp-obj-dir-' + Date.now().toString(); + var fullPath = path.join(localCacheDir, cacheName); + + if (fs.existsSync(fullPath)) { + rmrf.sync(fullPath); + } + + var instance1 = cache.get(cacheName); + var instance2 = cache.get(cacheName); + var instance3 = cache.get(cacheName); + + assert.ok(fs.existsSync(instance1.path)); + assert.ok(fs.existsSync(instance2.path)); + assert.ok(fs.existsSync(instance3.path)); + assert.ok(instance1.key !== instance2.key); + assert.ok(instance2.key !== instance3.key); + assert.ok(instance1.path !== instance2.path); + assert.ok(instance2.path !== instance3.path); + + // Release clame on cached volume + cache.release(instance2.key); + + // Should reclaim cache directory path created by instance2 + var instance4 = cache.get(cacheName); + + assert.ok(instance2.key !== instance4.key); + assert.ok(instance2.path === instance4.path); + + if(fs.existsSync(fullPath)) { + rmrf.sync(fullPath); + } + }); + + test('cache directory mounted in container', co(function* () { + // Test is currently setup using container volumes exposed via samba using + // boot2docker + + var cacheName = 'tmp-obj-dir-' + Date.now().toString(); + // Location on the docker VM that the cache will exists and is expose via + // samba + var hostCacheDir = '/docker_test_data'; + + var cache = new VolumeCache({ + cachePath: localCacheDir, + log: log + }); + + var cacheInstance = cache.get(cacheName); + var localCachePath = path.join(localCacheDir, cacheName); + + if (fs.existsSync(localCachePath)) { + rmrf.sync(localCachePath); + } + + var c = cmd( + 'echo "foo" > /docker_cache/tmp-obj-dir/blah.txt' + ); + + var createConfig = { + Image: IMAGE, + Cmd: c, + AttachStdin:false, + AttachStdout:true, + AttachStderr:true, + Tty: true + }; + var hostObjPath = path.join( + hostCacheDir, + cacheName, + cacheInstance.key.split('::')[1] + ); + var create = yield docker.createContainer(createConfig); + + container = docker.getContainer(create.id); + var stream = yield container.attach({stream: true, stdout: true, stderr: true}); + stream.pipe(process.stdout); + + var binds = hostObjPath + ':/docker_cache/tmp-obj-dir/'; + + var startConfig = { + Binds: binds, + }; + + yield container.start(startConfig); + + assert.ok(fs.existsSync(path.join(cacheInstance.path, 'blah.txt'))); + + if (fs.existsSync(localCachePath)) { + rmrf.sync(localCachePath); + } + })); +}); From a170e6d70497ed73cd6ac7dd72bbdff6fc72d620 Mon Sep 17 00:00:00 2001 From: Greg Arndt Date: Mon, 6 Oct 2014 20:25:22 -0500 Subject: [PATCH 02/15] Added stats --- bin/worker.js | 9 +++++---- lib/volume_cache.js | 14 +++++++++----- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/bin/worker.js b/bin/worker.js index 67fcf996..df0996ea 100644 --- a/bin/worker.js +++ b/bin/worker.js @@ -6,7 +6,7 @@ var loadConfig = require('taskcluster-base/config'); var createLogger = require('../lib/log'); var debug = require('debug')('docker-worker:bin:worker'); -var SDC = require('statsd-client') +var SDC = require('statsd-client'); var Runtime = require('../lib/runtime'); var TaskListener = require('../lib/task_listener'); var ShutdownManager = require('../lib/shutdown_manager'); @@ -84,7 +84,7 @@ co(function *() { // Load all base configuration that is on disk / environment variables / // flags. - var config = yield workerConf.load.bind(workerConf) + var config = yield workerConf.load.bind(workerConf); // Use a target specific configuration helper if available. var host; @@ -159,13 +159,14 @@ co(function *() { var gcConfig = config.garbageCollection; gcConfig.capacity = config.capacity, gcConfig.docker = config.docker; - gcConfig.log = config.log + gcConfig.log = config.log; config.gc = new GarbageCollector(gcConfig); config.volumeCache = new VolumeCache({ rootCachePath: config.cache.volumeCachePath, - log: config.log + log: config.log, + stats: config.stats }); config.gc.on('gc:container:removed', function (container) { diff --git a/lib/volume_cache.js b/lib/volume_cache.js index 38863075..51a2176c 100644 --- a/lib/volume_cache.js +++ b/lib/volume_cache.js @@ -7,6 +7,7 @@ function VolumeCache(config) { this.rootCachePath = config.rootCachePath; this.log = config.log; this.cache = {}; + this.stats = config.stats; } VolumeCache.prototype = { @@ -17,7 +18,8 @@ VolumeCache.prototype = { if(!fs.existsSync(cachePath)) { mkdirp.sync(cachePath); var cacheDetails = {cacheName: cacheName, cachPath: cachePath}; - this.log('created cached volume', cacheDetails); + this.stats.increment('cache.volume.created'); + this.log('cache volume created', cacheDetails); } }, @@ -58,17 +60,19 @@ VolumeCache.prototype = { } var instance; - var log_message = '' + var log_message = ''; if (!instanceId) { - log_message = 'cache miss'; + log_message = 'cache volume miss'; instance = this.add(cacheName); this.set(instance.key, {mounted: true}); + this.stats.increment('cache.volume.miss'); } else { - log_message = 'cache hit'; + log_message = 'cache volume hit'; instance = {key: cacheName + '::' + instanceId, path: this.cache[cacheName][instanceId].path }; + this.stats.increment('cache.volume.hit'); } this.log(log_message, instance); return instance; @@ -80,7 +84,7 @@ VolumeCache.prototype = { var oldPath = this.cache[cacheName][instanceId].path; delete this.cache[cacheName][instanceId]; this.add(cacheName, oldPath); - this.log("released cached volume", {key: cacheKey, path: oldPath}); + this.log("cache volume release", {key: cacheKey, path: oldPath}); }, set: function(cacheKey, value) { From ce68866de7a31bf6f5b325ba688fa9eec6c659d0 Mon Sep 17 00:00:00 2001 From: Greg Arndt Date: Mon, 6 Oct 2014 21:02:05 -0500 Subject: [PATCH 03/15] Added schema definition for cache --- app.box | Bin 0 -> 350 bytes schemas/payload.js | 4 ++++ 2 files changed, 4 insertions(+) create mode 100644 app.box diff --git a/app.box b/app.box new file mode 100644 index 0000000000000000000000000000000000000000..120d9c9c4cb6c9b6517af46d22ca7ea5e152d580 GIT binary patch literal 350 zcmV-k0ipgMiwFP!32ul0|Ll`rYw|D@#(Qf%#UbyF!fQ!uHQ=Y%#n|nL?Xk<&CX%EU znfvZL+U^fB*g)J6`h$>@@SKwzlIKp9PMNUQ6-JjrBtk5*EE0rV%#X83yY+#&NTMXo zl59aEk))?EjX+%NMxSVWP>!y0Zk@iD_xVa-|KRokA-5)=2_9Bh*%n%K2pea&y1)Wf zI|!t?QTY=(=zk1stPfMt Date: Mon, 6 Oct 2014 21:19:29 -0500 Subject: [PATCH 04/15] Added documentation to the volume cache manager --- lib/volume_cache.js | 47 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/lib/volume_cache.js b/lib/volume_cache.js index 51a2176c..e48e7800 100644 --- a/lib/volume_cache.js +++ b/lib/volume_cache.js @@ -2,7 +2,13 @@ var path = require('path'); var mkdirp = require('mkdirp'); var fs = require('fs'); +/** +Cache manager for volumes that can be reused between containers. Cached volumes +will be indexed based on timestamps and reused in the order of most recently used. +@constructor +@param {Object} configuration settings for the volume cache manager +*/ function VolumeCache(config) { this.rootCachePath = config.rootCachePath; this.log = config.log; @@ -11,6 +17,12 @@ function VolumeCache(config) { } VolumeCache.prototype = { + /** + Begin tracking the particular volume cache and create the necessary + local directories. + + @param {String} Name of the cached volume. + */ createCacheVolume: function(cacheName) { var cachePath = path.join(this.rootCachePath, cacheName); this.cache[cacheName] = {}; @@ -23,25 +35,43 @@ VolumeCache.prototype = { } }, + /** + Add a cached volume along with an optional instancePath. Cached volume will + be marked as not mounted until otherwise specified. + + @param {String} Name of the cached volume. + @param {String} Option path for the cached volume. + @return {Object} Cached volume instance that is not mounted. + */ add: function(cacheName, instancePath) { var instanceId = Date.now().toString(); if (!instancePath) { var cachePath = path.join(this.rootCachePath, cacheName); instancePath = path.join(cachePath, instanceId); } - // TODO if the cache can't be created? error task? + if (!fs.existsSync(instancePath)) { mkdirp.sync(instancePath); } + this.cache[cacheName][instanceId] = { path: instancePath, mounted: false }; + // Create a cache key that can be used by consumers of the cache in the + // forma of :: var instance = {key: cacheName + '::' + instanceId, path: instancePath}; return instance; }, + /** + Get the instance for the particular cached volume. If no instance that is not + mounted exists, a new one will be created. + + @param {String} Name of the cached volume. + @return {Object} Cached volume instance. + */ get: function (cacheName) { var instanceId; @@ -78,15 +108,30 @@ VolumeCache.prototype = { return instance; }, + /** + Release the claim on a cached volume. Cached volume should only be released + once a container has been completed removed. Local cached volume will remain + on the filesystem to be used by the next container/task. + + @param {String} Cache key int he format of :: + */ release: function(cacheKey) { var cacheName = cacheKey.split('::')[0]; var instanceId = cacheKey.split('::')[1]; var oldPath = this.cache[cacheName][instanceId].path; + // Remove the old cached volume and add a new unmounted one with an updated + // timestamp/id delete this.cache[cacheName][instanceId]; this.add(cacheName, oldPath); this.log("cache volume release", {key: cacheKey, path: oldPath}); }, + /** + Set a property for a cached volume. + + @param {String} Cache key int he format of :: + @param {Object} Key name and value for the property to be set. + */ set: function(cacheKey, value) { var cacheName = cacheKey.split('::')[0]; var instanceId = cacheKey.split('::')[1]; From 624f9b6ffc466360c36e9c3f9888c88ea28b3853 Mon Sep 17 00:00:00 2001 From: Greg Arndt Date: Mon, 6 Oct 2014 21:35:02 -0500 Subject: [PATCH 05/15] Fixed stats for tests --- test/volume_cache_test.js | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/test/volume_cache_test.js b/test/volume_cache_test.js index 5c657fcc..b20b2d11 100644 --- a/test/volume_cache_test.js +++ b/test/volume_cache_test.js @@ -22,6 +22,10 @@ suite('volume cache test', function () { workerType: 'test_worker_type' }); + var stats = { + increment: function(stat) { return; } + } + var IMAGE = 'taskcluster/test-ubuntu'; setup(co(function* () { @@ -30,8 +34,9 @@ suite('volume cache test', function () { test('cache directories created', function () { var cache = new VolumeCache({ - cachePath: localCacheDir, - log: log + rootCachePath: localCacheDir, + log: log, + stats: stats }); var cacheName = 'tmp-obj-dir-' + Date.now().toString(); @@ -77,8 +82,9 @@ suite('volume cache test', function () { var hostCacheDir = '/docker_test_data'; var cache = new VolumeCache({ - cachePath: localCacheDir, - log: log + rootCachePath: localCacheDir, + log: log, + stats: stats }); var cacheInstance = cache.get(cacheName); From abf43381b749ec47a57d3e4072906a360d5dc6e0 Mon Sep 17 00:00:00 2001 From: Greg Arndt Date: Mon, 6 Oct 2014 21:39:29 -0500 Subject: [PATCH 06/15] removed unecessary test file --- test/integration/volume_caching.js | 3 --- 1 file changed, 3 deletions(-) delete mode 100644 test/integration/volume_caching.js diff --git a/test/integration/volume_caching.js b/test/integration/volume_caching.js deleted file mode 100644 index 39df0fe1..00000000 --- a/test/integration/volume_caching.js +++ /dev/null @@ -1,3 +0,0 @@ -suite('caching volumes tests'), function () { - var testworker = require('../post_task'); -}); From a9de64b0f5ba7bc95daa229f63cc3cc8969f1ae8 Mon Sep 17 00:00:00 2001 From: Greg Arndt Date: Mon, 6 Oct 2014 21:48:49 -0500 Subject: [PATCH 07/15] Fixed payload schema --- app.box | Bin 350 -> 0 bytes schemas/payload.js | 3 ++- 2 files changed, 2 insertions(+), 1 deletion(-) delete mode 100644 app.box diff --git a/app.box b/app.box deleted file mode 100644 index 120d9c9c4cb6c9b6517af46d22ca7ea5e152d580..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 350 zcmV-k0ipgMiwFP!32ul0|Ll`rYw|D@#(Qf%#UbyF!fQ!uHQ=Y%#n|nL?Xk<&CX%EU znfvZL+U^fB*g)J6`h$>@@SKwzlIKp9PMNUQ6-JjrBtk5*EE0rV%#X83yY+#&NTMXo zl59aEk))?EjX+%NMxSVWP>!y0Zk@iD_xVa-|KRokA-5)=2_9Bh*%n%K2pea&y1)Wf zI|!t?QTY=(=zk1stPfMt Date: Tue, 7 Oct 2014 10:53:58 -0500 Subject: [PATCH 08/15] Added scope checking for volume caches --- lib/task.js | 42 ++++++-- .../container_volume_caching_test.js | 99 ++++++++++++++++++- 2 files changed, 133 insertions(+), 8 deletions(-) diff --git a/lib/task.js b/lib/task.js index a6dbe6f9..d59586a6 100644 --- a/lib/task.js +++ b/lib/task.js @@ -77,9 +77,23 @@ Create a list of cached volumes that will be mounted within the docker container @param {object} volume cache @param {object} volumes to mount in the container */ -function buildVolumeBindings(taskVolumeBindings, volumeCache) { +function buildVolumeBindings(taskVolumeBindings, volumeCache, taskScopes) { + var neededScopes = []; + + for (var volumeName in taskVolumeBindings) { + neededScopes.push('docker-worker:cache:' + volumeName); + } + + if (!scopeMatch(taskScopes, neededScopes)) { + throw new Error( + 'Insufficient scopes to attach "' + volumeName + '" as a cached ' + + 'volume. Try adding ' + neededScopes + ' to the .scopes array.' + ); + } + var bindings = []; var caches = []; + for (var volumeName in taskVolumeBindings) { var cacheInstance = volumeCache.get(volumeName); var binding = cacheInstance.path + ':' + taskVolumeBindings[volumeName]; @@ -141,7 +155,7 @@ Task.prototype = { if (this.task.payload.cache) { var bindings = buildVolumeBindings(this.task.payload.cache, - this.runtime.volumeCache); + this.runtime.volumeCache, this.task.scopes); this.volumeCaches = bindings[0]; procConfig.start.Binds = bindings[1]; } @@ -350,10 +364,6 @@ Task.prototype = { var links = yield* stats.timeGen('tasks.time.states.linked', this.states.link(this)); - var dockerProc = this.dockerProcess = new DockerProc( - this.runtime.docker, this.dockerConfig(links) - ); - // Hooks prior to running the task. yield stats.timeGen('tasks.time.states.created', this.states.created(this)); @@ -409,6 +419,26 @@ Task.prototype = { } gc.markImage(this.task.payload.image); + try { + var dockerProc = this.dockerProcess = new DockerProc( + this.runtime.docker, this.dockerConfig(links)); + } catch (e) { + this.stream.write(this.fmtLog('Docker configuration could not be ' + + 'created. This may indicate an authentication error when validating ' + + 'scopes necessary for using caches. \n Error %s', e)); + + yield this.stream.end.bind(this.stream, this.logFooter( + false, // unsuccessful task + -1, // negative exit code indicates infrastructure errors usually. + taskStart, // duration details... + new Date() + )); + + yield stats.timeGen( + 'tasks.time.states.docker_configuration.killed', this.states.killed(this) + ); + return false; + } // Now that we know the stream is ready pipe data into it... dockerProc.stdout.pipe(this.stream, { diff --git a/test/integration/container_volume_caching_test.js b/test/integration/container_volume_caching_test.js index f782744c..8d15aa36 100644 --- a/test/integration/container_volume_caching_test.js +++ b/test/integration/container_volume_caching_test.js @@ -8,8 +8,9 @@ suite('container volume cache tests', function () { var cacheDir = process.env.DOCKER_WORKER_CACHE_DIR || '/var/cache'; - test('mount cached folder in docker worker', co(function* () { + test('mount cached volume in docker worker', co(function* () { var cacheName = 'tmp-obj-dir-' + Date.now().toString(); + var neededScope = 'docker-worker:cache:' + cacheName; var fullCacheDir = path.join(cacheDir, cacheName); var task = { @@ -24,7 +25,8 @@ suite('container volume cache tests', function () { }, cache: {}, maxRunTime: 5 * 60 - } + }, + scopes: [neededScope] }; task.payload.cache[cacheName] = '/tmp-obj-dir'; @@ -41,4 +43,97 @@ suite('container volume cache tests', function () { rmrf.sync(fullCacheDir); } })); + + test('mount multiple cached volumes in docker worker', co(function* () { + var cacheName1 = 'tmp-obj-dir-' + Date.now().toString(); + var cacheName2 = 'tmp-obj-dir-' + (Date.now()+1).toString(); + + var neededScopes = [] + neededScopes.push('docker-worker:cache:' + cacheName1); + neededScopes.push('docker-worker:cache:' + cacheName2); + + var fullCache1Dir = path.join(cacheDir, cacheName1); + var fullCache2Dir = path.join(cacheDir, cacheName2); + + var task = { + payload: { + image: 'taskcluster/test-ubuntu', + command: cmd( + 'echo "foo" > /tmp-obj-dir1/foo.txt', + 'echo "bar" > /tmp-obj-dir2/bar.txt' + ), + features: { + // No need to actually issue live logging... + localLiveLog: false + }, + cache: {}, + maxRunTime: 5 * 60 + }, + scopes: neededScopes + }; + + task.payload.cache[cacheName1] = '/tmp-obj-dir1'; + task.payload.cache[cacheName2] = '/tmp-obj-dir2'; + + var result = yield testworker(task); + + // Get task specific results + assert.ok(result.run.success, 'task was successful'); + + var objDir = fs.readdirSync(fullCache1Dir); + assert.ok(fs.existsSync(path.join(fullCache1Dir, objDir[0], 'foo.txt'))); + + if (fs.existsSync(fullCache1Dir)) { + rmrf.sync(fullCache1Dir); + } + + objDir = fs.readdirSync(fullCache2Dir); + assert.ok(fs.existsSync(path.join(fullCache2Dir, objDir[0], 'bar.txt'))); + + if (fs.existsSync(fullCache2Dir)) { + rmrf.sync(fullCache2Dir); + } + })); + + test('task unsuccesful when insufficient cache scope is provided', + co(function* () { + var cacheName = 'tmp-obj-dir-' + Date.now().toString(); + var neededScope = 'docker-worker:cache:1' + cacheName; + var fullCacheDir = path.join(cacheDir, cacheName); + + var task = { + payload: { + image: 'taskcluster/test-ubuntu', + command: cmd( + 'echo "foo" > /tmp-obj-dir/foo.txt' + ), + features: { + // No need to actually issue live logging... + localLiveLog: false + }, + cache: {}, + maxRunTime: 5 * 60 + }, + scopes: [neededScope] + }; + + task.payload.cache[cacheName] = '/tmp-obj-dir'; + + var result = yield testworker(task); + + // Get task specific results + assert.ok(!result.run.success, + 'Task completed successfully when it should not have.'); + + var dirExists = fs.existsSync(fullCacheDir); + if (dirExists) { + rmrf.sync(fullCacheDir); + } + + assert.ok(!dirExists, + 'Volume cache created cached volume directory when it should not ' + + 'have.' + ); + }) + ); }); From c56b1484997dfbd333a6622e2617b26a7a673438 Mon Sep 17 00:00:00 2001 From: Greg Arndt Date: Tue, 7 Oct 2014 15:11:20 -0500 Subject: [PATCH 09/15] Added GC for volume caches --- bin/worker.js | 2 + lib/gc.js | 17 +++++-- lib/volume_cache.js | 82 ++++++++++++++++++++++++--------- test/garbage_collection_test.js | 67 +++++++++++++++++++-------- 4 files changed, 123 insertions(+), 45 deletions(-) diff --git a/bin/worker.js b/bin/worker.js index df0996ea..6186e40e 100644 --- a/bin/worker.js +++ b/bin/worker.js @@ -175,6 +175,8 @@ co(function *() { }); }); + config.gc.addManager(config.volumeCache); + var runtime = new Runtime(config); // Build the listener and connect to the queue. diff --git a/lib/gc.js b/lib/gc.js index b7d507f4..04dc3c49 100644 --- a/lib/gc.js +++ b/lib/gc.js @@ -70,12 +70,17 @@ function GarbageCollector(config) { this.markedImages = {}; this.retries = 5; this.scheduleSweep(this.interval); + this.managers = []; EventEmitter.call(this); } GarbageCollector.prototype = { __proto__: EventEmitter.prototype, + addManager: function(manager) { + this.managers.push(manager); + }, + markImage: function(image) { var parsedImage = parseImage(image); var repository = parsedImage.repository; @@ -141,14 +146,14 @@ GarbageCollector.prototype = { this.markedContainers[containerId].retries -= 1; } - this.emit('gc:error', {message: message, container: containerId}); + this.emit('gc:container:error', {message: message, container: containerId}); this.log('container removal error.', {container: containerId, err: message}); } } else { delete this.markedContainers[containerId]; this.ignoredContainers.push(containerId); - this.emit('gc:error', + this.emit('gc:container:error', {message: 'Retry limit exceeded', container: containerId}); this.log('container removal error', {container: containerId, err: 'Retry limit exceeded'}); @@ -223,17 +228,21 @@ GarbageCollector.prototype = { (this.capacity - this.taskListener.pending), this.log); if (exceedsThreshold) { - this.emit('gc:warning', + this.emit('gc:diskspace:warning', {message: 'Diskspace threshold reached. ' + 'Removing all non-running images.' }); } else { - this.emit('gc:info', + this.emit('gc:diskspace:info', {message: 'Diskspace threshold not reached. ' + 'Removing only expired images.' }); } yield this.removeUnusedImages(exceedsThreshold); + + for (var i = 0; i < this.managers.length; i++) { + yield this.managers[i].clear(exceedsThreshold); + } } this.log('garbage collection finished'); diff --git a/lib/volume_cache.js b/lib/volume_cache.js index e48e7800..3e5324ac 100644 --- a/lib/volume_cache.js +++ b/lib/volume_cache.js @@ -1,6 +1,20 @@ var path = require('path'); var mkdirp = require('mkdirp'); var fs = require('fs'); +var rmrf = require('rimraf'); +var Promise = require('promise'); + +function removeDir(directory) { + return new Promise(function(accept, reject) { + rmrf(directory, function (error) { + if (error) { + reject(error); + } else { + accept(); + } + }); + }); +} /** Cache manager for volumes that can be reused between containers. Cached volumes @@ -17,24 +31,6 @@ function VolumeCache(config) { } VolumeCache.prototype = { - /** - Begin tracking the particular volume cache and create the necessary - local directories. - - @param {String} Name of the cached volume. - */ - createCacheVolume: function(cacheName) { - var cachePath = path.join(this.rootCachePath, cacheName); - this.cache[cacheName] = {}; - - if(!fs.existsSync(cachePath)) { - mkdirp.sync(cachePath); - var cacheDetails = {cacheName: cacheName, cachPath: cachePath}; - this.stats.increment('cache.volume.created'); - this.log('cache volume created', cacheDetails); - } - }, - /** Add a cached volume along with an optional instancePath. Cached volume will be marked as not mounted until otherwise specified. @@ -65,6 +61,48 @@ VolumeCache.prototype = { return instance; }, + /** + Remove any unmounted volumes when diskspace threshold is reached. This will + be called at each garbage collection interval. + + @param {Boolean} Disksapce threshold reached + */ + clear: function* (exceedsDiskspaceThreshold) { + if (exceedsDiskspaceThreshold) { + for (var cacheName in this.cache) { + for (var instance in this.cache[cacheName]) { + if (!this.cache[cacheName][instance].mounted) { + var cacheKey = cacheName + '::' + instance; + var instancePath = this.cache[cacheName][instance].path; + yield removeDir(instancePath); + delete this.cache[cacheName][instance]; + this.stats.increment('cache.volume.removed'); + this.log('cache volume removed', + {key: cacheKey, path: instancePath}); + } + } + } + } + }, + + /** + Begin tracking the particular volume cache and create the necessary + local directories. + + @param {String} Name of the cached volume. + */ + createCacheVolume: function(cacheName) { + var cachePath = path.join(this.rootCachePath, cacheName); + this.cache[cacheName] = {}; + + if(!fs.existsSync(cachePath)) { + mkdirp.sync(cachePath); + var cacheDetails = {cacheName: cacheName, cachPath: cachePath}; + this.stats.increment('cache.volume.created'); + this.log('cache volume created', cacheDetails); + } + }, + /** Get the instance for the particular cached volume. If no instance that is not mounted exists, a new one will be created. @@ -90,21 +128,21 @@ VolumeCache.prototype = { } var instance; - var log_message = ''; + var logMessage = ''; if (!instanceId) { - log_message = 'cache volume miss'; + logMessage = 'cache volume miss'; instance = this.add(cacheName); this.set(instance.key, {mounted: true}); this.stats.increment('cache.volume.miss'); } else { - log_message = 'cache volume hit'; + logMessage = 'cache volume hit'; instance = {key: cacheName + '::' + instanceId, path: this.cache[cacheName][instanceId].path }; this.stats.increment('cache.volume.hit'); } - this.log(log_message, instance); + this.log(logMessage, instance); return instance; }, diff --git a/test/garbage_collection_test.js b/test/garbage_collection_test.js index 32278f6b..6c8d3b48 100644 --- a/test/garbage_collection_test.js +++ b/test/garbage_collection_test.js @@ -1,14 +1,17 @@ suite('garbage collection tests', function () { var co = require('co'); + var fs = require('fs'); var createLogger = require('../lib/log'); var docker = require('../lib/docker')(); var dockerUtils = require('dockerode-process/utils'); var pullImage = require('../lib/pull_image_to_stream'); var GarbageCollector = require('../lib/gc'); - var IMAGE = 'taskcluster/test-ubuntu'; + var VolumeCache = require('../lib/volume_cache'); var streams = require('stream'); var waitForEvent = require('../lib/wait_for_event'); + var IMAGE = 'taskcluster/test-ubuntu'; + var log = createLogger({ source: 'top', // top level logger details... provisionerId: 'test_provisioner', @@ -29,20 +32,7 @@ suite('garbage collection tests', function () { } setup(co(function* () { - yield new Promise(function(accept, reject) { - // pull the image (or use on in the cache and output status in stdout) - var pullStream = - dockerUtils.pullImageIfMissing(docker, IMAGE); - - // pipe the pull stream into stdout but don't end - pullStream.pipe(process.stdout, { end: false }); - - pullStream.once('error', reject); - pullStream.once('end', function() { - pullStream.removeListener('error', reject); - accept(); - }.bind(this)); - }.bind(this)); + yield pullImage(docker, IMAGE, process.stdout); })); test('remove container', co(function* () { @@ -115,7 +105,7 @@ suite('garbage collection tests', function () { gc.removeContainer(container.id); gc.markedContainers[container.id].retries = 0; - var error = yield waitForEvent(gc, 'gc:error'); + var error = yield waitForEvent(gc, 'gc:container:error'); assert.ok(error.container === container.id); assert.ok(error.message === 'Retry limit exceeded', 'Error message does not match \'Retry limit exceeded\''); @@ -151,7 +141,7 @@ suite('garbage collection tests', function () { gc.sweep(); - var error = yield waitForEvent(gc, 'gc:error'); + var error = yield waitForEvent(gc, 'gc:container:error'); var errorMessage = 'No such container. Will remove from marked ' + 'containers list.'; assert.ok(error.container === container.id); @@ -263,7 +253,7 @@ suite('garbage collection tests', function () { gc.markImage(imageName); gc.sweep(); - var infoMessage = yield waitForEvent(gc, 'gc:info'); + var infoMessage = yield waitForEvent(gc, 'gc:diskspace:info'); var msg = 'Diskspace threshold not reached. Removing only expired images.'; assert.ok(msg === infoMessage.message); @@ -301,7 +291,7 @@ suite('garbage collection tests', function () { gc.markImage(imageName); gc.sweep(); - var warningMessage = yield waitForEvent(gc, 'gc:warning'); + var warningMessage = yield waitForEvent(gc, 'gc:diskspace:warning'); var msg = 'Diskspace threshold reached. Removing all non-running images.'; assert.ok(msg === warningMessage.message); @@ -348,4 +338,43 @@ suite('garbage collection tests', function () { yield waitForEvent(gc, 'gc:sweep:stop'); clearTimeout(gc.sweepTimeoutId); })); + + test('clear volume cache when diskspace threshold reached', co(function* () { + var gc = new GarbageCollector({ + capacity: 2, + log: log, + docker: docker, + dockerVolume: '/', + interval: 2 * 1000, + taskListener: {pending: 1}, + diskspaceThreshold: 500000 * 100000000, + imageExpiration: 5 + }); + + clearTimeout(gc.sweepTimeoutId); + + var localCacheDir = process.env.DOCKER_WORKER_CACHE_DIR || '/var/cache'; + var stats = { + increment: function(stat) { return; } + } + + var cache = new VolumeCache({ + rootCachePath: localCacheDir, + log: log, + stats: stats + }); + + gc.addManager(cache); + + var cacheName = 'tmp-obj-dir-' + Date.now().toString(); + + var instance1 = cache.get(cacheName); + var instance2 = cache.get(cacheName); + cache.set(instance2.key, {mounted: false}); + gc.sweep(); + yield waitForEvent(gc, 'gc:sweep:stop'); + clearTimeout(gc.sweepTimeoutId); + assert.ok(fs.existsSync(instance1.path)); + assert.ok(!fs.existsSync(instance2.path)); + })); }); From 56af873bc723cf94f6caf10f2339cc6784b45cf9 Mon Sep 17 00:00:00 2001 From: Greg Arndt Date: Tue, 7 Oct 2014 15:14:25 -0500 Subject: [PATCH 10/15] Fixed indentation --- test/integration/container_volume_caching_test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/container_volume_caching_test.js b/test/integration/container_volume_caching_test.js index 8d15aa36..417e44f3 100644 --- a/test/integration/container_volume_caching_test.js +++ b/test/integration/container_volume_caching_test.js @@ -69,7 +69,7 @@ suite('container volume cache tests', function () { cache: {}, maxRunTime: 5 * 60 }, - scopes: neededScopes + scopes: neededScopes }; task.payload.cache[cacheName1] = '/tmp-obj-dir1'; From 1c47a25fe6d1f96a264630e71f0ed3b1b53655d8 Mon Sep 17 00:00:00 2001 From: Greg Arndt Date: Tue, 7 Oct 2014 15:53:11 -0500 Subject: [PATCH 11/15] Adjusted stat names for cache hit/miss --- lib/volume_cache.js | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/lib/volume_cache.js b/lib/volume_cache.js index 3e5324ac..b34a2207 100644 --- a/lib/volume_cache.js +++ b/lib/volume_cache.js @@ -74,9 +74,9 @@ VolumeCache.prototype = { if (!this.cache[cacheName][instance].mounted) { var cacheKey = cacheName + '::' + instance; var instancePath = this.cache[cacheName][instance].path; - yield removeDir(instancePath); + var statName = 'cache.volume.' + cacheKey + '.removed'; + yield this.stats.timeGen(statName, removeDir(instancePath)); delete this.cache[cacheName][instance]; - this.stats.increment('cache.volume.removed'); this.log('cache volume removed', {key: cacheKey, path: instancePath}); } @@ -98,7 +98,8 @@ VolumeCache.prototype = { if(!fs.existsSync(cachePath)) { mkdirp.sync(cachePath); var cacheDetails = {cacheName: cacheName, cachPath: cachePath}; - this.stats.increment('cache.volume.created'); + var statName = 'cache.volume.' + cacheName + '.created'; + this.stats.increment(statName); this.log('cache volume created', cacheDetails); } }, @@ -134,13 +135,15 @@ VolumeCache.prototype = { logMessage = 'cache volume miss'; instance = this.add(cacheName); this.set(instance.key, {mounted: true}); - this.stats.increment('cache.volume.miss'); + var statName = 'cache.volume.' + instance.key + 'miss'; + this.stats.increment(statName); } else { logMessage = 'cache volume hit'; instance = {key: cacheName + '::' + instanceId, path: this.cache[cacheName][instanceId].path }; - this.stats.increment('cache.volume.hit'); + var statName = 'cache.volume.' + instance.key + 'hit'; + this.stats.increment(statName); } this.log(logMessage, instance); return instance; From de32d33d6beea589b5bf7260c67d1fa884d70e89 Mon Sep 17 00:00:00 2001 From: Greg Arndt Date: Tue, 7 Oct 2014 17:07:19 -0500 Subject: [PATCH 12/15] Added generator support for volume cache --- bin/worker.js | 6 +-- lib/task.js | 14 +++---- lib/volume_cache.js | 39 ++++++++++++------- package.json | 1 + test/garbage_collection_test.js | 7 ++-- .../container_volume_caching_test.js | 2 +- test/volume_cache_test.js | 19 ++++----- 7 files changed, 51 insertions(+), 37 deletions(-) diff --git a/bin/worker.js b/bin/worker.js index 6186e40e..d134a7c5 100644 --- a/bin/worker.js +++ b/bin/worker.js @@ -170,9 +170,9 @@ co(function *() { }); config.gc.on('gc:container:removed', function (container) { - container.caches.forEach(function (cacheKey) { - config.volumeCache.release(cacheKey); - }); + container.caches.forEach(co(function* (cacheKey) { + yield config.volumeCache.release(cacheKey); + })); }); config.gc.addManager(config.volumeCache); diff --git a/lib/task.js b/lib/task.js index d59586a6..91eef109 100644 --- a/lib/task.js +++ b/lib/task.js @@ -77,7 +77,7 @@ Create a list of cached volumes that will be mounted within the docker container @param {object} volume cache @param {object} volumes to mount in the container */ -function buildVolumeBindings(taskVolumeBindings, volumeCache, taskScopes) { +function* buildVolumeBindings(taskVolumeBindings, volumeCache, taskScopes) { var neededScopes = []; for (var volumeName in taskVolumeBindings) { @@ -95,13 +95,12 @@ function buildVolumeBindings(taskVolumeBindings, volumeCache, taskScopes) { var caches = []; for (var volumeName in taskVolumeBindings) { - var cacheInstance = volumeCache.get(volumeName); + var cacheInstance = yield volumeCache.get(volumeName); var binding = cacheInstance.path + ':' + taskVolumeBindings[volumeName]; bindings.push(binding); caches.push(cacheInstance.key); } return [caches, bindings]; - } function Task(runtime, runId, task, status) { @@ -122,7 +121,7 @@ Task.prototype = { @param {Array[dockerode.Container]} [links] list of dockerode containers. */ - dockerConfig: function(links) { + dockerConfig: function* (links) { var config = this.task.payload; var env = config.env || {}; @@ -154,7 +153,7 @@ Task.prototype = { } if (this.task.payload.cache) { - var bindings = buildVolumeBindings(this.task.payload.cache, + var bindings = yield buildVolumeBindings(this.task.payload.cache, this.runtime.volumeCache, this.task.scopes); this.volumeCaches = bindings[0]; procConfig.start.Binds = bindings[1]; @@ -420,8 +419,7 @@ Task.prototype = { gc.markImage(this.task.payload.image); try { - var dockerProc = this.dockerProcess = new DockerProc( - this.runtime.docker, this.dockerConfig(links)); + var dockerConfig = yield this.dockerConfig(links); } catch (e) { this.stream.write(this.fmtLog('Docker configuration could not be ' + 'created. This may indicate an authentication error when validating ' + @@ -440,6 +438,8 @@ Task.prototype = { return false; } + var dockerProc = this.dockerProcess = new DockerProc( + this.runtime.docker, dockerConfig); // Now that we know the stream is ready pipe data into it... dockerProc.stdout.pipe(this.stream, { end: false diff --git a/lib/volume_cache.js b/lib/volume_cache.js index b34a2207..ce7b0e9a 100644 --- a/lib/volume_cache.js +++ b/lib/volume_cache.js @@ -1,6 +1,6 @@ var path = require('path'); var mkdirp = require('mkdirp'); -var fs = require('fs'); +var fs = require('co-fs'); var rmrf = require('rimraf'); var Promise = require('promise'); @@ -16,6 +16,18 @@ function removeDir(directory) { }); } +function makeDir(directory) { + return new Promise(function(accept, reject) { + mkdirp(directory, function (error) { + if (error) { + reject(error); + } else { + accept(); + } + }); + }); +} + /** Cache manager for volumes that can be reused between containers. Cached volumes will be indexed based on timestamps and reused in the order of most recently used. @@ -39,17 +51,16 @@ VolumeCache.prototype = { @param {String} Option path for the cached volume. @return {Object} Cached volume instance that is not mounted. */ - add: function(cacheName, instancePath) { + add: function* (cacheName, instancePath) { var instanceId = Date.now().toString(); if (!instancePath) { var cachePath = path.join(this.rootCachePath, cacheName); instancePath = path.join(cachePath, instanceId); } - if (!fs.existsSync(instancePath)) { - mkdirp.sync(instancePath); + if (!(yield fs.exists(instancePath))) { + yield makeDir(instancePath); } - this.cache[cacheName][instanceId] = { path: instancePath, mounted: false @@ -91,12 +102,12 @@ VolumeCache.prototype = { @param {String} Name of the cached volume. */ - createCacheVolume: function(cacheName) { + createCacheVolume: function* (cacheName) { var cachePath = path.join(this.rootCachePath, cacheName); this.cache[cacheName] = {}; - if(!fs.existsSync(cachePath)) { - mkdirp.sync(cachePath); + if(!(yield fs.exists(cachePath))) { + yield makeDir(cachePath); var cacheDetails = {cacheName: cacheName, cachPath: cachePath}; var statName = 'cache.volume.' + cacheName + '.created'; this.stats.increment(statName); @@ -111,11 +122,11 @@ VolumeCache.prototype = { @param {String} Name of the cached volume. @return {Object} Cached volume instance. */ - get: function (cacheName) { + get: function* (cacheName) { var instanceId; if (!this.cache[cacheName]) { - this.createCacheVolume(cacheName); + yield this.createCacheVolume(cacheName); } else { var instanceIds = Object.keys(this.cache[cacheName]).sort().reverse(); for (var i = 0; i < instanceIds.length; i++) { @@ -133,7 +144,7 @@ VolumeCache.prototype = { if (!instanceId) { logMessage = 'cache volume miss'; - instance = this.add(cacheName); + instance = yield this.add(cacheName); this.set(instance.key, {mounted: true}); var statName = 'cache.volume.' + instance.key + 'miss'; this.stats.increment(statName); @@ -156,14 +167,14 @@ VolumeCache.prototype = { @param {String} Cache key int he format of :: */ - release: function(cacheKey) { + release: function* (cacheKey) { var cacheName = cacheKey.split('::')[0]; var instanceId = cacheKey.split('::')[1]; var oldPath = this.cache[cacheName][instanceId].path; // Remove the old cached volume and add a new unmounted one with an updated // timestamp/id delete this.cache[cacheName][instanceId]; - this.add(cacheName, oldPath); + yield this.add(cacheName, oldPath); this.log("cache volume release", {key: cacheKey, path: oldPath}); }, @@ -173,7 +184,7 @@ VolumeCache.prototype = { @param {String} Cache key int he format of :: @param {Object} Key name and value for the property to be set. */ - set: function(cacheKey, value) { + set: function (cacheKey, value) { var cacheName = cacheKey.split('::')[0]; var instanceId = cacheKey.split('::')[1]; for (var key in value) { diff --git a/package.json b/package.json index 9eb482ee..8a352876 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "azure-storage": "^0.3.0", "co": "^3.1.0", "co-event": "^0.1.0", + "co-fs": "^1.2.0", "co-promise": "^1.0.0", "commander": "^2.2.0", "debug": "^1.0.2", diff --git a/test/garbage_collection_test.js b/test/garbage_collection_test.js index 6c8d3b48..232bcb16 100644 --- a/test/garbage_collection_test.js +++ b/test/garbage_collection_test.js @@ -355,7 +355,8 @@ suite('garbage collection tests', function () { var localCacheDir = process.env.DOCKER_WORKER_CACHE_DIR || '/var/cache'; var stats = { - increment: function(stat) { return; } + increment: function(stat) { return; }, + timeGen: function* (stat, fn) { yield fn; } } var cache = new VolumeCache({ @@ -368,8 +369,8 @@ suite('garbage collection tests', function () { var cacheName = 'tmp-obj-dir-' + Date.now().toString(); - var instance1 = cache.get(cacheName); - var instance2 = cache.get(cacheName); + var instance1 = yield cache.get(cacheName); + var instance2 = yield cache.get(cacheName); cache.set(instance2.key, {mounted: false}); gc.sweep(); yield waitForEvent(gc, 'gc:sweep:stop'); diff --git a/test/integration/container_volume_caching_test.js b/test/integration/container_volume_caching_test.js index 417e44f3..8b05f8b5 100644 --- a/test/integration/container_volume_caching_test.js +++ b/test/integration/container_volume_caching_test.js @@ -48,7 +48,7 @@ suite('container volume cache tests', function () { var cacheName1 = 'tmp-obj-dir-' + Date.now().toString(); var cacheName2 = 'tmp-obj-dir-' + (Date.now()+1).toString(); - var neededScopes = [] + var neededScopes = []; neededScopes.push('docker-worker:cache:' + cacheName1); neededScopes.push('docker-worker:cache:' + cacheName2); diff --git a/test/volume_cache_test.js b/test/volume_cache_test.js index b20b2d11..d753f520 100644 --- a/test/volume_cache_test.js +++ b/test/volume_cache_test.js @@ -24,7 +24,7 @@ suite('volume cache test', function () { var stats = { increment: function(stat) { return; } - } + }; var IMAGE = 'taskcluster/test-ubuntu'; @@ -32,7 +32,7 @@ suite('volume cache test', function () { yield pullImage(docker, IMAGE, process.stdout); })); - test('cache directories created', function () { + test('cache directories created', co(function* () { var cache = new VolumeCache({ rootCachePath: localCacheDir, log: log, @@ -46,9 +46,9 @@ suite('volume cache test', function () { rmrf.sync(fullPath); } - var instance1 = cache.get(cacheName); - var instance2 = cache.get(cacheName); - var instance3 = cache.get(cacheName); + var instance1 = yield cache.get(cacheName); + var instance2 = yield cache.get(cacheName); + var instance3 = yield cache.get(cacheName); assert.ok(fs.existsSync(instance1.path)); assert.ok(fs.existsSync(instance2.path)); @@ -59,10 +59,10 @@ suite('volume cache test', function () { assert.ok(instance2.path !== instance3.path); // Release clame on cached volume - cache.release(instance2.key); + yield cache.release(instance2.key); // Should reclaim cache directory path created by instance2 - var instance4 = cache.get(cacheName); + var instance4 = yield cache.get(cacheName); assert.ok(instance2.key !== instance4.key); assert.ok(instance2.path === instance4.path); @@ -70,7 +70,7 @@ suite('volume cache test', function () { if(fs.existsSync(fullPath)) { rmrf.sync(fullPath); } - }); + })); test('cache directory mounted in container', co(function* () { // Test is currently setup using container volumes exposed via samba using @@ -87,13 +87,14 @@ suite('volume cache test', function () { stats: stats }); - var cacheInstance = cache.get(cacheName); var localCachePath = path.join(localCacheDir, cacheName); if (fs.existsSync(localCachePath)) { rmrf.sync(localCachePath); } + var cacheInstance = yield cache.get(cacheName); + var c = cmd( 'echo "foo" > /docker_cache/tmp-obj-dir/blah.txt' ); From dce5f6aa1cd6496889abc1498cc19a13e9a499d3 Mon Sep 17 00:00:00 2001 From: Greg Arndt Date: Wed, 8 Oct 2014 11:30:36 -0500 Subject: [PATCH 13/15] Cache names cannot contain custom delimiter. --- lib/volume_cache.js | 54 +++++++++------- test/garbage_collection_test.js | 4 ++ .../container_volume_caching_test.js | 63 +++++++++++++++++-- test/volume_cache_test.js | 31 +++++++++ 4 files changed, 125 insertions(+), 27 deletions(-) diff --git a/lib/volume_cache.js b/lib/volume_cache.js index ce7b0e9a..5920fc92 100644 --- a/lib/volume_cache.js +++ b/lib/volume_cache.js @@ -4,13 +4,15 @@ var fs = require('co-fs'); var rmrf = require('rimraf'); var Promise = require('promise'); +var KEY_DELIMITER = '::'; + function removeDir(directory) { return new Promise(function(accept, reject) { rmrf(directory, function (error) { if (error) { - reject(error); + return reject(error); } else { - accept(); + return accept(); } }); }); @@ -20,9 +22,9 @@ function makeDir(directory) { return new Promise(function(accept, reject) { mkdirp(directory, function (error) { if (error) { - reject(error); + return reject(error); } else { - accept(); + return accept(); } }); }); @@ -68,7 +70,7 @@ VolumeCache.prototype = { // Create a cache key that can be used by consumers of the cache in the // forma of :: - var instance = {key: cacheName + '::' + instanceId, path: instancePath}; + var instance = {key: cacheName + KEY_DELIMITER + instanceId, path: instancePath}; return instance; }, @@ -79,18 +81,21 @@ VolumeCache.prototype = { @param {Boolean} Disksapce threshold reached */ clear: function* (exceedsDiskspaceThreshold) { - if (exceedsDiskspaceThreshold) { - for (var cacheName in this.cache) { - for (var instance in this.cache[cacheName]) { - if (!this.cache[cacheName][instance].mounted) { - var cacheKey = cacheName + '::' + instance; - var instancePath = this.cache[cacheName][instance].path; - var statName = 'cache.volume.' + cacheKey + '.removed'; - yield this.stats.timeGen(statName, removeDir(instancePath)); - delete this.cache[cacheName][instance]; - this.log('cache volume removed', - {key: cacheKey, path: instancePath}); - } + if (!exceedsDiskspaceThreshold) { + return; + } + for (var cacheName in this.cache) { + for (var instance in this.cache[cacheName]) { + if (this.cache[cacheName][instance].mounted) { + continue; + } else { + var cacheKey = cacheName + KEY_DELIMITER + instance; + var instancePath = this.cache[cacheName][instance].path; + var statName = 'cache.volume.' + cacheKey + '.removed'; + yield this.stats.timeGen(statName, removeDir(instancePath)); + delete this.cache[cacheName][instance]; + this.log('cache volume removed', + {key: cacheKey, path: instancePath}); } } } @@ -123,6 +128,11 @@ VolumeCache.prototype = { @return {Object} Cached volume instance. */ get: function* (cacheName) { + if (cacheName.indexOf(KEY_DELIMITER) !== -1) { + throw new Error('Invalid key name was provided. Ensure that the cache ' + + 'name does not contain "' + KEY_DELIMITER + '".'); + } + var instanceId; if (!this.cache[cacheName]) { @@ -150,7 +160,7 @@ VolumeCache.prototype = { this.stats.increment(statName); } else { logMessage = 'cache volume hit'; - instance = {key: cacheName + '::' + instanceId, + instance = {key: cacheName + KEY_DELIMITER + instanceId, path: this.cache[cacheName][instanceId].path }; var statName = 'cache.volume.' + instance.key + 'hit'; @@ -168,8 +178,8 @@ VolumeCache.prototype = { @param {String} Cache key int he format of :: */ release: function* (cacheKey) { - var cacheName = cacheKey.split('::')[0]; - var instanceId = cacheKey.split('::')[1]; + var cacheName = cacheKey.split(KEY_DELIMITER)[0]; + var instanceId = cacheKey.split(KEY_DELIMITER)[1]; var oldPath = this.cache[cacheName][instanceId].path; // Remove the old cached volume and add a new unmounted one with an updated // timestamp/id @@ -185,8 +195,8 @@ VolumeCache.prototype = { @param {Object} Key name and value for the property to be set. */ set: function (cacheKey, value) { - var cacheName = cacheKey.split('::')[0]; - var instanceId = cacheKey.split('::')[1]; + var cacheName = cacheKey.split(KEY_DELIMITER)[0]; + var instanceId = cacheKey.split(KEY_DELIMITER)[1]; for (var key in value) { this.cache[cacheName][instanceId][key] = value[key]; } diff --git a/test/garbage_collection_test.js b/test/garbage_collection_test.js index 232bcb16..ae55831d 100644 --- a/test/garbage_collection_test.js +++ b/test/garbage_collection_test.js @@ -372,9 +372,13 @@ suite('garbage collection tests', function () { var instance1 = yield cache.get(cacheName); var instance2 = yield cache.get(cacheName); cache.set(instance2.key, {mounted: false}); + gc.sweep(); + yield waitForEvent(gc, 'gc:sweep:stop'); + clearTimeout(gc.sweepTimeoutId); + assert.ok(fs.existsSync(instance1.path)); assert.ok(!fs.existsSync(instance2.path)); })); diff --git a/test/integration/container_volume_caching_test.js b/test/integration/container_volume_caching_test.js index 8b05f8b5..808bf4c8 100644 --- a/test/integration/container_volume_caching_test.js +++ b/test/integration/container_volume_caching_test.js @@ -26,7 +26,7 @@ suite('container volume cache tests', function () { cache: {}, maxRunTime: 5 * 60 }, - scopes: [neededScope] + scopes: [neededScope] }; task.payload.cache[cacheName] = '/tmp-obj-dir'; @@ -109,12 +109,59 @@ suite('container volume cache tests', function () { ), features: { // No need to actually issue live logging... - localLiveLog: false + localLiveLog: true }, cache: {}, maxRunTime: 5 * 60 }, - scopes: [neededScope] + scopes: [neededScope] + }; + + task.payload.cache[cacheName] = '/tmp-obj-dir'; + + var result = yield testworker(task); + + // Get task specific results + assert.ok(!result.run.success, + 'Task completed successfully when it should not have.'); + + var expectedError = 'Insufficient scopes to attach "' + cacheName + '"'; + assert.ok(result.log.indexOf(expectedError) !== -1, + 'Insufficient scopes error message did not appear in the log' + ); + + var dirExists = fs.existsSync(fullCacheDir); + if (dirExists) { + rmrf.sync(fullCacheDir); + } + + assert.ok(!dirExists, + 'Volume cache created cached volume directory when it should not ' + + 'have.' + ); + }) + ); + + test('task unsuccesful when invalid cache name is requested', + co(function* () { + var cacheName = 'tmp-obj-dir::-' + Date.now().toString(); + var neededScope = 'docker-worker:cache:' + cacheName; + var fullCacheDir = path.join(cacheDir, cacheName); + + var task = { + payload: { + image: 'taskcluster/test-ubuntu', + command: cmd( + 'echo "foo" > /tmp-obj-dir/foo.txt' + ), + features: { + // No need to actually issue live logging... + localLiveLog: true + }, + cache: {}, + maxRunTime: 5 * 60 + }, + scopes: [neededScope] }; task.payload.cache[cacheName] = '/tmp-obj-dir'; @@ -125,14 +172,20 @@ suite('container volume cache tests', function () { assert.ok(!result.run.success, 'Task completed successfully when it should not have.'); + var expectedError = 'Error: Invalid key name was provided'; + assert.ok(result.log.indexOf(expectedError) !== -1, + 'Invalid key name message did not appear in the logs' + ); + var dirExists = fs.existsSync(fullCacheDir); + if (dirExists) { rmrf.sync(fullCacheDir); } assert.ok(!dirExists, - 'Volume cache created cached volume directory when it should not ' + - 'have.' + 'Volume cache created cached volume directory when it should not ' + + 'have.' ); }) ); diff --git a/test/volume_cache_test.js b/test/volume_cache_test.js index d753f520..890747eb 100644 --- a/test/volume_cache_test.js +++ b/test/volume_cache_test.js @@ -132,4 +132,35 @@ suite('volume cache test', function () { rmrf.sync(localCachePath); } })); + + test('invalid cache name is rejected', co(function* () { + var cacheName = 'tmp-obj::dir-' + Date.now().toString(); + + var hostCacheDir = '/docker_test_data'; + var localCachePath = path.join(localCacheDir, cacheName); + + if (fs.existsSync(localCachePath)) { + rmrf.sync(localCachePath); + } + + var cache = new VolumeCache({ + rootCachePath: localCacheDir, + log: log, + stats: stats + }); + + + assert.throws(cache.get(cacheName), Error); + + var dirExists = fs.existsSync(localCachePath); + + if (dirExists) { + rmrf.sync(localCachePath) + } + + assert.ok(!dirExists, + 'Volume cache created cached volume directory when it should not ' + + 'have.' + ); + })); }); From c63a62bfd5e3b640ef652ede8643451cdff743a0 Mon Sep 17 00:00:00 2001 From: Greg Arndt Date: Wed, 8 Oct 2014 17:08:38 -0500 Subject: [PATCH 14/15] Unique ids will be used for cache key names --- lib/task.js | 1 + lib/volume_cache.js | 65 +++++++++++-------- test/dockerworker.js | 3 +- .../container_volume_caching_test.js | 3 +- test/volume_cache_test.js | 65 +++++++++++++++---- 5 files changed, 94 insertions(+), 43 deletions(-) diff --git a/lib/task.js b/lib/task.js index 91eef109..dc61a9cb 100644 --- a/lib/task.js +++ b/lib/task.js @@ -158,6 +158,7 @@ Task.prototype = { this.volumeCaches = bindings[0]; procConfig.start.Binds = bindings[1]; } + return procConfig; }, diff --git a/lib/volume_cache.js b/lib/volume_cache.js index 5920fc92..240af8ad 100644 --- a/lib/volume_cache.js +++ b/lib/volume_cache.js @@ -1,19 +1,17 @@ var path = require('path'); -var mkdirp = require('mkdirp'); +var Promise = require('promise'); var fs = require('co-fs'); +var mkdirp = require('mkdirp'); var rmrf = require('rimraf'); -var Promise = require('promise'); +var uuid = require('uuid'); var KEY_DELIMITER = '::'; function removeDir(directory) { return new Promise(function(accept, reject) { rmrf(directory, function (error) { - if (error) { - return reject(error); - } else { - return accept(); - } + if (error) return reject(error); + accept(error); }); }); } @@ -21,15 +19,24 @@ function removeDir(directory) { function makeDir(directory) { return new Promise(function(accept, reject) { mkdirp(directory, function (error) { - if (error) { - return reject(error); - } else { - return accept(); - } + if (error) return reject(error); + accept(error); }); }); } +function sortInstanceIds(cache) { + var instanceIds = Object.keys(cache); + var sorted = instanceIds.sort(function (a, b) { + if (cache[a].lastUsed < cache[b].lastUsed) return 1; + if (cache[a].lastUsed > cache[b].lastUsed) return -1; + + return 0; + }); + + return sorted; +} + /** Cache manager for volumes that can be reused between containers. Cached volumes will be indexed based on timestamps and reused in the order of most recently used. @@ -54,7 +61,8 @@ VolumeCache.prototype = { @return {Object} Cached volume instance that is not mounted. */ add: function* (cacheName, instancePath) { - var instanceId = Date.now().toString(); + var instanceId = uuid.v4() + if (!instancePath) { var cachePath = path.join(this.rootCachePath, cacheName); instancePath = path.join(cachePath, instanceId); @@ -63,14 +71,21 @@ VolumeCache.prototype = { if (!(yield fs.exists(instancePath))) { yield makeDir(instancePath); } + + var lastUsed = Date.now(); + this.cache[cacheName][instanceId] = { path: instancePath, - mounted: false + mounted: false, + lastUsed: lastUsed }; // Create a cache key that can be used by consumers of the cache in the // forma of :: - var instance = {key: cacheName + KEY_DELIMITER + instanceId, path: instancePath}; + var instance = {key: cacheName + KEY_DELIMITER + instanceId, + path: instancePath, + lastUsed: lastUsed + }; return instance; }, @@ -138,12 +153,13 @@ VolumeCache.prototype = { if (!this.cache[cacheName]) { yield this.createCacheVolume(cacheName); } else { - var instanceIds = Object.keys(this.cache[cacheName]).sort().reverse(); + var instanceIds = sortInstanceIds(this.cache[cacheName]); for (var i = 0; i < instanceIds.length; i++) { var id = instanceIds[i]; if (!this.cache[cacheName][id].mounted) { instanceId = id; this.cache[cacheName][id].mounted = true; + this.cache[cacheName][id].lastUsed = Date.now(); break; } } @@ -156,14 +172,15 @@ VolumeCache.prototype = { logMessage = 'cache volume miss'; instance = yield this.add(cacheName); this.set(instance.key, {mounted: true}); - var statName = 'cache.volume.' + instance.key + 'miss'; + var statName = 'cache.volume.' + instance.key + '.miss'; this.stats.increment(statName); } else { logMessage = 'cache volume hit'; instance = {key: cacheName + KEY_DELIMITER + instanceId, - path: this.cache[cacheName][instanceId].path + path: this.cache[cacheName][instanceId].path, + lastUsed: this.cache[cacheName][instanceId].lastUsed }; - var statName = 'cache.volume.' + instance.key + 'hit'; + var statName = 'cache.volume.' + instance.key + '.hit'; this.stats.increment(statName); } this.log(logMessage, instance); @@ -178,14 +195,8 @@ VolumeCache.prototype = { @param {String} Cache key int he format of :: */ release: function* (cacheKey) { - var cacheName = cacheKey.split(KEY_DELIMITER)[0]; - var instanceId = cacheKey.split(KEY_DELIMITER)[1]; - var oldPath = this.cache[cacheName][instanceId].path; - // Remove the old cached volume and add a new unmounted one with an updated - // timestamp/id - delete this.cache[cacheName][instanceId]; - yield this.add(cacheName, oldPath); - this.log("cache volume release", {key: cacheKey, path: oldPath}); + this.set(cacheKey, {mounted: false, lastUsed: Date.now()}) + this.log("cache volume release", {key: cacheKey}); }, /** diff --git a/test/dockerworker.js b/test/dockerworker.js index 05bd7de4..2a08214e 100644 --- a/test/dockerworker.js +++ b/test/dockerworker.js @@ -25,7 +25,8 @@ var COPIED_ENV = [ 'AZURE_STORAGE_ACCOUNT', 'AZURE_STORAGE_ACCESS_KEY', 'TASKCLUSTER_CLIENT_ID', - 'TASKCLUSTER_ACCESS_TOKEN' + 'TASKCLUSTER_ACCESS_TOKEN', + 'DOCKER_WORKER_CACHE_DIR' ]; function eventPromise(listener, event) { diff --git a/test/integration/container_volume_caching_test.js b/test/integration/container_volume_caching_test.js index 808bf4c8..08c5f8e5 100644 --- a/test/integration/container_volume_caching_test.js +++ b/test/integration/container_volume_caching_test.js @@ -17,7 +17,8 @@ suite('container volume cache tests', function () { payload: { image: 'taskcluster/test-ubuntu', command: cmd( - 'echo "foo" > /tmp-obj-dir/foo.txt' + 'echo "foo" > /tmp-obj-dir/foo.txt', + 'ls /tmp-obj-dir' ), features: { // No need to actually issue live logging... diff --git a/test/volume_cache_test.js b/test/volume_cache_test.js index 890747eb..6a921fa1 100644 --- a/test/volume_cache_test.js +++ b/test/volume_cache_test.js @@ -1,5 +1,6 @@ suite('volume cache test', function () { var VolumeCache = require('../lib/volume_cache'); + var GarbageCollector = require('../lib/gc'); var createLogger = require('../lib/log'); var docker = require('../lib/docker')(); var waitForEvent = require('../lib/wait_for_event'); @@ -64,7 +65,7 @@ suite('volume cache test', function () { // Should reclaim cache directory path created by instance2 var instance4 = yield cache.get(cacheName); - assert.ok(instance2.key !== instance4.key); + assert.ok(instance2.key === instance4.key); assert.ok(instance2.path === instance4.path); if(fs.existsSync(fullPath)) { @@ -72,14 +73,40 @@ suite('volume cache test', function () { } })); - test('cache directory mounted in container', co(function* () { - // Test is currently setup using container volumes exposed via samba using - // boot2docker + test('most recently used unmounted cache instance is used', co(function* () { + var cache = new VolumeCache({ + rootCachePath: localCacheDir, + log: log, + stats: stats + }); var cacheName = 'tmp-obj-dir-' + Date.now().toString(); - // Location on the docker VM that the cache will exists and is expose via - // samba - var hostCacheDir = '/docker_test_data'; + var fullPath = path.join(localCacheDir, cacheName); + + var instance1 = yield cache.get(cacheName); + var instance2 = yield cache.get(cacheName); + var instance3 = yield cache.get(cacheName); + var instance4 = yield cache.get(cacheName); + + // Release claim on cached volume + yield cache.release(instance4.key); + yield cache.release(instance2.key); + + // Should reclaim cache directory path created by instance2 + var instance5 = yield cache.get(cacheName); + + assert.ok(instance5.key === instance2.key); + assert.ok(instance5.path === instance2.path); + assert.ok(instance5.lastUsed > instance2.lastUsed); + + if(fs.existsSync(fullPath)) { + rmrf.sync(fullPath); + } + })); + + + test('cache directory mounted in container', co(function* () { + var cacheName = 'tmp-obj-dir-' + Date.now().toString(); var cache = new VolumeCache({ rootCachePath: localCacheDir, @@ -87,6 +114,16 @@ suite('volume cache test', function () { stats: stats }); + var gc = new GarbageCollector({ + capacity: 1, + log: log, + docker: docker, + interval: 2 * 1000, + taskListener: {pending: 1} + }); + + clearTimeout(gc.sweepTimeoutId); + var localCachePath = path.join(localCacheDir, cacheName); if (fs.existsSync(localCachePath)) { @@ -107,24 +144,25 @@ suite('volume cache test', function () { AttachStderr:true, Tty: true }; - var hostObjPath = path.join( - hostCacheDir, - cacheName, - cacheInstance.key.split('::')[1] - ); + var create = yield docker.createContainer(createConfig); container = docker.getContainer(create.id); var stream = yield container.attach({stream: true, stdout: true, stderr: true}); stream.pipe(process.stdout); - var binds = hostObjPath + ':/docker_cache/tmp-obj-dir/'; + var binds = cacheInstance.path + ':/docker_cache/tmp-obj-dir/'; var startConfig = { Binds: binds, }; yield container.start(startConfig); + gc.removeContainer(create.id); + gc.sweep(); + removedContainerId = yield waitForEvent(gc, 'gc:container:removed'); + + console.log(cacheInstance); assert.ok(fs.existsSync(path.join(cacheInstance.path, 'blah.txt'))); @@ -136,7 +174,6 @@ suite('volume cache test', function () { test('invalid cache name is rejected', co(function* () { var cacheName = 'tmp-obj::dir-' + Date.now().toString(); - var hostCacheDir = '/docker_test_data'; var localCachePath = path.join(localCacheDir, cacheName); if (fs.existsSync(localCachePath)) { From 63555b62ebf3ec19ef0575d91d55ff118e721d7f Mon Sep 17 00:00:00 2001 From: Greg Arndt Date: Wed, 8 Oct 2014 21:09:03 -0500 Subject: [PATCH 15/15] Added tests for using the cache between tasks --- .../container_volume_caching_test.js | 104 +++++++++++++++++- test/volume_cache_test.js | 2 - 2 files changed, 103 insertions(+), 3 deletions(-) diff --git a/test/integration/container_volume_caching_test.js b/test/integration/container_volume_caching_test.js index 08c5f8e5..63aee7b7 100644 --- a/test/integration/container_volume_caching_test.js +++ b/test/integration/container_volume_caching_test.js @@ -1,10 +1,13 @@ -suite('container volume cache tests', function () { +suite('volume cache tests', function () { + var settings = require('../settings'); var co = require('co'); var cmd = require('./helper/cmd'); var fs = require('fs'); var rmrf = require('rimraf'); var path = require('path'); var testworker = require('../post_task'); + var DockerWorker = require('../dockerworker'); + var TestWorker = require('../testworker'); var cacheDir = process.env.DOCKER_WORKER_CACHE_DIR || '/var/cache'; @@ -45,6 +48,105 @@ suite('container volume cache tests', function () { } })); + test('mounted cached volumes are not reused between tasks', co(function* () { + var cacheName = 'tmp-obj-dir-' + Date.now().toString(); + var neededScope = 'docker-worker:cache:' + cacheName; + + settings.configure({ + capacity: 2, + }); + + worker = new TestWorker(DockerWorker); + yield worker.launch() + + var tasks = []; + + for (var i = 0; i < 2; i++) { + var fileName = 'file' + i.toString() + '.txt'; + var task = { + payload: { + image: 'taskcluster/test-ubuntu', + command: cmd( + 'echo "foo" > /tmp-obj-dir/' + fileName, + 'sleep 10', + 'ls -lah /tmp-obj-dir' + ), + features: { + // No need to actually issue live logging... + localLiveLog: true + }, + cache: {}, + maxRunTime: 60 * 60 + }, + scopes: [neededScope] + }; + task.payload.cache[cacheName] = '/tmp-obj-dir'; + + tasks.push(worker.postToQueue(task)); + } + + var results = yield tasks; + assert.ok(results.length === 2); + assert.ok(results[0].log.indexOf('file0.txt') !== -1); + assert.ok(results[0].log.indexOf('file1.txt') === -1); + assert.ok(results[1].log.indexOf('file1.txt') !== -1); + assert.ok(results[1].log.indexOf('file0.txt') === -1); + + yield worker.terminate(); + })); + + test('cached volumes can be reused between tasks', co(function* () { + var cacheName = 'tmp-obj-dir-' + Date.now().toString(); + var fullCacheDir = path.join(cacheDir, cacheName); + var neededScope = 'docker-worker:cache:' + cacheName; + + settings.configure({ + capacity: 2, + garbageCollection: { + imageExpiration: 2 * 60 * 60 * 1000, + interval: 500, + diskspaceThreshold: 10 * 1000000000, + dockerVolume: '/mnt' + }, + }); + + worker = new TestWorker(DockerWorker); + yield worker.launch() + + var task = { + payload: { + image: 'taskcluster/test-ubuntu', + command: cmd( + 'echo "This is a shared file." > /tmp-obj-dir/foo.txt' + ), + features: { + // No need to actually issue live logging... + localLiveLog: false + }, + cache: {}, + maxRunTime: 5 * 60 + }, + scopes: [neededScope] + }; + + task.payload.cache[cacheName] = '/tmp-obj-dir'; + + var result1 = yield worker.postToQueue(task); + + task.payload.command = cmd('cat /tmp-obj-dir/foo.txt'); + task.payload.features.localLiveLog = true; + + var result2 = yield worker.postToQueue(task); + assert.ok(result2.run.success, 'task was successful'); + assert.ok(result2.log.indexOf('This is a shared file') !== -1); + + yield worker.terminate(); + + if (fs.existsSync(fullCacheDir)) { + rmrf.sync(fullCacheDir); + } + })); + test('mount multiple cached volumes in docker worker', co(function* () { var cacheName1 = 'tmp-obj-dir-' + Date.now().toString(); var cacheName2 = 'tmp-obj-dir-' + (Date.now()+1).toString(); diff --git a/test/volume_cache_test.js b/test/volume_cache_test.js index 6a921fa1..51d887f4 100644 --- a/test/volume_cache_test.js +++ b/test/volume_cache_test.js @@ -162,8 +162,6 @@ suite('volume cache test', function () { gc.sweep(); removedContainerId = yield waitForEvent(gc, 'gc:container:removed'); - console.log(cacheInstance); - assert.ok(fs.existsSync(path.join(cacheInstance.path, 'blah.txt'))); if (fs.existsSync(localCachePath)) {