Skip to content
This repository has been archived by the owner on Feb 27, 2020. It is now read-only.

Commit

Permalink
Merge branch 'master' of github.com:taskcluster/taskcluster-worker in…
Browse files Browse the repository at this point in the history
…to http-server

Conflicts:
	runtime/taskcontext.go
  • Loading branch information
jonasfj committed Mar 8, 2016
2 parents f5ecfd7 + 6510a6c commit f8ebef0
Show file tree
Hide file tree
Showing 26 changed files with 1,580 additions and 458 deletions.
32 changes: 24 additions & 8 deletions config/config.go
Expand Up @@ -27,11 +27,14 @@ type (
// The Client ID for the client. Not very helpful, am I?
//
// Syntax: ^[A-Za-z0-9@/:._-]+$
ClientId string `json:"clientId"`
ClientID string `json:"clientID"`
} `json:"credentials"`

// The amount of time to wait between task polling iterations
PollingInterval int `json:"pollingInterval"`

// The provisioner (if any) that is responsible for spawning instances of this worker. Typically `aws-provisioner-v1`.
ProvisionerId string `json:"provisionerId"`
ProvisionerID string `json:"provisionerID"`

// Configuration relating to the polling of the TaskCluster Queue.
QueueService struct {
Expand All @@ -44,7 +47,10 @@ type (
WorkerGroup string `json:"workerGroup"`

// A unique name that can be used to identify which worker instance this is (such as AWS instance id).
WorkerId string `json:"workerId"`
WorkerID string `json:"workerID"`

// Type of worker pool the worker belongs to.
WorkerType string `json:"workerType"`
}
)

