Skip to content
This repository has been archived by the owner on Feb 20, 2020. It is now read-only.

Commit

Permalink
Merge pull request #9 from taskcluster/bug1279019
Browse files Browse the repository at this point in the history
Bug 1279019: add artifact public/logs/worker_type_metadata.json to each task
  • Loading branch information
petemoore committed Jul 1, 2016
2 parents 2d2ad30 + a2d0576 commit 866f0a2
Show file tree
Hide file tree
Showing 13 changed files with 228 additions and 70 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Expand Up @@ -28,7 +28,7 @@ install:

script:
- "go install -v ./..."
- "if test $GIMME_OS.$GIMME_ARCH = linux.amd64; then ${GOPATH}/bin/gotestcover -v -coverprofile=coverage.report ./...; go tool cover -func=coverage.report; fi"
- "if test $GIMME_OS.$GIMME_ARCH = linux.amd64; then ${GOPATH}/bin/gotestcover -v -coverprofile=coverage.report ./... && go tool cover -func=coverage.report; fi"

after_script:
- "$HOME/gopath/bin/goveralls -coverprofile=coverage.report -service=travis-ci"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -222,7 +222,7 @@ go test -v ./...

1. Bump the version number in `main.go` [here](https://github.com/taskcluster/generic-worker/blob/d1e48692122dd3e295defda1e61acc8509ad7e23/main.go#L58).
2. Commit the change, e.g. `git add main.go; git commit -m "Bumped version number"`.
3. Tag the repo, e.g. `git tag v2.0.0`
3. Tag the repo, e.g. `git tag v2.1.0`
4. Push to github taskcluster repo master branch, e.g. `git push; git push --tags`
5. Wait for binary releases to magically appear [here](https://github.com/taskcluster/generic-worker/releases) (travis will push them if tests pass).

Expand Down
36 changes: 33 additions & 3 deletions artifacts_test.go
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"os"
"path/filepath"
"runtime"
"testing"
"time"

Expand Down Expand Up @@ -263,6 +264,26 @@ func TestUpload(t *testing.T) {
LiveLogSecret: "xyz",
PublicIP: net.ParseIP("127.0.0.1"),
Subdomain: "taskcluster-worker.net",
WorkerTypeMetadata: map[string]interface{}{
"aws": map[string]string{
"ami-id": "test-ami",
"availability-zone": "test-aws-zone",
"instance-id": "test-instance-id",
"instance-type": "test-instance-type",
"public-ipv4": "test-IP",
},
"generic-worker": map[string]string{
"go-arch": runtime.GOARCH,
"go-os": runtime.GOOS,
"go-version": runtime.Version(),
"release": "test-release-url",
"version": version,
},
"machine-setup": map[string]string{
"maintainer": "pmoore@mozilla.com",
"script": "test-script-url",
},
},
}

// get the worker started
Expand All @@ -286,11 +307,11 @@ func TestUpload(t *testing.T) {
case *queueevents.ArtifactCreatedMessage:
a := message.(*queueevents.ArtifactCreatedMessage)
artifactCreatedMessages[a.Artifact.Name] = a
// Finish after 4 artifacts have been created. Note: the second
// Finish after 5 artifacts have been created. Note: the second
// publish of the livelog artifact (for redirecting to the
// underlying file rather than the livelog stream) doesn't
// cause a new pulse message, hence this is 4 not 5.
if len(artifactCreatedMessages) == 4 {
// cause a new pulse message, hence this is 5 not 6.
if len(artifactCreatedMessages) == 5 {
// killWorkerChan <- true
// pulseConn.AMQPConn.Close()
artifactsCreatedChan <- true
Expand Down Expand Up @@ -414,6 +435,15 @@ func TestUpload(t *testing.T) {
}
}

// Check worker type metadata is there
if a := artifactCreatedMessages["public/logs/worker_type_metadata.json"]; a != nil {
if a.Artifact.ContentType != "text/plain; charset=utf-8" {
t.Errorf("Artifact 'public/logs/worker_type_metadata.json' should have mime type 'text/plain; charset=utf-8' but has '%s'", a.Artifact.ContentType)
}
} else {
t.Error("Artifact 'public/logs/worker_type_metadata.json' not created")
}

// now check content was uploaded to Amazon, and is correct
for artifact, content := range expectedArtifacts {
url, err := myQueue.GetLatestArtifact_SignedURL(taskId, artifact, 10*time.Minute)
Expand Down
41 changes: 20 additions & 21 deletions aws.go
Expand Up @@ -12,6 +12,7 @@ import (
"net"
"os"
"path/filepath"
"strings"
"time"

"github.com/taskcluster/httpbackoff"
Expand All @@ -31,7 +32,7 @@ func queryUserData() (*UserData, error) {
return userData, err
}

func queryMetaData(url, name string) (string, error) {
func queryMetaData(url string) (string, error) {
// http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html#instancedata-data-retrieval
// call http://169.254.169.254/latest/meta-data/instance-id with httpbackoff
resp, _, err := httpbackoff.Get(url)
Expand All @@ -40,18 +41,9 @@ func queryMetaData(url, name string) (string, error) {
}
defer resp.Body.Close()
content, err := ioutil.ReadAll(resp.Body)
fmt.Println(name + ": " + string(content))
return string(content), err
}

func queryInstanceName() (string, error) {
return queryMetaData("http://169.254.169.254/latest/meta-data/instance-id", "Instance name")
}

func queryPublicIP() (string, error) {
return queryMetaData("http://169.254.169.254/latest/meta-data/public-ipv4", "Public IP")
}

type UserData struct {
Data interface{} `json:"data"`
Capacity int `json:"capacity"`
Expand Down Expand Up @@ -179,14 +171,6 @@ func (c *Config) updateConfigWithAmazonSettings() error {
if err != nil {
return err
}
instanceName, err := queryInstanceName()
if err != nil {
return err
}
publicIP, err := queryPublicIP()
if err != nil {
return err
}
c.ProvisionerId = userData.ProvisionerId
awsprov := awsprovisioner.AwsProvisioner{
Authenticate: false,
Expand All @@ -205,17 +189,32 @@ func (c *Config) updateConfigWithAmazonSettings() error {
c.ClientId = secToken.Credentials.ClientID
c.Certificate = secToken.Credentials.Certificate
c.WorkerGroup = userData.Region
c.WorkerId = instanceName
c.PublicIP = net.ParseIP(publicIP)
c.WorkerType = userData.WorkerType

awsMetadata := map[string]interface{}{}
for _, url := range []string{
"http://169.254.169.254/latest/meta-data/ami-id",
"http://169.254.169.254/latest/meta-data/instance-id",
"http://169.254.169.254/latest/meta-data/instance-type",
"http://169.254.169.254/latest/meta-data/public-ipv4",
"http://169.254.169.254/latest/meta-data/placement/availability-zone",
} {
key := url[strings.LastIndex(url, "/")+1:]
// if we get an error, be ok with it, it isn't sooooo important
value, _ := queryMetaData(url)
awsMetadata[key] = value
}
c.WorkerTypeMetadata["aws"] = awsMetadata
c.WorkerId = awsMetadata["instance-id"].(string)
c.PublicIP = net.ParseIP(awsMetadata["public-ipv4"].(string))
secrets := new(Secrets)
json.Unmarshal(secToken.Data, secrets)
if err != nil {
return err
}

// Now overlay existing config with values in secrets
json.Unmarshal(secrets.GenericWorker.Config, c)
err = c.mergeInJSON([]byte(secrets.GenericWorker.Config))
if err != nil {
return err
}
Expand Down
21 changes: 21 additions & 0 deletions config_test.go
Expand Up @@ -3,6 +3,7 @@ package main
import (
"encoding/json"
"net"
"runtime"
"testing"
)

Expand Down Expand Up @@ -83,3 +84,23 @@ func TestMissingConfigFile(t *testing.T) {
t.Fatalf("Was expecting an error of type MissingConfigError but received error of type %T", err)
}
}

func TestWorkerTypeMetadata(t *testing.T) {
const file = "test/config/worker-type-metadata.json"
config, err := loadConfig(file, false)
if err != nil {
t.Fatalf("Config should pass validation, but get:\n%s", err)
}
// loadConfig function specifies a value, let's check we can override it in the config file
if config.WorkerTypeMetadata["generic-worker"].(map[string]interface{})["go-os"] != "fakeos" {
t.Fatalf("Was expecting key 'go-os' from file worker-type-metadata.json to override default value\n%#v", config)
}
// go-version not specified in config file, but should be set in loadConfig, let's check it is
if config.WorkerTypeMetadata["generic-worker"].(map[string]interface{})["go-version"] != runtime.Version() {
t.Fatalf("Was expecting key 'go-version' to be set to go version in worker type metadata\n%#v", config)
}
// machine-setup is not set in loadConfig, but is set in config file, let's check we have it
if config.WorkerTypeMetadata["machine-setup"].(map[string]interface{})["script"] != "https://raw.githubusercontent.com/taskcluster/generic-worker/2d2ad3000787f2c893299e693ea3f59287127f5c/worker_types/win2012r2/userdata" {
t.Fatalf("Was expecting machine-setup to be set properly\n%#v", config)
}
}
73 changes: 55 additions & 18 deletions main.go
@@ -1,7 +1,6 @@
package main

import (
"bytes"
"encoding/base64"
"encoding/json"
"encoding/xml"
Expand All @@ -15,6 +14,7 @@ import (
"os"
"path/filepath"
"reflect"
"runtime"
"strings"
"time"

Expand Down Expand Up @@ -54,7 +54,7 @@ var (
config *Config
configFile string

version = "generic-worker 2.0.0"
version = "2.1.0"
usage = `
generic-worker
generic-worker is a taskcluster worker that can run on any platform that supports go (golang).
Expand Down Expand Up @@ -176,6 +176,14 @@ and reports back results to the queue.
An integer, >= 0. A value of 0 means "do not shut
the computer down" - i.e. continue running
indefinitely.
workerTypeMetaData This arbitrary json blob will be uploaded as an
artifact called worker_type_metadata.json with each
task. Providing information here, such as a URL to
the code/config used to set up the worker type will
mean that people running tasks on the worker type
will have more information about how it was set up
(for example what has been installed on the
machine).
Here is an syntactically valid example configuration file:
Expand All @@ -202,7 +210,7 @@ and reports back results to the queue.

// Entry point into the generic worker...
func main() {
arguments, err := docopt.Parse(usage, nil, true, version, false, true)
arguments, err := docopt.Parse(usage, nil, true, "generic-worker "+version, false, true)
if err != nil {
fmt.Println("Error parsing command line arguments!")
panic(err)
Expand Down Expand Up @@ -258,15 +266,23 @@ func loadConfig(filename string, queryUserData bool) (*Config, error) {
UsersDir: "C:\\Users",
CleanUpTaskDirs: true,
IdleShutdownTimeoutSecs: 0,
WorkerTypeMetadata: map[string]interface{}{
"generic-worker": map[string]string{
"go-arch": runtime.GOARCH,
"go-os": runtime.GOOS,
"go-version": runtime.Version(),
"release": "https://github.com/taskcluster/generic-worker/releases/tag/v" + version,
"version": version,
},
},
}
// try to open config file...
configFile, err := os.Open(filename)
// only overlay values if config file exists

configFileBytes, err := ioutil.ReadFile(filename)
// only overlay values if config file exists and could be read
if err == nil {
defer configFile.Close()
err = json.NewDecoder(configFile).Decode(&c)
err = c.mergeInJSON(configFileBytes)
if err != nil {
return c, err
return nil, err
}
}

Expand Down Expand Up @@ -872,6 +888,18 @@ func (task *TaskRun) run() error {
var finalReason string
var finalError error = nil

err := task.preTaskActions()

if err != nil {
log.Printf("%#v", err)
if finalError == nil {
log.Println("TASK EXCEPTION when running pre-task actions")
finalTaskStatus = Errored
finalReason = "worker-shutdown" // internal error (could not post public/logs/worker_type_metadata.json artifact)
finalError = err
}
}

for i, _ := range task.Payload.Command {
err := task.ExecuteCommand(i)
if err != nil {
Expand All @@ -883,7 +911,7 @@ func (task *TaskRun) run() error {
}
}

err := task.postTaskActions()
err = task.postTaskActions()

if err != nil {
log.Printf("%#v", err)
Expand Down Expand Up @@ -942,6 +970,22 @@ func (task *TaskRun) run() error {
return finalError
}

func writeToFileAsJSON(obj interface{}, filename string) error {
jsonBytes, err := json.MarshalIndent(obj, "", " ")
if err != nil {
return err
}
return ioutil.WriteFile(filename, append(jsonBytes, '\n'), 0644)
}

func (task *TaskRun) preTaskActions() error {
err := writeToFileAsJSON(config.WorkerTypeMetadata, filepath.Join(TaskUser.HomeDir, "public", "logs", "worker_type_metadata.json"))
if err != nil {
return err
}
return task.uploadLog("public/logs/worker_type_metadata.json")
}

func (task *TaskRun) postTaskActions() error {
completeLogFile, err := os.Create(filepath.Join(TaskUser.HomeDir, "public", "logs", "live_backing.log"))
if err != nil {
Expand Down Expand Up @@ -978,14 +1022,7 @@ func (task *TaskRun) postTaskActions() error {
func (c *Config) persist(file string) error {
fmt.Println("Worker ID: " + config.WorkerId)
fmt.Println("Creating file " + file + "...")
jsonBytes, err := json.Marshal(c)
if err != nil {
return err
}
var out bytes.Buffer
json.Indent(&out, jsonBytes, "", " ")
out.WriteRune('\n')
return ioutil.WriteFile(file, out.Bytes(), 0644)
return writeToFileAsJSON(c, file)
}

func convertNilToEmptyString(val interface{}) string {
Expand Down
39 changes: 39 additions & 0 deletions mergeconfig.go
@@ -0,0 +1,39 @@
package main

import (
"encoding/json"

"github.com/peterbourgon/mergemap"
)

func (c *Config) mergeInJSON(data []byte) error {
// This is all HORRIBLE
// but it seems about the only reasonable way to properly merge
// the json schemas such that json objects are recursively merged.
// Steps: convert c to json and then back to a go type, so that
// it is a map[string]interface{} and not a Config type. Get
// the json bytes also into a map[string]interface{} so that
// the two map[string]interface{} objects can be merged. Finally
// convert the merge result to json again so that it can be
// marshaled back into the original Config type... Yuck!
m1 := new(map[string]interface{})
m2 := new(map[string]interface{})
m1bytes, err := json.Marshal(c)
if err != nil {
return err
}
err = json.Unmarshal(m1bytes, m1)
if err != nil {
return err
}
err = json.Unmarshal(data, m2)
if err != nil {
return err
}
merged := mergemap.Merge(*m1, *m2)
mergedBytes, err := json.Marshal(merged)
if err != nil {
return err
}
return json.Unmarshal(mergedBytes, c)
}

0 comments on commit 866f0a2

Please sign in to comment.