Skip to content
This repository has been archived by the owner on May 9, 2020. It is now read-only.

Commit

Permalink
Merge pull request #49 from gregarndt/cached_folders
Browse files Browse the repository at this point in the history
Cached folders
  • Loading branch information
gregarndt committed Oct 9, 2014
2 parents 6e8e061 + 63555b6 commit 2005a30
Show file tree
Hide file tree
Showing 15 changed files with 908 additions and 46 deletions.
21 changes: 18 additions & 3 deletions bin/worker.js
Expand Up @@ -6,12 +6,13 @@ var loadConfig = require('taskcluster-base/config');
var createLogger = require('../lib/log');
var debug = require('debug')('docker-worker:bin:worker');

var SDC = require('statsd-client')
var SDC = require('statsd-client');
var Runtime = require('../lib/runtime');
var TaskListener = require('../lib/task_listener');
var ShutdownManager = require('../lib/shutdown_manager');
var Stats = require('../lib/stat');
var GarbageCollector = require('../lib/gc');
var VolumeCache = require('../lib/volume_cache');

// Available target configurations.
var allowedHosts = ['aws', 'test'];
Expand Down Expand Up @@ -83,7 +84,7 @@ co(function *() {

// Load all base configuration that is on disk / environment variables /
// flags.
var config = yield workerConf.load.bind(workerConf)
var config = yield workerConf.load.bind(workerConf);

// Use a target specific configuration helper if available.
var host;
Expand Down Expand Up @@ -158,10 +159,24 @@ co(function *() {
var gcConfig = config.garbageCollection;
gcConfig.capacity = config.capacity,
gcConfig.docker = config.docker;
gcConfig.log = config.log
gcConfig.log = config.log;

config.gc = new GarbageCollector(gcConfig);

config.volumeCache = new VolumeCache({
rootCachePath: config.cache.volumeCachePath,
log: config.log,
stats: config.stats
});

config.gc.on('gc:container:removed', function (container) {
container.caches.forEach(co(function* (cacheKey) {
yield config.volumeCache.release(cacheKey);
}));
});

config.gc.addManager(config.volumeCache);

var runtime = new Runtime(config);

// Build the listener and connect to the queue.
Expand Down
4 changes: 4 additions & 0 deletions config/defaults.js
Expand Up @@ -20,6 +20,10 @@ module.exports = {
dockerVolume: '/mnt'
},

cache: {
volumeCachePath: '/mnt/var/cache/docker-worker'
},

logging: {
liveLogChunkInterval: 5000, // 5 seconds
// Added to the current date to make up the expiry time for logs. This is
Expand Down
4 changes: 4 additions & 0 deletions config/test.js
Expand Up @@ -11,5 +11,9 @@ module.exports = {
// Expires one hour from now so test logs don't live too long...
liveLogExpires: 3600,
bulkLogExpires: 3600
},

cache: {
volumeCachePath: process.env.DOCKER_WORKER_CACHE_DIR || '/var/cache'
}
};
1 change: 1 addition & 0 deletions deploy/template/etc/default/docker-aufs
Expand Up @@ -20,5 +20,6 @@ mount /dev/instance_storage/all /mnt

mkdir -p /mnt/var/lib/docker
mkdir -p /mnt/docker-tmp
mkdir -p /mnt/var/cache/docker-worker
export TMPDIR="/mnt/docker-tmp"
export DOCKER_OPTS=" -g /mnt/var/lib/docker -s aufs"
1 change: 1 addition & 0 deletions deploy/template/etc/default/docker-btrfs
Expand Up @@ -15,6 +15,7 @@ mount $(echo $devices | cut -d ' ' -f 1) /mnt

mkdir -p /mnt/var/lib/docker
mkdir -p /mnt/docker-tmp
mkdir -p /mnt/var/cache/docker-worker
export TMPDIR="/mnt/docker-tmp"
export DOCKER_OPTS=" -g /mnt/var/lib/docker -s btrfs"

51 changes: 38 additions & 13 deletions lib/gc.js
Expand Up @@ -70,12 +70,17 @@ function GarbageCollector(config) {
this.markedImages = {};
this.retries = 5;
this.scheduleSweep(this.interval);
this.managers = [];
EventEmitter.call(this);
}

GarbageCollector.prototype = {
__proto__: EventEmitter.prototype,

addManager: function(manager) {
this.managers.push(manager);
},

markImage: function(image) {
var parsedImage = parseImage(image);
var repository = parsedImage.repository;
Expand All @@ -97,42 +102,58 @@ GarbageCollector.prototype = {
}.bind(this));
},

removeContainer: function (containerId) {
this.markedContainers[containerId] = this.retries;
removeContainer: function (containerId, volumeCaches) {
this.markedContainers[containerId] = {
retries: this.retries,
caches: volumeCaches || []
};
this.emit('gc:container:marked', containerId);
},

removeContainers: function* () {
for (var containerId in this.markedContainers) {
// If a container can't be removed after 5 tries, more tries won't help
if (this.markedContainers[containerId] !== 0) {
if (this.markedContainers[containerId].retries !== 0) {
var c = this.docker.getContainer(containerId);
var caches = this.markedContainers[containerId].caches

try {
// Even running containers should be removed otherwise shouldn't have
// been marked for removal.
yield c.remove({force: true});
delete this.markedContainers[containerId];
this.emit('gc:container:removed', containerId);
this.log('container removed', {container: containerId});

this.emit('gc:container:removed', {
id: containerId,
caches: caches
});
this.log('container removed', {
container: containerId,
caches: caches
});
} catch(e) {
var message = e;
if (e.reason === 'no such container') {
message = 'No such container. Will remove from marked ' +
'containers list.';
delete this.markedContainers[containerId];
delete this.markedContainers[containerId];

message = 'No such container. Will remove from marked ' +
'containers list.';
this.emit('gc:container:removed', {
id: containerId,
caches: caches
});
} else {
this.markedContainers[containerId] -= 1;
this.markedContainers[containerId].retries -= 1;
}

this.emit('gc:error', {message: message, container: containerId});
this.emit('gc:container:error', {message: message, container: containerId});
this.log('container removal error.',
{container: containerId, err: message});
}
} else {
delete this.markedContainers[containerId];
this.ignoredContainers.push(containerId);
this.emit('gc:error',
this.emit('gc:container:error',
{message: 'Retry limit exceeded', container: containerId});
this.log('container removal error',
{container: containerId, err: 'Retry limit exceeded'});
Expand Down Expand Up @@ -207,17 +228,21 @@ GarbageCollector.prototype = {
(this.capacity - this.taskListener.pending),
this.log);
if (exceedsThreshold) {
this.emit('gc:warning',
this.emit('gc:diskspace:warning',
{message: 'Diskspace threshold reached. ' +
'Removing all non-running images.'
});
} else {
this.emit('gc:info',
this.emit('gc:diskspace:info',
{message: 'Diskspace threshold not reached. ' +
'Removing only expired images.'
});
}
yield this.removeUnusedImages(exceedsThreshold);

for (var i = 0; i < this.managers.length; i++) {
yield this.managers[i].clear(exceedsThreshold);
}
}

this.log('garbage collection finished');
Expand Down
72 changes: 64 additions & 8 deletions lib/task.js
Expand Up @@ -71,6 +71,38 @@ function buildStateHandlers(task) {
return new States(handlers);
}

/**
Create a list of cached volumes that will be mounted within the docker container.
@param {object} volume cache
@param {object} volumes to mount in the container
*/
function* buildVolumeBindings(taskVolumeBindings, volumeCache, taskScopes) {
var neededScopes = [];

for (var volumeName in taskVolumeBindings) {
neededScopes.push('docker-worker:cache:' + volumeName);
}

if (!scopeMatch(taskScopes, neededScopes)) {
throw new Error(
'Insufficient scopes to attach "' + volumeName + '" as a cached ' +
'volume. Try adding ' + neededScopes + ' to the .scopes array.'
);
}

var bindings = [];
var caches = [];

for (var volumeName in taskVolumeBindings) {
var cacheInstance = yield volumeCache.get(volumeName);
var binding = cacheInstance.path + ':' + taskVolumeBindings[volumeName];
bindings.push(binding);
caches.push(cacheInstance.key);
}
return [caches, bindings];
}

function Task(runtime, runId, task, status) {
this.runId = runId;
this.task = task;
Expand All @@ -89,7 +121,7 @@ Task.prototype = {
@param {Array[dockerode.Container]} [links] list of dockerode containers.
*/
dockerConfig: function(links) {
dockerConfig: function* (links) {
var config = this.task.payload;
var env = config.env || {};

Expand All @@ -112,14 +144,21 @@ Task.prototype = {
StdinOnce: false,
Env: taskEnvToDockerEnv(env)
}
}
};

if (links) {
procConfig.start.Links = links.map(function(link) {
return link.name + ':' + link.alias;
});
}

if (this.task.payload.cache) {
var bindings = yield buildVolumeBindings(this.task.payload.cache,
this.runtime.volumeCache, this.task.scopes);
this.volumeCaches = bindings[0];
procConfig.start.Binds = bindings[1];
}

return procConfig;
},

Expand Down Expand Up @@ -174,7 +213,7 @@ Task.prototype = {
*/
scheduleReclaim: function* (claim) {
// Figure out when to issue the next claim...
var takenUntil = (new Date(claim.takenUntil) - new Date())
var takenUntil = (new Date(claim.takenUntil) - new Date());
var nextClaim = takenUntil / this.runtime.task.reclaimDivisor;

// This is tricky ensure we have logs...
Expand Down Expand Up @@ -325,10 +364,6 @@ Task.prototype = {
var links =
yield* stats.timeGen('tasks.time.states.linked', this.states.link(this));

var dockerProc = this.dockerProcess = new DockerProc(
this.runtime.docker, this.dockerConfig(links)
);

// Hooks prior to running the task.
yield stats.timeGen('tasks.time.states.created', this.states.created(this));

Expand Down Expand Up @@ -384,7 +419,28 @@ Task.prototype = {
}
gc.markImage(this.task.payload.image);

try {
var dockerConfig = yield this.dockerConfig(links);
} catch (e) {
this.stream.write(this.fmtLog('Docker configuration could not be ' +
'created. This may indicate an authentication error when validating ' +
'scopes necessary for using caches. \n Error %s', e));

yield this.stream.end.bind(this.stream, this.logFooter(
false, // unsuccessful task
-1, // negative exit code indicates infrastructure errors usually.
taskStart, // duration details...
new Date()
));

yield stats.timeGen(
'tasks.time.states.docker_configuration.killed', this.states.killed(this)
);
return false;
}

var dockerProc = this.dockerProcess = new DockerProc(
this.runtime.docker, dockerConfig);
// Now that we know the stream is ready pipe data into it...
dockerProc.stdout.pipe(this.stream, {
end: false
Expand Down Expand Up @@ -440,7 +496,7 @@ Task.prototype = {
yield this.stream.end.bind(this.stream);

// Garbage collect containers
gc.removeContainer(dockerProc.container.id);
gc.removeContainer(dockerProc.container.id, this.volumeCaches);
yield stats.timeGen('tasks.time.states.killed', this.states.killed(this));

// If the results validation failed we consider this task failure.
Expand Down
1 change: 1 addition & 0 deletions lib/task_listener.js
Expand Up @@ -90,6 +90,7 @@ TaskListener.prototype = {
taskId: content.status.taskId,
runId: content.runId,
message: e.toString(),
stack: e.stack,
err: e
});
} else {
Expand Down

0 comments on commit 2005a30

Please sign in to comment.