Skip to content

Commit

Permalink
Merge pull request #672 from pachyderm/env_in_transform
Browse files Browse the repository at this point in the history
Env in transform
  • Loading branch information
jdoliner committed Jul 27, 2016
2 parents 94701b8 + 37d9e53 commit 94f5390
Show file tree
Hide file tree
Showing 8 changed files with 296 additions and 219 deletions.
105 changes: 18 additions & 87 deletions assets.go

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions doc/pipeline_spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ This document discusses each of the fields present in a pipeline specification.
"image": string,
"cmd": [ string ],
"stdin": [ string ]
"env": {
"foo": "bar"
},
"secrets": [ {
"name": "secret_name",
"mountPath": "/path/in/container"
} ]
},
"parallelism": int,
"inputs": [
Expand Down Expand Up @@ -45,6 +52,10 @@ This document discusses each of the fields present in a pipeline specification.

`transform.stdin` is an array of lines that are sent to your command on stdin. Lines need not end in newline characters.

`transform.env is a map from key to value of environment variables that will be injected into the container

`transform.secrets` is an array of secrets, secrets reference Kubernetes secrets by name and specify a path that the secrets should be mounted to. Secrets are useful for embedding sensitive data such as credentials. Read more about secrets in Kubernetes [here](http://kubernetes.io/docs/user-guide/secrets/).

### Parallelism

`parallelism` is how many copies of your container should run in parallel. If you'd like Pachyderm to automatically scale the parallelism based on available cluster resources, you can set this to 0.
Expand Down
242 changes: 138 additions & 104 deletions src/client/pps/pps.pb.go

Large diffs are not rendered by default.

14 changes: 11 additions & 3 deletions src/client/pps/pps.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,20 @@ package pachyderm.pps;

option go_package = "pps";

message Secret {
// Name must be the name of the secret in kubernetes.
string name = 1;
string mount_path = 2;
}

message Transform {
string image = 1;
repeated string cmd = 2;
repeated string stdin = 3;
repeated int64 accept_return_code = 4;
bool debug = 5;
map<string, string> env = 3;
repeated Secret secrets = 4;
repeated string stdin = 5;
repeated int64 accept_return_code = 6;
bool debug = 7;
}

message Job {
Expand Down
67 changes: 66 additions & 1 deletion src/server/pachyderm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2189,6 +2189,71 @@ func TestPipelineInfoDestroyedIfRepoCreationFails(t *testing.T) {
require.Matches(t, "not found", err.Error())
}

func TestPipelineEnv(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration tests in short mode")
}
t.Parallel()

// make a secret to reference
k := getKubeClient(t)
secretName := uniqueString("test-secret")
_, err := k.Secrets(api.NamespaceDefault).Create(
&api.Secret{
ObjectMeta: api.ObjectMeta{
Name: secretName,
},
Data: map[string][]byte{
"foo": []byte("foo\n"),
},
},
)
require.NoError(t, err)
c := getPachClient(t)
// create repos
dataRepo := uniqueString("TestPipelineEnv_data")
require.NoError(t, c.CreateRepo(dataRepo))
// create pipeline
pipelineName := uniqueString("pipeline")
_, err = c.PpsAPIClient.CreatePipeline(
context.Background(),
&ppsclient.CreatePipelineRequest{
Pipeline: client.NewPipeline(pipelineName),
Transform: &ppsclient.Transform{
Cmd: []string{"sh"},
Stdin: []string{
"ls /var/secret",
"cat /var/secret/foo > /pfs/out/foo",
"echo $bar> /pfs/out/bar",
},
Env: map[string]string{"bar": "bar"},
Secrets: []*ppsclient.Secret{
{
Name: secretName,
MountPath: "/var/secret",
},
},
},
Parallelism: 1,
Inputs: []*ppsclient.PipelineInput{{Repo: &pfsclient.Repo{Name: dataRepo}}},
})
require.NoError(t, err)
// Do first commit to repo
commit, err := c.StartCommit(dataRepo, "", "")
require.NoError(t, err)
_, err = c.PutFile(dataRepo, commit.ID, "file", strings.NewReader("foo\n"))
require.NoError(t, err)
require.NoError(t, c.FinishCommit(dataRepo, commit.ID))
commitInfos, err := c.FlushCommit([]*pfsclient.Commit{commit}, nil)
require.Equal(t, 1, len(commitInfos))
var buffer bytes.Buffer
require.NoError(t, c.GetFile(pipelineName, commitInfos[0].Commit.ID, "foo", 0, 0, "", false, nil, &buffer))
require.Equal(t, "foo\n", buffer.String())
buffer = bytes.Buffer{}
require.NoError(t, c.GetFile(pipelineName, commitInfos[0].Commit.ID, "bar", 0, 0, "", false, nil, &buffer))
require.Equal(t, "bar\n", buffer.String())
}

func TestFlushNonExistantCommit(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration tests in short mode")
Expand Down Expand Up @@ -2293,7 +2358,7 @@ func getKubeClient(t *testing.T) *kube.Client {
}

func uniqueString(prefix string) string {
return prefix + "_" + uuid.NewWithoutDashes()[0:12]
return prefix + uuid.NewWithoutDashes()[0:12]
}

func pachdRc(t *testing.T) *api.ReplicationController {
Expand Down
4 changes: 2 additions & 2 deletions src/server/pps/cmds/cmds.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,12 @@ The increase the throughput of a job increase the Shard paremeter.
}),
}

exampleCreateJobRequest, err := marshaller.MarshalToString(example.CreateJobRequest())
exampleCreateJobRequest, err := marshaller.MarshalToString(example.CreateJobRequest)
if err != nil {
return nil, err
}

exampleRunPipelineSpec, err := marshaller.MarshalToString(example.RunPipelineSpec())
exampleRunPipelineSpec, err := marshaller.MarshalToString(example.RunPipelineSpec)
if err != nil {
return nil, err
}
Expand Down
44 changes: 22 additions & 22 deletions src/server/pps/example/example.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,22 @@ import (
ppsclient "github.com/pachyderm/pachyderm/src/client/pps"
)

// CreateJobRequest creates an example job request.
func CreateJobRequest() *ppsclient.CreateJobRequest {
return &ppsclient.CreateJobRequest{
Transform: &ppsclient.Transform{
Cmd: []string{"cmd", "args..."},
AcceptReturnCode: []int64{1},
},
var (
// Secret example
Secret = &ppsclient.Secret{
Name: "secret_name",
MountPath: "/path/in/container",
}
// Transform example
Transform = &ppsclient.Transform{
Cmd: []string{"cmd", "args..."},
AcceptReturnCode: []int64{1},
Env: map[string]string{"foo": "bar"},
Secrets: []*ppsclient.Secret{Secret},
}
// CreateJobRequest example
CreateJobRequest = &ppsclient.CreateJobRequest{
Transform: Transform,
Parallelism: 1,
Inputs: []*ppsclient.JobInput{
{
Expand All @@ -27,18 +36,12 @@ func CreateJobRequest() *ppsclient.CreateJobRequest {
ID: "a951ca06cfda4377b8ffaa050d1074df",
},
}
}

// CreatePipelineRequest creates an example pipeline request.
func CreatePipelineRequest() *ppsclient.CreatePipelineRequest {
return &ppsclient.CreatePipelineRequest{
// CreatePipelineRequest example
CreatePipelineRequest = &ppsclient.CreatePipelineRequest{
Pipeline: &ppsclient.Pipeline{
Name: "name",
},
Transform: &ppsclient.Transform{
Cmd: []string{"cmd", "args..."},
AcceptReturnCode: []int64{1},
},
Transform: Transform,
Parallelism: 1,
Inputs: []*ppsclient.PipelineInput{
{
Expand All @@ -47,11 +50,8 @@ func CreatePipelineRequest() *ppsclient.CreatePipelineRequest {
},
},
}
}

// RunPipelineSpec creates an example run pipeline request.
func RunPipelineSpec() *ppsclient.CreateJobRequest {
return &ppsclient.CreateJobRequest{
// RunPipelineSpec example
RunPipelineSpec = &ppsclient.CreateJobRequest{
Inputs: []*ppsclient.JobInput{
{
Commit: &pfs.Commit{
Expand All @@ -63,4 +63,4 @@ func RunPipelineSpec() *ppsclient.CreateJobRequest {
},
Parallelism: 3,
}
}
)
28 changes: 28 additions & 0 deletions src/server/pps/server/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,32 @@ func job(jobInfo *persist.JobInfo) *batch.Job {
},
)
}
for name, value := range jobInfo.Transform.Env {
jobEnv = append(
jobEnv,
api.EnvVar{
Name: name,
Value: value,
},
)
}

var volumes []api.Volume
var volumeMounts []api.VolumeMount
for _, secret := range jobInfo.Transform.Secrets {
volumes = append(volumes, api.Volume{
Name: secret.Name,
VolumeSource: api.VolumeSource{
Secret: &api.SecretVolumeSource{
SecretName: secret.Name,
},
},
})
volumeMounts = append(volumeMounts, api.VolumeMount{
Name: secret.Name,
MountPath: secret.MountPath,
})
}

return &batch.Job{
TypeMeta: unversioned.TypeMeta{
Expand Down Expand Up @@ -1397,9 +1423,11 @@ func job(jobInfo *persist.JobInfo) *batch.Job {
},
ImagePullPolicy: "IfNotPresent",
Env: jobEnv,
VolumeMounts: volumeMounts,
},
},
RestartPolicy: "Never",
Volumes: volumes,
},
},
},
Expand Down

0 comments on commit 94f5390

Please sign in to comment.