diff --git a/codegen/go-composite-schema/main.go b/codegen/go-composite-schema/main.go index 5b8cfa29..9f924a9c 100644 --- a/codegen/go-composite-schema/main.go +++ b/codegen/go-composite-schema/main.go @@ -140,11 +140,11 @@ func generateFunctions(ymlFile, goType, schemaProperty string, req bool) string log.Fatalf("ERROR: Problem reading from file '%v' - %s", ymlFile, err) } // json is valid YAML, so we can safely convert, even if it is already json - rawJson, err := yaml.YAMLToJSON(data) + rawJSON, err := yaml.YAMLToJSON(data) if err != nil { log.Fatalf("ERROR: Problem converting file '%v' to json format - %s", ymlFile, err) } - rawJson, err = jsontest.FormatJson(rawJson) + rawJSON, err = jsontest.FormatJson(rawJSON) if err != nil { log.Fatalf("ERROR: Problem pretty printing json in '%v' - %s", ymlFile, err) } @@ -152,8 +152,8 @@ func generateFunctions(ymlFile, goType, schemaProperty string, req bool) string result += "\tschema, err := runtime.NewCompositeSchema(\n" result += "\t\t\"" + schemaProperty + "\",\n" result += "\t\t`\n" - // the following strings.Replace function call safely escapes backticks (`) in rawJson - result += strings.Replace(text.Indent(fmt.Sprintf("%v", string(rawJson)), "\t\t")+"\n", "`", "` + \"`\" + `", -1) + // the following strings.Replace function call safely escapes backticks (`) in rawJSON + result += strings.Replace(text.Indent(fmt.Sprintf("%v", string(rawJSON)), "\t\t")+"\n", "`", "` + \"`\" + `", -1) result += "\t\t`,\n" result += fmt.Sprintf("\t\t%t,\n", req) result += "\t\tfunc() interface{} {\n" diff --git a/engines/doc.go b/engines/doc.go index a53384ca..2a54373f 100644 --- a/engines/doc.go +++ b/engines/doc.go @@ -1,4 +1,4 @@ -// Package engine specfies the interfaces that each engine must implement. +// Package engines specfies the interfaces that each engine must implement. // // This is all rooted at the EngineProvider interface specified in the extpoints // package, where implementors should register their engine. diff --git a/engines/docker/docker.go b/engines/docker/docker.go index 63f6d0ac..651f883e 100644 --- a/engines/docker/docker.go +++ b/engines/docker/docker.go @@ -1,4 +1,4 @@ //go:generate go-composite-schema --unexported --required config config-schema.yml generated_configschema.go -// comments +// Package docker is the engine for executing tasks within a docker environment. package docker diff --git a/engines/enginetest/artifacts.go b/engines/enginetest/artifacts.go index b3d779cb..931d5571 100644 --- a/engines/enginetest/artifacts.go +++ b/engines/enginetest/artifacts.go @@ -14,8 +14,7 @@ import ( // The ArtifactTestCase contains information sufficient to test artifact // extration from an engine. type ArtifactTestCase struct { - engineProvider - Engine string + EngineProvider // Text to search for in files Text string // Path of a file containing the Text string above @@ -35,7 +34,7 @@ type ArtifactTestCase struct { // TestExtractTextFile checks that TextFilePath contains Text func (c *ArtifactTestCase) TestExtractTextFile() { - r := c.NewRun(c.Engine) + r := c.newRun() defer r.Dispose() r.NewSandboxBuilder(c.Payload) assert(r.buildRunSandbox(), "Task failed to run, payload: ", c.Payload) @@ -52,7 +51,7 @@ func (c *ArtifactTestCase) TestExtractTextFile() { // TestExtractFileNotFound checks that FileNotFoundPath returns // ErrResourceNotFound func (c *ArtifactTestCase) TestExtractFileNotFound() { - r := c.NewRun(c.Engine) + r := c.newRun() defer r.Dispose() r.NewSandboxBuilder(c.Payload) assert(r.buildRunSandbox(), "Task failed to run, payload: ", c.Payload) @@ -65,7 +64,7 @@ func (c *ArtifactTestCase) TestExtractFileNotFound() { // TestExtractFolderNotFound checks that FolderNotFoundPath returns // ErrResourceNotFound func (c *ArtifactTestCase) TestExtractFolderNotFound() { - r := c.NewRun(c.Engine) + r := c.newRun() defer r.Dispose() r.NewSandboxBuilder(c.Payload) assert(r.buildRunSandbox(), "Task failed to run, payload: ", c.Payload) @@ -82,7 +81,7 @@ func (c *ArtifactTestCase) TestExtractFolderNotFound() { // TestExtractNestedFolderPath checks FolderNotFoundPath contains files // NestedFolderFiles func (c *ArtifactTestCase) TestExtractNestedFolderPath() { - r := c.NewRun(c.Engine) + r := c.newRun() defer r.Dispose() r.NewSandboxBuilder(c.Payload) assert(r.buildRunSandbox(), "Task failed to run, payload: ", c.Payload) @@ -132,7 +131,7 @@ func (c *ArtifactTestCase) TestExtractNestedFolderPath() { // TestExtractFolderHandlerInterrupt checks that errors in handler given to // ExtractFolder causes ErrHandlerInterrupt func (c *ArtifactTestCase) TestExtractFolderHandlerInterrupt() { - r := c.NewRun(c.Engine) + r := c.newRun() defer r.Dispose() r.NewSandboxBuilder(c.Payload) assert(r.buildRunSandbox(), "Task failed to run, payload: ", c.Payload) @@ -148,7 +147,7 @@ func (c *ArtifactTestCase) TestExtractFolderHandlerInterrupt() { // Test runs all test cases in parallel func (c *ArtifactTestCase) Test() { - c.ensureEngine(c.Engine) + c.ensureEngine() wg := sync.WaitGroup{} wg.Add(5) go func() { c.TestExtractTextFile(); wg.Done() }() diff --git a/engines/enginetest/attachproxy.go b/engines/enginetest/attachproxy.go index ec6c36d5..e10b4341 100644 --- a/engines/enginetest/attachproxy.go +++ b/engines/enginetest/attachproxy.go @@ -10,9 +10,7 @@ import ( // A ProxyTestCase holds information necessary to run tests that an engine // can attach proxies, call them and forward calls correctly type ProxyTestCase struct { - engineProvider - // Name of engine - Engine string + EngineProvider // A valid name for a proxy attachment ProxyName string // A task.payload as accepted by the engine, which will write "Pinging" @@ -25,7 +23,7 @@ type ProxyTestCase struct { // TestPingProxyPayload checks that PingProxyPayload works as defined func (c *ProxyTestCase) TestPingProxyPayload() { - r := c.NewRun(c.Engine) + r := c.newRun() defer r.Dispose() r.NewSandboxBuilder(c.PingProxyPayload) @@ -58,7 +56,7 @@ func (c *ProxyTestCase) TestPingProxyPayload() { // TestPing404IsUnsuccessful checks that 404 returns unsuccessful func (c *ProxyTestCase) TestPing404IsUnsuccessful() { - r := c.NewRun(c.Engine) + r := c.newRun() defer r.Dispose() r.NewSandboxBuilder(c.PingProxyPayload) @@ -88,7 +86,7 @@ func (c *ProxyTestCase) TestPing404IsUnsuccessful() { // TestLiveLogging checks that "Pinging" is readable from log before the task // is finished. func (c *ProxyTestCase) TestLiveLogging() { - r := c.NewRun(c.Engine) + r := c.newRun() defer r.Dispose() r.NewSandboxBuilder(c.PingProxyPayload) @@ -145,7 +143,7 @@ func (c *ProxyTestCase) TestParallelPings() { // Test runs all tests for the ProxyTestCase is parallel func (c *ProxyTestCase) Test() { - c.ensureEngine(c.Engine) + c.ensureEngine() wg := sync.WaitGroup{} wg.Add(4) go func() { c.TestPingProxyPayload(); wg.Done() }() diff --git a/engines/enginetest/attachvolume.go b/engines/enginetest/attachvolume.go index 2cd2d6e1..1754a0c9 100644 --- a/engines/enginetest/attachvolume.go +++ b/engines/enginetest/attachvolume.go @@ -11,9 +11,7 @@ import ( // A VolumeTestCase holds information necessary to run tests that an engine // can create volumes, mount and read/write to volumes. type VolumeTestCase struct { - engineProvider - // Name of engine - Engine string + EngineProvider // A valid mountpoint Mountpoint string // A task.payload as accepted by the engine, which will write something to the @@ -42,7 +40,7 @@ func (c *VolumeTestCase) writeVolume(volume engines.Volume, readOnly bool) bool // Construct SandboxBuilder, Attach volume to sandbox and run it func (c *VolumeTestCase) readVolume(volume engines.Volume, readOnly bool) bool { - r := c.NewRun(c.Engine) + r := c.newRun() defer r.Dispose() r.NewSandboxBuilder(c.CheckVolumePayload) err := r.sandboxBuilder.AttachVolume(c.Mountpoint, volume, readOnly) @@ -52,7 +50,7 @@ func (c *VolumeTestCase) readVolume(volume engines.Volume, readOnly bool) bool { // TestWriteReadVolume tests that we can write and read from a volume func (c *VolumeTestCase) TestWriteReadVolume() { - c.ensureEngine(c.Engine) + c.ensureEngine() volume, err := c.engine.NewCacheFolder() nilOrPanic(err, "Failed to create a new cache folder") defer evalNilOrPanic(volume.Dispose, "Failed to dispose cache folder") @@ -67,7 +65,7 @@ func (c *VolumeTestCase) TestWriteReadVolume() { // TestReadEmptyVolume tests that read from empty volume doesn't work func (c *VolumeTestCase) TestReadEmptyVolume() { - c.ensureEngine(c.Engine) + c.ensureEngine() volume, err := c.engine.NewCacheFolder() nilOrPanic(err, "Failed to create a new cache folder") defer evalNilOrPanic(volume.Dispose, "Failed to dispose cache folder") @@ -79,7 +77,7 @@ func (c *VolumeTestCase) TestReadEmptyVolume() { // TestWriteToReadOnlyVolume tests that write doesn't work to a read-only volume func (c *VolumeTestCase) TestWriteToReadOnlyVolume() { - c.ensureEngine(c.Engine) + c.ensureEngine() volume, err := c.engine.NewCacheFolder() nilOrPanic(err, "Failed to create a new cache folder") defer evalNilOrPanic(volume.Dispose, "Failed to dispose cache folder") @@ -91,7 +89,7 @@ func (c *VolumeTestCase) TestWriteToReadOnlyVolume() { // TestReadToReadOnlyVolume tests that we can read from a read-only volume func (c *VolumeTestCase) TestReadToReadOnlyVolume() { - c.ensureEngine(c.Engine) + c.ensureEngine() volume, err := c.engine.NewCacheFolder() nilOrPanic(err, "Failed to create a new cache folder") defer evalNilOrPanic(volume.Dispose, "Failed to dispose cache folder") @@ -107,7 +105,7 @@ func (c *VolumeTestCase) TestReadToReadOnlyVolume() { // Test runs all tests on the test case. func (c *VolumeTestCase) Test() { - c.ensureEngine(c.Engine) + c.ensureEngine() wg := sync.WaitGroup{} wg.Add(4) go func() { c.TestWriteReadVolume(); wg.Done() }() diff --git a/engines/enginetest/display.go b/engines/enginetest/display.go index f42b5408..15897068 100644 --- a/engines/enginetest/display.go +++ b/engines/enginetest/display.go @@ -5,7 +5,7 @@ import "github.com/taskcluster/taskcluster-worker/engines" // The DisplayTestCase contains information sufficient to test the interactive // display provided by a Sandbox type DisplayTestCase struct { - Engine string + EngineProvider // List of display that should be returned from Sandbox.ListDisplays(), // They will all be opened to ensure that they are in fact VNC connections. Displays []engines.Display diff --git a/engines/enginetest/environmentvariable.go b/engines/enginetest/environmentvariable.go index e43698d2..d296d93a 100644 --- a/engines/enginetest/environmentvariable.go +++ b/engines/enginetest/environmentvariable.go @@ -9,9 +9,7 @@ import ( // The EnvVarTestCase contains information sufficient to setting an environment // variable. type EnvVarTestCase struct { - engineProvider - // Name of engine - Engine string + EngineProvider // Valid name for an environment variable. VariableName string // Invalid environment variable names. @@ -22,7 +20,7 @@ type EnvVarTestCase struct { // TestPrintVariable checks that variable value can be printed func (c *EnvVarTestCase) TestPrintVariable() { - r := c.NewRun(c.Engine) + r := c.newRun() defer r.Dispose() r.NewSandboxBuilder(c.Payload) err := r.sandboxBuilder.SetEnvironmentVariable(c.VariableName, "Hello World") @@ -33,7 +31,7 @@ func (c *EnvVarTestCase) TestPrintVariable() { // TestVariableNameConflict checks that variable name can't conflict func (c *EnvVarTestCase) TestVariableNameConflict() { - r := c.NewRun(c.Engine) + r := c.newRun() defer r.Dispose() r.NewSandboxBuilder(c.Payload) err := r.sandboxBuilder.SetEnvironmentVariable(c.VariableName, "Hello World") @@ -46,7 +44,7 @@ func (c *EnvVarTestCase) TestVariableNameConflict() { // TestInvalidVariableNames checks that invalid variables returns correct error func (c *EnvVarTestCase) TestInvalidVariableNames() { - r := c.NewRun(c.Engine) + r := c.newRun() defer r.Dispose() r.NewSandboxBuilder(c.Payload) for _, name := range c.InvalidVariableNames { @@ -59,7 +57,7 @@ func (c *EnvVarTestCase) TestInvalidVariableNames() { // Test runs all tests in parallel func (c *EnvVarTestCase) Test() { - c.ensureEngine(c.Engine) + c.ensureEngine() wg := sync.WaitGroup{} wg.Add(3) go func() { c.TestPrintVariable(); wg.Done() }() diff --git a/engines/enginetest/logging.go b/engines/enginetest/logging.go index 2976f71a..3544dba2 100644 --- a/engines/enginetest/logging.go +++ b/engines/enginetest/logging.go @@ -5,9 +5,7 @@ import "sync" // A LoggingTestCase holds information necessary to run tests that an engine // can write things to the log. type LoggingTestCase struct { - engineProvider - // Name of engine - Engine string + EngineProvider // String that we will look for in the log Target string // A task.payload as accepted by the engine, which will Target to the log and @@ -20,7 +18,7 @@ type LoggingTestCase struct { } func (c *LoggingTestCase) grepLogFromPayload(payload string, needle string, success bool) bool { - r := c.NewRun(c.Engine) + r := c.newRun() r.NewSandboxBuilder(payload) s := r.buildRunSandbox() if s != success { @@ -52,7 +50,7 @@ func (c *LoggingTestCase) TestSilentTask() { // Test will run all logging tests func (c *LoggingTestCase) Test() { - c.ensureEngine(c.Engine) + c.ensureEngine() wg := sync.WaitGroup{} wg.Add(3) go func() { c.TestLogTarget(); wg.Done() }() diff --git a/engines/enginetest/shell.go b/engines/enginetest/shell.go index ad3fcc7c..d4fbc3f2 100644 --- a/engines/enginetest/shell.go +++ b/engines/enginetest/shell.go @@ -11,8 +11,7 @@ import ( // The ShellTestCase contains information sufficient to test the interactive // shell provided by a Sandbox type ShellTestCase struct { - engineProvider - Engine string + EngineProvider // Command to pipe to the Shell over stdin Command string // Result to expect from the Shell on stdout when running Command @@ -30,7 +29,7 @@ type ShellTestCase struct { // TestCommand checks we can run Command in the shell func (c *ShellTestCase) TestCommand() { - r := c.NewRun(c.Engine) + r := c.newRun() defer r.Dispose() r.NewSandboxBuilder(c.Payload) r.StartSandbox() @@ -72,7 +71,7 @@ func (c *ShellTestCase) TestCommand() { // TestBadCommand checks we can run BadCommand in the shell func (c *ShellTestCase) TestBadCommand() { - r := c.NewRun(c.Engine) + r := c.newRun() defer r.Dispose() r.NewSandboxBuilder(c.Payload) r.StartSandbox() @@ -107,7 +106,7 @@ func (c *ShellTestCase) TestBadCommand() { // TestAbortSleepCommand checks we can Abort the sleep command func (c *ShellTestCase) TestAbortSleepCommand() { - r := c.NewRun(c.Engine) + r := c.newRun() defer r.Dispose() r.NewSandboxBuilder(c.Payload) r.StartSandbox() @@ -146,7 +145,7 @@ func (c *ShellTestCase) TestAbortSleepCommand() { // Test runs all tests in parallel func (c *ShellTestCase) Test() { - c.ensureEngine(c.Engine) + c.ensureEngine() wg := sync.WaitGroup{} wg.Add(3) go func() { c.TestCommand(); wg.Done() }() diff --git a/engines/enginetest/testutils.go b/engines/enginetest/testutils.go index 087fcbf8..a52fddc1 100644 --- a/engines/enginetest/testutils.go +++ b/engines/enginetest/testutils.go @@ -36,36 +36,48 @@ func assert(condition bool, a ...interface{}) { } } -// Type can embed so that we can reuse ensure engine -type engineProvider struct { - sync.Mutex +// EngineProvider is a base object used by test case to get an engine. +type EngineProvider struct { + m sync.Mutex engine engines.Engine environment *runtime.Environment + // Name of engine + Engine string + // Engine configuration as JSON + Config string } -func (p *engineProvider) ensureEngine(engineName string) { - p.Lock() - defer p.Unlock() +func (p *EngineProvider) ensureEngine() { + p.m.Lock() + defer p.m.Unlock() if p.engine != nil { return } // Create a runtime environment p.environment = newTestEnvironment() // Find EngineProvider - engineProvider := extpoints.EngineProviders.Lookup(engineName) + engineProvider := extpoints.EngineProviders.Lookup(p.Engine) if engineProvider == nil { - fmtPanic("Couldn't find EngineProvider: ", engineName) + fmtPanic("Couldn't find EngineProvider: ", p.Engine) } + + jsonConfig := map[string]json.RawMessage{} + err := json.Unmarshal([]byte(p.Config), &jsonConfig) + nilOrPanic(err, "Config parsing failed: ", p.Config) + config, err := engineProvider.ConfigSchema().Parse(jsonConfig) + nilOrPanic(err, "Config validation failed: ", p.Config) + // Create Engine instance engine, err := engineProvider.NewEngine(extpoints.EngineOptions{ Environment: p.environment, - Log: p.environment.Log.WithField("engine", engineName), + Log: p.environment.Log.WithField("engine", p.Engine), + Config: config, }) nilOrPanic(err, "Failed to create Engine") p.engine = engine } -func (p *engineProvider) newTestTaskContext() (*runtime.TaskContext, *runtime.TaskContextController) { +func (p *EngineProvider) newTestTaskContext() (*runtime.TaskContext, *runtime.TaskContextController) { ctx, control, err := runtime.NewTaskContext(p.environment.TemporaryStorage.NewFilePath()) nilOrPanic(err, "Failed to create new TaskContext") return ctx, control @@ -126,7 +138,7 @@ func buildRunSandbox(b engines.SandboxBuilder) bool { // run helper type run struct { - provider *engineProvider + provider *EngineProvider context *runtime.TaskContext control *runtime.TaskContextController sandboxBuilder engines.SandboxBuilder @@ -136,8 +148,8 @@ type run struct { closedLog bool } -func (p *engineProvider) NewRun(engine string) *run { - p.ensureEngine(engine) +func (p *EngineProvider) newRun() *run { + p.ensureEngine() r := run{} r.provider = p r.context, r.control = p.newTestTaskContext() diff --git a/engines/extpoints/extpoints.go b/engines/extpoints/extpoints.go index 5c3a6988..56896bbe 100644 --- a/engines/extpoints/extpoints.go +++ b/engines/extpoints/extpoints.go @@ -55,7 +55,6 @@ func UnregisterExtension(name string) []string { return ifaces } - // Base extension point type extensionPoint struct { @@ -176,5 +175,3 @@ func (ep *engineProviderExt) Names() []string { } return names } - - diff --git a/engines/extpoints/interfaces.go b/engines/extpoints/interfaces.go index a355ccec..6a1477b0 100644 --- a/engines/extpoints/interfaces.go +++ b/engines/extpoints/interfaces.go @@ -14,14 +14,13 @@ import ( // We pass all options as a single argument, so that we can add additional // properties without breaking source compatibility. type EngineOptions struct { - //TODO: Add some json.RawMessage or other free-form configuration section - // that is read from the config file... + Environment *runtime.Environment + Log *logrus.Entry + Config interface{} // Note: This is passed by-value for efficiency (and to prohibit nil), if // adding any large fields please consider adding them as pointers. // Note: This is intended to be a simple argument wrapper, do not add methods // to this struct. - Environment *runtime.Environment - Log *logrus.Entry } // EngineProvider is the interface engine implementors must implement and diff --git a/engines/mock/mockengine_test.go b/engines/mock/mockengine_test.go index d7958757..e8b9a5a7 100644 --- a/engines/mock/mockengine_test.go +++ b/engines/mock/mockengine_test.go @@ -6,9 +6,14 @@ import ( "github.com/taskcluster/taskcluster-worker/engines/enginetest" ) +var provider = enginetest.EngineProvider{ + Engine: "mock", + Config: "{}", +} + var volumeTestCase = enginetest.VolumeTestCase{ - Engine: "mock", - Mountpoint: "/mock/volume", + EngineProvider: provider, + Mountpoint: "/mock/volume", WriteVolumePayload: `{ "start": { "delay": 10, @@ -32,8 +37,8 @@ func TestReadToReadOnlyVolume(*t.T) { volumeTestCase.TestReadToReadOnlyVolume() func TestVolumeTestCase(t *t.T) { volumeTestCase.Test() } var loggingTestCase = enginetest.LoggingTestCase{ - Engine: "mock", - Target: "Hello World", + EngineProvider: provider, + Target: "Hello World", TargetPayload: `{ "start": { "delay": 10, @@ -63,8 +68,8 @@ func TestSilentTask(t *t.T) { loggingTestCase.TestSilentTask() } func TestLoggingTestCase(t *t.T) { loggingTestCase.Test() } var proxyTestCase = enginetest.ProxyTestCase{ - Engine: "mock", - ProxyName: "proxy.com", + EngineProvider: provider, + ProxyName: "proxy.com", PingProxyPayload: `{ "start": { "delay": 10, @@ -81,7 +86,7 @@ func TestParallelPings(t *t.T) { proxyTestCase.TestParallelPings() } func TestProxyTestCase(t *t.T) { proxyTestCase.Test() } var envVarTestCase = enginetest.EnvVarTestCase{ - Engine: "mock", + EngineProvider: provider, VariableName: "HELLO_WORLD", InvalidVariableNames: []string{"bad d", "also bad", "can't have space"}, Payload: `{ @@ -99,7 +104,7 @@ func TestInvalidVariableNames(t *t.T) { envVarTestCase.TestInvalidVariableNames( func TestEnvVarTestCase(t *t.T) { envVarTestCase.Test() } var artifactTestCase = enginetest.ArtifactTestCase{ - Engine: "mock", + EngineProvider: provider, Text: "Hello World", TextFilePath: "/folder/a.txt", FileNotFoundPath: "/not-found.txt", @@ -123,12 +128,12 @@ func TestExtractFolderHandlerInterrupt(t *t.T) { artifactTestCase.TestExtractFol func TestArtifactTestCase(t *t.T) { artifactTestCase.Test() } var shellTestCase = enginetest.ShellTestCase{ - Engine: "mock", - Command: "print-hello", - Stdout: "Hello World", - Stderr: "No error!", - BadCommand: "exit-false", - SleepCommand: "sleep", + EngineProvider: provider, + Command: "print-hello", + Stdout: "Hello World", + Stderr: "No error!", + BadCommand: "exit-false", + SleepCommand: "sleep", Payload: `{ "start":{ "delay": 10, diff --git a/engines/winnative/winnative.go b/engines/winnative/winnative.go index b7e45276..011121f0 100644 --- a/engines/winnative/winnative.go +++ b/engines/winnative/winnative.go @@ -1,4 +1,4 @@ //go:generate go-composite-schema --unexported --required config config-schema.yml generated_configschema.go -// Package comments +// Package winnative is responsible for managing the Windows engine. package winnative diff --git a/plugins/extpoints/extpoints.go b/plugins/extpoints/extpoints.go index 9f8f86d5..d476f7a3 100644 --- a/plugins/extpoints/extpoints.go +++ b/plugins/extpoints/extpoints.go @@ -55,7 +55,6 @@ func UnregisterExtension(name string) []string { return ifaces } - // Base extension point type extensionPoint struct { @@ -176,5 +175,3 @@ func (ep *pluginProviderExt) Names() []string { } return names } - - diff --git a/plugins/livelog/livelog.go b/plugins/livelog/livelog.go index 3f5557be..01b5712f 100644 --- a/plugins/livelog/livelog.go +++ b/plugins/livelog/livelog.go @@ -1,4 +1,4 @@ //go:generate go-composite-schema --unexported --required config config-schema.yml generated_configschema.go -// livelog is an awesome package +// Package livelog is an awesome package package livelog diff --git a/plugins/plugin.go b/plugins/plugin.go index b7d3b6ff..cfed67ed 100644 --- a/plugins/plugin.go +++ b/plugins/plugin.go @@ -152,7 +152,7 @@ func (PluginBase) PayloadSchema() (runtime.CompositeSchema, error) { return runtime.NewEmptyCompositeSchema(), nil } -// PluginBase requires no custom config +// ConfigSchema will return a new empty composite schema. PluginBase requires no custom config func (PluginBase) ConfigSchema() runtime.CompositeSchema { return runtime.NewEmptyCompositeSchema() } diff --git a/worker/mock_test.go b/worker/mock_test.go index 5a094be9..5ce1ac81 100644 --- a/worker/mock_test.go +++ b/worker/mock_test.go @@ -10,33 +10,33 @@ type MockQueue struct { mock.Mock } -func (m *MockQueue) ReportCompleted(taskId, runId string) (*queue.TaskStatusResponse, *tcclient.CallSummary, error) { - args := m.Called(taskId, runId) +func (m *MockQueue) ReportCompleted(taskID, runID string) (*queue.TaskStatusResponse, *tcclient.CallSummary, error) { + args := m.Called(taskID, runID) return args.Get(0).(*queue.TaskStatusResponse), args.Get(1).(*tcclient.CallSummary), args.Error(2) } -func (m *MockQueue) ReclaimTask(taskId, runId string) (*queue.TaskReclaimResponse, *tcclient.CallSummary, error) { - args := m.Called(taskId, runId) +func (m *MockQueue) ReclaimTask(taskID, runID string) (*queue.TaskReclaimResponse, *tcclient.CallSummary, error) { + args := m.Called(taskID, runID) return args.Get(0).(*queue.TaskReclaimResponse), args.Get(1).(*tcclient.CallSummary), args.Error(2) } -func (m *MockQueue) PollTaskUrls(provisionerId, workerType string) (*queue.PollTaskUrlsResponse, *tcclient.CallSummary, error) { - args := m.Called(provisionerId, workerType) +func (m *MockQueue) PollTaskUrls(provisionerID, workerType string) (*queue.PollTaskUrlsResponse, *tcclient.CallSummary, error) { + args := m.Called(provisionerID, workerType) return args.Get(0).(*queue.PollTaskUrlsResponse), args.Get(1).(*tcclient.CallSummary), args.Error(2) } -func (m *MockQueue) CancelTask(taskId string) (*queue.TaskStatusResponse, *tcclient.CallSummary, error) { - args := m.Called(taskId) +func (m *MockQueue) CancelTask(taskID string) (*queue.TaskStatusResponse, *tcclient.CallSummary, error) { + args := m.Called(taskID) return args.Get(0).(*queue.TaskStatusResponse), args.Get(1).(*tcclient.CallSummary), args.Error(2) } -func (m *MockQueue) ClaimTask(taskId, runId string, payload *queue.TaskClaimRequest) (*queue.TaskClaimResponse, *tcclient.CallSummary, error) { - args := m.Called(taskId, runId, payload) +func (m *MockQueue) ClaimTask(taskID, runID string, payload *queue.TaskClaimRequest) (*queue.TaskClaimResponse, *tcclient.CallSummary, error) { + args := m.Called(taskID, runID, payload) return args.Get(0).(*queue.TaskClaimResponse), args.Get(1).(*tcclient.CallSummary), args.Error(2) } -func (m *MockQueue) ReportFailed(taskId, runId string) (*queue.TaskStatusResponse, *tcclient.CallSummary, error) { - args := m.Called(taskId, runId) +func (m *MockQueue) ReportFailed(taskID, runID string) (*queue.TaskStatusResponse, *tcclient.CallSummary, error) { + args := m.Called(taskID, runID) return args.Get(0).(*queue.TaskStatusResponse), args.Get(1).(*tcclient.CallSummary), args.Error(2) } -func (m *MockQueue) ReportException(taskId, runId string, payload *queue.TaskExceptionRequest) (*queue.TaskStatusResponse, *tcclient.CallSummary, error) { - args := m.Called(taskId, runId, payload) +func (m *MockQueue) ReportException(taskID, runID string, payload *queue.TaskExceptionRequest) (*queue.TaskStatusResponse, *tcclient.CallSummary, error) { + args := m.Called(taskID, runID, payload) return args.Get(0).(*queue.TaskStatusResponse), args.Get(1).(*tcclient.CallSummary), args.Error(2) } diff --git a/worker/queueservice.go b/worker/queueservice.go index b20f55c5..7ac2fb63 100644 --- a/worker/queueservice.go +++ b/worker/queueservice.go @@ -29,7 +29,7 @@ type ( // Used for modelling the xml we get back from Azure queueMessage struct { - MessageId string `xml:"MessageId"` + MessageID string `xml:"MessageId"` PopReceipt string `xml:"PopReceipt"` DequeueCount int `xml:"DequeueCount"` MessageText string `xml:"MessageText"` @@ -137,7 +137,7 @@ func (q *queueService) claimTask(task *TaskRun) bool { func (q *queueService) deleteFromAzure(deleteURL string) error { // Messages are deleted from the Azure queue with a DELETE request to the // SignedDeleteURL from the Azure queue object returned from - // queue.pollTaskUrls. + // queue.pollTaskURLs. // Also remark that the worker must delete messages if the queue.claimTask // operations fails with a 4xx error. A 400 hundred range error implies @@ -180,7 +180,7 @@ func (q *queueService) deleteFromAzure(deleteURL string) error { } // Retrieves the number of tasks requested from the Azure queues. -func (q *queueService) pollTaskUrl(taskQueue *taskQueue, ntasks int) ([]*TaskRun, error) { +func (q *queueService) pollTaskURL(taskQueue *taskQueue, ntasks int) ([]*TaskRun, error) { taskRuns := []*TaskRun{} var r queueMessagesList // To poll an Azure Queue the worker must do a `GET` request to the @@ -230,8 +230,8 @@ func (q *queueService) pollTaskUrl(taskQueue *taskQueue, ntasks int) ([]*TaskRun // Utility method for replacing a placeholder within a uri with // a string value which first must be uri encoded... - detokeniseUri := func(uri, placeholder, rawValue string) string { - return strings.Replace(uri, placeholder, strings.Replace(url.QueryEscape(rawValue), "+", "%20", -1), -1) + detokeniseURI := func(URI, placeholder, rawValue string) string { + return strings.Replace(URI, placeholder, strings.Replace(url.QueryEscape(rawValue), "+", "%20", -1), -1) } for _, qm := range r.QueueMessages { @@ -244,11 +244,11 @@ func (q *queueService) pollTaskUrl(taskQueue *taskQueue, ntasks int) ([]*TaskRun // SignedDeleteURL. Otherwise, the worker will experience intermittent // failures. - SignedDeleteURL := detokeniseUri( - detokeniseUri( + SignedDeleteURL := detokeniseURI( + detokeniseURI( taskQueue.SignedDeleteURL, "{{messageId}}", - qm.MessageId, + qm.MessageID, ), "{{popReceipt}}", qm.PopReceipt, @@ -258,7 +258,7 @@ func (q *queueService) pollTaskUrl(taskQueue *taskQueue, ntasks int) ([]*TaskRun // that alert the operator if a message has been dequeued a significant // number of times, for example 15 or more. if qm.DequeueCount >= 15 { - q.Log.Warnf("Queue Message with message id %v has been dequeued %v times!", qm.MessageId, qm.DequeueCount) + q.Log.Warnf("Queue Message with message id %v has been dequeued %v times!", qm.MessageID, qm.DequeueCount) err := q.deleteFromAzure(SignedDeleteURL) if err != nil { q.Log.Warnf("Not able to call Azure delete URL %v. %v", SignedDeleteURL, err) @@ -273,11 +273,11 @@ func (q *queueService) pollTaskUrl(taskQueue *taskQueue, ntasks int) ([]*TaskRun // try to delete from Azure, if it fails, nothing we can do about it // not very serious - another worker will try to delete it q.Log.WithField("messageText", qm.MessageText).Errorf("Not able to base64 decode the Message Text in Azure message response.") - q.Log.WithField("messageId", qm.MessageId).Info("Deleting from Azure queue as other workers will have the same problem.") + q.Log.WithField("messageID", qm.MessageID).Info("Deleting from Azure queue as other workers will have the same problem.") err := q.deleteFromAzure(SignedDeleteURL) if err != nil { q.Log.WithFields(logrus.Fields{ - "messageId": qm.MessageId, + "messageID": qm.MessageID, "url": SignedDeleteURL, "error": err, }).Warn("Not able to call Azure delete URL") @@ -358,7 +358,7 @@ func (q *queueService) retrieveTasksFromQueue(ntasks int) ([]*TaskRun, error) { if len(tasks) >= ntasks { return tasks, nil } - taskRuns, err := q.pollTaskUrl(&queue, ntasks-len(tasks)) + taskRuns, err := q.pollTaskURL(&queue, ntasks-len(tasks)) if err != nil { q.Log.Warnf("Could not retrieve tasks from the Azure queue. %s", err) break diff --git a/worker/queueservice_test.go b/worker/queueservice_test.go index 636421f3..1b7a896e 100644 --- a/worker/queueservice_test.go +++ b/worker/queueservice_test.go @@ -144,12 +144,12 @@ func TestPollTaskUrlInvalidXMLResponse(t *testing.T) { }}, } - _, err := service.pollTaskUrl(&service.queues[0], 3) + _, err := service.pollTaskURL(&service.queues[0], 3) assert.NotNil(t, err, "Error should have been returned when invalid xml was parsed") assert.Contains(t, err.Error(), "Not able to xml decode the response from the Azure queue") } -func TestPollTaskUrlEmptyMessageList(t *testing.T) { +func TestPollTaskURLEmptyMessageList(t *testing.T) { var handler = func(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/xml") io.WriteString(w, "") @@ -166,12 +166,12 @@ func TestPollTaskUrlEmptyMessageList(t *testing.T) { }}, } - tasks, err := service.pollTaskUrl(&service.queues[0], 3) + tasks, err := service.pollTaskURL(&service.queues[0], 3) assert.Nil(t, err, "Error should not have been returned when empty message list provided.") assert.Equal(t, 0, len(tasks)) } -func TestPollTaskUrlNonEmptyMessageList(t *testing.T) { +func TestPollTaskURLNonEmptyMessageList(t *testing.T) { // Messages below are arbitrary messages to ensure that they can be // decoded. messages := ` @@ -211,7 +211,7 @@ func TestPollTaskUrlNonEmptyMessageList(t *testing.T) { }}, } - tasks, err := service.pollTaskUrl(&service.queues[0], 3) + tasks, err := service.pollTaskURL(&service.queues[0], 3) assert.Nil(t, err, "Error should not have been returned when empty message list provided.") assert.Equal(t, 2, len(tasks)) // quick sanity check to make sure the messages are different @@ -219,7 +219,7 @@ func TestPollTaskUrlNonEmptyMessageList(t *testing.T) { assert.NotEqual(t, tasks[0].SignedDeleteURL, tasks[1].SignedDeleteURL) } -func TestPollTaskUrlInvalidMessageTextContents(t *testing.T) { +func TestPollTaskURLInvalidMessageTextContents(t *testing.T) { // MessageText is {"abc",0} which is an invalid format when // unmarshalling. messages := ` @@ -251,12 +251,12 @@ func TestPollTaskUrlInvalidMessageTextContents(t *testing.T) { }}, } - tasks, err := service.pollTaskUrl(&service.queues[0], 3) + tasks, err := service.pollTaskURL(&service.queues[0], 3) assert.Nil(t, err, "Error should not have been raised when unmarshalling invalid MessageText") assert.Equal(t, 0, len(tasks)) } -func TestPollTaskUrlInvalidMessageTextEncoding(t *testing.T) { +func TestPollTaskURLInvalidMessageTextEncoding(t *testing.T) { // MessageText is not a valid base64 encoded string messages := ` @@ -296,7 +296,7 @@ func TestPollTaskUrlInvalidMessageTextEncoding(t *testing.T) { }}, } - tasks, err := service.pollTaskUrl(&service.queues[0], 3) + tasks, err := service.pollTaskURL(&service.queues[0], 3) assert.Nil(t, err, "Error should not have been raised when unmarshalling invalid MessageText") assert.Equal(t, 0, len(tasks)) assert.True(t, deleteCalled, "Delete URL not called after attempting to decode messageText") diff --git a/worker/task.go b/worker/task.go index 7a2b225a..0775a4ec 100644 --- a/worker/task.go +++ b/worker/task.go @@ -117,6 +117,8 @@ func (t *TaskRun) Run(pluginManager plugins.Plugin, engine engines.Engine, conte } +// ParsePayload will parse the task payload, which will validate it against the engine +// and plugin schemas. func (t *TaskRun) ParsePayload(pluginManager plugins.Plugin, engine engines.Engine) error { var err error jsonPayload := map[string]json.RawMessage{} @@ -140,6 +142,7 @@ func (t *TaskRun) ParsePayload(pluginManager plugins.Plugin, engine engines.Engi return nil } +// CreateTaskPlugins will create a new task plugin to be used during the task lifecycle. func (t *TaskRun) CreateTaskPlugins(pluginManager plugins.Plugin) error { var err error popts := plugins.TaskPluginOptions{TaskInfo: &runtime.TaskInfo{}, Payload: t.pluginPayload} diff --git a/worker/task_manager.go b/worker/task_manager.go index e0f493b4..23f72e9a 100644 --- a/worker/task_manager.go +++ b/worker/task_manager.go @@ -31,9 +31,9 @@ type Manager struct { pluginOptions *extpoints.PluginOptions log *logrus.Entry queue QueueService - provisionerId string + provisionerID string workerGroup string - workerId string + workerID string tasks map[string]*TaskRun } @@ -65,9 +65,9 @@ func newTaskManager(config *config.Config, engine engines.Engine, environment *r log: log, maxCapacity: config.Capacity, queue: service, - provisionerId: config.ProvisionerID, + provisionerID: config.ProvisionerID, workerGroup: config.WorkerGroup, - workerId: config.WorkerID, + workerID: config.WorkerID, } m.pluginOptions = &extpoints.PluginOptions{ @@ -136,8 +136,8 @@ func (m *Manager) RunningTasks() []string { // CancelTask will cancel a running task. Typically this will be called when a Pulse // message is received to cancel a task. Calling this method will not resolve the task // as it's assumed that this task was already resolved as cancelled by another system/client. -func (m *Manager) CancelTask(taskId string, runId int) { - name := fmt.Sprintf("%s/%d", taskId, runId) +func (m *Manager) CancelTask(taskID string, runID int) { + name := fmt.Sprintf("%s/%d", taskID, runID) m.RLock() defer m.RUnlock() @@ -165,8 +165,8 @@ func (m *Manager) claimWork(ntasks int) { func (m *Manager) run(task *TaskRun) { log := m.log.WithFields(logrus.Fields{ - "taskId": task.TaskID, - "runId": task.RunID, + "taskID": task.TaskID, + "runID": task.RunID, }) task.log = log