Skip to content
This repository was archived by the owner on Feb 27, 2020. It is now read-only.

Commit 6510a6c

Browse files
committed
Rebased
1 parent 36cf89e commit 6510a6c

26 files changed

+1539
-418
lines changed

config/config.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,14 @@ type (
2727
// The Client ID for the client. Not very helpful, am I?
2828
//
2929
// Syntax: ^[A-Za-z0-9@/:._-]+$
30-
ClientID string `json:"clientId"`
30+
ClientID string `json:"clientID"`
3131
} `json:"credentials"`
3232

33+
// The amount of time to wait between task polling iterations
34+
PollingInterval int `json:"pollingInterval"`
35+
3336
// The provisioner (if any) that is responsible for spawning instances of this worker. Typically `aws-provisioner-v1`.
34-
ProvisionerID string `json:"provisionerId"`
37+
ProvisionerID string `json:"provisionerID"`
3538

3639
// Configuration relating to the polling of the TaskCluster Queue.
3740
QueueService struct {
@@ -44,7 +47,10 @@ type (
4447
WorkerGroup string `json:"workerGroup"`
4548

4649
// A unique name that can be used to identify which worker instance this is (such as AWS instance id).
47-
WorkerID string `json:"workerId"`
50+
WorkerID string `json:"workerID"`
51+
52+
// Type of worker pool the worker belongs to.
53+
WorkerType string `json:"workerType"`
4854
}
4955
)
5056

