Skip to content

Commit

Permalink
646 lab 604 custom job selector label (#655)
Browse files Browse the repository at this point in the history
  • Loading branch information
thetechnocrat-dev committed Sep 19, 2023
1 parent dceca82 commit 10656ed
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 248 deletions.
3 changes: 2 additions & 1 deletion cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var initCmd = &cobra.Command{
}

if autoRun {
_, _, err := ipwl.RunIO(cid, outputDir, verbose, showAnimation, maxTime, concurrency, *annotationsForAutoRun)
_, _, err := ipwl.RunIO(cid, outputDir, selector, verbose, showAnimation, maxTime, concurrency, *annotationsForAutoRun)
if err != nil {
fmt.Println("Error:", err)
os.Exit(1)
Expand All @@ -85,6 +85,7 @@ func init() {
initCmd.Flags().StringVarP(&inputs, "inputs", "i", "{}", "Inputs in JSON format")
initCmd.Flags().StringVarP(&scatteringMethod, "scatteringMethod", "", "{}", "Inputs in JSON format")
initCmd.Flags().BoolVarP(&autoRun, "autoRun", "", false, "Auto submit the IO to plex run")
initCmd.Flags().StringVarP(&selector, "selector", "s", "", "Bacalhau Selector (label query) to filter nodes on which this job can be executed, supports '=', '==', and '!='.(e.g. -s key1=value1,key2=value2). Matching objects must satisfy all of the specified label constraints")
annotationsForAutoRun = initCmd.Flags().StringArrayP("annotations", "a", []string{}, "Annotations to add to Bacalhau job")

rootCmd.AddCommand(initCmd)
Expand Down
6 changes: 3 additions & 3 deletions cmd/resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ var resumeCmd = &cobra.Command{
dry := true
upgradePlexVersion(dry)

_, err := Resume(ioJsonPath, outputDir, verbose, showAnimation, retry, maxTime, concurrency, *annotationsForResume)
_, err := Resume(ioJsonPath, outputDir, selector, verbose, showAnimation, retry, maxTime, concurrency, *annotationsForResume)
if err != nil {
fmt.Println("Error:", err)
}
},
}

func Resume(ioJsonFilePath, outputDir string, verbose, showAnimation, retry bool, maxTime, concurrency int, annotations []string) (completedIoJsonCid string, err error) {
func Resume(ioJsonFilePath, outputDir, selector string, verbose, showAnimation, retry bool, maxTime, concurrency int, annotations []string) (completedIoJsonCid string, err error) {
fmt.Println("Continuing to process IO JSON file at: ", ioJsonPath)
fmt.Println("Processing IO Entries")
workDirPath := filepath.Dir(ioJsonFilePath)
ipwl.ProcessIOList(workDirPath, ioJsonPath, retry, verbose, showAnimation, maxTime, concurrency, annotations)
ipwl.ProcessIOList(workDirPath, ioJsonPath, selector, retry, verbose, showAnimation, maxTime, concurrency, annotations)
fmt.Printf("Finished processing, results written to %s\n", ioJsonPath)
completedIoJsonCid, err = ipfs.PinFile(ioJsonPath)
if err != nil {
Expand Down
4 changes: 3 additions & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
var (
ioJsonCid string
outputDir string
selector string
verbose bool
showAnimation bool
maxTime int
Expand All @@ -26,7 +27,7 @@ var runCmd = &cobra.Command{
dry := true
upgradePlexVersion(dry)

_, _, err := ipwl.RunIO(ioJsonCid, outputDir, verbose, showAnimation, maxTime, concurrency, *annotations)
_, _, err := ipwl.RunIO(ioJsonCid, outputDir, selector, verbose, showAnimation, maxTime, concurrency, *annotations)
if err != nil {
fmt.Println("Error:", err)
os.Exit(1)
Expand All @@ -37,6 +38,7 @@ var runCmd = &cobra.Command{
func init() {
runCmd.Flags().StringVarP(&ioJsonCid, "ioJsonCid", "i", "", "IPFS CID of IO JSON")
runCmd.Flags().StringVarP(&outputDir, "outputDir", "o", "", "Output directory")
runCmd.Flags().StringVarP(&selector, "selector", "s", "", "Bacalhau Selector (label query) to filter nodes on which this job can be executed, supports '=', '==', and '!='.(e.g. -s key1=value1,key2=value2). Matching objects must satisfy all of the specified label constraints")
runCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Enable verbose output")
runCmd.Flags().BoolVarP(&showAnimation, "showAnimation", "", true, "Show job processing animation")
runCmd.Flags().IntVarP(&maxTime, "maxTime", "m", 60, "Maximum time (min) to run a job")
Expand Down
130 changes: 72 additions & 58 deletions go.mod

Large diffs are not rendered by default.

347 changes: 183 additions & 164 deletions go.sum

Large diffs are not rendered by default.

32 changes: 18 additions & 14 deletions internal/bacalhau/bacalhau.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (

"github.com/bacalhau-project/bacalhau/pkg/downloader"
"github.com/bacalhau-project/bacalhau/pkg/downloader/util"
node "github.com/bacalhau-project/bacalhau/pkg/job"
"github.com/bacalhau-project/bacalhau/pkg/model"
"github.com/bacalhau-project/bacalhau/pkg/requester/publicapi"
"github.com/bacalhau-project/bacalhau/pkg/system"
"k8s.io/apimachinery/pkg/selection"
)

func GetBacalhauApiHost() string {
Expand All @@ -27,7 +27,7 @@ func GetBacalhauApiHost() string {
}
}

func CreateBacalhauJob(cid, container, cmd string, maxTime, memory int, gpu, network bool, annotations []string) (job *model.Job, err error) {
func CreateBacalhauJob(cid, container, cmd, selector string, maxTime, memory int, gpu, network bool, annotations []string) (job *model.Job, err error) {
job, err = model.NewJobWithSaneProductionDefaults()
if err != nil {
return nil, err
Expand All @@ -37,18 +37,19 @@ func CreateBacalhauJob(cid, container, cmd string, maxTime, memory int, gpu, net
job.Spec.Publisher = model.PublisherIpfs
job.Spec.Docker.Entrypoint = []string{"/bin/bash", "-c", cmd}
job.Spec.Annotations = annotations
job.Spec.Timeout = float64(maxTime) * 60
job.Spec.Timeout = float64(maxTime * 60)

// had problems getting selector to work in bacalhau v0.28
var selectorLabel string
plexEnv, _ := os.LookupEnv("PLEX_ENV")
if plexEnv == "stage" {
selectorLabel = "labdaostage"
} else {
selectorLabel = "labdao"
if selector == "" && plexEnv == "stage" {
selector = "owner=labdaostage"
} else if selector == "" && plexEnv == "prod" {
selector = "owner=labdao"
}
selector := model.LabelSelectorRequirement{Key: "owner", Operator: selection.Equals, Values: []string{selectorLabel}}
job.Spec.NodeSelectors = []model.LabelSelectorRequirement{selector}
nodeSelectorRequirements, err := node.ParseNodeSelector(selector)
if err != nil {
return nil, err
}
job.Spec.NodeSelectors = nodeSelectorRequirements

if memory > 0 {
job.Spec.Resources.Memory = fmt.Sprintf("%dgb", memory)
Expand All @@ -66,8 +67,8 @@ func CreateBacalhauJob(cid, container, cmd string, maxTime, memory int, gpu, net

func CreateBacalhauClient() *publicapi.RequesterAPIClient {
system.InitConfig()
apiPort := uint16(1234)
apiHost := GetBacalhauApiHost()
apiPort := uint16(1234)
client := publicapi.NewRequesterAPIClient(apiHost, apiPort)
return client
}
Expand Down Expand Up @@ -124,9 +125,12 @@ func GetBacalhauJobResults(submittedJob *model.Job, showAnimation bool, maxTime
}

func DownloadBacalhauResults(dir string, submittedJob *model.Job, results []model.PublishedResult) error {
downloadSettings := util.NewDownloadSettings()
downloadSettings.OutputDir = dir
cm := system.NewCleanupManager()
downloadSettings := &model.DownloaderSettings{
Timeout: model.DefaultIPFSTimeout,
OutputDir: dir,
}
downloadSettings.OutputDir = dir
downloaderProvider := util.NewStandardDownloaders(cm, downloadSettings)
err := downloader.DownloadResults(context.Background(), results, downloaderProvider, downloadSettings)
return err
Expand Down
3 changes: 2 additions & 1 deletion internal/bacalhau/bacalhau_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ func TestCreateBacalhauJob(t *testing.T) {
memory := "12gb"
gpu := "1"
networkFlag := true
job, err := CreateBacalhauJob(cid, container, cmd, 60, 12, true, networkFlag, []string{})
selector := ""
job, err := CreateBacalhauJob(cid, container, cmd, selector, 60, 12, true, networkFlag, []string{})
if err != nil {
t.Fatalf(fmt.Sprint(err))
}
Expand Down
12 changes: 6 additions & 6 deletions internal/ipwl/ipwl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

var errOutputPathEmpty = errors.New("output file path is empty, still waiting")

func RunIO(ioJsonCid, outputDir string, verbose, showAnimation bool, maxTime, concurrency int, annotations []string) (completedIoJsonCid, ioJsonPath string, err error) {
func RunIO(ioJsonCid, outputDir, selector string, verbose, showAnimation bool, maxTime, concurrency int, annotations []string) (completedIoJsonCid, ioJsonPath string, err error) {
id := uuid.New()
var cwd string
if outputDir != "" {
Expand Down Expand Up @@ -64,7 +64,7 @@ func RunIO(ioJsonCid, outputDir string, verbose, showAnimation bool, maxTime, co

retry := false
fmt.Println("Processing IO Entries")
ProcessIOList(workDirPath, ioJsonPath, retry, verbose, showAnimation, maxTime, concurrency, annotations)
ProcessIOList(workDirPath, ioJsonPath, selector, retry, verbose, showAnimation, maxTime, concurrency, annotations)
fmt.Printf("Finished processing, results written to %s\n", ioJsonPath)
completedIoJsonCid, err = ipfs.PinFile(ioJsonPath)
if err != nil {
Expand All @@ -75,7 +75,7 @@ func RunIO(ioJsonCid, outputDir string, verbose, showAnimation bool, maxTime, co
return completedIoJsonCid, ioJsonPath, nil
}

func ProcessIOList(jobDir, ioJsonPath string, retry, verbose, showAnimation bool, maxTime, maxConcurrency int, annotations []string) {
func ProcessIOList(jobDir, ioJsonPath, selector string, retry, verbose, showAnimation bool, maxTime, maxConcurrency int, annotations []string) {
// Use a buffered channel as a semaphore to limit the number of concurrent tasks
semaphore := make(chan struct{}, maxConcurrency)

Expand Down Expand Up @@ -117,7 +117,7 @@ func ProcessIOList(jobDir, ioJsonPath string, retry, verbose, showAnimation bool
fmt.Printf("Starting to process IO entry %d \n", index)

// add retry and resume check
err := processIOTask(entry, index, maxTime, jobDir, ioJsonPath, retry, verbose, showAnimation, annotations, &fileMutex)
err := processIOTask(entry, index, maxTime, jobDir, ioJsonPath, selector, retry, verbose, showAnimation, annotations, &fileMutex)
if errors.Is(err, errOutputPathEmpty) {
fmt.Printf("Waiting to process IO entry %d \n", index)
} else if err != nil {
Expand All @@ -140,7 +140,7 @@ func ProcessIOList(jobDir, ioJsonPath string, retry, verbose, showAnimation bool
}
}

func processIOTask(ioEntry IO, index, maxTime int, jobDir, ioJsonPath string, retry, verbose, showAnimation bool, annotations []string, fileMutex *sync.Mutex) error {
func processIOTask(ioEntry IO, index, maxTime int, jobDir, ioJsonPath, selector string, retry, verbose, showAnimation bool, annotations []string, fileMutex *sync.Mutex) error {
fileMutex.Lock()
ioGraph, err := ReadIOList(ioJsonPath)
fileMutex.Unlock()
Expand Down Expand Up @@ -216,7 +216,7 @@ func processIOTask(ioEntry IO, index, maxTime int, jobDir, ioJsonPath string, re
memory = *toolConfig.MemoryGB
}

bacalhauJob, err := bacalhau.CreateBacalhauJob(cid, toolConfig.DockerPull, cmd, maxTime, memory, toolConfig.GpuBool, toolConfig.NetworkBool, annotations)
bacalhauJob, err := bacalhau.CreateBacalhauJob(cid, toolConfig.DockerPull, cmd, selector, maxTime, memory, toolConfig.GpuBool, toolConfig.NetworkBool, annotations)
if err != nil {
updateIOWithError(ioJsonPath, index, err, fileMutex)
return fmt.Errorf("error creating Bacalhau job: %w", err)
Expand Down

0 comments on commit 10656ed

Please sign in to comment.