-
Notifications
You must be signed in to change notification settings - Fork 568
/
cmds.go
2214 lines (2093 loc) · 85.4 KB
/
cmds.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package cmds
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"reflect"
"strings"
"time"
prompt "github.com/c-bata/go-prompt"
"github.com/fatih/color"
docker "github.com/fsouza/go-dockerclient"
"github.com/itchyny/gojq"
"github.com/spf13/cobra"
"go.uber.org/zap"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/emptypb"
"github.com/pachyderm/pachyderm/v2/src/constants"
pachdclient "github.com/pachyderm/pachyderm/v2/src/internal/client"
"github.com/pachyderm/pachyderm/v2/src/internal/cmdutil"
"github.com/pachyderm/pachyderm/v2/src/internal/config"
"github.com/pachyderm/pachyderm/v2/src/internal/errors"
"github.com/pachyderm/pachyderm/v2/src/internal/errutil"
"github.com/pachyderm/pachyderm/v2/src/internal/grpcutil"
"github.com/pachyderm/pachyderm/v2/src/internal/log"
"github.com/pachyderm/pachyderm/v2/src/internal/pachctl"
"github.com/pachyderm/pachyderm/v2/src/internal/pachtmpl"
"github.com/pachyderm/pachyderm/v2/src/internal/pager"
"github.com/pachyderm/pachyderm/v2/src/internal/pctx"
"github.com/pachyderm/pachyderm/v2/src/internal/ppsutil"
"github.com/pachyderm/pachyderm/v2/src/internal/tabwriter"
"github.com/pachyderm/pachyderm/v2/src/internal/tracing/extended"
"github.com/pachyderm/pachyderm/v2/src/internal/uuid"
"github.com/pachyderm/pachyderm/v2/src/pfs"
"github.com/pachyderm/pachyderm/v2/src/pps"
"github.com/pachyderm/pachyderm/v2/src/server/cmd/pachctl/shell"
"github.com/pachyderm/pachyderm/v2/src/server/pps/pretty"
txncmds "github.com/pachyderm/pachyderm/v2/src/server/transaction/cmds"
workerserver "github.com/pachyderm/pachyderm/v2/src/server/worker/server"
workerapi "github.com/pachyderm/pachyderm/v2/src/worker"
)
const (
// Plural variables are used below for user convenience.
datums = "datums"
jobs = "jobs"
pipelines = "pipelines"
secrets = "secrets"
)
// Cmds returns a slice containing pps commands.
func Cmds(mainCtx context.Context, pachCtx *config.Context, pachctlCfg *pachctl.Config) []*cobra.Command {
var commands []*cobra.Command
var raw bool
var output string
outputFlags := cmdutil.OutputFlags(&raw, &output)
var fullTimestamps bool
timestampFlags := cmdutil.TimestampFlags(&fullTimestamps)
var noPager bool
pagerFlags := cmdutil.PagerFlags(&noPager)
jobDocs := &cobra.Command{
Short: "Docs for jobs.",
Long: "Jobs are the basic units of computation in Pachyderm and are created by pipelines. \n \n" +
"When created, a job runs a containerized workload over a set of finished input commits; once completed, they write the output to a commit in the pipeline's output repo. " +
"Jobs can have multiple datums, each processed independently, with the results merged together at the end. \n \n" +
"If a job fails, the output commit will not be populated with data.",
}
commands = append(commands, cmdutil.CreateDocsAliases(jobDocs, "job", " job$", jobs))
var project = pachCtx.Project
inspectJob := &cobra.Command{
Use: "{{alias}} <pipeline>@<job>",
Short: "Return info about a job.",
Long: "This command returns detailed info about a job, including processing stats, inputs, and transformation configuration (the image and commands used). \n \n" +
"If you pass in a job set ID (without the `pipeline@`), it will defer you to using the `pachctl list job <id>` command. See examples for proper use. \n \n" +
"\t- To specify the project where the parent pipeline lives, use the `--project` flag \n" +
"\t- To specify the output should be raw JSON or YAML, use the `--raw` flag along with `--output`",
Example: "\t- {{alias}} foo@e0f68a2fcda7458880c9e2e2dae9e678 \n" +
"\t- {{alias}} foo@e0f68a2fcda7458880c9e2e2dae9e678 --project bar \n" +
"\t- {{alias}} foo@e0f68a2fcda7458880c9e2e2dae9e678 --project bar --raw --output yaml \n",
Run: cmdutil.RunFixedArgs(1, func(args []string) error {
job, err := cmdutil.ParseJob(project, args[0])
if err != nil && uuid.IsUUIDWithoutDashes(args[0]) {
return errors.New(`Use "list job <id>" to see jobs with a given ID across different pipelines`)
} else if err != nil {
return err
}
client, err := pachctlCfg.NewOnUserMachine(mainCtx, false)
if err != nil {
return err
}
defer client.Close()
jobInfo, err := client.InspectJob(job.Pipeline.Project.GetName(), job.Pipeline.Name, job.Id, true)
if err != nil {
return errors.Wrap(err, "error from InspectJob")
}
if raw {
return errors.EnsureStack(cmdutil.Encoder(output, os.Stdout).EncodeProto(jobInfo))
} else if output != "" {
return errors.New("cannot set --output (-o) without --raw")
}
pji := &pretty.PrintableJobInfo{
JobInfo: jobInfo,
FullTimestamps: fullTimestamps,
}
return pretty.PrintDetailedJobInfo(os.Stdout, pji)
}),
}
inspectJob.Flags().AddFlagSet(outputFlags)
inspectJob.Flags().AddFlagSet(timestampFlags)
inspectJob.Flags().StringVar(&project, "project", project, "Specify the project (by name) containing the parent pipeline for this job.")
shell.RegisterCompletionFunc(inspectJob, shell.JobCompletion)
commands = append(commands, cmdutil.CreateAliases(inspectJob, "inspect job", jobs))
writeJobInfos := func(out io.Writer, jobInfos []*pps.JobInfo) error {
if raw {
e := cmdutil.Encoder(output, out)
for _, jobInfo := range jobInfos {
if err := e.EncodeProto(jobInfo); err != nil {
return errors.EnsureStack(err)
}
}
return nil
} else if output != "" {
return errors.New("cannot set --output (-o) without --raw")
}
return pager.Page(noPager, out, func(w io.Writer) error {
writer := tabwriter.NewWriter(w, pretty.JobHeader)
for _, jobInfo := range jobInfos {
pretty.PrintJobInfo(writer, jobInfo, fullTimestamps)
}
return writer.Flush()
})
}
waitJob := &cobra.Command{
Use: "{{alias}} <job>|<pipeline>@<job>",
Short: "Wait for a job to finish then return info about the job.",
Long: "This command waits for a job to finish then return info about the job.",
Example: "\t- {{alias}} e0f68a2fcda7458880c9e2e2dae9e678 \n" +
"\t- {{alias}} foo@e0f68a2fcda7458880c9e2e2dae9e678 \n" +
"\t- {{alias}} foo@e0f68a2fcda7458880c9e2e2dae9e678 --project bar \n" +
"\t- {{alias}} foo@e0f68a2fcda7458880c9e2e2dae9e678 --project bar --raw --output yaml \n",
Run: cmdutil.RunFixedArgs(1, func(args []string) error {
client, err := pachctlCfg.NewOnUserMachine(mainCtx, false)
if err != nil {
return err
}
defer client.Close()
var jobInfos []*pps.JobInfo
if uuid.IsUUIDWithoutDashes(args[0]) {
jobInfos, err = client.WaitJobSetAll(args[0], true)
if err != nil {
return err
}
} else {
job, err := cmdutil.ParseJob(project, args[0])
if err != nil {
return err
}
jobInfo, err := client.WaitJob(job.Pipeline.Project.GetName(), job.Pipeline.Name, job.Id, true)
if err != nil {
return errors.Wrap(err, "error from InspectJob")
}
jobInfos = []*pps.JobInfo{jobInfo}
}
return writeJobInfos(os.Stdout, jobInfos)
}),
}
waitJob.Flags().AddFlagSet(outputFlags)
waitJob.Flags().AddFlagSet(timestampFlags)
waitJob.Flags().StringVar(&project, "project", project, "Specify the project (by name) containing the parent pipeline for this job.")
shell.RegisterCompletionFunc(waitJob, shell.JobCompletion)
commands = append(commands, cmdutil.CreateAliases(waitJob, "wait job", jobs))
var pipelineName string
var allProjects bool
var inputCommitStrs []string
var history string
var stateStrs []string
var expand bool
listJob := &cobra.Command{
Use: "{{alias}} [<job-id>]",
Short: "Return info about jobs.",
Long: "This command returns info about a list of jobs. You can pass in the command with or without a job ID. \n \n" +
"Without an ID, this command returns a global list of top-level job sets which contain their own sub-jobs; " +
"With an ID, it returns a list of sub-jobs within the specified job set. \n \n" +
"\t- To return a list of sub-jobs across all job sets, use the `--expand` flag without passing an ID \n" +
"\t- To return only the sub-jobs from the most recent version of a pipeline, use the `--pipeline` flag \n" +
"\t- To return all sub-jobs from all versions of a pipeline, use the `--history` flag \n" +
"\t- To return all sub-jobs whose input commits include data from a particular repo branch/commit, use the `--input` flag \n" +
"\t- To turn only sub-jobs with a particular state, use the `--state` flag; options: CREATED, STARTING, UNRUNNABLE, RUNNING, EGRESS, FINISHING, FAILURE, KILLED, SUCCESS",
Example: "\t- {{alias}} \n" +
"\t- {{alias}} --state starting \n" +
"\t- {{alias}} --pipeline foo \n" +
"\t- {{alias}} --expand \n" +
"\t- {{alias}} --expand --pipeline foo \n" +
"\t- {{alias}} --expand --pipeline foo --state failure --state unrunnable \n" +
"\t- {{alias}} 5f93d03b65fa421996185e53f7f8b1e4 \n" +
"\t- {{alias}} 5f93d03b65fa421996185e53f7f8b1e4 --state running\n" +
"\t- {{alias}} --input foo-repo@staging \n" +
"\t- {{alias}} --input foo-repo@5f93d03b65fa421996185e53f7f8b1e4 \n" +
"\t- {{alias}} --pipeline foo --input bar-repo@staging \n" +
"\t- {{alias}} --pipeline foo --input bar-repo@5f93d03b65fa421996185e53f7f8b1e4 \n",
Run: cmdutil.RunBoundedArgs(0, 1, func(args []string) error {
commits, err := cmdutil.ParseCommits(project, inputCommitStrs)
if err != nil {
return err
}
historyCount, err := cmdutil.ParseHistory(history)
if err != nil {
return errors.Wrapf(err, "error parsing history flag")
}
var filter string
if len(stateStrs) > 0 {
filter, err = ParseJobStates(stateStrs)
if err != nil {
return errors.Wrap(err, "error parsing state")
}
}
client, err := pachctlCfg.NewOnUserMachine(mainCtx, false)
if err != nil {
return err
}
defer client.Close()
if !raw && output != "" {
return errors.New("cannot set --output (-o) without --raw")
}
// To list jobs for all projects, user must be explicit about it.
// The --project filter takes precedence over everything else.
// By default use pfs.DefaultProjectName
projectsFilter := []*pfs.Project{{Name: project}}
if allProjects {
projectsFilter = nil
}
if len(args) == 0 {
if pipelineName == "" && !expand {
// We are listing jobs
if len(stateStrs) != 0 {
return errors.Errorf("cannot specify '--state' when listing all jobs")
} else if len(inputCommitStrs) != 0 {
return errors.Errorf("cannot specify '--input' when listing all jobs")
} else if history != "none" {
return errors.Errorf("cannot specify '--history' when listing all jobs")
}
req := &pps.ListJobSetRequest{Projects: projectsFilter}
listJobSetClient, err := client.PpsAPIClient.ListJobSet(client.Ctx(), req)
if err != nil {
return grpcutil.ScrubGRPC(err)
}
if raw {
e := cmdutil.Encoder(output, os.Stdout)
return grpcutil.ForEach[*pps.JobSetInfo](listJobSetClient, func(jobSetInfo *pps.JobSetInfo) error {
return errors.EnsureStack(e.EncodeProto(jobSetInfo))
})
}
return pager.Page(noPager, os.Stdout, func(w io.Writer) error {
writer := tabwriter.NewWriter(w, pretty.JobSetHeader)
if err := grpcutil.ForEach[*pps.JobSetInfo](listJobSetClient, func(jobSetInfo *pps.JobSetInfo) error {
pretty.PrintJobSetInfo(writer, jobSetInfo, fullTimestamps)
return nil
}); err != nil {
return err
}
return writer.Flush()
})
} else {
// We are listing all sub-jobs, possibly restricted to a single pipeline
var pipeline *pps.Pipeline
if pipelineName != "" {
pipeline = &pps.Pipeline{Name: pipelineName, Project: &pfs.Project{Name: project}}
}
req := &pps.ListJobRequest{
Projects: projectsFilter,
Pipeline: pipeline,
InputCommit: commits,
History: historyCount,
Details: true,
JqFilter: filter,
}
ctx, cf := pctx.WithCancel(client.Ctx())
defer cf()
ljClient, err := client.PpsAPIClient.ListJob(ctx, req)
if err != nil {
return grpcutil.ScrubGRPC(err)
}
if raw {
e := cmdutil.Encoder(output, os.Stdout)
return listJobFilterF(ctx, ljClient, func(ji *pps.JobInfo) error {
return errors.EnsureStack(e.EncodeProto(ji))
})
}
return pager.Page(noPager, os.Stdout, func(w io.Writer) error {
writer := tabwriter.NewWriter(w, pretty.JobHeader)
if err := listJobFilterF(ctx, ljClient, func(ji *pps.JobInfo) error {
pretty.PrintJobInfo(writer, ji, fullTimestamps)
return nil
}); err != nil {
return err
}
return writer.Flush()
})
}
} else {
// We are listing sub-jobs of a specific job
if len(stateStrs) != 0 {
return errors.Errorf("cannot specify '--state' when listing sub-jobs")
} else if len(inputCommitStrs) != 0 {
return errors.Errorf("cannot specify '--input' when listing sub-jobs")
} else if history != "none" {
return errors.Errorf("cannot specify '--history' when listing sub-jobs")
} else if pipelineName != "" {
return errors.Errorf("cannot specify '--pipeline' when listing sub-jobs")
}
var jobInfos []*pps.JobInfo
jobInfos, err = client.InspectJobSet(args[0], false)
if err != nil {
return errors.Wrap(err, "error from InspectJobSet")
}
return writeJobInfos(os.Stdout, jobInfos)
}
}),
}
listJob.Flags().StringVarP(&pipelineName, "pipeline", "p", "", "Specify results should only return jobs created by a given pipeline.")
listJob.Flags().BoolVarP(&allProjects, "all-projects", "A", false, "Specify results should return jobs from all projects.")
listJob.Flags().StringVar(&project, "project", project, "Specify the project (by name) containing the parent pipeline for returned jobs.")
listJob.MarkFlagCustom("pipeline", "__pachctl_get_pipeline")
listJob.Flags().StringSliceVarP(&inputCommitStrs, "input", "i", []string{}, "Specify results should only return jobs with a specific set of input commits; format: <repo>@<branch-or-commit>")
listJob.MarkFlagCustom("input", "__pachctl_get_repo_commit")
listJob.Flags().BoolVarP(&expand, "expand", "x", false, "Specify results return as one line for each sub-job and include more columns; not needed if ID is passed.")
listJob.Flags().AddFlagSet(outputFlags)
listJob.Flags().AddFlagSet(timestampFlags)
listJob.Flags().AddFlagSet(pagerFlags)
listJob.Flags().StringVar(&history, "history", "none", "Specify results returned include jobs from historical versions of pipelines.")
listJob.Flags().StringArrayVar(&stateStrs, "state", []string{}, "Specify results return only sub-jobs with the specified state; can be repeated to include multiple states.")
shell.RegisterCompletionFunc(listJob,
func(flag, text string, maxCompletions int64) ([]prompt.Suggest, shell.CacheFunc) {
if flag == "-p" || flag == "--pipeline" {
cs, cf := shell.PipelineCompletion(flag, text, maxCompletions)
return cs, shell.AndCacheFunc(cf, shell.SameFlag(flag))
}
return shell.JobSetCompletion(flag, text, maxCompletions)
})
commands = append(commands, cmdutil.CreateAliases(listJob, "list job", jobs))
deleteJob := &cobra.Command{
Use: "{{alias}} <pipeline>@<job>",
Short: "Delete a job.",
Long: "This command deletes a job.",
Example: "\t- {{alias}} 5f93d03b65fa421996185e53f7f8b1e4 \n" +
"\t- {{alias}} 5f93d03b65fa421996185e53f7f8b1e4 --project foo",
Run: cmdutil.RunFixedArgs(1, func(args []string) error {
job, err := cmdutil.ParseJob(project, args[0])
if err != nil {
return err
}
client, err := pachctlCfg.NewOnUserMachine(mainCtx, false)
if err != nil {
return err
}
defer client.Close()
if err := client.DeleteJob(job.Pipeline.Project.GetName(), job.Pipeline.Name, job.Id); err != nil {
return errors.Wrap(err, "error from DeleteJob")
}
return nil
}),
}
deleteJob.Flags().StringVar(&project, "project", project, "Specify the project (by name) containing the parent pipeline for this job.")
shell.RegisterCompletionFunc(deleteJob, shell.JobCompletion)
commands = append(commands, cmdutil.CreateAliases(deleteJob, "delete job", jobs))
stopJob := &cobra.Command{
Use: "{{alias}} <pipeline>@<job>",
Short: "Stop a job.",
Long: "This command stops a job immediately." +
"\t- To specify the project where the parent pipeline lives, use the `--project` flag \n",
Run: cmdutil.RunFixedArgs(1, func(args []string) error {
client, err := pachctlCfg.NewOnUserMachine(mainCtx, false)
if err != nil {
return err
}
defer client.Close()
if uuid.IsUUIDWithoutDashes(args[0]) {
// Stop each subjob in a transaction
jobInfos, err := client.InspectJobSet(args[0], false)
if err != nil {
return err
}
if _, err := client.RunBatchInTransaction(func(tb *pachdclient.TransactionBuilder) error {
for _, jobInfo := range jobInfos {
if err := tb.StopJob(jobInfo.Job.Pipeline.Project.Name, jobInfo.Job.Pipeline.Name, jobInfo.Job.Id); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
} else {
job, err := cmdutil.ParseJob(project, args[0])
if err != nil {
return err
}
if err := client.StopJob(job.Pipeline.Project.Name, job.Pipeline.Name, job.Id); err != nil {
return errors.Wrap(err, "error from StopProjectJob")
}
}
return nil
}),
}
stopJob.Flags().StringVar(&project, "project", project, "Specify the project (by name) containing the parent pipeline for the job.")
shell.RegisterCompletionFunc(stopJob, shell.JobCompletion)
commands = append(commands, cmdutil.CreateAliases(stopJob, "stop job", jobs))
datumDocs := &cobra.Command{
Short: "Docs for datums.",
Long: "Datums are the smallest independent unit of processing for a Job. " +
"Datums are defined by applying a glob pattern in the pipeline spec to the file paths in an input repo, and they can include any number of files and directories. \n \n" +
"Datums within a job are processed independently -- and sometimes distributed across separate workers (see `datum_set_spec` and `parallelism_spec` options for pipeline specification).\n \n" +
"A separate execution of user code will be run for each datum unless datum batching is utilized.",
}
commands = append(commands, cmdutil.CreateDocsAliases(datumDocs, "datum", " datum$", datums))
restartDatum := &cobra.Command{
Use: "{{alias}} <pipeline>@<job> <datum-path1>,<datum-path2>,...",
Short: "Restart a stuck datum during a currently running job.",
Long: "This command restarts a stuck datum during a currently running job; it does not solve failed datums. \n \n" +
"You can configure a job to skip failed datums via the transform.err_cmd setting of your pipeline spec. \n \n" +
"\t- To specify the project where the parent pipeline lives, use the `--project` flag \n",
Example: "\t- {{alias}} foo@5f93d03b65fa421996185e53f7f8b1e4 /logs/logs.txt \n" +
"\t- {{alias}} foo@5f93d03b65fa421996185e53f7f8b1e4 /logs/logs-a.txt, /logs/logs-b.txt \n" +
"\t- {{alias}} foo@5f93d03b65fa421996185e53f7f8b1e4 /logs/logs-a.txt, /logs/logs-b.txt --project bar ",
Run: cmdutil.RunFixedArgs(2, func(args []string) error {
job, err := cmdutil.ParseJob(project, args[0])
if err != nil {
return err
}
client, err := pachctlCfg.NewOnUserMachine(mainCtx, false)
if err != nil {
return err
}
defer client.Close()
datumFilter := strings.Split(args[1], ",")
for i := 0; i < len(datumFilter); {
if len(datumFilter[i]) == 0 {
if i+1 < len(datumFilter) {
copy(datumFilter[i:], datumFilter[i+1:])
}
datumFilter = datumFilter[:len(datumFilter)-1]
} else {
i++
}
}
return client.RestartDatum(job.Pipeline.Project.GetName(), job.Pipeline.Name, job.Id, datumFilter)
}),
}
restartDatum.Flags().StringVar(&project, "project", project, "Specify the project (by name) containing parent pipeline for the datum's job")
commands = append(commands, cmdutil.CreateAliases(restartDatum, "restart datum", datums))
var pipelineInputPath string
listDatum := &cobra.Command{
Use: "{{alias}} <pipeline>@<job>",
Short: "Return the datums in a job.",
Long: "This command returns the datums in a job. \n \n" +
"\t- To pass in a JSON pipeline spec instead of `pipeline@job`, use the `--file` flag \n " +
"\t- To specify the project where the parent pipeline lives, use the `--project` flag \n",
Example: "\t- {{alias}} foo@5f93d03b65fa421996185e53f7f8b1e4 \n" +
"\t- {{alias}} foo@5f93d03b65fa421996185e53f7f8b1e4 --project bar \n" +
"\t- {{alias}} --file pipeline.json",
Run: cmdutil.RunBoundedArgs(0, 1, func(args []string) (retErr error) {
client, err := pachctlCfg.NewOnUserMachine(mainCtx, false)
if err != nil {
return err
}
defer client.Close()
var printF func(*pps.DatumInfo) error
if !raw {
if output != "" {
return errors.New("cannot set --output (-o) without --raw")
}
writer := tabwriter.NewWriter(os.Stdout, pretty.DatumHeader)
printF = func(di *pps.DatumInfo) error {
pretty.PrintDatumInfo(writer, di)
return nil
}
defer func() {
if err := writer.Flush(); retErr == nil {
retErr = err
}
}()
} else {
e := cmdutil.Encoder(output, os.Stdout)
printF = func(di *pps.DatumInfo) error {
return errors.EnsureStack(e.EncodeProto(di))
}
}
if pipelineInputPath != "" && len(args) == 1 {
return errors.Errorf("can't specify both a job and a pipeline spec")
} else if pipelineInputPath != "" {
r, err := fileIndicatorToReadCloser(pipelineInputPath)
if err != nil {
return err
}
defer r.Close()
specReader := ppsutil.NewSpecReader(r)
spec, err := specReader.Next()
if err != nil {
return err
}
var request pps.CreatePipelineRequest
if err := protojson.Unmarshal([]byte(spec), &request); err != nil {
return errors.Wrap(err, "could not unmarshal CreatePipelineRequest")
}
if err := pps.VisitInput(request.Input, func(i *pps.Input) error {
if i.Pfs != nil && i.Pfs.Project == "" {
i.Pfs.Project = project
}
return nil
}); err != nil {
return err
}
return client.ListDatumInput(request.Input, printF)
} else if len(args) == 1 {
job, err := cmdutil.ParseJob(project, args[0])
if err != nil {
return err
}
return client.ListDatum(job.Pipeline.Project.GetName(), job.Pipeline.Name, job.Id, printF)
} else {
return errors.Errorf("must specify either a job or a pipeline spec")
}
}),
}
listDatum.Flags().StringVarP(&pipelineInputPath, "file", "f", "", "Set the JSON file containing the pipeline to list datums from; the pipeline need not exist.")
listDatum.Flags().StringVar(&project, "project", project, "Specify the project (by name) containing parent pipeline for the job.")
listDatum.Flags().AddFlagSet(outputFlags)
shell.RegisterCompletionFunc(listDatum, shell.JobCompletion)
commands = append(commands, cmdutil.CreateAliases(listDatum, "list datum", datums))
var since string
kubeEvents := &cobra.Command{
Use: "{{alias}}",
Short: "Return the kubernetes events.",
Long: "This command returns the kubernetes events. \n" +
"\t- To return results starting from a certain amount of time before now, use the `--since` flag \n" +
"\t- To return the raw events, use the `--raw` flag \n",
Example: "\t- {{alias}} --raw \n" +
"\t- {{alias}} --since 100s \n" +
"\t- {{alias}} --raw --since 1h \n",
Run: cmdutil.RunFixedArgs(0, func(args []string) error {
client, err := pachctlCfg.NewOnUserMachine(mainCtx, false)
if err != nil {
return err
}
defer client.Close()
since, err := time.ParseDuration(since)
if err != nil {
return errors.Wrapf(err, "parse since(%q)", since)
}
request := pps.LokiRequest{
Since: durationpb.New(since),
}
kubeEventsClient, err := client.PpsAPIClient.GetKubeEvents(client.Ctx(), &request)
if err != nil {
return grpcutil.ScrubGRPC(err)
}
writer := tabwriter.NewWriter(os.Stdout, pretty.KubeEventsHeader)
if err := grpcutil.ForEach[*pps.LokiLogMessage](kubeEventsClient, func(msg *pps.LokiLogMessage) error {
if raw {
fmt.Println(msg.Message)
} else {
pretty.PrintKubeEvent(writer, msg.Message)
}
return nil
}); err != nil {
return err
}
return writer.Flush()
}),
}
kubeEvents.Flags().BoolVar(&raw, "raw", false, "Specify results should return log messages verbatim from server.")
kubeEvents.Flags().StringVar(&since, "since", "0", "Specify results should return log messages more recent than \"since\".")
commands = append(commands, cmdutil.CreateAlias(kubeEvents, "kube-events"))
queryLoki := &cobra.Command{
Use: "{{alias}} <query>",
Short: "Query the loki logs.",
Long: "This command queries the loki logs.",
Example: "\t- {{alias}} <query> --since 100s \n" +
"\t- {{alias}} <query> --since 1h",
Run: cmdutil.RunFixedArgs(1, func(args []string) error {
query := args[0]
client, err := pachctlCfg.NewOnUserMachine(mainCtx, false)
if err != nil {
return err
}
defer client.Close()
since, err := time.ParseDuration(since)
if err != nil {
return errors.Wrapf(err, "parse since(%q)", since)
}
request := pps.LokiRequest{
Query: query,
Since: durationpb.New(since),
}
lokiClient, err := client.PpsAPIClient.QueryLoki(client.Ctx(), &request)
if err != nil {
return grpcutil.ScrubGRPC(err)
}
if err := grpcutil.ForEach[*pps.LokiLogMessage](lokiClient, func(log *pps.LokiLogMessage) error {
fmt.Println(log.Message)
return nil
}); err != nil {
return err
}
return nil
}),
}
queryLoki.Flags().StringVar(&since, "since", "0", "Specify results should return log messages more recent than \"since\".")
commands = append(commands, cmdutil.CreateAlias(queryLoki, "loki"))
inspectDatum := &cobra.Command{
Use: "{{alias}} <pipeline>@<job> <datum>",
Short: "Display detailed info about a single datum.",
Long: "This command displays detailed info about a single datum; requires the pipeline to have stats enabled.",
Example: "\t- {{alias}} foo@5f93d03b65fa421996185e53f7f8b1e4 7f3cd988429894000bdad549dfe2d09b5ca7bfc5083b79fec0e6bda3db8cc705 \n" +
"\t- {{alias}} foo@5f93d03b65fa421996185e53f7f8b1e4 7f3cd988429894000bdad549dfe2d09b5ca7bfc5083b79fec0e6bda3db8cc705 --project foo",
Run: cmdutil.RunFixedArgs(2, func(args []string) error {
job, err := cmdutil.ParseJob(project, args[0])
if err != nil {
return err
}
client, err := pachctlCfg.NewOnUserMachine(mainCtx, false)
if err != nil {
return err
}
defer client.Close()
datumInfo, err := client.InspectDatum(job.Pipeline.Project.GetName(), job.Pipeline.Name, job.Id, args[1])
if err != nil {
return err
}
if raw {
return errors.EnsureStack(cmdutil.Encoder(output, os.Stdout).EncodeProto(datumInfo))
} else if output != "" {
return errors.New("cannot set --output (-o) without --raw")
}
pretty.PrintDetailedDatumInfo(os.Stdout, datumInfo)
return nil
}),
}
inspectDatum.Flags().StringVar(&project, "project", project, "Project containing the job")
inspectDatum.Flags().AddFlagSet(outputFlags)
commands = append(commands, cmdutil.CreateAliases(inspectDatum, "inspect datum", datums))
var (
jobStr string
datumID string
commaInputs string // comma-separated list of input files of interest
master bool
worker bool
follow bool
tail int64
)
// prettyLogsPrinter helps to print the logs recieved in different colours
prettyLogsPrinter := func(message string) {
informationArray := strings.Split(message, " ")
if len(informationArray) > 1 {
debugString := informationArray[1]
debugLevel := strings.ToLower(debugString)
var debugLevelColoredString string
if debugLevel == "info" {
debugLevelColoredString = color.New(color.FgGreen).Sprint(debugString)
} else if debugLevel == "warning" {
debugLevelColoredString = color.New(color.FgYellow).Sprint(debugString)
} else if debugLevel == "error" {
debugLevelColoredString = color.New(color.FgRed).Sprint(debugString)
} else {
debugLevelColoredString = debugString
}
informationArray[1] = debugLevelColoredString
coloredMessage := strings.Join(informationArray, " ")
fmt.Println(coloredMessage)
} else {
fmt.Println(message)
}
}
getLogs := &cobra.Command{
Use: "{{alias}} [--pipeline=<pipeline>|--job=<pipeline>@<job>] [--datum=<datum>]",
Short: "Return logs from a job.",
Long: "This command returns logs from a job. \n" +
"\t- To filter your logs by pipeline, use the `--pipeline` flag \n" +
"\t- To filter your logs by job, use the `--job` flag \n" +
"\t- To filter your logs by datum, use the `--datum` flag \n" +
"\t- To filter your logs by the master process, use the `--master` flag with the `--pipeline` flag \n" +
"\t- To filter your logs by the worker process, use the `--worker` flag \n" +
"\t- To follow the logs as more are created, use the `--follow` flag \n" +
"\t- To set the number of lines to return, use the `--tail` flag \n" +
"\t- To return results starting from a certain amount of time before now, use the `--since` flag \n",
Example: "\t- {{alias}} --pipeline foo \n" +
"\t- {{alias}} --job foo@5f93d03b65fa421996185e53f7f8b1e4 \n" +
"\t- {{alias}} --job foo@5f93d03b65fa421996185e53f7f8b1e4 --tail 10 \n" +
"\t- {{alias}} --job foo@5f93d03b65fa421996185e53f7f8b1e4 --follow \n" +
"\t- {{alias}} --job foo@5f93d03b65fa421996185e53f7f8b1e4 --datum 7f3c[...] \n" +
"\t- {{alias}} --pipeline foo --datum 7f3c[...] --master \n" +
"\t- {{alias}} --pipeline foo --datum 7f3c[...] --worker \n" +
"\t- {{alias}} --pipeline foo --datum 7f3c[...] --master --tail 10 \n" +
"\t- {{alias}} --pipeline foo --datum 7f3c[...] --worker --follow \n",
Run: cmdutil.RunFixedArgsCmd(0, func(cmd *cobra.Command, args []string) error {
client, err := pachctlCfg.NewOnUserMachine(mainCtx, false)
if err != nil {
return errors.Wrapf(err, "error connecting to pachd")
}
defer client.Close()
// Break up comma-separated input paths, and filter out empty entries
data := strings.Split(commaInputs, ",")
for i := 0; i < len(data); {
if len(data[i]) == 0 {
if i+1 < len(data) {
copy(data[i:], data[i+1:])
}
data = data[:len(data)-1]
} else {
i++
}
}
since, err := time.ParseDuration(since)
if err != nil {
return errors.Wrapf(err, "error parsing since (%q)", since)
}
if tail != 0 {
return errors.Errorf("tail has been deprecated and removed from Pachyderm, use --since instead")
}
if pipelineName != "" && jobStr != "" {
return errors.Errorf("only one of pipeline or job should be specified")
}
var jobID string
if jobStr != "" {
job, err := cmdutil.ParseJob(project, jobStr)
if err != nil {
return err
}
pipelineName = job.Pipeline.Name
jobID = job.Id
}
// Issue RPC
if !cmd.Flags().Changed("since") {
since = 0
}
iter := client.GetLogs(project, pipelineName, jobID, data, datumID, master, follow, since)
for iter.Next() {
if raw {
fmt.Println(protojson.Format(iter.Message()))
} else if iter.Message().User && !master && !worker {
prettyLogsPrinter(iter.Message().Message)
} else if iter.Message().Master && master {
prettyLogsPrinter(iter.Message().Message)
} else if !iter.Message().User && !iter.Message().Master && worker {
prettyLogsPrinter(iter.Message().Message)
} else if pipelineName == "" && jobID == "" {
prettyLogsPrinter(iter.Message().Message)
}
}
return iter.Err()
}),
}
getLogs.Flags().StringVarP(&pipelineName, "pipeline", "p", "", "Specify results should only return logs for a given pipeline.")
getLogs.MarkFlagCustom("pipeline", "__pachctl_get_pipeline")
getLogs.Flags().StringVarP(&jobStr, "job", "j", "", "Specify results should only return logs for a given job ID.")
getLogs.MarkFlagCustom("job", "__pachctl_get_job")
getLogs.Flags().StringVar(&datumID, "datum", "", "Specify results should only return logs for a given datum ID.")
getLogs.Flags().StringVar(&commaInputs, "inputs", "", "Filter for log lines generated while processing these files (accepts PFS paths or file hashes)")
getLogs.Flags().BoolVar(&master, "master", false, "Specify results should only return logs from the master process; --pipeline must be set.")
getLogs.Flags().BoolVar(&worker, "worker", false, "Specify results should only return logs from the worker process.")
getLogs.Flags().BoolVar(&raw, "raw", false, "Specify results should only return log messages verbatim from server.")
getLogs.Flags().BoolVarP(&follow, "follow", "f", false, "Follow logs as more are created.")
getLogs.Flags().Int64VarP(&tail, "tail", "t", 0, "Set the number of lines to return of the most recent logs.")
getLogs.Flags().StringVar(&since, "since", "24h", "Specify results should return log messages more recent than \"since\".")
getLogs.Flags().StringVar(&project, "project", project, "Specify the project (by name) containing parent pipeline for the job.")
shell.RegisterCompletionFunc(getLogs,
func(flag, text string, maxCompletions int64) ([]prompt.Suggest, shell.CacheFunc) {
if flag == "--pipeline" || flag == "-p" {
cs, cf := shell.PipelineCompletion(flag, text, maxCompletions)
return cs, shell.AndCacheFunc(cf, shell.SameFlag(flag))
}
if flag == "--job" || flag == "-j" {
cs, cf := shell.JobCompletion(flag, text, maxCompletions)
return cs, shell.AndCacheFunc(cf, shell.SameFlag(flag))
}
return nil, shell.SameFlag(flag)
})
commands = append(commands, cmdutil.CreateAlias(getLogs, "logs"))
pipelineDocs := &cobra.Command{
Short: "Docs for pipelines.",
Long: "Pipelines are a powerful abstraction for automating jobs. They take a set of repos and branches as inputs and write to a single output repo of the same name. \n" +
"Pipelines then subscribe to commits on those repos and launch a job to process each incoming commit. All jobs created by a pipeline will create commits in the pipeline's output repo. ",
}
commands = append(commands, cmdutil.CreateDocsAliases(pipelineDocs, "pipeline", " pipeline$", pipelines))
var pushImages bool
var registry string
var username string
var pipelinePath string
var jsonnetPath string
var jsonnetArgs []string
var dryRun bool
createPipeline := &cobra.Command{
Short: "Create a new pipeline.",
Long: "This command creates a new pipeline from a pipeline specification. \n \n" +
"You can create a pipeline using a JSON/YAML file or a jsonnet template file -- via either a local filepath or URL. Multiple pipelines can be created from one file." +
"For details on the format, see https://docs.pachyderm.com/latest/reference/pipeline_spec/. \n \n" +
"\t- To create a pipeline from a JSON/YAML file, use the `--file` flag \n" +
"\t- To create a pipeline from a jsonnet template file, use the `--jsonnet` flag; you can optionally pay multiple arguments separately using `--arg` \n" +
"\t- To push your local images to docker registry, use the `--push-images` and `--username` flags \n" +
"\t- To push your local images to custom registry, use the `--push-images`, `--registry`, and `--username` flags \n",
Example: "\t {{alias}} -file regression.json \n" +
"\t {{alias}} -file foo.json --project bar \n" +
"\t {{alias}} -file foo.json --push-images --username lbliii \n" +
"\t {{alias}} --jsonnet /templates/foo.jsonnet --arg myimage=bar --arg src=image \n",
Run: cmdutil.RunFixedArgs(0, func(args []string) (retErr error) {
return pipelineHelper(mainCtx, pachctlCfg, false, pushImages, registry, username, project, pipelinePath, jsonnetPath, jsonnetArgs, false, dryRun, output, raw)
}),
}
createPipeline.Flags().StringVarP(&pipelinePath, "file", "f", "", "Provide a JSON/YAML file (url or filepath) for one or more pipelines. \"-\" reads from stdin (the default behavior). Exactly one of --file and --jsonnet must be set.")
createPipeline.Flags().StringVar(&jsonnetPath, "jsonnet", "", "Provide a Jsonnet template file (url or filepath) for one or more pipelines. \"-\" reads from stdin. Exactly one of --file and --jsonnet must be set. Jsonnet templates must contain a top-level function; strings can be passed to this function with --arg (below)")
createPipeline.Flags().StringArrayVar(&jsonnetArgs, "arg", nil, "Provide a top-level argument in the form of 'param=value' passed to the Jsonnet template; requires --jsonnet. For multiple args, --arg may be set more than once.")
createPipeline.Flags().BoolVarP(&pushImages, "push-images", "p", false, "Specify that the local docker images should be pushed into the registry (docker by default).")
createPipeline.Flags().StringVarP(®istry, "registry", "r", "index.docker.io", "Specify an alternative registry to push images to.")
createPipeline.Flags().StringVarP(&username, "username", "u", "", "Specify the username to push images as.")
createPipeline.Flags().StringVar(&project, "project", project, "Specify the project (by name) in which to create the pipeline.")
createPipeline.Flags().BoolVar(&dryRun, "dry-run", false, "If true, pipeline will not actually be created.")
createPipeline.Flags().AddFlagSet(outputFlags)
commands = append(commands, cmdutil.CreateAliases(createPipeline, "create pipeline", pipelines))
var reprocess bool
updatePipeline := &cobra.Command{
Short: "Update an existing Pachyderm pipeline.",
Long: "This command updates a Pachyderm pipeline with a new pipeline specification. For details on the format, see https://docs.pachyderm.com/latest/reference/pipeline-spec/ \n \n" +
"\t- To update a pipeline from a JSON/YAML file, use the `--file` flag \n" +
"\t- To update a pipeline from a jsonnet template file, use the `--jsonnet` flag. You can optionally pay multiple arguments separately using `--arg` \n" +
"\t- To reprocess all data in the pipeline, use the `--reprocess` flag \n" +
"\t- To push your local images to docker registry, use the `--push-images` and `--username` flags \n" +
"\t- To push your local images to custom registry, use the `--push-images`, `--registry`, and `--username` flags \n",
Example: "\t {{alias}} -file regression.json \n" +
"\t {{alias}} -file foo.json --project bar \n" +
"\t {{alias}} -file foo.json --push-images --username lbliii \n" +
"\t {{alias}} --jsonnet /templates/foo.jsonnet --arg myimage=bar --arg src=image \n",
Run: cmdutil.RunFixedArgs(0, func(args []string) (retErr error) {
return pipelineHelper(mainCtx, pachctlCfg, reprocess, pushImages, registry, username, project, pipelinePath, jsonnetPath, jsonnetArgs, true, dryRun, output, raw)
}),
}
updatePipeline.Flags().StringVarP(&pipelinePath, "file", "f", "", "Provide a JSON/YAML file (url or filepath) for one or more pipelines. \"-\" reads from stdin (the default behavior). Exactly one of --file and --jsonnet must be set.")
updatePipeline.Flags().StringVar(&jsonnetPath, "jsonnet", "", "Provide a Jsonnet template file (url or filepath) for one or more pipelines. \"-\" reads from stdin. Exactly one of --file and --jsonnet must be set. Jsonnet templates must contain a top-level function; strings can be passed to this function with --arg (below)")
updatePipeline.Flags().StringArrayVar(&jsonnetArgs, "arg", nil, "Provide a top-level argument in the form of 'param=value' passed to the Jsonnet template; requires --jsonnet. For multiple args, --arg may be set more than once.")
updatePipeline.Flags().BoolVarP(&pushImages, "push-images", "p", false, "Specify that the local docker images should be pushed into the registry (docker by default).")
updatePipeline.Flags().StringVarP(®istry, "registry", "r", "index.docker.io", "Specify an alternative registry to push images to.")
updatePipeline.Flags().StringVarP(&username, "username", "u", "", "Specify the username to push images as.")
updatePipeline.Flags().BoolVar(&reprocess, "reprocess", false, "Reprocess all datums that were already processed by previous version of the pipeline.")
updatePipeline.Flags().StringVar(&project, "project", project, "Specify the project (by name) in which to create the pipeline.")
updatePipeline.Flags().BoolVar(&dryRun, "dry-run", false, "If true, pipeline will not actually be updated.")
updatePipeline.Flags().AddFlagSet(outputFlags)
commands = append(commands, cmdutil.CreateAliases(updatePipeline, "update pipeline", pipelines))
runCron := &cobra.Command{
Use: "{{alias}} <pipeline>",
Short: "Run an existing Pachyderm cron pipeline now",
Long: "This command runs an existing Pachyderm cron pipeline immediately.",
Example: "\t- {{alias}} foo \n" +
"\t- {{alias}} foo --project bar \n",
Run: cmdutil.RunMinimumArgs(1, func(args []string) (retErr error) {
client, err := pachctlCfg.NewOnUserMachine(mainCtx, false)
if err != nil {
return err
}
defer client.Close()
err = client.RunCron(project, args[0])
if err != nil {
return err
}
return nil
}),
}
runCron.Flags().StringVar(&project, "project", project, "Specify the project (by name) containing the cron pipeline.")
commands = append(commands, cmdutil.CreateAlias(runCron, "run cron"))
inspectPipeline := &cobra.Command{
Use: "{{alias}} <pipeline>",
Short: "Return info about a pipeline.",
Long: "This command returns info about a pipeline.",
Example: "\t- {{alias}} foo \n" +
"\t- {{alias}} foo --project bar \n" +
"\t- {{alias}} foo --project bar --raw -o yaml \n",
Run: cmdutil.RunFixedArgs(1, func(args []string) error {
client, err := pachctlCfg.NewOnUserMachine(mainCtx, false)
if err != nil {
return err
}
defer client.Close()
pipelineInfo, err := client.InspectPipeline(project, args[0], true)
if err != nil {
return err
}
if raw {
return errors.EnsureStack(cmdutil.Encoder(output, os.Stdout).EncodeProto(pipelineInfo))
} else if output != "" {
return errors.New("cannot set --output (-o) without --raw")
}
pi := &pretty.PrintablePipelineInfo{
PipelineInfo: pipelineInfo,
FullTimestamps: fullTimestamps,
}
return pretty.PrintDetailedPipelineInfo(os.Stdout, pi)
}),
}
inspectPipeline.Flags().AddFlagSet(outputFlags)
inspectPipeline.Flags().AddFlagSet(timestampFlags)
inspectPipeline.Flags().StringVar(&project, "project", project, "Specify the project (by name) containing the inspected pipeline.")
commands = append(commands, cmdutil.CreateAliases(inspectPipeline, "inspect pipeline", pipelines))
var editor string
var editorArgs []string
editPipeline := &cobra.Command{
Use: "{{alias}} <pipeline>",
Short: "Edit the manifest for a pipeline in your text editor.",
Long: "This command edits the manifest for a pipeline in your text editor.",
Example: "\t- {{alias}} foo \n" +
"\t- {{alias}} foo --project bar \n" +
"\t- {{alias}} foo --project bar --editor vim \n" +
"\t- {{alias}} foo --project bar --editor vim --output yaml \n" +
"\t- {{alias}} foo --project bar --editor vim --reprocess \n",
Run: cmdutil.RunFixedArgs(1, func(args []string) (retErr error) {
client, err := pachctlCfg.NewOnUserMachine(mainCtx, false)
if err != nil {
return err
}
defer client.Close()
info, _ := client.ClusterInfo()
pipelineInfo, err := client.InspectPipeline(project, args[0], true)
if err != nil {
return err
}
format := output
if format == "" {
format = "json"
}
f, err := os.CreateTemp("", fmt.Sprintf("%v-*.pipeline.%v", pipelineInfo.GetPipeline().GetName(), format))
if err != nil {
return errors.EnsureStack(err)
}
defer func() {
if err := f.Close(); err != nil && retErr == nil {
retErr = err
}
}()
var oldSpec map[string]any
decoder := json.NewDecoder(strings.NewReader(pipelineInfo.UserSpecJson))
if err := decoder.Decode(&oldSpec); err != nil {
return errors.Wrapf(err, "could not decode old user spec %s", pipelineInfo.UserSpecJson)
}
oldSpec[constants.JSONSchemaKey] = info.GetWebResources().GetCreatePipelineRequestJsonSchemaUrl()