Skip to content
This repository has been archived by the owner on Feb 20, 2020. It is now read-only.

Commit

Permalink
Bug 1428422 - use updated taskcluster-client-go in generic-worker
Browse files Browse the repository at this point in the history
  • Loading branch information
petemoore committed Jan 28, 2019
1 parent d135c0f commit 9cd6dd8
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 46 deletions.
11 changes: 6 additions & 5 deletions aws.go
Expand Up @@ -195,11 +195,13 @@ func updateConfigWithAmazonSettings(c *gwconfig.Config) error {
c.ProvisionerID = userData.ProvisionerID
c.Region = userData.Region
c.ProvisionerBaseURL = userData.ProvisionerBaseURL
c.RootURL = userData.TaskclusterRootURL

awsprov := tcawsprovisioner.AwsProvisioner{
Authenticate: false,
BaseURL: userData.ProvisionerBaseURL,
}
// We need an AWS Provisioner client for fetching taskcluster credentials.
// Ensure auth is disabled in client, since we don't have credentials yet.
awsprov := c.AWSProvisioner()
awsprov.Authenticate = false
awsprov.Credentials = nil

secToken, getErr := awsprov.GetSecret(userData.SecurityToken)
// remove secrets even if we couldn't retrieve them!
Expand All @@ -214,7 +216,6 @@ func updateConfigWithAmazonSettings(c *gwconfig.Config) error {
c.AccessToken = secToken.Credentials.AccessToken
c.Certificate = secToken.Credentials.Certificate
c.ClientID = secToken.Credentials.ClientID
c.RootURL = userData.TaskclusterRootURL
c.WorkerGroup = userData.Region
c.WorkerType = userData.WorkerType

Expand Down
2 changes: 1 addition & 1 deletion aws_helper_test.go
Expand Up @@ -93,7 +93,7 @@ func (m *MockAWSProvisionedEnvironment) Setup(t *testing.T) func() {
"instanceType": "p3.teenyweeny",
"spotBid": 3.5,
"price": 3.02,
"taskclusterRootUrl": "http://localhost:13243",
"taskclusterRootUrl": os.Getenv("TASKCLUSTER_ROOT_URL"), // don't use tcclient.RootURLFromEnvVars() since we don't want ClientID of CI
"launchSpecGenerated": time.Now(),
"lastModified": time.Now().Add(time.Minute * -30),
"provisionerBaseUrl": "http://localhost:13243/provisioner",
Expand Down
50 changes: 50 additions & 0 deletions gwconfig/config.go
Expand Up @@ -9,6 +9,11 @@ import (
"runtime"

"github.com/taskcluster/generic-worker/fileutil"
tcclient "github.com/taskcluster/taskcluster-client-go"
"github.com/taskcluster/taskcluster-client-go/tcauth"
"github.com/taskcluster/taskcluster-client-go/tcawsprovisioner"
"github.com/taskcluster/taskcluster-client-go/tcpurgecache"
"github.com/taskcluster/taskcluster-client-go/tcqueue"
)

type (
Expand Down Expand Up @@ -127,3 +132,48 @@ func (c *Config) Validate() error {
func (err MissingConfigError) Error() string {
return "Config setting \"" + err.Setting + "\" has not been defined"
}

func (c *Config) Credentials() *tcclient.Credentials {
return &tcclient.Credentials{
AccessToken: c.AccessToken,
ClientID: c.ClientID,
Certificate: c.Certificate,
}
}

func (c *Config) Auth() *tcauth.Auth {
auth := tcauth.New(c.Credentials(), c.RootURL)
// If authBaseURL provided, it should take precedence over rootURL
if c.AuthBaseURL != "" {
auth.BaseURL = c.AuthBaseURL
}
return auth
}

func (c *Config) Queue() *tcqueue.Queue {
queue := tcqueue.New(c.Credentials(), c.RootURL)
// If queueBaseURL provided, it should take precedence over rootURL
if c.QueueBaseURL != "" {
queue.BaseURL = c.QueueBaseURL
}
return queue
}

func (c *Config) AWSProvisioner() *tcawsprovisioner.AwsProvisioner {
awsProvisioner := tcawsprovisioner.New(c.Credentials())
awsProvisioner.BaseURL = tcclient.BaseURL(c.RootURL, "aws-provisioner", "v1")
// If provisionerBaseURL provided, it should take precedence over rootURL
if c.ProvisionerBaseURL != "" {
awsProvisioner.BaseURL = c.ProvisionerBaseURL
}
return awsProvisioner
}

func (c *Config) PurgeCache() *tcpurgecache.PurgeCache {
purgeCache := tcpurgecache.New(c.Credentials(), c.RootURL)
// If purgeCacheBaseURL provided, it should take precedence over rootURL
if c.PurgeCacheBaseURL != "" {
purgeCache.BaseURL = c.PurgeCacheBaseURL
}
return purgeCache
}
12 changes: 6 additions & 6 deletions helper_test.go
Expand Up @@ -23,8 +23,6 @@ import (
"github.com/taskcluster/httpbackoff"
"github.com/taskcluster/slugid-go/slugid"
tcclient "github.com/taskcluster/taskcluster-client-go"
"github.com/taskcluster/taskcluster-client-go/tcauth"
"github.com/taskcluster/taskcluster-client-go/tcpurgecache"
"github.com/taskcluster/taskcluster-client-go/tcqueue"
)

Expand Down Expand Up @@ -98,7 +96,7 @@ func setup(t *testing.T) (teardown func()) {
testDir := filepath.Join(testdataDir, t.Name())
config = &gwconfig.Config{
AccessToken: os.Getenv("TASKCLUSTER_ACCESS_TOKEN"),
AuthBaseURL: tcauth.DefaultBaseURL,
AuthBaseURL: "",
AvailabilityZone: "outer-space",
// Need common caches directory across tests, since files
// directory-caches.json and file-caches.json are not per-test.
Expand Down Expand Up @@ -126,8 +124,8 @@ func setup(t *testing.T) (teardown func()) {
ProvisionerBaseURL: "",
ProvisionerID: "test-provisioner",
PublicIP: net.ParseIP("12.34.56.78"),
PurgeCacheBaseURL: tcpurgecache.DefaultBaseURL,
QueueBaseURL: tcqueue.DefaultBaseURL,
PurgeCacheBaseURL: "",
QueueBaseURL: "",
Region: "test-worker-group",
RootURL: os.Getenv("TASKCLUSTER_ROOT_URL"),
// should be enough for tests, and travis-ci.org CI environments don't
Expand Down Expand Up @@ -178,7 +176,9 @@ func NewQueue(t *testing.T) *tcqueue.Queue {
os.Getenv("TASKCLUSTER_ROOT_URL") == "" {
t.Skip("Skipping test since TASKCLUSTER_{CLIENT_ID,ACCESS_TOKEN,ROOT_URL} env vars not set")
}
return tcqueue.NewFromEnv()
// BaseURL shouldn't be proxy otherwise requests will use CI clientId
// rather than env var TASKCLUSTER_CLIENT_ID
return tcqueue.New(tcclient.CredentialsFromEnvVars(), os.Getenv("TASKCLUSTER_ROOT_URL"))
}

func scheduleTask(t *testing.T, td *tcqueue.TaskDefinitionRequest, payload GenericWorkerPayload) (taskID string) {
Expand Down
28 changes: 11 additions & 17 deletions main.go
Expand Up @@ -23,9 +23,6 @@ import (
"github.com/taskcluster/generic-worker/process"
"github.com/taskcluster/taskcluster-base-go/scopes"
tcclient "github.com/taskcluster/taskcluster-client-go"
"github.com/taskcluster/taskcluster-client-go/tcauth"
"github.com/taskcluster/taskcluster-client-go/tcawsprovisioner"
"github.com/taskcluster/taskcluster-client-go/tcpurgecache"
"github.com/taskcluster/taskcluster-client-go/tcqueue"
"github.com/xeipuuv/gojsonschema"
)
Expand Down Expand Up @@ -474,7 +471,7 @@ func loadConfig(filename string, queryUserData bool) (*gwconfig.Config, error) {

// first assign defaults
c := &gwconfig.Config{
AuthBaseURL: tcauth.DefaultBaseURL,
AuthBaseURL: "",
CachesDir: "caches",
CheckForNewDeploymentEverySecs: 1800,
CleanUpTaskDirs: true,
Expand All @@ -487,8 +484,8 @@ func loadConfig(filename string, queryUserData bool) (*gwconfig.Config, error) {
NumberOfTasksToRun: 0,
ProvisionerBaseURL: "",
ProvisionerID: "test-provisioner",
PurgeCacheBaseURL: tcpurgecache.DefaultBaseURL,
QueueBaseURL: tcqueue.DefaultBaseURL,
PurgeCacheBaseURL: "",
QueueBaseURL: "",
RequiredDiskSpaceMegabytes: 10240,
RootURL: "",
RunAfterUserCreation: "",
Expand Down Expand Up @@ -618,16 +615,9 @@ func RunWorker() (exitCode ExitCode) {
log.Printf("OH NO!!!\n\n%#v", err)
panic(err)
}
creds := &tcclient.Credentials{
ClientID: config.ClientID,
AccessToken: config.AccessToken,
Certificate: config.Certificate,
}
// Queue is the object we will use for accessing queue api
queue = tcqueue.New(creds)
queue.BaseURL = config.QueueBaseURL
provisioner = tcawsprovisioner.New(creds)
provisioner.BaseURL = config.ProvisionerBaseURL
queue = config.Queue()
provisioner = config.AWSProvisioner()

err = initialiseFeatures()
if err != nil {
Expand Down Expand Up @@ -787,8 +777,12 @@ func ClaimWork() *TaskRun {
AccessToken: taskResponse.Credentials.AccessToken,
Certificate: taskResponse.Credentials.Certificate,
},
config.RootURL,
)
taskQueue.BaseURL = config.QueueBaseURL
// if queueBaseURL is configured, this takes precedence over rootURL
if config.QueueBaseURL != "" {
taskQueue.BaseURL = config.QueueBaseURL
}
task := &TaskRun{
TaskID: taskResponse.Status.TaskID,
RunID: uint(taskResponse.RunID),
Expand Down Expand Up @@ -1166,7 +1160,7 @@ func (task *TaskRun) Run() (err *ExecutionErrors) {
log.Printf("Creating task feature %v...", feature.Name())
taskFeature := feature.NewTaskFeature(task)
requiredScopes := taskFeature.RequiredScopes()
scopesSatisfied, scopeValidationErr := scopes.Given(task.Definition.Scopes).Satisfies(requiredScopes, tcauth.New(nil))
scopesSatisfied, scopeValidationErr := scopes.Given(task.Definition.Scopes).Satisfies(requiredScopes, config.Auth())
if scopeValidationErr != nil {
// presumably we couldn't expand assume:* scopes due to auth
// service unavailability
Expand Down
5 changes: 2 additions & 3 deletions mounts.go
Expand Up @@ -151,8 +151,7 @@ func (cm *CacheMap) LoadFromFile(stateFile string, cacheDir string) {
func (feature *MountsFeature) Initialise() error {
fileCaches.LoadFromFile("file-caches.json", config.CachesDir)
directoryCaches.LoadFromFile("directory-caches.json", config.DownloadsDir)
pc = tcpurgecache.New(nil)
pc.BaseURL = config.PurgeCacheBaseURL
pc = config.PurgeCache()
return nil
}

Expand Down Expand Up @@ -843,7 +842,7 @@ func (bc *Base64Content) Download(task *TaskRun) (file string, sha256 string, er
}

func (bc *Base64Content) String() string {
return "Base64 (" + bc.Base64 + ")"
return "Base64 (" + bc.Base64 + ")"
}

func (bc *Base64Content) UniqueKey() string {
Expand Down
11 changes: 1 addition & 10 deletions sentry.go
Expand Up @@ -6,23 +6,14 @@ import (
"strconv"

raven "github.com/getsentry/raven-go"
tcclient "github.com/taskcluster/taskcluster-client-go"
"github.com/taskcluster/taskcluster-client-go/tcauth"
)

func ReportCrashToSentry(r interface{}) {
if config.SentryProject == "" {
log.Println("No sentry project defined, not reporting to sentry")
return
}
Auth := tcauth.New(
&tcclient.Credentials{
ClientID: config.ClientID,
AccessToken: config.AccessToken,
Certificate: config.Certificate,
},
)
Auth.BaseURL = config.AuthBaseURL
Auth := config.Auth()
res, err := Auth.SentryDSN(config.SentryProject)
if err != nil {
log.Printf("WARNING: Could not get sentry DSN: %v", err)
Expand Down
6 changes: 3 additions & 3 deletions taskstatus.go
Expand Up @@ -7,7 +7,7 @@ import (
"sync"
"time"

"github.com/taskcluster/taskcluster-client-go"
tcclient "github.com/taskcluster/taskcluster-client-go"
"github.com/taskcluster/taskcluster-client-go/tcqueue"
)

Expand Down Expand Up @@ -167,11 +167,11 @@ func (tsm *TaskStatusManager) reclaim() error {

task.TaskReclaimResponse = *tcrsp
task.queueMux.Lock()
task.Queue = tcqueue.New(&tcclient.Credentials{
task.Queue.Credentials = &tcclient.Credentials{
ClientID: tcrsp.Credentials.ClientID,
AccessToken: tcrsp.Credentials.AccessToken,
Certificate: tcrsp.Credentials.Certificate,
})
}
task.queueMux.Unlock()
tsm.status = tcrsp.Status
tsm.takenUntil = tcrsp.TakenUntil
Expand Down
3 changes: 2 additions & 1 deletion testdata/resolvetask.go
Expand Up @@ -4,11 +4,12 @@ import (
"log"
"os"

tcclient "github.com/taskcluster/taskcluster-client-go"
"github.com/taskcluster/taskcluster-client-go/tcqueue"
)

func main() {
queue := tcqueue.NewFromEnv()
queue := tcqueue.New(tcclient.CredentialsFromEnvVars(), os.Getenv("TASKCLUSTER_ROOT_URL"))
taskID := os.Getenv("TASK_ID")
_, err := queue.CancelTask(taskID)
if err != nil {
Expand Down

0 comments on commit 9cd6dd8

Please sign in to comment.