Skip to content
This repository has been archived by the owner on Feb 27, 2020. It is now read-only.

Commit

Permalink
Task Context should contain a reference to a taskcluster Queue Client
Browse files Browse the repository at this point in the history
  • Loading branch information
gregarndt committed Mar 21, 2016
1 parent a56a068 commit a8ddf4f
Show file tree
Hide file tree
Showing 15 changed files with 442 additions and 235 deletions.
30 changes: 30 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,17 @@ type (
ExpirationOffset int `json:"expirationOffset"`
} `json:"queueService"`

// Properties related to the TaskCluster Platform
Taskcluster struct {

// Properties defining interaction with the TaskCluster queue
Queue struct {

// Base URL for TaskCluster queue
URL string `json:"url"`
} `json:"queue"`
} `json:"taskcluster"`

// A logical group for this worker to belong to, such as an AWS region.
WorkerGroup string `json:"workerGroup"`

Expand Down Expand Up @@ -120,6 +131,25 @@ var ConfigSchema = func() runtime.CompositeSchema {
"title": "QueueService",
"type": "object"
},
"taskcluster": {
"description": "Properties related to the TaskCluster Platform",
"properties": {
"queue": {
"description": "Properties defining interaction with the TaskCluster queue",
"properties": {
"url": {
"description": "Base URL for TaskCluster queue",
"title": "BaseUrl",
"type": "string"
}
},
"title": "TaskclusterQueueProperties",
"type": "object"
}
},
"title": "TaskclusterProperties",
"type": "object"
},
"workerGroup": {
"description": "A logical group for this worker to belong to, such as an AWS region.",
"title": "WorkerGroup",
Expand Down
14 changes: 14 additions & 0 deletions config/global-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,20 @@ properties:
title: "PollingInterval"
description: "The amount of time to wait between task polling iterations"
type: "integer"
taskcluster:
title: TaskclusterProperties
description: Properties related to the TaskCluster Platform
type: object
properties:
queue:
title: TaskclusterQueueProperties
description: Properties defining interaction with the TaskCluster queue
type: object
properties:
url:
title: BaseUrl
description: Base URL for TaskCluster queue
type: string
credentials:
title: Credentials
description: |-
Expand Down
3 changes: 2 additions & 1 deletion engines/enginetest/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
"sync"

"github.com/taskcluster/taskcluster-client-go/queue"
"github.com/taskcluster/taskcluster-worker/engines"
"github.com/taskcluster/taskcluster-worker/engines/extpoints"
"github.com/taskcluster/taskcluster-worker/runtime"
Expand Down Expand Up @@ -78,7 +79,7 @@ func (p *EngineProvider) ensureEngine() {
}

func (p *EngineProvider) newTestTaskContext() (*runtime.TaskContext, *runtime.TaskContextController) {
ctx, control, err := runtime.NewTaskContext(p.environment.TemporaryStorage.NewFilePath())
ctx, control, err := runtime.NewTaskContext(p.environment.TemporaryStorage.NewFilePath(), &queue.TaskClaimResponse{})
nilOrPanic(err, "Failed to create new TaskContext")
return ctx, control
}
Expand Down
3 changes: 2 additions & 1 deletion plugins/plugintest/plugintest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"regexp"
rt "runtime"

"github.com/taskcluster/taskcluster-client-go/queue"
"github.com/taskcluster/taskcluster-worker/engines"
"github.com/taskcluster/taskcluster-worker/engines/extpoints"
// Ensure we load the mock engine
Expand Down Expand Up @@ -69,7 +70,7 @@ func (c Case) Test() {
Log: runtimeEnvironment.Log.WithField("engine", "mock"),
// TODO: Add engine config
})
context, controller, err := runtime.NewTaskContext(runtimeEnvironment.TemporaryStorage.NewFilePath())
context, controller, err := runtime.NewTaskContext(runtimeEnvironment.TemporaryStorage.NewFilePath(), &queue.TaskClaimResponse{})
sandboxBuilder, err := engine.NewSandboxBuilder(engines.SandboxOptions{
TaskContext: context,
Payload: parseEnginePayload(engine, c.Payload),
Expand Down
1 change: 1 addition & 0 deletions runtime/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ type Environment struct {
Log *logrus.Logger
Sentry *raven.Client
WebHookServer webhookserver.WebHookServer
QueueBaseURL string
}
21 changes: 20 additions & 1 deletion worker/mock_test.go → runtime/queue.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,30 @@
package worker
package runtime

import (
"github.com/stretchr/testify/mock"
"github.com/taskcluster/taskcluster-client-go/queue"
"github.com/taskcluster/taskcluster-client-go/tcclient"
)

// QueueClient is an interface to the Queue client provided by the
// taskcluster-client-go package. Passing around an interface allows the
// queue client to be mocked
type QueueClient interface {
ReportCompleted(string, string) (*queue.TaskStatusResponse, *tcclient.CallSummary, error)
ReportException(string, string, *queue.TaskExceptionRequest) (*queue.TaskStatusResponse, *tcclient.CallSummary, error)
ReportFailed(string, string) (*queue.TaskStatusResponse, *tcclient.CallSummary, error)
ClaimTask(string, string, *queue.TaskClaimRequest) (*queue.TaskClaimResponse, *tcclient.CallSummary, error)
ReclaimTask(string, string) (*queue.TaskReclaimResponse, *tcclient.CallSummary, error)
PollTaskUrls(string, string) (*queue.PollTaskUrlsResponse, *tcclient.CallSummary, error)
CancelTask(string) (*queue.TaskStatusResponse, *tcclient.CallSummary, error)
}

// MockQueue is a mocked TaskCluster queue client. Calls to methods exposed by the queue
// client will be recorded for assertion later and will respond with the data
// that was specified during creation of the mocked object.
//
// For more information about each of these client methods, consult the
// taskcluster-clieng-go/queue package
type MockQueue struct {
mock.Mock
}
Expand Down
34 changes: 32 additions & 2 deletions runtime/taskcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"sync"

"github.com/taskcluster/taskcluster-client-go/queue"
"github.com/taskcluster/taskcluster-worker/runtime/webhookserver"

"gopkg.in/djherbis/stream.v1"
Expand Down Expand Up @@ -39,8 +40,14 @@ const (
)

// The TaskInfo struct exposes generic properties from a task definition.
//
// Note, do not be tempted to add task definition or status here in its entirety
// as it can encourage undesired behaviors. Instead only the data necessary
// should be exposed and nothing more. One such anti-pattern could be for a
// plugin to look at task.extra instead of adding data to task.payload.
type TaskInfo struct {
// TODO: Add fields and getters to get them
TaskID string
RunID int
}

// The TaskContext exposes generic properties and functionality related to a
Expand All @@ -55,6 +62,7 @@ type TaskContext struct {
webHookSet *webhookserver.WebHookSet
logStream *stream.Stream
mu sync.RWMutex
queue QueueClient
status TaskStatus
cancelled bool
}
Expand All @@ -68,13 +76,17 @@ type TaskContextController struct {
}

// NewTaskContext creates a TaskContext and associated TaskContextController
func NewTaskContext(tempLogFile string) (*TaskContext, *TaskContextController, error) {
func NewTaskContext(tempLogFile string, claim *queue.TaskClaimResponse) (*TaskContext, *TaskContextController, error) {
logStream, err := stream.New(tempLogFile)
if err != nil {
return nil, nil, err
}
ctx := &TaskContext{
logStream: logStream,
TaskInfo: TaskInfo{
TaskID: claim.Status.TaskID,
RunID: claim.RunID,
},
}
return ctx, &TaskContextController{ctx}, nil
}
Expand All @@ -89,6 +101,24 @@ func (c *TaskContextController) Dispose() error {
return c.logStream.Remove()
}

// GetQueueClient will return a client for the TaskCluster Queue. This client
// is useful for plugins that require interactions with the queue, such as creating
// artifacts.
func (c *TaskContext) GetQueueClient() QueueClient {
c.mu.RLock()
defer c.mu.RUnlock()
return c.queue
}

// CreateQueueClient will create a client for the TaskCluster Queue. This client
// can then be used by others that have access to the task context and require
// interaction with the queue.
func (c *TaskContext) SetQueueClient(client QueueClient) {
c.mu.Lock()
c.queue = client
c.mu.Unlock()
}

// Abort sets the status to aborted
func (c *TaskContext) Abort() {
// TODO (garndt): add abort/cancel channels for plugins to listen on
Expand Down
5 changes: 3 additions & 2 deletions runtime/taskcontext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"github.com/taskcluster/slugid-go/slugid"
"github.com/taskcluster/taskcluster-client-go/queue"
)

func nilOrPanic(err error, a ...interface{}) {
Expand All @@ -21,7 +22,7 @@ func nilOrPanic(err error, a ...interface{}) {
func TestTaskContextLogging(t *testing.T) {
t.Parallel()
path := filepath.Join(os.TempDir(), slugid.V4())
context, control, err := NewTaskContext(path)
context, control, err := NewTaskContext(path, &queue.TaskClaimResponse{})
nilOrPanic(err, "Failed to create context")

context.Log("Hello World")
Expand All @@ -44,7 +45,7 @@ func TestTaskContextLogging(t *testing.T) {
func TestTaskContextConcurrentLogging(t *testing.T) {
t.Parallel()
path := filepath.Join(os.TempDir(), slugid.V4())
context, control, err := NewTaskContext(path)
context, control, err := NewTaskContext(path, &queue.TaskClaimResponse{})
nilOrPanic(err, "Failed to create context")

wg := sync.WaitGroup{}
Expand Down
19 changes: 2 additions & 17 deletions worker/queueservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import (

logrus "github.com/Sirupsen/logrus"
"github.com/taskcluster/httpbackoff"
tcqueue "github.com/taskcluster/taskcluster-client-go/queue"
"github.com/taskcluster/taskcluster-client-go/tcclient"
"github.com/taskcluster/taskcluster-worker/runtime"
"github.com/taskcluster/taskcluster-worker/runtime/atomics"
)

Expand Down Expand Up @@ -51,21 +51,6 @@ type (
signedDeleteURL string
}

// TaskclusterQueue is an interface to the Queue client provided by the
// taskcluster-client-go package. Passing around an interface allows the
// queue client to be mocked
// TODO (garndt): move out of the worker package to something more
// appropriate like the task context or runtime.
queueClient interface {
ReportCompleted(string, string) (*tcqueue.TaskStatusResponse, *tcclient.CallSummary, error)
ReportException(string, string, *tcqueue.TaskExceptionRequest) (*tcqueue.TaskStatusResponse, *tcclient.CallSummary, error)
ReportFailed(string, string) (*tcqueue.TaskStatusResponse, *tcclient.CallSummary, error)
ClaimTask(string, string, *tcqueue.TaskClaimRequest) (*tcqueue.TaskClaimResponse, *tcclient.CallSummary, error)
ReclaimTask(string, string) (*tcqueue.TaskReclaimResponse, *tcclient.CallSummary, error)
PollTaskUrls(string, string) (*tcqueue.PollTaskUrlsResponse, *tcclient.CallSummary, error)
CancelTask(string) (*tcqueue.TaskStatusResponse, *tcclient.CallSummary, error)
}

// QueueService is an interface describing the methods responsible for claiming
// work from Azure queues.
QueueService interface {
Expand All @@ -82,7 +67,7 @@ type (
queues []messageQueue
expires tcclient.Time
expirationOffset int
client queueClient
client runtime.QueueClient
provisionerID string
workerType string
workerID string
Expand Down
10 changes: 5 additions & 5 deletions worker/queueservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ var WorkerID = fmt.Sprintf("dummy-worker-%s", slugid.Nice())

func TestRetrievePollTaskUrls(t *testing.T) {
logger, _ := runtime.CreateLogger("")
mockedQueue := &MockQueue{}
mockedQueue := &runtime.MockQueue{}
service := queueService{
client: mockedQueue,
provisionerID: ProvisionerID,
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestRetrievePollTaskUrls(t *testing.T) {

func TestRetrievePollTaskUrlsErrorCaught(t *testing.T) {
logger, _ := runtime.CreateLogger("")
mockedQueue := &MockQueue{}
mockedQueue := &runtime.MockQueue{}
service := queueService{
client: mockedQueue,
provisionerID: ProvisionerID,
Expand Down Expand Up @@ -564,7 +564,7 @@ func TestClaimTask(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(handler))
defer s.Close()

mockedQueue := &MockQueue{}
mockedQueue := &runtime.MockQueue{}
mockedQueue.On(
"ClaimTask",
"abc",
Expand Down Expand Up @@ -631,7 +631,7 @@ func TestClaimTaskError(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(handler))
defer s.Close()

mockedQueue := &MockQueue{}
mockedQueue := &runtime.MockQueue{}
mockedQueue.On(
"ClaimTask",
"abc",
Expand Down Expand Up @@ -675,7 +675,7 @@ func TestClaimTasks(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(handler))
defer s.Close()

mockedQueue := &MockQueue{}
mockedQueue := &runtime.MockQueue{}
mockedQueue.On(
"ClaimTask",
"abc",
Expand Down
Loading

0 comments on commit a8ddf4f

Please sign in to comment.