Skip to content
This repository has been archived by the owner on Feb 20, 2020. It is now read-only.

Commit

Permalink
Bug 1180187 - Resolve worker-shutdown for spot terminations
Browse files Browse the repository at this point in the history
  • Loading branch information
petemoore committed Mar 14, 2018
1 parent b9e2433 commit 92d7f96
Show file tree
Hide file tree
Showing 8 changed files with 442 additions and 76 deletions.
65 changes: 51 additions & 14 deletions aws.go
Expand Up @@ -6,11 +6,11 @@ import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
"os"
"path/filepath"
"strings"
Expand All @@ -21,9 +21,16 @@ import (
"github.com/taskcluster/taskcluster-client-go/awsprovisioner"
)

var (
// not a const, because in testing we swap this out
EC2MetadataBaseURL = "http://169.254.169.254/latest"
// for querying deploymentId
Provisioner *awsprovisioner.AwsProvisioner
)

func queryUserData() (*UserData, error) {
// http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html#instancedata-user-data-retrieval
resp, _, err := httpbackoff.Get("http://169.254.169.254/latest/user-data")
resp, _, err := httpbackoff.Get(EC2MetadataBaseURL + "/user-data")
if err != nil {
return nil, err
}
Expand All @@ -46,15 +53,19 @@ func queryMetaData(url string) (string, error) {
return string(content), err
}

// taken from https://github.com/taskcluster/aws-provisioner/blob/5a01a94141c38447968ec75232fd86a86cca366a/src/worker-type.js#L601-L615
type UserData struct {
Data interface{} `json:"data"`
Capacity int `json:"capacity"`
WorkerType string `json:"workerType"`
ProvisionerID string `json:"provisionerId"`
Region string `json:"region"`
AvailabilityZone string `json:"availabilityZone"`
InstanceType string `json:"instanceType"`
SpotBid float64 `json:"spotBid"`
Price float64 `json:"price"`
LaunchSpecGenerated time.Time `json:"launchSpecGenerated"`
WorkerModified time.Time `json:"workerModified"`
LastModified time.Time `json:"lastModified"`
ProvisionerBaseURL string `json:"provisionerBaseUrl"`
SecurityToken string `json:"securityToken"`
}
Expand Down Expand Up @@ -92,6 +103,7 @@ func (f File) ExtractFile() error {
if err != nil {
return err
}
log.Printf("Writing %v to path %v", f.Description, f.Path)
return ioutil.WriteFile(f.Path, data, 0777)
default:
return errors.New("Unsupported encoding " + f.Encoding + " for file secret in worker type definition")
Expand All @@ -105,6 +117,7 @@ func (f File) ExtractZip() error {
if err != nil {
return err
}
log.Printf("Unzipping %v to path %v", f.Description, f.Path)
return Unzip(data, f.Path)
default:
return errors.New("Unsupported encoding " + f.Encoding + " for zip secret in worker type definition")
Expand Down Expand Up @@ -178,15 +191,19 @@ func updateConfigWithAmazonSettings(c *gwconfig.Config) error {
return err
}
c.ProvisionerID = userData.ProvisionerID
c.Region = userData.Region
c.ProvisionerBaseURL = userData.ProvisionerBaseURL

awsprov := awsprovisioner.AwsProvisioner{
Authenticate: false,
BaseURL: userData.ProvisionerBaseURL,
}

secToken, getErr := awsprov.GetSecret(userData.SecurityToken)
// remove secrets even if we couldn't retrieve them!
removeErr := awsprov.RemoveSecret(userData.SecurityToken)
if getErr != nil {
return err
return getErr
}
if removeErr != nil {
return removeErr
Expand All @@ -199,13 +216,13 @@ func updateConfigWithAmazonSettings(c *gwconfig.Config) error {

awsMetadata := map[string]interface{}{}
for _, url := range []string{
"http://169.254.169.254/latest/meta-data/ami-id",
"http://169.254.169.254/latest/meta-data/instance-id",
"http://169.254.169.254/latest/meta-data/instance-type",
"http://169.254.169.254/latest/meta-data/public-ipv4",
"http://169.254.169.254/latest/meta-data/placement/availability-zone",
"http://169.254.169.254/latest/meta-data/public-hostname",
"http://169.254.169.254/latest/meta-data/local-ipv4",
EC2MetadataBaseURL + "/meta-data/ami-id",
EC2MetadataBaseURL + "/meta-data/instance-id",
EC2MetadataBaseURL + "/meta-data/instance-type",
EC2MetadataBaseURL + "/meta-data/public-ipv4",
EC2MetadataBaseURL + "/meta-data/placement/availability-zone",
EC2MetadataBaseURL + "/meta-data/public-hostname",
EC2MetadataBaseURL + "/meta-data/local-ipv4",
} {
key := url[strings.LastIndex(url, "/")+1:]
value, err := queryMetaData(url)
Expand All @@ -220,7 +237,7 @@ func updateConfigWithAmazonSettings(c *gwconfig.Config) error {
c.PrivateIP = net.ParseIP(awsMetadata["local-ipv4"].(string))
c.InstanceID = awsMetadata["instance-id"].(string)
c.InstanceType = awsMetadata["instance-type"].(string)
c.Region = awsMetadata["availability-zone"].(string)
c.AvailabilityZone = awsMetadata["availability-zone"].(string)

secrets := new(Secrets)
err = json.Unmarshal(secToken.Data, secrets)
Expand All @@ -234,8 +251,6 @@ func updateConfigWithAmazonSettings(c *gwconfig.Config) error {
return err
}

fmt.Printf("\n\nConfig\n\n%#v\n\n", c)

// Now put secret files in place...
for _, f := range secrets.Files {
err := f.Extract()
Expand Down Expand Up @@ -276,3 +291,25 @@ func deploymentIDUpdated() bool {
log.Printf("New deploymentId found! %q => %q - therefore shutting down!", config.DeploymentID, c.DeploymentID)
return true
}

func handleWorkerShutdown(abort func()) func() {
// Bug 1180187: poll this url every 5 seconds:
// http://169.254.169.254/latest/meta-data/spot/termination-time
ticker := time.NewTicker(time.Second * 5)
go func() {
for _ = range ticker.C {
resp, err := http.Get(EC2MetadataBaseURL + "/meta-data/spot/termination-time")
// intermittent errors calling this endpoint should be ignored, but can be logged
if err != nil {
log.Printf("WARNING: error when calling AWS EC2 spot termination endpoint: %v", err)
continue
}
defer resp.Body.Close()
if resp.StatusCode == 200 {
abort()
break
}
}
}()
return ticker.Stop
}
170 changes: 170 additions & 0 deletions aws_helper_test.go
@@ -0,0 +1,170 @@
package main

import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"path/filepath"
"testing"
"time"

"github.com/taskcluster/slugid-go/slugid"
)

type MockAWSProvisionedEnvironment struct {
SecretFiles []map[string]string
Terminating bool
PretendMetadata string
TestName string
}

func WriteJSON(t *testing.T, w http.ResponseWriter, resp interface{}) {
bytes, err := json.MarshalIndent(resp, "", " ")
if err != nil {
t.Fatalf("Strange - I can't convert %#v to json: %v", resp, err)
}
w.Write(bytes)
}

func (m *MockAWSProvisionedEnvironment) Setup(t *testing.T) func() {
teardown := setupEnvironment(t, m.TestName)
workerType := slugid.Nice()
configureForAws = true
oldEC2MetadataBaseURL := EC2MetadataBaseURL
EC2MetadataBaseURL = "http://localhost:13243/latest"

// Create custom *http.ServeMux rather than using http.DefaultServeMux, so
// registered handler functions won't interfere with future tests that also
// use http.DefaultServeMux.
ec2MetadataHandler := http.NewServeMux()
ec2MetadataHandler.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
switch req.URL.Path {

// simulate provisioner endpoints

case "/provisioner/worker-type/" + workerType:
resp := map[string]interface{}{
"secrets": m.Secrets(t),
}
WriteJSON(t, w, resp)
case "/provisioner/secret/12345":
resp := map[string]interface{}{
"data": m.Secrets(t),
"credentials": map[string]string{
"clientId": os.Getenv("TASKCLUSTER_CLIENT_ID"),
"certificate": os.Getenv("TASKCLUSTER_CERTIFICATE"),
"accessToken": os.Getenv("TASKCLUSTER_ACCESS_TOKEN"),
},
"scopes": []string{},
}
WriteJSON(t, w, resp)

// simulate AWS endpoints

case "/latest/meta-data/ami-id":
fmt.Fprint(w, "test-ami")
case "/latest/meta-data/spot/termination-time":
if m.Terminating {
fmt.Fprint(w, "time to die")
} else {
w.WriteHeader(404)
}
case "/latest/meta-data/placement/availability-zone":
fmt.Fprint(w, "outer-space")
case "/latest/meta-data/instance-type":
fmt.Fprint(w, "p3.teenyweeny")
case "/latest/meta-data/instance-id":
fmt.Fprint(w, "test-instance-id")
case "/latest/meta-data/public-hostname":
fmt.Fprint(w, "MadamaButterfly")
case "/latest/meta-data/local-ipv4":
fmt.Fprint(w, "87.65.43.21")
case "/latest/meta-data/public-ipv4":
fmt.Fprint(w, "12.34.56.78")
case "/latest/user-data":
resp := map[string]interface{}{
"data": map[string]interface{}{},
"capacity": 1,
"workerType": workerType,
"provisionerId": "test-provisioner",
"region": "test-worker-group",
"availabilityZone": "neuss-germany",
"instanceType": "p3.teenyweeny",
"spotBid": 3.5,
"price": 3.02,
"launchSpecGenerated": time.Now(),
"lastModified": time.Now().Add(time.Minute * -30),
"provisionerBaseUrl": "http://localhost:13243/provisioner",
"securityToken": "12345",
}
WriteJSON(t, w, resp)
default:
w.WriteHeader(400)
fmt.Fprintf(w, "Cannot serve URL %v", req.URL)
t.Fatalf("Cannot serve URL %v", req.URL)
}
})
s := &http.Server{
Addr: "localhost:13243",
Handler: ec2MetadataHandler,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
}
go func() {
s.ListenAndServe()
t.Log("HTTP server for mock Provisioner and EC2 metadata endpoints stopped")
}()
var err error
config, err = loadConfig(filepath.Join(testdataDir, m.TestName, "generic-worker.config"), true)
if err != nil {
t.Fatalf("Error: %v", err)
}
return func() {
teardown()
err := s.Shutdown(context.Background())
if err != nil {
t.Fatalf("Error shutting down http server: %v", err)
}
EC2MetadataBaseURL = oldEC2MetadataBaseURL
configureForAws = false
}
}

func (m *MockAWSProvisionedEnvironment) Secrets(t *testing.T) interface{} {

gwConfig := map[string]interface{}{
"config": map[string]interface{}{
// Need common caches directory across tests, since files
// directory-caches.json and file-caches.json are not per-test.
"cachesDir": filepath.Join(cwd, "caches"),
"cleanUpTaskDirs": false,
"deploymentId": "sdkfjh4zxmnf",
"disableReboots": true,
// Need common downloads directory across tests, since files
// directory-caches.json and file-caches.json are not per-test.
"downloadsDir": filepath.Join(cwd, "downloads"),
"idleTimeoutSecs": 60,
"livelogSecret": "I have to confess, when me and my friends sort of used to run through the fields of wheat, um, the farmers weren't too pleased about that.",
"numberOfTasksToRun": 1,
"sentryProject": "generic-worker-tests",
"shutdownMachineOnIdle": false,
"shutdownMachineOnInternalError": false,
"signingKeyLocation": filepath.Join(testdataDir, "private-opengpg-key"),
"subdomain": "taskcluster-worker.net",
"tasksDir": filepath.Join(testdataDir, m.TestName),
"workerTypeMetadata": map[string]interface{}{
"machine-setup": map[string]string{
"pretend-metadata": m.PretendMetadata,
},
},
},
}

return map[string]interface{}{
"files": m.SecretFiles,
"generic-worker": gwConfig,
}
}

0 comments on commit 92d7f96

Please sign in to comment.