Skip to content

Commit

Permalink
PlexRun refactored : now RunIO and lives in ipwl package (#644)
Browse files Browse the repository at this point in the history
  • Loading branch information
acashmoney committed Sep 12, 2023
1 parent 71edc8f commit 7838c44
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 70 deletions.
2 changes: 1 addition & 1 deletion cmd/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ var initCmd = &cobra.Command{
}

if autoRun {
_, _, err := PlexRun(cid, outputDir, verbose, showAnimation, maxTime, concurrency, *annotationsForAutoRun)
_, _, err := ipwl.RunIO(cid, outputDir, verbose, showAnimation, maxTime, concurrency, *annotationsForAutoRun)
if err != nil {
fmt.Println("Error:", err)
os.Exit(1)
Expand Down
65 changes: 1 addition & 64 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@ package cmd
import (
"fmt"
"os"
"path"
"path/filepath"

"github.com/google/uuid"
"github.com/labdao/plex/internal/ipfs"
"github.com/labdao/plex/internal/ipwl"
"github.com/spf13/cobra"
)
Expand All @@ -30,73 +26,14 @@ var runCmd = &cobra.Command{
dry := true
upgradePlexVersion(dry)

_, _, err := PlexRun(ioJsonCid, outputDir, verbose, showAnimation, maxTime, concurrency, *annotations)
_, _, err := ipwl.RunIO(ioJsonCid, outputDir, verbose, showAnimation, maxTime, concurrency, *annotations)
if err != nil {
fmt.Println("Error:", err)
os.Exit(1)
}
},
}

func PlexRun(ioJsonCid, outputDir string, verbose, showAnimation bool, maxTime, concurrency int, annotations []string) (completedIoJsonCid, ioJsonPath string, err error) {
// Create plex working directory
id := uuid.New()
var cwd string
if outputDir != "" {
absPath, err := filepath.Abs(outputDir)
if err != nil {
return completedIoJsonCid, ioJsonPath, err
}
cwd = absPath
} else {
cwd, err = os.Getwd()
if err != nil {
return completedIoJsonCid, ioJsonPath, err
}
cwd = path.Join(cwd, "jobs")
}
workDirPath := path.Join(cwd, id.String())
err = os.MkdirAll(workDirPath, 0755)
if err != nil {
return completedIoJsonCid, ioJsonPath, err
}
fmt.Println("Created working directory: ", workDirPath)

ioJsonPath = path.Join(workDirPath, "io.json")
err = ipfs.DownloadFileContents(ioJsonCid, ioJsonPath)
if err != nil {
return completedIoJsonCid, ioJsonPath, err
}
fmt.Println("Initialized IO file at: ", ioJsonPath)

userID, err := ipwl.ExtractUserIDFromIOJson(ioJsonPath)
if err != nil {
return completedIoJsonCid, ioJsonPath, err
}

if userID != "" && !ipwl.ContainsUserIdAnnotation(annotations) {
annotations = append(annotations, fmt.Sprintf("userId=%s", userID))
}

if maxTime > 60 {
fmt.Println("Error: maxTime cannot exceed 60 minutes")
os.Exit(1)
}

retry := false
fmt.Println("Processing IO Entries")
ipwl.ProcessIOList(workDirPath, ioJsonPath, retry, verbose, showAnimation, maxTime, concurrency, annotations)
fmt.Printf("Finished processing, results written to %s\n", ioJsonPath)
completedIoJsonCid, err = ipfs.PinFile(ioJsonPath)
if err != nil {
return completedIoJsonCid, ioJsonPath, err
}

// The Python SDK string matches here so make sure to change in both places
fmt.Println("Completed IO JSON CID:", completedIoJsonCid)
return completedIoJsonCid, ioJsonPath, err
}

func init() {
runCmd.Flags().StringVarP(&ioJsonCid, "ioJsonCid", "i", "", "IPFS CID of IO JSON")
runCmd.Flags().StringVarP(&outputDir, "outputDir", "o", "", "Output directory")
Expand Down
2 changes: 1 addition & 1 deletion cmd/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var upgradeCmd = &cobra.Command{
}

const (
CurrentPlexVersion = "v0.10.3"
CurrentPlexVersion = "v0.10.4"
ReleaseURL = "https://api.github.com/repos/labdao/plex/releases/latest"
ToolsURL = "https://api.github.com/repos/labdao/plex/contents/tools?ref=main"
)
Expand Down
41 changes: 38 additions & 3 deletions gateway/handlers/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"

Expand Down Expand Up @@ -77,9 +78,43 @@ func InitJobHandler(db *gorm.DB) http.HandlerFunc {
}
}

func RunJobHandler() {
// log that this function is being hit
fmt.Print("RunJobHandler hit")
func RunJobHandler(db *gorm.DB) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if err := utils.CheckRequestMethod(r, http.MethodPost); err != nil {
utils.SendJSONError(w, err.Error(), http.StatusBadRequest)
return
}

var requestData struct {
IoJsonCid string `json:"ioJsonCid"`
Annotations []string `json:"annotations"`
}

if err := utils.ReadRequestBody(r, &requestData); err != nil {
http.Error(w, "Error parsing request body", http.StatusBadRequest)
return
}

tempDir, err := ioutil.TempDir("", "job")
if err != nil {
http.Error(w, fmt.Sprintf("Error creating temporary directory: %v", err), http.StatusInternalServerError)
return
}
defer os.RemoveAll(tempDir)

completedIoJsonCid, ioJsonPath, err := ipwl.RunIO(requestData.IoJsonCid, requestData.OutputDir, false, false, 60, 1, requestData.Annotations)
if err != nil {
http.Error(w, fmt.Sprintf("Error running job: %v", err), http.StatusInternalServerError)
return
}

responseData := map[string]string{
"completedIoJsonCid": completedIoJsonCid,
"ioJsonPath": ioJsonPath,
}

utils.SendJSONResponse(w, responseData)
}
}

func GetJobHandler(db *gorm.DB) http.HandlerFunc {
Expand Down
2 changes: 1 addition & 1 deletion gateway/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func NewServer(db *gorm.DB) *mux.Router {
router.HandleFunc("/init-job", handlers.InitJobHandler(db)).Methods("POST")
router.HandleFunc("/get-jobs", handlers.GetJobsHandler(db)).Methods("GET")
router.HandleFunc("/get-jobs/{cid}", handlers.GetJobHandler(db)).Methods("GET")
// router.HandleFunc("/run-job", handlers.RunJobHandler(db)).Methods("POST")
router.HandleFunc("/run-job", handlers.RunJobHandler(db)).Methods("POST")

return router
}
59 changes: 59 additions & 0 deletions internal/ipwl/ipwl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,75 @@ import (
"io"
"io/ioutil"
"os"
"path"
"path/filepath"
"sync"
"time"

"github.com/google/uuid"
"github.com/labdao/plex/internal/bacalhau"
"github.com/labdao/plex/internal/ipfs"
)

var errOutputPathEmpty = errors.New("output file path is empty, still waiting")

func RunIO(ioJsonCid, outputDir string, verbose, showAnimation bool, maxTime, concurrency int, annotations []string) (completedIoJsonCid, ioJsonPath string, err error) {
id := uuid.New()
var cwd string
if outputDir != "" {
absPath, err := filepath.Abs(outputDir)
if err != nil {
return completedIoJsonCid, ioJsonPath, err
}
cwd = absPath
} else {
cwd, err = os.Getwd()
if err != nil {
return completedIoJsonCid, ioJsonPath, err
}
cwd = path.Join(cwd, "jobs")
}
workDirPath := path.Join(cwd, id.String())
err = os.MkdirAll(workDirPath, 0755)
if err != nil {
return completedIoJsonCid, ioJsonPath, err
}
fmt.Println("Created working directory:", workDirPath)

ioJsonPath = path.Join(workDirPath, "io.json")
err = ipfs.DownloadFileContents(ioJsonCid, ioJsonPath)
if err != nil {
return completedIoJsonCid, ioJsonPath, err
}
fmt.Println("Initialized IO file at:", ioJsonPath)

userID, err := ExtractUserIDFromIOJson(ioJsonPath)
if err != nil {
return completedIoJsonCid, ioJsonPath, err
}

if userID != "" && !ContainsUserIdAnnotation(annotations) {
annotations = append(annotations, fmt.Sprintf("userId=%s", userID))
}

if maxTime > 60 {
fmt.Println("Error: maxTime cannot exceed 60 minutes")
os.Exit(1)
}

retry := false
fmt.Println("Processing IO Entries")
ProcessIOList(workDirPath, ioJsonPath, retry, verbose, showAnimation, maxTime, concurrency, annotations)
fmt.Printf("Finished processing, results written to %s\n", ioJsonPath)
completedIoJsonCid, err = ipfs.PinFile(ioJsonPath)
if err != nil {
return completedIoJsonCid, ioJsonPath, err
}

fmt.Println("Completed IO JSON CID:", completedIoJsonCid)
return completedIoJsonCid, ioJsonPath, nil
}

func ProcessIOList(jobDir, ioJsonPath string, retry, verbose, showAnimation bool, maxTime, maxConcurrency int, annotations []string) {
// Use a buffered channel as a semaphore to limit the number of concurrent tasks
semaphore := make(chan struct{}, maxConcurrency)
Expand Down

0 comments on commit 7838c44

Please sign in to comment.