Skip to content
This repository has been archived by the owner on Feb 27, 2020. It is now read-only.

Commit

Permalink
Merge pull request #237 from taskcluster/artifact-rework
Browse files Browse the repository at this point in the history
Artifact rework
  • Loading branch information
jonasfj committed Apr 24, 2017
2 parents 5cf5c0f + b46a29c commit 78fdfef
Show file tree
Hide file tree
Showing 13 changed files with 496 additions and 174 deletions.
354 changes: 263 additions & 91 deletions plugins/artifacts/artifacts.go

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions plugins/artifacts/doc.go
@@ -0,0 +1,7 @@
// Package artifacts provides a taskcluster-worker plugin that uploads
// artifacts when sandbox execution has stopped.
package artifacts

import "github.com/taskcluster/taskcluster-worker/runtime/util"

var debug = util.Debug("artifacts")
9 changes: 7 additions & 2 deletions plugins/artifacts/payloadschema.go
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/taskcluster/taskcluster-worker/runtime/util"
)

type payloadType struct {
type payload struct {
Artifacts []artifact `json:"artifacts"`
}
type artifact struct {
Expand All @@ -17,6 +17,11 @@ type artifact struct {
Expires time.Time `json:"expires"`
}

const (
typeFile = "file"
typeDirectory = "directory"
)

var payloadSchema = schematypes.Object{
Properties: schematypes.Properties{
"artifacts": schematypes.Array{
Expand All @@ -35,7 +40,7 @@ var payloadSchema = schematypes.Object{
subdirectories
`),
},
Options: []string{"file", "directory"},
Options: []string{typeFile, typeDirectory},
},
"path": schematypes.String{
MetaData: schematypes.MetaData{
Expand Down
146 changes: 80 additions & 66 deletions plugins/env/env.go
@@ -1,81 +1,119 @@
// This plugins creates the environment variables setup in the payload
// env" section. It does its processing at BuildSandbox call. After the
// "BuildSandox" stage, all environment variables configured in the payload
// are available in the SandboxBuilder.

// Package env provides a taskcluster-worker plugin that injects environment
// variables into the task environments.
//
// This plugin supports per-task environment variables specified in
// task.payload.env, but also globally configured environment variables, which
// can be used to inject information such as instance type.
//
// Finally, this plugin will inject TASK_ID and RUN_ID as environment variables.
package env

import (
"strconv"

schematypes "github.com/taskcluster/go-schematypes"
"github.com/taskcluster/taskcluster-worker/engines"
"github.com/taskcluster/taskcluster-worker/plugins"
"github.com/taskcluster/taskcluster-worker/runtime"
"github.com/taskcluster/taskcluster-worker/runtime/util"
)

type plugin struct {
plugins.PluginBase
extraVars map[string]string
}

type payloadType struct {
type payload struct {
Env map[string]string `json:"env"`
}

type config struct {
Extra map[string]string `json:"extra"`
}

var configSchema = schematypes.Object{
Properties: schematypes.Properties{
"extra": schematypes.Map{
MetaData: schematypes.MetaData{
Title: "Extra environment variables",
Description: "This defines extra environment variables to add to the engine.",
},
Values: schematypes.String{},
},
},
type provider struct {
plugins.PluginProviderBase
}

type taskPlugin struct {
plugins.TaskPluginBase
variables map[string]string
}

func init() {
plugins.Register("env", &provider{})
}

var payloadSchema = schematypes.Object{
Properties: schematypes.Properties{
"env": schematypes.Map{
MetaData: schematypes.MetaData{
Title: "Environment Variables",
Description: "Mapping from environment variables to values",
func (p *provider) ConfigSchema() schematypes.Schema {
return schematypes.Object{
Properties: schematypes.Properties{
"extra": schematypes.Map{
MetaData: schematypes.MetaData{
Title: "Extra Environment Variables",
Description: util.Markdown(`
The 'extra' property holds a mapping from variable name to value.
These _extra_ environment variables will be injected into all tasks,
though they can be overwritten on per-task basis using the
'task.payload.env' property.
Notice that these overwrite built-in environment variables
'TASK_ID' and 'RUN_ID' which is also supplied by this plugin.
`),
},
Values: schematypes.String{},
},
Values: schematypes.String{},
},
},
}
}

func (*plugin) PayloadSchema() schematypes.Object {
return payloadSchema
func (p *provider) NewPlugin(options plugins.PluginOptions) (plugins.Plugin, error) {
var c config
schematypes.MustValidateAndMap(p.ConfigSchema(), options.Config, &c)

return &plugin{
extraVars: c.Extra,
}, nil
}

func (pl *plugin) NewTaskPlugin(options plugins.TaskPluginOptions) (plugins.TaskPlugin, error) {
p := payloadType{
// Must explicitly create an empty map, so if payload does not include
// env vars, we'll still have a valid map to read from/write to.
Env: map[string]string{},
func (p *plugin) PayloadSchema() schematypes.Object {
return schematypes.Object{
Properties: schematypes.Properties{
"env": schematypes.Map{
MetaData: schematypes.MetaData{
Title: "Environment Variables",
Description: "Mapping from environment variables to values",
},
Values: schematypes.String{},
},
},
}
schematypes.MustValidateAndMap(payloadSchema, options.Payload, &p)
}

func (p *plugin) NewTaskPlugin(options plugins.TaskPluginOptions) (plugins.TaskPlugin, error) {
var P payload
schematypes.MustValidateAndMap(p.PayloadSchema(), options.Payload, &P)

for k, v := range pl.extraVars {
p.Env[k] = v
env := make(map[string]string)

// Set built-in environment variables
env["TASK_ID"] = options.TaskContext.TaskID
env["RUN_ID"] = strconv.Itoa(options.TaskContext.RunID)

// Set variables as configured globally
for k, v := range p.extraVars {
env[k] = v
}
// Set variables as configured per-task (overwriting globally config vars)
for k, v := range P.Env {
env[k] = v
}

return &taskPlugin{
TaskPluginBase: plugins.TaskPluginBase{},
variables: p.Env,
variables: env,
}, nil
}

type taskPlugin struct {
plugins.TaskPluginBase
variables map[string]string
}

func (p *taskPlugin) BuildSandbox(sandboxBuilder engines.SandboxBuilder) error {
for k, v := range p.variables {
err := sandboxBuilder.SetEnvironmentVariable(k, v)
Expand All @@ -86,9 +124,7 @@ func (p *taskPlugin) BuildSandbox(sandboxBuilder engines.SandboxBuilder) error {
return runtime.NewMalformedPayloadError("Environment variable ", k, " has already been set.")
case engines.ErrFeatureNotSupported:
return runtime.NewMalformedPayloadError(
"Cannot set environment variable ",
k,
". Engine does not support this operation")
"Custom environment variables are not supported in the current configuration of the engine")
case nil:
// break
default:
Expand All @@ -98,25 +134,3 @@ func (p *taskPlugin) BuildSandbox(sandboxBuilder engines.SandboxBuilder) error {

return nil
}

type pluginProvider struct {
plugins.PluginProviderBase
}

func (*pluginProvider) NewPlugin(options plugins.PluginOptions) (plugins.Plugin, error) {
var c config
schematypes.MustValidateAndMap(configSchema, options.Config, &c)

return &plugin{
PluginBase: plugins.PluginBase{},
extraVars: c.Extra,
}, nil
}

func (*pluginProvider) ConfigSchema() schematypes.Schema {
return configSchema
}

func init() {
plugins.Register("env", &pluginProvider{})
}
56 changes: 56 additions & 0 deletions plugins/env/env_test.go
Expand Up @@ -3,6 +3,7 @@ package env
import (
"testing"

"github.com/taskcluster/slugid-go/slugid"
"github.com/taskcluster/taskcluster-worker/plugins/plugintest"
)

Expand Down Expand Up @@ -77,3 +78,58 @@ func TestEnvConfig(*testing.T) {
MatchLog: "env1",
}.Test()
}

func TestEnvOverwrites(*testing.T) {
plugintest.Case{
Payload: `{
"delay": 0,
"function": "print-env-var",
"argument": "ENV1",
"env": {
"ENV1" : "env2"
}
}`,
PluginConfig: `{
"extra": {
"ENV1": "env1"
}
}`,
Plugin: "env",
PluginSuccess: true,
EngineSuccess: true,
MatchLog: "env2",
}.Test()
}

func TestInjectsTaskID(*testing.T) {
TaskID := slugid.Nice()
plugintest.Case{
TaskID: TaskID,
Payload: `{
"delay": 0,
"function": "print-env-var",
"argument": "TASK_ID"
}`,
PluginConfig: `{}`,
Plugin: "env",
PluginSuccess: true,
EngineSuccess: true,
MatchLog: TaskID,
}.Test()
}

func TestInjectsRunID(*testing.T) {
plugintest.Case{
RunID: 7,
Payload: `{
"delay": 0,
"function": "print-env-var",
"argument": "RUN_ID"
}`,
PluginConfig: `{}`,
Plugin: "env",
PluginSuccess: true,
EngineSuccess: true,
MatchLog: "7",
}.Test()
}
22 changes: 14 additions & 8 deletions plugins/watchdog/watchdog_test.go
Expand Up @@ -34,9 +34,12 @@ func TestWatchdog(t *testing.T) {
time.Sleep(500 * time.Millisecond)
assert.False(t, lifeCycle.StoppingNow.IsDone())

// Sleep 1500ms is not okay, when timeout is configured to 1s
time.Sleep(1500 * time.Millisecond)
assert.True(t, lifeCycle.StoppingNow.IsDone())
// Sleeping > 1s is not okay, when timeout is configured to 1s
select {
case <-lifeCycle.StoppingNow.Done():
case <-time.After(5 * time.Second):
assert.Fail(t, "Expected watchdog plugin to initiate graceful shutdown")
}
}

func TestWatchdogRunningIgnored(t *testing.T) {
Expand All @@ -59,14 +62,17 @@ func TestWatchdogRunningIgnored(t *testing.T) {
tp.BuildSandbox(nil)
tp.Started(nil)

// Sleep 1500ms is okay because we're running
time.Sleep(1500 * time.Millisecond)
// Sleep 2s is okay because we're running
time.Sleep(2 * time.Second)
assert.False(t, lifeCycle.StoppingNow.IsDone())

// Stopped, Dispose or Exception should all do...
tp.Dispose()

// Sleep 1500ms is not okay, when timeout is configured to 1s
time.Sleep(1500 * time.Millisecond)
assert.True(t, lifeCycle.StoppingNow.IsDone())
// Sleeping > 1s is not okay, when timeout is configured to 1s
select {
case <-lifeCycle.StoppingNow.Done():
case <-time.After(5 * time.Second):
assert.Fail(t, "Expected watchdog plugin to initiate graceful shutdown")
}
}
5 changes: 5 additions & 0 deletions runtime/errors.go
Expand Up @@ -41,6 +41,11 @@ type MalformedPayloadError struct {
messages []string
}

// Messages returns a list of messages explaining why the error.
func (e MalformedPayloadError) Messages() []string {
return append([]string{}, e.messages...)
}

// Error returns the error message and adheres to the Error interface
func (e MalformedPayloadError) Error() string {
return fmt.Sprintf("malformed-payload error: %s", strings.Join(e.messages, "\n"))
Expand Down
31 changes: 31 additions & 0 deletions runtime/util/parallel.go
Expand Up @@ -37,3 +37,34 @@ func Spawn(N int, fn func(i int)) {
}
wg.Wait()
}

// SpawnWithLimit N go routines running at-most limit in parallel and wait for
// them to return.
//
// This utility is smart when instantiating elements in an array concurrently.
func SpawnWithLimit(N, limit int, fn func(i int)) {
wg := sync.WaitGroup{}
wg.Add(N)
m := sync.Mutex{}
c := sync.NewCond(&m)
for index := 0; index < N; index++ {
go func(i int) {
defer wg.Done()

m.Lock()
defer m.Unlock()

for limit == 0 {
c.Wait()
}
limit--
defer func() {
limit++
c.Signal()
}()

fn(i)
}(index)
}
wg.Wait()
}

0 comments on commit 78fdfef

Please sign in to comment.