Skip to content

Commit

Permalink
Add additional step to transform build json using an external filter …
Browse files Browse the repository at this point in the history
…script
  • Loading branch information
Albert Manya committed Jul 13, 2017
1 parent f911690 commit e404634
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 0 deletions.
1 change: 1 addition & 0 deletions cli.go
Expand Up @@ -171,6 +171,7 @@ func (i *CLI) Setup() (bool, error) {
MaxLogLength: i.Config.MaxLogLength,
ScriptUploadTimeout: i.Config.ScriptUploadTimeout,
StartupTimeout: i.Config.StartupTimeout,
PayloadFilterScript: i.Config.FilterJobJsonScript,
}

pool := NewProcessorPool(ppc, i.BackendProvider, i.BuildScriptGenerator, i.CancellationBroadcaster)
Expand Down
5 changes: 5 additions & 0 deletions config/config.go
Expand Up @@ -166,6 +166,9 @@ var (
NewConfigDef("BuildCacheS3SecretAccessKey", &cli.StringFlag{}),

// non-config and special case flags
NewConfigDef("FilterJobJsonScript", &cli.StringFlag{
Usage: "External script which will be called to filter the json to be sent to the build script generator",
}),
NewConfigDef("SkipShutdownOnLogTimeout", &cli.BoolFlag{
Usage: "Special-case mode to aid with debugging timed out jobs",
}),
Expand Down Expand Up @@ -344,6 +347,8 @@ type Config struct {
BuildCacheS3AccessKeyID string `config:"build-cache-s3-access-key-id"`
BuildCacheS3SecretAccessKey string `config:"build-cache-s3-secret-access-key"`

FilterJobJsonScript string `config:"filter-job-json-script"`

ProviderConfig *ProviderConfig
}

Expand Down
6 changes: 6 additions & 0 deletions processor.go
Expand Up @@ -24,6 +24,7 @@ type Processor struct {
maxLogLength int
scriptUploadTimeout time.Duration
startupTimeout time.Duration
payloadFilterScript string

ctx gocontext.Context
buildJobsChan <-chan Job
Expand Down Expand Up @@ -56,6 +57,7 @@ type ProcessorConfig struct {
MaxLogLength int
ScriptUploadTimeout time.Duration
StartupTimeout time.Duration
PayloadFilterScript string
}

// NewProcessor creates a new processor that will run the build jobs on the
Expand Down Expand Up @@ -87,6 +89,7 @@ func NewProcessor(ctx gocontext.Context, hostname string, queue JobQueue,
scriptUploadTimeout: config.ScriptUploadTimeout,
startupTimeout: config.StartupTimeout,
maxLogLength: config.MaxLogLength,
payloadFilterScript: config.PayloadFilterScript,

ctx: ctx,
buildJobsChan: buildJobsChan,
Expand Down Expand Up @@ -194,6 +197,9 @@ func (p *Processor) process(ctx gocontext.Context, buildJob Job) {
&stepSubscribeCancellation{
cancellationBroadcaster: p.cancellationBroadcaster,
},
&stepTransformBuildJSON{
payloadFilterScript: p.payloadFilterScript,
},
&stepGenerateScript{
generator: p.generator,
},
Expand Down
6 changes: 6 additions & 0 deletions processor_pool.go
Expand Up @@ -25,6 +25,8 @@ type ProcessorPool struct {
HardTimeout, InitialSleep, LogTimeout, ScriptUploadTimeout, StartupTimeout time.Duration
MaxLogLength int

PayloadFilterScript string

SkipShutdownOnLogTimeout bool

queue JobQueue
Expand All @@ -41,6 +43,8 @@ type ProcessorPoolConfig struct {

HardTimeout, InitialSleep, LogTimeout, ScriptUploadTimeout, StartupTimeout time.Duration
MaxLogLength int

PayloadFilterScript string
}

// NewProcessorPool creates a new processor pool using the given arguments.
Expand All @@ -62,6 +66,7 @@ func NewProcessorPool(ppc *ProcessorPoolConfig,
Provider: provider,
Generator: generator,
CancellationBroadcaster: cancellationBroadcaster,
PayloadFilterScript: ppc.PayloadFilterScript,
}
}

Expand Down Expand Up @@ -185,6 +190,7 @@ func (p *ProcessorPool) runProcessor(queue JobQueue) error {
MaxLogLength: p.MaxLogLength,
ScriptUploadTimeout: p.ScriptUploadTimeout,
StartupTimeout: p.StartupTimeout,
PayloadFilterScript: p.PayloadFilterScript,
})

if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions processor_test.go
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

simplejson "github.com/bitly/go-simplejson"
"github.com/pborman/uuid"
"github.com/travis-ci/worker/backend"
"github.com/travis-ci/worker/config"
Expand Down Expand Up @@ -43,6 +44,7 @@ func TestProcessor(t *testing.T) {
ScriptUploadTimeout: 3 * time.Second,
StartupTimeout: 4 * time.Second,
MaxLogLength: 4500000,
PayloadFilterScript: "filter.py",
})
if err != nil {
t.Error(err)
Expand All @@ -54,7 +56,10 @@ func TestProcessor(t *testing.T) {
doneChan <- struct{}{}
}()

rawPayload, _ := simplejson.NewJson([]byte("{}"))

job := &fakeJob{
rawPayload: rawPayload,
payload: &JobPayload{
Type: "job:test",
Job: JobJobPayload{
Expand Down
68 changes: 68 additions & 0 deletions step_transform_build_json.go
@@ -0,0 +1,68 @@
package worker

import (
"bytes"
"fmt"
"os/exec"
"strings"

"github.com/mitchellh/multistep"
"github.com/travis-ci/worker/context"
gocontext "golang.org/x/net/context"
)

type stepTransformBuildJSON struct {
payloadFilterScript string
}

type EnvVar struct {
Name string
Public bool
Value string
}

func (s *stepTransformBuildJSON) Run(state multistep.StateBag) multistep.StepAction {
buildJob := state.Get("buildJob").(Job)
ctx := state.Get("ctx").(gocontext.Context)

if s.payloadFilterScript == "" {
context.LoggerFromContext(ctx).Info("skipping json transformation, no filter script defined")
return multistep.ActionContinue
}

context.LoggerFromContext(ctx).Info(fmt.Sprintf("calling filter script: %s", s.payloadFilterScript))

payload := buildJob.RawPayload()

cmd := exec.Command(s.payloadFilterScript)
rawJson, err := payload.MarshalJSON()
if err != nil {
context.LoggerFromContext(ctx).Info(fmt.Sprintf("failed to marshal json: %v", err))
return multistep.ActionContinue
}

cmd.Stdin = strings.NewReader(string(rawJson))

var out bytes.Buffer
cmd.Stdout = &out

err = cmd.Run()
if err != nil {
context.LoggerFromContext(ctx).Info(fmt.Sprintf("failed to execute filter script: %v", err))
return multistep.ActionContinue
}

err = payload.UnmarshalJSON(out.Bytes())
if err != nil {
context.LoggerFromContext(ctx).Info(fmt.Sprintf("failed to unmarshal json: %v", err))
return multistep.ActionContinue
}

context.LoggerFromContext(ctx).Info("replaced the build json")

return multistep.ActionContinue
}

func (s *stepTransformBuildJSON) Cleanup(multistep.StateBag) {
// Nothing to clean up
}
57 changes: 57 additions & 0 deletions step_transform_build_json_test.go
@@ -0,0 +1,57 @@
package worker

import (
"testing"

gocontext "golang.org/x/net/context"

"github.com/bitly/go-simplejson"
"github.com/mitchellh/multistep"
"github.com/stretchr/testify/assert"
"github.com/travis-ci/worker/backend"
"github.com/travis-ci/worker/config"
)

func setupStepTransformBuildJSON(cfg *config.ProviderConfig) (*stepTransformBuildJSON, multistep.StateBag) {
s := &stepTransformBuildJSON{}

bp, _ := backend.NewBackendProvider("fake", cfg)

ctx := gocontext.TODO()
instance, _ := bp.Start(ctx, nil)

rawPayload, _ := simplejson.NewJson([]byte("{}"))

job := &fakeJob{
rawPayload: rawPayload,
}

state := &multistep.BasicStateBag{}
state.Put("ctx", ctx)
state.Put("buildJob", job)
state.Put("instance", instance)

return s, state
}

func TestStepTransformBuildJSON_Run(t *testing.T) {

cfg := config.ProviderConfigFromMap(map[string]string{
"FILTER_JOB_JSON_SCRIPT": "",
})

s, state := setupStepTransformBuildJSON(cfg)
action := s.Run(state)
assert.Equal(t, multistep.ActionContinue, action)
}

func TestStepTransformBuildJSON_RunWithScriptConfigured(t *testing.T) {

cfg := config.ProviderConfigFromMap(map[string]string{
"FILTER_JOB_JSON_SCRIPT": "/usr/local/bin/filter.py",
})

s, state := setupStepTransformBuildJSON(cfg)
action := s.Run(state)
assert.Equal(t, multistep.ActionContinue, action)
}

0 comments on commit e404634

Please sign in to comment.