Expand Down Expand Up @@ -76,7 +82,7 @@ var ConfigSchema = func() runtime.CompositeSchema {
"title": "Certificate",
"type": "string"
},
"clientId": {
"clientID": {
"description": "The Client ID for the client. Not very helpful, am I?",
"pattern": "^[A-Za-z0-9@/:._-]+$",
"title": "ClientId",
Expand All @@ -89,9 +95,14 @@ var ConfigSchema = func() runtime.CompositeSchema {
"title": "Credentials",
"type": "object"
},
"provisionerId": {
"pollingInterval": {
"description": "The amount of time to wait between task polling iterations",
"title": "PollingInterval",
"type": "integer"
},
"provisionerID": {
"description": "The provisioner (if any) that is responsible for spawning instances of this worker. Typically `+"`"+`aws-provisioner-v1`+"`"+`.",
"title": "ProvisionerId",
"title": "ProvisionerID",
"type": "string"
},
"queueService": {
Expand All @@ -114,9 +125,14 @@ var ConfigSchema = func() runtime.CompositeSchema {
"title": "WorkerGroup",
"type": "string"
},
"workerId": {
"workerID": {
"description": "A unique name that can be used to identify which worker instance this is (such as AWS instance id).",
"title": "WorkerId",
"title": "WorkerID",
"type": "string"
},
"workerType": {
"description": "Type of worker pool the worker belongs to.",
"title": "WorkerType",
"type": "string"
}
},
Expand Down
19 changes: 14 additions & 5 deletions config/global-config.yml
Expand Up @@ -5,14 +5,18 @@ description: |-
to any engine or plugin.
type: object
properties:
pollingInterval:
title: "PollingInterval"
description: "The amount of time to wait between task polling iterations"
type: "integer"
credentials:
title: Credentials
description: |-
The set of credentials that should be used by the worker when
authenticating against taskcluster endpoints.
type: object
properties:
clientId:
clientID:
title: ClientId
description: |-
The Client ID for the client. Not very helpful, am I?
Expand All @@ -31,8 +35,8 @@ properties:
type: string
required:
- clientId, accessToken, certificate
provisionerId:
title: ProvisionerId
provisionerID:
title: ProvisionerID
description: |-
The provisioner (if any) that is responsible for spawning instances of this worker. Typically `aws-provisioner-v1`.
type: string
Expand All @@ -41,8 +45,13 @@ properties:
description: |-
A logical group for this worker to belong to, such as an AWS region.
type: string
workerId:
title: WorkerId
workerType:
title: WorkerType
description: |-
Type of worker pool the worker belongs to.
type: string
workerID:
title: WorkerID
description: |-
A unique name that can be used to identify which worker instance this is (such as AWS instance id).
type: string
Expand Down
23 changes: 23 additions & 0 deletions engines/errors.go
Expand Up @@ -79,6 +79,7 @@ type MalformedPayloadError struct {
message string
}

// Error returns the error message and adheres to the Error interface
func (e MalformedPayloadError) Error() string {
return e.message
}
Expand All @@ -92,3 +93,25 @@ func (e MalformedPayloadError) Error() string {
func NewMalformedPayloadError(a ...interface{}) MalformedPayloadError {
return MalformedPayloadError{message: fmt.Sprint(a...)}
}

// InternalError are errors that could not be completed because of issues related to the
// host. These issues could include issues with the engine, host resources, and worker
// configuration.
type InternalError struct {
message string
}

// Error returns the error message and adheres to the Error interface
func (e InternalError) Error() string {
return e.message
}

// NewInternalError creates an InternalError object, please
// make sure to include a detailed description of the error, preferably using
// multiple lines and with examples.
//
// These will be printed in the logs and end-users will rely on them to debug
// their tasks.
func NewInternalError(message string) InternalError {
return InternalError{message: message}
}
3 changes: 3 additions & 0 deletions engines/extpoints/extpoints.go
Expand Up @@ -55,6 +55,7 @@ func UnregisterExtension(name string) []string {
return ifaces
}


// Base extension point

type extensionPoint struct {
Expand Down Expand Up @@ -175,3 +176,5 @@ func (ep *engineProviderExt) Names() []string {
}
return names
}


2 changes: 0 additions & 2 deletions engines/mock/mockengine.go
Expand Up @@ -5,7 +5,6 @@
package mockengine

import (
"fmt"
"net/http"

"github.com/Sirupsen/logrus"
Expand All @@ -27,7 +26,6 @@ func init() {
}

func (e engineProvider) NewEngine(options extpoints.EngineOptions) (engines.Engine, error) {
fmt.Println(options.Log)
return engine{Log: options.Log}, nil
}

Expand Down
33 changes: 23 additions & 10 deletions engines/mock/mocksandbox.go
Expand Up @@ -2,6 +2,7 @@ package mockengine

import (
"bytes"
"errors"
"io"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -36,13 +37,15 @@ type sandbox struct {
proxies map[string]http.Handler
files map[string][]byte
result bool
done chan struct{}
}

///////////////////////////// Implementation of SandboxBuilder interface

func (s *sandbox) StartSandbox() (engines.Sandbox, error) {
s.Lock()
defer s.Unlock()
s.done = make(chan struct{})
return s, nil
}

Expand Down Expand Up @@ -170,17 +173,27 @@ var functions = map[string]func(*sandbox, string) bool{

func (s *sandbox) WaitForResult() (engines.ResultSet, error) {
// No need to lock access to payload, as it can't be mutated at this point
time.Sleep(time.Duration(s.payload.Delay) * time.Millisecond)
// No need to lock access mounts and proxies either
f := functions[s.payload.Function]
if f == nil {
return nil, engines.NewMalformedPayloadError("Unknown function")
select {
case <-s.done:
s.result = false
return s, errors.New("Task execution has been aborted")
case <-time.After(time.Duration(s.payload.Delay) * time.Millisecond):
// No need to lock access mounts and proxies either
f := functions[s.payload.Function]
if f == nil {
return nil, engines.NewMalformedPayloadError("Unknown function")
}
result := f(s, s.payload.Argument)
s.Lock()
defer s.Unlock()
s.result = result
return s, nil
}
result := f(s, s.payload.Argument)
s.Lock()
defer s.Unlock()
s.result = result
return s, nil
}

func (s *sandbox) Abort() error {
close(s.done)
return nil
}

func (s *sandbox) ExtractFile(path string) (io.ReadCloser, error) {
Expand Down
49 changes: 36 additions & 13 deletions main.go
Expand Up @@ -7,12 +7,15 @@ package main
import (
"fmt"
"os"
"path/filepath"

"github.com/Sirupsen/logrus"
"github.com/docopt/docopt-go"
"github.com/taskcluster/slugid-go/slugid"
"github.com/taskcluster/taskcluster-worker/config"
"github.com/taskcluster/taskcluster-worker/engines/extpoints"
"github.com/taskcluster/taskcluster-worker/runtime"
"github.com/taskcluster/taskcluster-worker/taskmgr"
"github.com/taskcluster/taskcluster-worker/worker"
)

const version = "taskcluster-worker 0.0.1"
Expand Down Expand Up @@ -61,10 +64,15 @@ func main() {
logger.Fatalf("Must supply a valid engine. Supported Engines %v", engineNames)
}

runtimeEnvironment := runtime.Environment{Log: logger}
tempPath := filepath.Join(os.TempDir(), slugid.V4())
tempStorage, err := runtime.NewTemporaryStorage(tempPath)
runtimeEnvironment := &runtime.Environment{
Log: logger,
TemporaryStorage: tempStorage,
}

engine, err := engineProvider.NewEngine(extpoints.EngineOptions{
Environment: &runtimeEnvironment,
Environment: runtimeEnvironment,
Log: logger.WithField("engine", engineName),
})
if err != nil {
Expand All @@ -76,25 +84,40 @@ func main() {
Credentials: struct {
AccessToken string `json:"accessToken"`
Certificate string `json:"certificate"`
ClientId string `json:"clientId"`
ClientID string `json:"clientID"`
}{
AccessToken: "123",
Certificate: "",
ClientId: "abc",
AccessToken: os.Getenv("TASKCLUSTER_ACCESS_TOKEN"),
Certificate: os.Getenv("TASKCLUSTER_CERTIFICATE"),
ClientID: os.Getenv("TASKCLUSTER_CLIENT_ID"),
},
Capacity: 5,
ProvisionerId: "tasckluster-worker-provisioner",
WorkerGroup: "taskcluster-worker-test-worker-group",
WorkerId: "taskcluster-worker-test-worker",
ProvisionerID: "dummy-test-provisioner",
WorkerGroup: "dummy-test-group",
WorkerType: "dummy-test-type",
WorkerID: "dummy-test-worker",
QueueService: struct {
ExpirationOffset int `json:"expirationOffset"`
}{
ExpirationOffset: 300,
},
PollingInterval: 10,
}

taskManager := taskmgr.New(config, &engine, logger.WithField("component", "Task Manager"))
l := logger.WithFields(logrus.Fields{
"workerID": config.WorkerID,
"workerType": config.WorkerType,
"workerGroup": config.WorkerGroup,
"provisionerID": config.ProvisionerID,
})

w, err := worker.New(config, engine, runtimeEnvironment, l)
if err != nil {
logger.Fatalf("Could not create worker. %s", err)
}

runtimeEnvironment.Log.Debugf("Created taskManager %+v", taskManager)
runtimeEnvironment.Log.Info("Worker started up")
err = w.Start()
if err != nil {
os.Stderr.WriteString(err.Error())
os.Exit(1)
}
}
3 changes: 3 additions & 0 deletions plugins/extpoints/extpoints.go
Expand Up @@ -55,6 +55,7 @@ func UnregisterExtension(name string) []string {
return ifaces
}


// Base extension point

type extensionPoint struct {
Expand Down Expand Up @@ -175,3 +176,5 @@ func (ep *pluginProviderExt) Names() []string {
}
return names
}


6 changes: 3 additions & 3 deletions plugins/extpoints/interfaces.go
Expand Up @@ -13,9 +13,9 @@ import (
// We wrap all arguments so that we can add additional properties without
// breaking source compatibility with older plugins.
type PluginOptions struct {
environment *runtime.Environment
engine *engines.Engine
log *logrus.Entry
Environment *runtime.Environment
Engine *engines.Engine
Log *logrus.Entry
}

// The PluginProvider interface must be implemented and registered by anyone
Expand Down
8 changes: 4 additions & 4 deletions plugins/extpoints/manager.go
Expand Up @@ -69,9 +69,9 @@ func NewPluginManager(pluginsToLoad []string, options PluginOptions) (plugins.Pl
return nil, errors.New("Missing plugin")
}
plugin, err := pluginProvider.NewPlugin(PluginOptions{
environment: options.environment,
engine: options.engine,
log: options.log.WithField("plugin", p),
Environment: options.Environment,
Engine: options.Engine,
Log: options.Log.WithField("plugin", p),
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -227,7 +227,7 @@ func (m *taskPluginManager) Finished(s bool) error {
return waitForErrors(errors, len(m.taskPlugins))
}

func (m *taskPluginManager) Exception(r plugins.ExceptionReason) error {
func (m *taskPluginManager) Exception(r runtime.ExceptionReason) error {
// Sanity check that no two methods on plugin is running in parallel, this way
// plugins don't have to be thread-safe, and we ensure nothing is called after
// Dispose() has been called.
Expand Down

0 comments on commit f8ebef0

Please sign in to comment.