diff --git a/cmd/root.go b/cmd/root.go index 38ab622..1a60474 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -41,6 +41,7 @@ var rootCmd = &cobra.Command{ debugLogger.SetLevel(logrus.DebugLevel) if util.InSlice(verboseAll, verbose) { + logrus.SetLevel(logrus.DebugLevel) verbose = []string{verboseDHT, verboseNodeFinder} } diff --git a/cmd/upload.go b/cmd/upload.go index db9c1a0..7506d35 100644 --- a/cmd/upload.go +++ b/cmd/upload.go @@ -5,20 +5,23 @@ import ( "io/ioutil" "os" "os/signal" + "path" "sync" "syscall" "time" - "github.com/lbryio/lbry.go/stop" "github.com/lbryio/reflector.go/db" "github.com/lbryio/reflector.go/peer" "github.com/lbryio/reflector.go/store" + "github.com/lbryio/lbry.go/stop" + log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) -var workers int +var uploadWorkers int +var uploadSkipExistsCheck bool const ( sdInc = 1 @@ -27,24 +30,25 @@ const ( ) type uploaderParams struct { - workerWG *sync.WaitGroup - counterWG *sync.WaitGroup - stopper *stop.Group - filenameChan chan string - countChan chan int - sdCount int - blobCount int - errCount int + workerWG *sync.WaitGroup + counterWG *sync.WaitGroup + stopper *stop.Group + pathChan chan string + countChan chan int + sdCount int + blobCount int + errCount int } func init() { var cmd = &cobra.Command{ - Use: "upload DIR", + Use: "upload PATH", Short: "Upload blobs to S3", Args: cobra.ExactArgs(1), Run: uploadCmd, } - cmd.PersistentFlags().IntVar(&workers, "workers", 1, "How many worker threads to run at once") + cmd.PersistentFlags().IntVar(&uploadWorkers, "workers", 1, "How many worker threads to run at once") + cmd.PersistentFlags().BoolVar(&uploadSkipExistsCheck, "skipExistsCheck", false, "Dont check if blobs exist before uploading") rootCmd.AddCommand(cmd) } @@ -55,28 +59,36 @@ func uploadCmd(cmd *cobra.Command, args []string) { checkErr(err) params := uploaderParams{ - workerWG: &sync.WaitGroup{}, - counterWG: &sync.WaitGroup{}, - filenameChan: make(chan string), - countChan: make(chan int), - stopper: stop.New()} + workerWG: &sync.WaitGroup{}, + counterWG: &sync.WaitGroup{}, + pathChan: make(chan string), + countChan: make(chan int), + stopper: stop.New()} setInterrupt(params.stopper) - filenames, err := getFileNames(args[0]) + paths, err := getPaths(args[0]) checkErr(err) - totalCount := len(filenames) + totalCount := len(paths) + + hashes := make([]string, len(paths)) + for i, p := range paths { + hashes[i] = path.Base(p) + } log.Println("checking for existing blobs") - exists, err := db.HasBlobs(filenames) - checkErr(err) + exists := make(map[string]bool) + if !uploadSkipExistsCheck { + exists, err = db.HasBlobs(hashes) + checkErr(err) + } existsCount := len(exists) log.Printf("%d new blobs to upload", totalCount-existsCount) - startUploadWorkers(¶ms, args[0]) + startUploadWorkers(¶ms) params.counterWG.Add(1) go func() { defer params.counterWG.Done() @@ -84,20 +96,20 @@ func uploadCmd(cmd *cobra.Command, args []string) { }() Upload: - for _, filename := range filenames { - if exists[filename] { + for _, f := range paths { + if exists[path.Base(f)] { continue } select { - case params.filenameChan <- filename: + case params.pathChan <- f: case <-params.stopper.Ch(): log.Warnln("Caught interrupt, quitting at first opportunity...") break Upload } } - close(params.filenameChan) + close(params.pathChan) params.workerWG.Wait() close(params.countChan) params.counterWG.Wait() @@ -134,8 +146,8 @@ func setInterrupt(stopper *stop.Group) { }() } -func startUploadWorkers(params *uploaderParams, dir string) { - for i := 0; i < workers; i++ { +func startUploadWorkers(params *uploaderParams) { + for i := 0; i < uploadWorkers; i++ { params.workerWG.Add(1) go func(i int) { defer params.workerWG.Done() @@ -144,27 +156,27 @@ func startUploadWorkers(params *uploaderParams, dir string) { }(i) blobStore := newBlobStore() - launchFileUploader(params, blobStore, dir, i) + launchFileUploader(params, blobStore, i) }(i) } } -func launchFileUploader(params *uploaderParams, blobStore *store.DBBackedS3Store, dir string, worker int) { +func launchFileUploader(params *uploaderParams, blobStore *store.DBBackedS3Store, worker int) { for { select { case <-params.stopper.Ch(): return - case filename, ok := <-params.filenameChan: + case filepath, ok := <-params.pathChan: if !ok { return } - blob, err := ioutil.ReadFile(dir + "/" + filename) + blob, err := ioutil.ReadFile(filepath) checkErr(err) hash := peer.GetBlobHash(blob) - if hash != filename { - log.Errorf("worker %d: filename does not match hash (%s != %s), skipping", worker, filename, hash) + if hash != path.Base(filepath) { + log.Errorf("worker %d: file name does not match hash (%s != %s), skipping", worker, filepath, hash) select { case params.countChan <- errInc: case <-params.stopper.Ch(): @@ -221,8 +233,17 @@ func runCountReceiver(params *uploaderParams, startTime time.Time, totalCount in } } -func getFileNames(dir string) ([]string, error) { - f, err := os.Open(dir) +func getPaths(path string) ([]string, error) { + info, err := os.Stat(path) + if err != nil { + return nil, err + } + + if info.Mode().IsRegular() { + return []string{path}, nil + } + + f, err := os.Open(path) if err != nil { return nil, err } @@ -239,7 +260,7 @@ func getFileNames(dir string) ([]string, error) { var filenames []string for _, file := range files { if !file.IsDir() { - filenames = append(filenames, file.Name()) + filenames = append(filenames, path+"/"+file.Name()) } } diff --git a/db/db.go b/db/db.go index d69e0e0..36f1cf7 100644 --- a/db/db.go +++ b/db/db.go @@ -72,7 +72,7 @@ func addBlob(tx *sql.Tx, hash string, length int, isStored bool) error { return errors.Err("length must be positive") } - err := execPrepared(tx, + err := execTx(tx, "INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))", []interface{}{hash, isStored, length}, ) @@ -200,7 +200,7 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error { } // insert stream - err = execPrepared(tx, + err = execTx(tx, "INSERT IGNORE INTO stream (hash, sd_hash) VALUES (?,?)", []interface{}{sdBlob.StreamHash, sdHash}, ) @@ -220,7 +220,7 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error { return err } - err = execPrepared(tx, + err = execTx(tx, "INSERT IGNORE INTO stream_blob (stream_hash, blob_hash, num) VALUES (?,?,?)", []interface{}{sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum}, ) @@ -264,13 +264,8 @@ func (s *SQL) GetStoredHashesInRange(ctx context.Context, start, end bits.Bitmap return } - //query := "SELECT hash FROM blob_ WHERE hash >= ? AND hash <= ? AND is_stored = 1" - //args := []interface{}{start.Hex(), end.Hex()} - query := "SELECT hash FROM blob_ WHERE hash IN (?,?) AND is_stored = 1" - args := []interface{}{ - "6363e3ed8d32156aebbbe8c0dd077e7029c7cdaec58e08271aa35baa4250ec531129cb4f7a9ac9b7285dbb7ba375ab11", - "89c5c3f9794b0b24a03406e3b74361edb9ae70828e4c133512fc75db0a2d312673cdd4e30eed37892a46692d2fe439f3", - } + query := "SELECT hash FROM blob_ WHERE hash >= ? AND hash <= ? AND is_stored = 1" + args := []interface{}{start.Hex(), end.Hex()} logQuery(query, args...) @@ -352,20 +347,9 @@ func closeRows(rows *sql.Rows) { } } -func execPrepared(tx *sql.Tx, query string, args []interface{}) error { +func execTx(tx *sql.Tx, query string, args []interface{}) error { logQuery(query, args...) - - stmt, err := tx.Prepare(query) - if err != nil { - return errors.Err(err) - } - - _, err = stmt.Exec(args...) - if err != nil { - return errors.Err(err) - } - - err = stmt.Close() + _, err := tx.Exec(query, args...) return errors.Err(err) }