@@ -76,7 +82,7 @@ var ConfigSchema = func() runtime.CompositeSchema {
7682
"title": "Certificate",
7783
"type": "string"
7884
},
79-
"clientId": {
85+
"clientID": {
8086
"description": "The Client ID for the client. Not very helpful, am I?",
8187
"pattern": "^[A-Za-z0-9@/:._-]+$",
8288
"title": "ClientId",
@@ -89,9 +95,14 @@ var ConfigSchema = func() runtime.CompositeSchema {
8995
"title": "Credentials",
9096
"type": "object"
9197
},
92-
"provisionerId": {
98+
"pollingInterval": {
99+
"description": "The amount of time to wait between task polling iterations",
100+
"title": "PollingInterval",
101+
"type": "integer"
102+
},
103+
"provisionerID": {
93104
"description": "The provisioner (if any) that is responsible for spawning instances of this worker. Typically `+"`"+`aws-provisioner-v1`+"`"+`.",
94-
"title": "ProvisionerId",
105+
"title": "ProvisionerID",
95106
"type": "string"
96107
},
97108
"queueService": {
@@ -114,9 +125,14 @@ var ConfigSchema = func() runtime.CompositeSchema {
114125
"title": "WorkerGroup",
115126
"type": "string"
116127
},
117-
"workerId": {
128+
"workerID": {
118129
"description": "A unique name that can be used to identify which worker instance this is (such as AWS instance id).",
119-
"title": "WorkerId",
130+
"title": "WorkerID",
131+
"type": "string"
132+
},
133+
"workerType": {
134+
"description": "Type of worker pool the worker belongs to.",
135+
"title": "WorkerType",
120136
"type": "string"
121137
}
122138
},

config/global-config.yml

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,18 @@ description: |-
55
to any engine or plugin.
66
type: object
77
properties:
8+
pollingInterval:
9+
title: "PollingInterval"
10+
description: "The amount of time to wait between task polling iterations"
11+
type: "integer"
812
credentials:
913
title: Credentials
1014
description: |-
1115
The set of credentials that should be used by the worker when
1216
authenticating against taskcluster endpoints.
1317
type: object
1418
properties:
15-
clientId:
19+
clientID:
1620
title: ClientId
1721
description: |-
1822
The Client ID for the client. Not very helpful, am I?
@@ -31,8 +35,8 @@ properties:
3135
type: string
3236
required:
3337
- clientId, accessToken, certificate
34-
provisionerId:
35-
title: ProvisionerId
38+
provisionerID:
39+
title: ProvisionerID
3640
description: |-
3741
The provisioner (if any) that is responsible for spawning instances of this worker. Typically `aws-provisioner-v1`.
3842
type: string
@@ -41,8 +45,13 @@ properties:
4145
description: |-
4246
A logical group for this worker to belong to, such as an AWS region.
4347
type: string
44-
workerId:
45-
title: WorkerId
48+
workerType:
49+
title: WorkerType
50+
description: |-
51+
Type of worker pool the worker belongs to.
52+
type: string
53+
workerID:
54+
title: WorkerID
4655
description: |-
4756
A unique name that can be used to identify which worker instance this is (such as AWS instance id).
4857
type: string

engines/errors.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ type MalformedPayloadError struct {
7979
message string
8080
}
8181

82+
// Error returns the error message and adheres to the Error interface
8283
func (e MalformedPayloadError) Error() string {
8384
return e.message
8485
}
@@ -92,3 +93,25 @@ func (e MalformedPayloadError) Error() string {
9293
func NewMalformedPayloadError(a ...interface{}) MalformedPayloadError {
9394
return MalformedPayloadError{message: fmt.Sprint(a...)}
9495
}
96+
97+
// InternalError are errors that could not be completed because of issues related to the
98+
// host. These issues could include issues with the engine, host resources, and worker
99+
// configuration.
100+
type InternalError struct {
101+
message string
102+
}
103+
104+
// Error returns the error message and adheres to the Error interface
105+
func (e InternalError) Error() string {
106+
return e.message
107+
}
108+
109+
// NewInternalError creates an InternalError object, please
110+
// make sure to include a detailed description of the error, preferably using
111+
// multiple lines and with examples.
112+
//
113+
// These will be printed in the logs and end-users will rely on them to debug
114+
// their tasks.
115+
func NewInternalError(message string) InternalError {
116+
return InternalError{message: message}
117+
}

engines/extpoints/extpoints.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func UnregisterExtension(name string) []string {
5555
return ifaces
5656
}
5757

58+
5859
// Base extension point
5960

6061
type extensionPoint struct {
@@ -175,3 +176,5 @@ func (ep *engineProviderExt) Names() []string {
175176
}
176177
return names
177178
}
179+
180+

engines/mock/mockengine.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package mockengine
66

77
import (
8-
"fmt"
98
"net/http"
109

1110
"github.com/Sirupsen/logrus"
@@ -27,7 +26,6 @@ func init() {
2726
}
2827

2928
func (e engineProvider) NewEngine(options extpoints.EngineOptions) (engines.Engine, error) {
30-
fmt.Println(options.Log)
3129
return engine{Log: options.Log}, nil
3230
}
3331

engines/mock/mocksandbox.go

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package mockengine
22

33
import (
44
"bytes"
5+
"errors"
56
"io"
67
"io/ioutil"
78
"net/http"
@@ -36,13 +37,15 @@ type sandbox struct {
3637
proxies map[string]http.Handler
3738
files map[string][]byte
3839
result bool
40+
done chan struct{}
3941
}
4042

4143
///////////////////////////// Implementation of SandboxBuilder interface
4244

4345
func (s *sandbox) StartSandbox() (engines.Sandbox, error) {
4446
s.Lock()
4547
defer s.Unlock()
48+
s.done = make(chan struct{})
4649
return s, nil
4750
}
4851

@@ -170,17 +173,27 @@ var functions = map[string]func(*sandbox, string) bool{
170173

171174
func (s *sandbox) WaitForResult() (engines.ResultSet, error) {
172175
// No need to lock access to payload, as it can't be mutated at this point
173-
time.Sleep(time.Duration(s.payload.Delay) * time.Millisecond)
174-
// No need to lock access mounts and proxies either
175-
f := functions[s.payload.Function]
176-
if f == nil {
177-
return nil, engines.NewMalformedPayloadError("Unknown function")
176+
select {
177+
case <-s.done:
178+
s.result = false
179+
return s, errors.New("Task execution has been aborted")
180+
case <-time.After(time.Duration(s.payload.Delay) * time.Millisecond):
181+
// No need to lock access mounts and proxies either
182+
f := functions[s.payload.Function]
183+
if f == nil {
184+
return nil, engines.NewMalformedPayloadError("Unknown function")
185+
}
186+
result := f(s, s.payload.Argument)
187+
s.Lock()
188+
defer s.Unlock()
189+
s.result = result
190+
return s, nil
178191
}
179-
result := f(s, s.payload.Argument)
180-
s.Lock()
181-
defer s.Unlock()
182-
s.result = result
183-
return s, nil
192+
}
193+
194+
func (s *sandbox) Abort() error {
195+
close(s.done)
196+
return nil
184197
}
185198

186199
func (s *sandbox) ExtractFile(path string) (io.ReadCloser, error) {

main.go

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,15 @@ package main
77
import (
88
"fmt"
99
"os"
10+
"path/filepath"
1011

12+
"github.com/Sirupsen/logrus"
1113
"github.com/docopt/docopt-go"
14+
"github.com/taskcluster/slugid-go/slugid"
1215
"github.com/taskcluster/taskcluster-worker/config"
1316
"github.com/taskcluster/taskcluster-worker/engines/extpoints"
1417
"github.com/taskcluster/taskcluster-worker/runtime"
15-
"github.com/taskcluster/taskcluster-worker/taskmgr"
18+
"github.com/taskcluster/taskcluster-worker/worker"
1619
)
1720

1821
const version = "taskcluster-worker 0.0.1"
@@ -61,10 +64,15 @@ func main() {
6164
logger.Fatalf("Must supply a valid engine. Supported Engines %v", engineNames)
6265
}
6366

64-
runtimeEnvironment := runtime.Environment{Log: logger}
67+
tempPath := filepath.Join(os.TempDir(), slugid.V4())
68+
tempStorage, err := runtime.NewTemporaryStorage(tempPath)
69+
runtimeEnvironment := &runtime.Environment{
70+
Log: logger,
71+
TemporaryStorage: tempStorage,
72+
}
6573

6674
engine, err := engineProvider.NewEngine(extpoints.EngineOptions{
67-
Environment: &runtimeEnvironment,
75+
Environment: runtimeEnvironment,
6876
Log: logger.WithField("engine", engineName),
6977
})
7078
if err != nil {
@@ -76,25 +84,40 @@ func main() {
7684
Credentials: struct {
7785
AccessToken string `json:"accessToken"`
7886
Certificate string `json:"certificate"`
79-
ClientID string `json:"clientId"`
87+
ClientID string `json:"clientID"`
8088
}{
81-
AccessToken: "123",
82-
Certificate: "",
83-
ClientID: "abc",
89+
AccessToken: os.Getenv("TASKCLUSTER_ACCESS_TOKEN"),
90+
Certificate: os.Getenv("TASKCLUSTER_CERTIFICATE"),
91+
ClientID: os.Getenv("TASKCLUSTER_CLIENT_ID"),
8492
},
8593
Capacity: 5,
86-
ProvisionerID: "tasckluster-worker-provisioner",
87-
WorkerGroup: "taskcluster-worker-test-worker-group",
88-
WorkerID: "taskcluster-worker-test-worker",
94+
ProvisionerID: "dummy-test-provisioner",
95+
WorkerGroup: "dummy-test-group",
96+
WorkerType: "dummy-test-type",
97+
WorkerID: "dummy-test-worker",
8998
QueueService: struct {
9099
ExpirationOffset int `json:"expirationOffset"`
91100
}{
92101
ExpirationOffset: 300,
93102
},
103+
PollingInterval: 10,
94104
}
95105

96-
taskManager := taskmgr.New(config, &engine, logger.WithField("component", "Task Manager"))
106+
l := logger.WithFields(logrus.Fields{
107+
"workerID": config.WorkerID,
108+
"workerType": config.WorkerType,
109+
"workerGroup": config.WorkerGroup,
110+
"provisionerID": config.ProvisionerID,
111+
})
112+
113+
w, err := worker.New(config, engine, runtimeEnvironment, l)
114+
if err != nil {
115+
logger.Fatalf("Could not create worker. %s", err)
116+
}
97117

98-
runtimeEnvironment.Log.Debugf("Created taskManager %+v", taskManager)
99-
runtimeEnvironment.Log.Info("Worker started up")
118+
err = w.Start()
119+
if err != nil {
120+
os.Stderr.WriteString(err.Error())
121+
os.Exit(1)
122+
}
100123
}

plugins/extpoints/extpoints.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ func UnregisterExtension(name string) []string {
5555
return ifaces
5656
}
5757

58+
5859
// Base extension point
5960

6061
type extensionPoint struct {
@@ -175,3 +176,5 @@ func (ep *pluginProviderExt) Names() []string {
175176
}
176177
return names
177178
}
179+
180+

plugins/extpoints/interfaces.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ import (
1313
// We wrap all arguments so that we can add additional properties without
1414
// breaking source compatibility with older plugins.
1515
type PluginOptions struct {
16-
environment *runtime.Environment
17-
engine *engines.Engine
18-
log *logrus.Entry
16+
Environment *runtime.Environment
17+
Engine *engines.Engine
18+
Log *logrus.Entry
1919
}
2020

2121
// The PluginProvider interface must be implemented and registered by anyone

plugins/extpoints/manager.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ func NewPluginManager(pluginsToLoad []string, options PluginOptions) (plugins.Pl
6969
return nil, errors.New("Missing plugin")
7070
}
7171
plugin, err := pluginProvider.NewPlugin(PluginOptions{
72-
environment: options.environment,
73-
engine: options.engine,
74-
log: options.log.WithField("plugin", p),
72+
Environment: options.Environment,
73+
Engine: options.Engine,
74+
Log: options.Log.WithField("plugin", p),
7575
})
7676
if err != nil {
7777
return nil, err
@@ -227,7 +227,7 @@ func (m *taskPluginManager) Finished(s bool) error {
227227
return waitForErrors(errors, len(m.taskPlugins))
228228
}
229229

230-
func (m *taskPluginManager) Exception(r plugins.ExceptionReason) error {
230+
func (m *taskPluginManager) Exception(r runtime.ExceptionReason) error {
231231
// Sanity check that no two methods on plugin is running in parallel, this way
232232
// plugins don't have to be thread-safe, and we ensure nothing is called after
233233
// Dispose() has been called.

0 commit comments

Comments
 (0)