Skip to content

Commit

Permalink
make concurrency always equal task list (#670)
Browse files Browse the repository at this point in the history
  • Loading branch information
thetechnocrat-dev committed Sep 29, 2023
1 parent c35e3f0 commit 27d75e3
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 25 deletions.
2 changes: 1 addition & 1 deletion cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var initCmd = &cobra.Command{
}

if autoRun {
_, _, err := ipwl.RunIO(cid, outputDir, selector, verbose, showAnimation, maxTime, concurrency, *annotationsForAutoRun)
_, _, err := ipwl.RunIO(cid, outputDir, selector, verbose, showAnimation, maxTime, *annotationsForAutoRun)
if err != nil {
fmt.Println("Error:", err)
os.Exit(1)
Expand Down
7 changes: 3 additions & 4 deletions cmd/resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,18 @@ var resumeCmd = &cobra.Command{
dry := true
upgradePlexVersion(dry)

_, err := Resume(ioJsonPath, outputDir, selector, verbose, showAnimation, retry, maxTime, concurrency, *annotationsForResume)
_, err := Resume(ioJsonPath, outputDir, selector, verbose, showAnimation, retry, maxTime, *annotationsForResume)
if err != nil {
fmt.Println("Error:", err)
}
},
}

func Resume(ioJsonFilePath, outputDir, selector string, verbose, showAnimation, retry bool, maxTime, concurrency int, annotations []string) (completedIoJsonCid string, err error) {
func Resume(ioJsonFilePath, outputDir, selector string, verbose, showAnimation, retry bool, maxTime int, annotations []string) (completedIoJsonCid string, err error) {
fmt.Println("Continuing to process IO JSON file at: ", ioJsonPath)
fmt.Println("Processing IO Entries")
workDirPath := filepath.Dir(ioJsonFilePath)
ipwl.ProcessIOList(workDirPath, ioJsonPath, selector, retry, verbose, showAnimation, maxTime, concurrency, annotations)
ipwl.ProcessIOList(workDirPath, ioJsonPath, selector, retry, verbose, showAnimation, maxTime, annotations)
fmt.Printf("Finished processing, results written to %s\n", ioJsonPath)
completedIoJsonCid, err = ipfs.PinFile(ioJsonPath)
if err != nil {
Expand All @@ -51,7 +51,6 @@ func init() {
resumeCmd.Flags().StringVarP(&outputDir, "outputDir", "o", "", "Output directory")
resumeCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Enable verbose output")
resumeCmd.Flags().BoolVarP(&showAnimation, "showAnimation", "", true, "Show job processing animation")
resumeCmd.Flags().IntVarP(&concurrency, "concurrency", "c", 1, "Number of concurrent operations")
annotationsForResume = resumeCmd.Flags().StringArrayP("annotations", "a", []string{}, "Annotations to add to Bacalhau job")
resumeCmd.Flags().BoolVarP(&retry, "retry", "", true, "Retry failed jobs")

Expand Down
2 changes: 1 addition & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var runCmd = &cobra.Command{
dry := true
upgradePlexVersion(dry)

_, _, err := ipwl.RunIO(ioJsonCid, outputDir, selector, verbose, showAnimation, maxTime, concurrency, *annotations)
_, _, err := ipwl.RunIO(ioJsonCid, outputDir, selector, verbose, showAnimation, maxTime, *annotations)
if err != nil {
fmt.Println("Error:", err)
os.Exit(1)
Expand Down
2 changes: 0 additions & 2 deletions docs/docs/reference/python.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ def plex_run(
output_dir="",
verbose=False,
show_animation=False,
concurrency="1",
annotations=[],
plex_path="plex"
)
Expand All @@ -78,7 +77,6 @@ io_json_cid, io_json_local_filepath = plex_run(
* `output_dir` *str*, *optional* - output directory; plex default creates a jobs directory
* `verbose` *bool*, *optional* - verbose mode with more detailed logs
* `show_animation` *bool*, *optional* - emote animation during job runs
* `concurrency` *str*, *optional* - concurrency for processing jobs
* `annotations` *List[str]*, *optional* - list of annotations for jobs; mostly used for usage metrics
* `plex_path` *str*, *optional* - path pointing to plex binary

Expand Down
19 changes: 6 additions & 13 deletions internal/ipwl/ipwl.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

var errOutputPathEmpty = errors.New("output file path is empty, still waiting")

func RunIO(ioJsonCid, outputDir, selector string, verbose, showAnimation bool, maxTime, concurrency int, annotations []string) (completedIoJsonCid, ioJsonPath string, err error) {
func RunIO(ioJsonCid, outputDir, selector string, verbose, showAnimation bool, maxTime int, annotations []string) (completedIoJsonCid, ioJsonPath string, err error) {
id := uuid.New()
var cwd string
if outputDir != "" {
Expand Down Expand Up @@ -65,7 +65,7 @@ func RunIO(ioJsonCid, outputDir, selector string, verbose, showAnimation bool, m

retry := false
fmt.Println("Processing IO Entries")
ProcessIOList(workDirPath, ioJsonPath, selector, retry, verbose, showAnimation, maxTime, concurrency, annotations)
ProcessIOList(workDirPath, ioJsonPath, selector, retry, verbose, showAnimation, maxTime, annotations)
fmt.Printf("Finished processing, results written to %s\n", ioJsonPath)
completedIoJsonCid, err = ipfs.PinFile(ioJsonPath)
if err != nil {
Expand All @@ -76,10 +76,7 @@ func RunIO(ioJsonCid, outputDir, selector string, verbose, showAnimation bool, m
return completedIoJsonCid, ioJsonPath, nil
}

func ProcessIOList(jobDir, ioJsonPath, selector string, retry, verbose, showAnimation bool, maxTime, maxConcurrency int, annotations []string) {
// Use a buffered channel as a semaphore to limit the number of concurrent tasks
semaphore := make(chan struct{}, maxConcurrency)

func ProcessIOList(jobDir, ioJsonPath, selector string, retry, verbose, showAnimation bool, maxTime int, annotations []string) {
// Mutex to synchronize file access
var fileMutex sync.Mutex

Expand Down Expand Up @@ -112,9 +109,6 @@ func ProcessIOList(jobDir, ioJsonPath, selector string, retry, verbose, showAnim
go func(index int, entry IO) {
defer wg.Done()

// Acquire the semaphore
semaphore <- struct{}{}

fmt.Printf("Starting to process IO entry %d \n", index)

// add retry and resume check
Expand All @@ -127,16 +121,15 @@ func ProcessIOList(jobDir, ioJsonPath, selector string, retry, verbose, showAnim
} else {
fmt.Printf("Success processing IO entry %d \n", index)
}

// Release the semaphore
<-semaphore
}(i, ioEntry)
}

// Wait for all goroutines to finish
wg.Wait()

// Wait before re-checking chain dependecies
// Wait before re-checking chain dependencies
// Todo: switch to event driven checking so go-routine
// can stop once bacalhau job is created instead of completed
time.Sleep(500 * time.Millisecond)
}
}
Expand Down
5 changes: 1 addition & 4 deletions python/src/plex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def plex_upload(file_path: str, wrap_file=True, plex_path="plex"):
return file_cid


def plex_run(io_json_cid: str, output_dir="", verbose=False, show_animation=False, max_time="60" ,concurrency="1", annotations=None, plex_path="plex"):
def plex_run(io_json_cid: str, output_dir="", verbose=False, show_animation=False, max_time="60" , annotations=None, plex_path="plex"):
cwd = os.getcwd()
plex_work_dir = os.environ.get("PLEX_WORK_DIR", os.path.dirname(cwd))
cmd = [plex_path, "run", "-i", io_json_cid]
Expand All @@ -135,9 +135,6 @@ def plex_run(io_json_cid: str, output_dir="", verbose=False, show_animation=Fals
if max_time:
cmd.append(f"-m={max_time}")

if concurrency:
cmd.append(f"--concurrency={concurrency}")

if annotations is None:
annotations = []

Expand Down

0 comments on commit 27d75e3

Please sign in to comment.