This repository has been archived by the owner on Feb 20, 2020. It is now read-only.
/
main.go
1240 lines (1139 loc) · 48.2 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//go:generate gw-codegen all-unix-style.yml generated_all-unix-style.go !windows
//go:generate gw-codegen windows.yml generated_windows.go
package main
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
"reflect"
"runtime"
"runtime/debug"
"strconv"
"strings"
"time"
docopt "github.com/docopt/docopt-go"
"github.com/taskcluster/generic-worker/gwconfig"
"github.com/taskcluster/generic-worker/process"
"github.com/taskcluster/taskcluster-base-go/scopes"
tcclient "github.com/taskcluster/taskcluster-client-go"
"github.com/taskcluster/taskcluster-client-go/auth"
"github.com/taskcluster/taskcluster-client-go/awsprovisioner"
"github.com/taskcluster/taskcluster-client-go/queue"
"github.com/xeipuuv/gojsonschema"
)
var (
// Whether we are running under the aws provisioner
configureForAws bool
// General platform independent user settings, such as home directory, username...
// Platform specific data should be managed in plat_<platform>.go files
taskContext *TaskContext = &TaskContext{}
// Queue is the object we will use for accessing queue api. See
// https://docs.taskcluster.net/reference/platform/queue/api-docs
Queue *queue.Queue
Provisioner *awsprovisioner.AwsProvisioner
config *gwconfig.Config
configFile string
Features []Feature = []Feature{
&LiveLogFeature{},
&OSGroupsFeature{},
&ChainOfTrustFeature{},
&MountsFeature{},
&SupersedeFeature{},
}
version = "10.4.0"
revision = "" // this is set during build with `-ldflags "-X main.revision=$(git rev-parse HEAD)"`
usage = `
generic-worker
generic-worker is a taskcluster worker that can run on any platform that supports go (golang).
See http://taskcluster.github.io/generic-worker/ for more details. Essentially, the worker is
the taskcluster component that executes tasks. It requests tasks from the taskcluster queue,
and reports back results to the queue.
Usage:
generic-worker run [--config CONFIG-FILE]
[--configure-for-aws]
generic-worker install service [--nssm NSSM-EXE]
[--service-name SERVICE-NAME]
[--config CONFIG-FILE]
generic-worker show-payload-schema
generic-worker new-openpgp-keypair --file PRIVATE-KEY-FILE
generic-worker --help
generic-worker --version
Targets:
run Runs the generic-worker.
show-payload-schema Each taskcluster task defines a payload to be
interpreted by the worker that executes it. This
payload is validated against a json schema baked
into the release. This option outputs the json
schema used in this version of the generic
worker.
install service This will install the generic worker as a
Windows service running under the Local System
account. This is the preferred way to run the
worker under Windows.
new-openpgp-keypair This will generate a fresh, new OpenPGP
compliant private/public key pair. The public
key will be written to stdout and the private
key will be written to the specified file.
Options:
--config CONFIG-FILE Json configuration file to use. See
configuration section below to see what this
file should contain. When calling the install
target, this is the config file that the
installation should use, rather than the
config to use during install.
[default: generic-worker.config]
--configure-for-aws This will create the CONFIG-FILE for an AWS
installation by querying the AWS environment
and setting appropriate values.
--nssm NSSM-EXE The full path to nssm.exe to use for
installing the service.
[default: C:\nssm-2.24\win64\nssm.exe]
--service-name SERVICE-NAME The name that the Windows service should be
installed under. [default: Generic Worker]
--file PRIVATE-KEY-FILE The path to the file to write the private key
to. The parent directory must already exist.
If the file exists it will be overwritten,
otherwise it will be created.
--help Display this help text.
--version The release version of the generic-worker.
Configuring the generic worker:
The configuration file for the generic worker is specified with -c|--config CONFIG-FILE
as described above. Its format is a json dictionary of name/value pairs.
** REQUIRED ** properties
=========================
accessToken Taskcluster access token used by generic worker
to talk to taskcluster queue.
clientId Taskcluster client id used by generic worker to
talk to taskcluster queue.
livelogSecret This should match the secret used by the
stateless dns server; see
https://github.com/taskcluster/stateless-dns-server
publicIP The IP address for clients to be directed to
for serving live logs; see
https://github.com/taskcluster/livelog and
https://github.com/taskcluster/stateless-dns-server
signingKeyLocation The PGP signing key for signing artifacts with.
workerId A name to uniquely identify your worker.
workerType This should match a worker_type managed by the
provisioner you have specified.
** OPTIONAL ** properties
=========================
cachesDir The location where task caches should be stored on
the worker. [default: C:\generic-worker\caches]
certificate Taskcluster certificate, when using temporary
credentials only.
checkForNewDeploymentEverySecs The number of seconds between consecutive calls
to the provisioner, to check if there has been a
new deployment of the current worker type. If a
new deployment is discovered, worker will shut
down. See deploymentId property. [default: 1800]
cleanUpTaskDirs Whether to delete the home directories of the task
users after the task completes. Normally you would
want to do this to avoid filling up disk space,
but for one-off troubleshooting, it can be useful
to (temporarily) leave home directories in place.
Accepted values: true or false. [default: true]
deploymentId If running with --configure-for-aws, then between
tasks, at a chosen maximum frequency (see
checkForNewDeploymentEverySecs property), the
worker will query the provisioner to get the
updated worker type definition. If the deploymentId
in the config of the worker type definition is
different to the worker's current deploymentId, the
worker will shut itself down. See
https://bugzil.la/1298010
disableReboots If true, no system reboot will be initiated by
generic-worker program, but it will still return
with exit code 67 if the system needs rebooting.
This allows custom logic to be executed before
rebooting, by patching run-generic-worker.bat
script to check for exit code 67, perform steps
(such as formatting a hard drive) and then
rebooting in the run-generic-worker.bat script.
[default: false]
downloadsDir The location where resources are downloaded for
populating preloaded caches and readonly mounts.
[default: C:\generic-worker\downloads]
idleTimeoutSecs How many seconds to wait without getting a new
task to perform, before the worker process exits.
An integer, >= 0. A value of 0 means "never reach
the idle state" - i.e. continue running
indefinitely. See also shutdownMachineOnIdle.
[default: 0]
livelogCertificate SSL certificate to be used by livelog for hosting
logs over https. If not set, http will be used.
livelogExecutable Filepath of LiveLog executable to use; see
https://github.com/taskcluster/livelog
[default: livelog]
livelogKey SSL key to be used by livelog for hosting logs
over https. If not set, http will be used.
livelogPUTPort Port number for livelog HTTP PUT requests.
[default: 60022]
livelogGETPort Port number for livelog HTTP GET requests.
[default: 60023]
numberOfTasksToRun If zero, run tasks indefinitely. Otherwise, after
this many tasks, exit. [default: 0]
provisionerId The taskcluster provisioner which is taking care
of provisioning environments with generic-worker
running on them. [default: test-provisioner]
requiredDiskSpaceMegabytes The garbage collector will ensure at least this
number of megabytes of disk space are available
when each task starts. If it cannot free enough
disk space, the worker will shut itself down.
[default: 10240]
runAfterUserCreation A string, that if non-empty, will be treated as a
command to be executed as the newly generated task
user, each time a task user is created. This is a
way to provide generic user initialisation logic
that should apply to all generated users (and thus
all tasks).
runTasksAsCurrentUser If true, users will not be created for tasks, but
the current OS user will be used. Useful if not an
administrator, e.g. when running tests. Should not
be used in production! [default: false]
sentryProject The project name used in https://sentry.io for
reporting worker crashes. Permission to publish
crash reports is granted via the scope
auth:sentry:<sentryProject>. If the taskcluster
client (see clientId property above) does not
posses this scope, no crash reports will be sent.
Similarly, if this property is not specified or
is the empty string, no reports will be sent.
shutdownMachineOnInternalError If true, if the worker encounters an unrecoverable
error (such as not being able to write to a
required file) it will shutdown the host
computer. Note this is generally only desired
for machines running in production, such as on AWS
EC2 spot instances. Use with caution!
[default: false]
shutdownMachineOnIdle If true, when the worker is deemed to have been
idle for enough time (see idleTimeoutSecs) the
worker will issue an OS shutdown command. If false,
the worker process will simply terminate, but the
machine will not be shut down. [default: false]
subdomain Subdomain to use in stateless dns name for live
logs; see
https://github.com/taskcluster/stateless-dns-server
[default: taskcluster-worker.net]
tasksDir The location where task directories should be
created on the worker. [default: ` + defaultTasksDir() + `]
workerGroup Typically this would be an aws region - an
identifier to uniquely identify which pool of
workers this worker logically belongs to.
[default: test-worker-group]
workerTypeMetaData This arbitrary json blob will be included at the
top of each task log. Providing information here,
such as a URL to the code/config used to set up the
worker type will mean that people running tasks on
the worker type will have more information about how
it was set up (for example what has been installed on
the machine).
If an optional config setting is not provided in the json configuration file, the
default will be taken (defaults documented above).
If no value can be determined for a required config setting, the generic-worker will
exit with a failure message.
Exit Codes:
0 Tasks completed successfully; no more tasks to run (see config setting
numberOfTasksToRun).
67 A task user has been created, and the generic-worker needs to reboot in order
to log on as the new task user. Note, the reboot happens automatically unless
config setting disableReboots is set to true - in either code this exit code will
be issued.
68 The generic-worker hit its idle timeout limit (see config settings idleTimeoutSecs
and shutdownMachineOnIdle).
69 Worker panic - either a worker bug, or the environment is not suitable for running
a task, e.g. a file cannot be written to the file system, or something else did
not work that was required in order to execute a task. See config setting
shutdownMachineOnInternalError.
70 A new deploymentId has been issued in the AWS worker type configuration, meaning
this worker environment is no longer up-to-date. Typcially workers should
terminate.
`
)
type ExitCode int
const (
TASKS_COMPLETE ExitCode = 0
REBOOT_REQUIRED ExitCode = 67
IDLE_TIMEOUT ExitCode = 68
INTERNAL_ERROR ExitCode = 69
NONCURRENT_DEPLOYMENT_ID ExitCode = 70
)
func persistFeaturesState() (err error) {
for _, feature := range Features {
err := feature.PersistState()
if err != nil {
return err
}
}
return nil
}
func initialiseFeatures() (err error) {
for _, feature := range Features {
err := feature.Initialise()
if err != nil {
return err
}
}
return nil
}
// Entry point into the generic worker...
func main() {
versionName := "generic-worker " + version
if revision != "" {
versionName += " [ revision: https://github.com/taskcluster/generic-worker/commits/" + revision + " ]"
}
arguments, err := docopt.Parse(usage, nil, true, versionName, false, true)
if err != nil {
log.Println("Error parsing command line arguments!")
panic(err)
}
switch {
case arguments["show-payload-schema"]:
fmt.Println(taskPayloadSchema())
case arguments["run"]:
configureForAws = arguments["--configure-for-aws"].(bool)
configFile = arguments["--config"].(string)
config, err = loadConfig(configFile, configureForAws)
// persist before checking for error, so we can see what the problem was...
if config != nil {
config.Persist(configFile)
}
if err != nil {
log.Printf("Error loading configuration from file '%v':\n", configFile)
log.Printf("%v\n", err)
os.Exit(64)
}
exitCode := RunWorker()
log.Printf("Exiting worker with exit code %v", exitCode)
switch exitCode {
case REBOOT_REQUIRED:
if !config.DisableReboots {
immediateReboot()
}
case IDLE_TIMEOUT:
if config.ShutdownMachineOnIdle {
immediateShutdown("generic-worker idle timeout")
}
case INTERNAL_ERROR:
if config.ShutdownMachineOnInternalError {
immediateShutdown("generic-worker internal error")
}
case NONCURRENT_DEPLOYMENT_ID:
immediateShutdown("generic-worker deploymentId is not latest")
}
os.Exit(int(exitCode))
case arguments["install"]:
// platform specific...
err := install(arguments)
if err != nil {
log.Println("Error installing generic worker:")
log.Printf("%#v\n", err)
os.Exit(65)
}
case arguments["new-openpgp-keypair"]:
err := generateOpenPGPKeypair(arguments["--file"].(string))
if err != nil {
log.Println("Error generating OpenPGP keypair for worker:")
log.Printf("%#v\n", err)
os.Exit(66)
}
}
}
type MissingConfigError struct {
Setting string
File string
}
func (err MissingConfigError) Error() string {
return "Config setting \"" + err.Setting + "\" must be defined in file \"" + err.File + "\"."
}
func loadConfig(filename string, queryUserData bool) (*gwconfig.Config, error) {
// TODO: would be better to have a json schema, and also define defaults in
// only one place if possible (defaults also declared in `usage`)
// first assign defaults
c := &gwconfig.Config{
CachesDir: "C:\\generic-worker\\caches",
CheckForNewDeploymentEverySecs: 1800,
CleanUpTaskDirs: true,
DisableReboots: false,
DownloadsDir: "C:\\generic-worker\\downloads",
IdleTimeoutSecs: 0,
LiveLogExecutable: "livelog",
LiveLogPUTPort: 60022,
LiveLogGETPort: 60023,
NumberOfTasksToRun: 0,
SentryProject: "",
ProvisionerID: "test-provisioner",
RefreshUrlsPrematurelySecs: 310,
RequiredDiskSpaceMegabytes: 10240,
RunAfterUserCreation: "",
RunTasksAsCurrentUser: false,
ShutdownMachineOnInternalError: false,
ShutdownMachineOnIdle: false,
Subdomain: "taskcluster-worker.net",
TasksDir: defaultTasksDir(),
WorkerGroup: "test-worker-group",
WorkerTypeMetadata: map[string]interface{}{},
}
// now overlay with data from amazon, if applicable
if queryUserData {
// don't check errors, since maybe secrets are gone, but maybe we had them already from first run...
updateConfigWithAmazonSettings(c)
}
configFileBytes, err := ioutil.ReadFile(filename)
// only overlay values if config file exists and could be read
if err == nil {
err = c.MergeInJSON(configFileBytes)
if err != nil {
return nil, err
}
}
// Add any useful worker config to worker metadata
c.WorkerTypeMetadata["config"] = map[string]interface{}{
"runTasksAsCurrentUser": c.RunTasksAsCurrentUser,
"deploymentId": c.DeploymentID,
}
gwMetadata := map[string]interface{}{
"go-arch": runtime.GOARCH,
"go-os": runtime.GOOS,
"go-version": runtime.Version(),
"release": "https://github.com/taskcluster/generic-worker/releases/tag/v" + version,
"version": version,
}
if revision != "" {
gwMetadata["revision"] = revision
gwMetadata["source"] = "https://github.com/taskcluster/generic-worker/commits/" + revision
}
c.WorkerTypeMetadata["generic-worker"] = gwMetadata
// now check all required values are set
// TODO: could probably do this with reflection to avoid explicitly listing
// all members
fields := []struct {
value interface{}
name string
disallowed interface{}
}{
{value: c.AccessToken, name: "accessToken", disallowed: ""},
{value: c.CachesDir, name: "cachesDir", disallowed: ""},
{value: c.ClientID, name: "clientId", disallowed: ""},
{value: c.DownloadsDir, name: "downloadsDir", disallowed: ""},
{value: c.LiveLogExecutable, name: "livelogExecutable", disallowed: ""},
{value: c.LiveLogGETPort, name: "livelogGETPort", disallowed: 0},
{value: c.LiveLogPUTPort, name: "livelogPUTPort", disallowed: 0},
{value: c.LiveLogSecret, name: "livelogSecret", disallowed: ""},
{value: c.ProvisionerID, name: "provisionerId", disallowed: ""},
{value: c.PublicIP, name: "publicIP", disallowed: net.IP(nil)},
{value: c.RefreshUrlsPrematurelySecs, name: "refreshURLsPrematurelySecs", disallowed: 0},
{value: c.SigningKeyLocation, name: "signingKeyLocation", disallowed: ""},
{value: c.Subdomain, name: "subdomain", disallowed: ""},
{value: c.TasksDir, name: "tasksDir", disallowed: ""},
{value: c.WorkerGroup, name: "workerGroup", disallowed: ""},
{value: c.WorkerID, name: "workerId", disallowed: ""},
{value: c.WorkerType, name: "workerType", disallowed: ""},
}
for _, f := range fields {
if reflect.DeepEqual(f.value, f.disallowed) {
return c, MissingConfigError{Setting: f.name, File: filename}
}
}
// all required config set!
return c, nil
}
func ReadTasksResolvedFile() uint {
b, err := ioutil.ReadFile("tasks-resolved-count.txt")
if err != nil {
return 0
}
i, err := strconv.Atoi(string(b))
if err != nil {
panic(err)
}
return uint(i)
}
// Also called from tests, so avoid panic in this function since this could
// cause tests to silently pass - instead require error handling.
func UpdateTasksResolvedFile(t uint) error {
return ioutil.WriteFile("tasks-resolved-count.txt", []byte(strconv.Itoa(int(t))), 0777)
}
// HandleCrash reports a crash in worker logs and reports the crash to sentry
// if it has valid credentials and a valid sentry project. The argument r is
// the object returned by the recover call, thrown by the panic call that
// caused the worker crash.
func HandleCrash(r interface{}) {
log.Print(string(debug.Stack()))
log.Print(" *********** PANIC occurred! *********** ")
log.Printf("%v", r)
ReportCrashToSentry(r)
}
func RunWorker() (exitCode ExitCode) {
defer func() {
if r := recover(); r != nil {
HandleCrash(r)
exitCode = INTERNAL_ERROR
}
}()
log.Printf("Detected %s platform", runtime.GOOS)
// number of tasks resolved since worker first ran
// stored in a json file, since we may reboot between tasks etc
tasksResolved := ReadTasksResolvedFile()
// use a pointer to the value, to make sure it is resolved at defer-time, not now
defer func(t *uint) {
err := UpdateTasksResolvedFile(*t)
if err != nil {
panic(err)
}
}(&tasksResolved)
err := taskCleanup()
// any errors are fatal
if err != nil {
log.Printf("OH NO!!!\n\n%#v", err)
panic(err)
}
creds := &tcclient.Credentials{
ClientID: config.ClientID,
AccessToken: config.AccessToken,
Certificate: config.Certificate,
}
// Queue is the object we will use for accessing queue api
Queue, err = queue.New(creds)
if err != nil {
log.Print("Invalid taskcluster credentials!!!")
panic(err)
}
Provisioner, err = awsprovisioner.New(creds)
if err != nil {
log.Print("Invalid taskcluster credentials!!!")
panic(err)
}
err = initialiseFeatures()
if err != nil {
panic(err)
}
defer func() {
err := persistFeaturesState()
if err != nil {
log.Printf("Could not persist features: %v", err)
exitCode = INTERNAL_ERROR
}
}()
// loop, claiming and running tasks!
lastActive := time.Now()
// use zero value, to be sure that a check is made before first task runs
lastQueriedProvisioner := time.Time{}
lastReportedNoTasks := time.Now()
reboot := PrepareTaskEnvironment()
if reboot {
return REBOOT_REQUIRED
}
for {
// See https://bugzil.la/1298010 - routinely check if this worker type is
// outdated, and shut down if a new deployment is required.
if configureForAws && time.Now().Sub(lastQueriedProvisioner) > time.Duration(config.CheckForNewDeploymentEverySecs)*time.Second {
lastQueriedProvisioner = time.Now()
if deploymentIDUpdated() {
return NONCURRENT_DEPLOYMENT_ID
}
}
// make sure at least 5 seconds passes between iterations
wait5Seconds := time.NewTimer(time.Second * 5)
taskFound := FindAndRunTask()
if taskFound {
err := taskCleanup()
if err != nil {
log.Printf("Error cleaning up after task!\n%v", err)
}
tasksResolved++
// remainingTasks will be -ve, if config.NumberOfTasksToRun is not set (=0)
remainingTasks := int(config.NumberOfTasksToRun - tasksResolved)
remainingTaskCountText := ""
if remainingTasks > 0 {
remainingTaskCountText = fmt.Sprintf(" (will exit after resolving %v more)", remainingTasks)
}
log.Printf("Resolved %v tasks in total so far%v.", tasksResolved, remainingTaskCountText)
if remainingTasks == 0 {
log.Printf("Completed all task(s) (number of tasks to run = %v)", config.NumberOfTasksToRun)
if configureForAws && deploymentIDUpdated() {
return NONCURRENT_DEPLOYMENT_ID
}
return TASKS_COMPLETE
}
lastActive = time.Now()
unsetAutoLogon()
reboot := PrepareTaskEnvironment()
if reboot {
return REBOOT_REQUIRED
}
} else {
idleTime := time.Now().Sub(lastActive)
remainingIdleTimeText := ""
if config.IdleTimeoutSecs > 0 {
remainingIdleTimeText = fmt.Sprintf(" (will exit if no task claimed in %v)", time.Second*time.Duration(config.IdleTimeoutSecs)-idleTime)
if idleTime.Seconds() > float64(config.IdleTimeoutSecs) {
taskCleanup()
log.Printf("Worker idle for idleShutdownTimeoutSecs seconds (%v)", idleTime)
return IDLE_TIMEOUT
}
}
// let's not be over-verbose in logs - has cost implications
// so report only once per minute that no task was claimed, not every second
if time.Now().Sub(lastReportedNoTasks) > 1*time.Minute {
lastReportedNoTasks = time.Now()
// remainingTasks will be -ve, if config.NumberOfTasksToRun is not set (=0)
remainingTaskCountText := ""
if config.NumberOfTasksToRun > 0 {
if remainingTasks := int(config.NumberOfTasksToRun - tasksResolved); remainingTasks >= 0 {
remainingTaskCountText = fmt.Sprintf(" %v more tasks to run before exiting.", remainingTasks)
}
}
log.Printf("No task claimed. Idle for %v%v.%v", idleTime, remainingIdleTimeText, remainingTaskCountText)
}
}
// To avoid hammering queue, make sure there is at least 5 seconds
// between consecutive requests. Note we do this even if a task ran,
// since a task could complete in less than that amount of time.
<-wait5Seconds.C
}
}
// FindAndRunTask queries the Taskcluster Queue to find a task to
// run. If it finds one, it handles all the bookkeeping, as well as running the
// task. Returns true if it successfully claimed a task (regardless of whether
// the task ran successfully) otherwise false.
func FindAndRunTask() bool {
taskFound := false
req := &queue.ClaimWorkRequest{
Tasks: 1,
WorkerGroup: config.WorkerGroup,
WorkerID: config.WorkerID,
}
resp, err := Queue.ClaimWork(config.ProvisionerID, config.WorkerType, req)
if err != nil {
log.Printf("Could not claim work. %v", err)
return taskFound
}
for _, taskResponse := range resp.Tasks {
taskFound = true
taskQueue, err := queue.New(
&tcclient.Credentials{
ClientID: taskResponse.Credentials.ClientID,
AccessToken: taskResponse.Credentials.AccessToken,
Certificate: taskResponse.Credentials.Certificate,
},
)
if err != nil {
log.Printf("SERIOUS BUG: invalid credentials from queue for task %v", taskResponse.Status.TaskID)
}
task := &TaskRun{
TaskID: taskResponse.Status.TaskID,
RunID: uint(taskResponse.RunID),
Status: claimed,
Definition: taskResponse.Task,
Queue: taskQueue,
TaskClaimResponse: queue.TaskClaimResponse(taskResponse),
Artifacts: map[string]Artifact{},
featureArtifacts: map[string]string{
livelogBackingName: "log feature",
},
}
task.StatusManager = NewTaskStatusManager(task)
// Now we found a task, run it, and then exit the loop. Work is returned
// by the queue in the order of priority. Higher priority tasks will be claimed
// and returned before lower priority tasks.
log.Print("Task found")
execErr := task.Run()
if execErr.Occurred() {
task.reportPossibleError(execErr)
}
break
}
return taskFound
}
func (task *TaskRun) reportPossibleError(err error) {
if err != nil {
log.Printf("ERROR encountered: %v", err)
task.Log(err.Error())
}
}
func (task *TaskRun) setReclaimTimer() {
// Reclaiming Tasks
// ----------------
// When the worker has claimed a task, it's said to have a claim to a given
// `taskId`/`runId`. This claim has an expiration, see the `takenUntil`
// property in the _task status structure_ returned from `queue.claimTask`
// and `queue.reclaimTask`. A worker must call `queue.reclaimTask` before
// the claim denoted in `takenUntil` expires. It's recommended that this
// attempted a few minutes prior to expiration, to allow for clock drift.
// First time we need to check claim response, after that, need to check reclaim response
log.Print("Setting reclaim timer...")
var takenUntil time.Time
if len(task.TaskReclaimResponse.Status.Runs) > 0 {
takenUntil = time.Time(task.TaskReclaimResponse.Status.Runs[task.RunID].TakenUntil)
} else {
takenUntil = time.Time(task.TaskClaimResponse.Status.Runs[task.RunID].TakenUntil)
}
log.Printf("Current claim will expire at %v", takenUntil)
// Attempt to reclaim 3 mins earlier...
reclaimTime := takenUntil.Add(time.Minute * -3)
log.Printf("Reclaiming 3 mins earlier, at %v", reclaimTime)
waitTimeUntilReclaim := reclaimTime.Sub(time.Now())
log.Printf("Time to wait until then is %v", waitTimeUntilReclaim)
// sanity check - only set an alarm, if wait time > 30s, so we can't hammer queue
if waitTimeUntilReclaim.Seconds() > 30 {
log.Print("This is more than 30 seconds away - so setting a timer")
task.reclaimTimer = time.AfterFunc(
waitTimeUntilReclaim, func() {
log.Printf("About to reclaim task %v...", task.TaskID)
err := task.StatusManager.Reclaim()
if err == nil {
log.Printf("Successfully reclaimed task %v", task.TaskID)
// only set another reclaim timer if the previous reclaim succeeded
task.setReclaimTimer()
} else {
log.Printf("Encountered exception when reclaiming task %v: %v", task.TaskID, err)
log.Printf("Killing task %v since I cannot reclaim it", task.TaskID)
task.Logf("Killing process since task reclaim resulted in exception: %v", err)
task.kill()
}
},
)
} else {
log.Print("WARNING ******************** This is NOT more than 30 seconds away - so NOT setting a timer")
}
}
func (task *TaskRun) fetchTaskDefinition() {
// Fetch task definition
task.Definition = task.TaskClaimResponse.Task
}
func (task *TaskRun) validatePayload() *CommandExecutionError {
jsonPayload := task.Definition.Payload
log.Printf("JSON payload: %s", jsonPayload)
schemaLoader := gojsonschema.NewStringLoader(taskPayloadSchema())
docLoader := gojsonschema.NewStringLoader(string(jsonPayload))
result, err := gojsonschema.Validate(schemaLoader, docLoader)
if err != nil {
return MalformedPayloadError(err)
}
if !result.Valid() {
task.Log("TASK FAIL since the task payload is invalid. See errors:")
for _, desc := range result.Errors() {
task.Logf("- %s", desc)
}
// Dealing with Invalid Task Payloads
// ----------------------------------
// If the task payload is malformed or invalid, keep in mind that the
// queue doesn't validate the contents of the `task.payload` property,
// the worker may resolve the current run by reporting an exception.
// When reporting an exception, using `queue.reportException` the
// worker should give a `reason`. If the worker is unable execute the
// task specific payload/code/logic, it should report exception with
// the reason `malformed-payload`.
//
// This can also be used if an external resource that is referenced in
// a declarative nature doesn't exist. Generally, it should be used if
// we can be certain that another run of the task will have the same
// result. This differs from `queue.reportFailed` in the sense that we
// report a failure if the task specific code failed.
//
// Most tasks includes a lot of declarative steps, such as poll a
// docker image, create cache folder, decrypt encrypted environment
// variables, set environment variables and etc. Clearly, if decryption
// of environment variables fail, there is no reason to retry the task.
// Nor can it be said that the task failed, because the error wasn't
// cause by execution of Turing complete code.
//
// If however, we run some executable code referenced in `task.payload`
// and the code crashes or exists non-zero, then the task is said to be
// failed. The difference is whether or not the unexpected behavior
// happened before or after the execution of task specific Turing
// complete code.
return MalformedPayloadError(fmt.Errorf("Validation of payload failed for task %v", task.TaskID))
}
err = json.Unmarshal(jsonPayload, &task.Payload)
if err != nil {
return MalformedPayloadError(err)
}
for _, artifact := range task.Payload.Artifacts {
// The default artifact expiry is task expiry, but is only applied when
// the task artifacts are resolved. We intentionally don't modify
// task.Payload otherwise it no longer reflects the real data defined
// in the task.
if !time.Time(artifact.Expires).IsZero() {
// Don't be too strict: allow 1s discrepancy to account for
// possible timestamp rounding on upstream systems
if time.Time(artifact.Expires).Add(time.Second).Before(time.Time(task.Definition.Deadline)) {
return MalformedPayloadError(fmt.Errorf("Malformed payload: artifact '%v' expires before task deadline (%v is before %v)", artifact.Path, artifact.Expires, task.Definition.Deadline))
}
// Don't be too strict: allow 1s discrepancy to account for
// possible timestamp rounding on upstream systems
if time.Time(artifact.Expires).After(time.Time(task.Definition.Expires).Add(time.Second)) {
return MalformedPayloadError(fmt.Errorf("Malformed payload: artifact '%v' expires after task expiry (%v is after %v)", artifact.Path, artifact.Expires, task.Definition.Expires))
}
}
}
return nil
}
type CommandExecutionError struct {
TaskStatus TaskStatus
Cause error
Reason TaskUpdateReason
}
func executionError(reason TaskUpdateReason, status TaskStatus, err error) *CommandExecutionError {
if err == nil {
return nil
}
return &CommandExecutionError{
Cause: err,
Reason: reason,
TaskStatus: status,
}
}
func ResourceUnavailable(err error) *CommandExecutionError {
return executionError("resource-unavailable", errored, err)
}
func MalformedPayloadError(err error) *CommandExecutionError {
return executionError("malformed-payload", errored, err)
}
func Failure(err error) *CommandExecutionError {
return executionError("", failed, err)
}
func (task *TaskRun) Logf(format string, v ...interface{}) {
task.Log(fmt.Sprintf(format, v...))
}
func (task *TaskRun) Log(message string) {
if task.logWriter != nil {
for _, line := range strings.Split(message, "\n") {
task.logWriter.Write([]byte("[taskcluster " + tcclient.Time(time.Now()).String() + "] " + line + "\n"))
}
} else {
log.Print("Unloggable task log message (no task log writer): " + message)
}
}
func (err *CommandExecutionError) Error() string {
return fmt.Sprintf("%v", err.Cause)
}
func (task *TaskRun) ExecuteCommand(index int) *CommandExecutionError {
task.Logf("Executing command %v: %v", index, task.formatCommand(index))
log.Print("Executing command " + strconv.Itoa(index) + ": " + task.Commands[index].String())
cee := task.prepareCommand(index)
if cee != nil {
panic(cee)
}
result := task.Commands[index].Execute()
task.Logf("%v", result)
switch {
case result.Failed():
return &CommandExecutionError{
Cause: result.FailureCause(),
TaskStatus: failed,
}
case result.Crashed():
panic(result.CrashCause())
}
return nil
}
type executionErrors []*CommandExecutionError
func (e *executionErrors) add(err *CommandExecutionError) {
if err == nil {
return
}
if e == nil {
*e = executionErrors{err}
} else {
*e = append(*e, err)
}
}
func (err *executionErrors) Error() string {
if !err.Occurred() {
return ""
}
text := "Task not successful due to following exception(s):\n"
for i, e := range *err {
text += fmt.Sprintf("Exception %v)\n%v\n", i+1, e)
}
return text
}
func (err *executionErrors) Occurred() bool {
return len(*err) > 0
}
func (task *TaskRun) resolve(e *executionErrors) *CommandExecutionError {
log.Print("Resolving task...")
if !e.Occurred() {
return ResourceUnavailable(task.StatusManager.ReportCompleted())
}
if (*e)[0].TaskStatus == failed {
return ResourceUnavailable(task.StatusManager.ReportFailed())
}
return ResourceUnavailable(task.StatusManager.ReportException((*e)[0].Reason))
}
func (task *TaskRun) setMaxRunTimer() *time.Timer {
// Terminating the Worker Early
// ----------------------------
// If the worker finds itself having to terminate early, for example a spot
// nodes that detects pending termination. Or a physical machine ordered to
// be provisioned for another purpose, the worker should report exception
// with the reason `worker-shutdown`. Upon such report the queue will
// resolve the run as exception and create a new run, if the task has
// additional retries left.
return time.AfterFunc(
task.maxRunTimeDeadline.Sub(time.Now()),
func() {
// ignore any error - in the wrong go routine to properly handle it
task.StatusManager.Abort()
},
)
}
func (task *TaskRun) kill() {
for _, command := range task.Commands {
command.Kill()
}
}
func (task *TaskRun) createLogFile() *os.File {
absLogFile := filepath.Join(taskContext.TaskDir, livelogPath)
logFileHandle, err := os.Create(absLogFile)
if err != nil {
panic(err)
}
task.logWriter = logFileHandle
return logFileHandle
}
func (task *TaskRun) logHeader() {
jsonBytes, err := json.MarshalIndent(config.WorkerTypeMetadata, " ", " ")
if err != nil {
panic(err)
}
task.Log("Worker Type (" + config.WorkerType + ") settings:")
task.Log(" " + string(jsonBytes))
task.Log("Task ID: " + task.TaskID)
task.Log("=== Task Starting ===")
}
func (task *TaskRun) Run() (err *executionErrors) {
// err is essentially a list of all errors that occur. We'll base the task
// resolution on the first error that occurs. The err.add(<error-or-nil>)
// function is a simple way of adding an error to the list, if one occurs,
// otherwise not adding it, if it is nil
// note, since we return the value pointed to by `err`, we can continue
// to manipulate `err` even in defer statements, and this will affect
// return value of this method.
err = &executionErrors{}