Skip to content

Commit

Permalink
actual proper fix for db statement issue, added skipExists flag and t…
Browse files Browse the repository at this point in the history
…he ability to upload a single blob
  • Loading branch information
lyoshenka committed Aug 8, 2018
1 parent 0e0b2aa commit 3855d5c
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 60 deletions.
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var rootCmd = &cobra.Command{
debugLogger.SetLevel(logrus.DebugLevel)

if util.InSlice(verboseAll, verbose) {
logrus.SetLevel(logrus.DebugLevel)
verbose = []string{verboseDHT, verboseNodeFinder}
}

Expand Down
95 changes: 58 additions & 37 deletions cmd/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,23 @@ import (
"io/ioutil"
"os"
"os/signal"
"path"
"sync"
"syscall"
"time"

"github.com/lbryio/lbry.go/stop"
"github.com/lbryio/reflector.go/db"
"github.com/lbryio/reflector.go/peer"
"github.com/lbryio/reflector.go/store"

"github.com/lbryio/lbry.go/stop"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)

var workers int
var uploadWorkers int
var uploadSkipExistsCheck bool

const (
sdInc = 1
Expand All @@ -27,24 +30,25 @@ const (
)

type uploaderParams struct {
workerWG *sync.WaitGroup
counterWG *sync.WaitGroup
stopper *stop.Group
filenameChan chan string
countChan chan int
sdCount int
blobCount int
errCount int
workerWG *sync.WaitGroup
counterWG *sync.WaitGroup
stopper *stop.Group
pathChan chan string
countChan chan int
sdCount int
blobCount int
errCount int
}

func init() {
var cmd = &cobra.Command{
Use: "upload DIR",
Use: "upload PATH",
Short: "Upload blobs to S3",
Args: cobra.ExactArgs(1),
Run: uploadCmd,
}
cmd.PersistentFlags().IntVar(&workers, "workers", 1, "How many worker threads to run at once")
cmd.PersistentFlags().IntVar(&uploadWorkers, "workers", 1, "How many worker threads to run at once")
cmd.PersistentFlags().BoolVar(&uploadSkipExistsCheck, "skipExistsCheck", false, "Dont check if blobs exist before uploading")
rootCmd.AddCommand(cmd)
}

Expand All @@ -55,49 +59,57 @@ func uploadCmd(cmd *cobra.Command, args []string) {
checkErr(err)

params := uploaderParams{
workerWG: &sync.WaitGroup{},
counterWG: &sync.WaitGroup{},
filenameChan: make(chan string),
countChan: make(chan int),
stopper: stop.New()}
workerWG: &sync.WaitGroup{},
counterWG: &sync.WaitGroup{},
pathChan: make(chan string),
countChan: make(chan int),
stopper: stop.New()}

setInterrupt(params.stopper)

filenames, err := getFileNames(args[0])
paths, err := getPaths(args[0])
checkErr(err)

totalCount := len(filenames)
totalCount := len(paths)

hashes := make([]string, len(paths))
for i, p := range paths {
hashes[i] = path.Base(p)
}

log.Println("checking for existing blobs")

exists, err := db.HasBlobs(filenames)
checkErr(err)
exists := make(map[string]bool)
if !uploadSkipExistsCheck {
exists, err = db.HasBlobs(hashes)
checkErr(err)
}
existsCount := len(exists)

log.Printf("%d new blobs to upload", totalCount-existsCount)

startUploadWorkers(&params, args[0])
startUploadWorkers(&params)
params.counterWG.Add(1)
go func() {
defer params.counterWG.Done()
runCountReceiver(&params, startTime, totalCount, existsCount)
}()

Upload:
for _, filename := range filenames {
if exists[filename] {
for _, f := range paths {
if exists[path.Base(f)] {
continue
}

select {
case params.filenameChan <- filename:
case params.pathChan <- f:
case <-params.stopper.Ch():
log.Warnln("Caught interrupt, quitting at first opportunity...")
break Upload
}
}

close(params.filenameChan)
close(params.pathChan)
params.workerWG.Wait()
close(params.countChan)
params.counterWG.Wait()
Expand Down Expand Up @@ -134,8 +146,8 @@ func setInterrupt(stopper *stop.Group) {
}()
}

func startUploadWorkers(params *uploaderParams, dir string) {
for i := 0; i < workers; i++ {
func startUploadWorkers(params *uploaderParams) {
for i := 0; i < uploadWorkers; i++ {
params.workerWG.Add(1)
go func(i int) {
defer params.workerWG.Done()
Expand All @@ -144,27 +156,27 @@ func startUploadWorkers(params *uploaderParams, dir string) {
}(i)

blobStore := newBlobStore()
launchFileUploader(params, blobStore, dir, i)
launchFileUploader(params, blobStore, i)
}(i)
}
}

func launchFileUploader(params *uploaderParams, blobStore *store.DBBackedS3Store, dir string, worker int) {
func launchFileUploader(params *uploaderParams, blobStore *store.DBBackedS3Store, worker int) {
for {
select {
case <-params.stopper.Ch():
return
case filename, ok := <-params.filenameChan:
case filepath, ok := <-params.pathChan:
if !ok {
return
}

blob, err := ioutil.ReadFile(dir + "/" + filename)
blob, err := ioutil.ReadFile(filepath)
checkErr(err)

hash := peer.GetBlobHash(blob)
if hash != filename {
log.Errorf("worker %d: filename does not match hash (%s != %s), skipping", worker, filename, hash)
if hash != path.Base(filepath) {
log.Errorf("worker %d: file name does not match hash (%s != %s), skipping", worker, filepath, hash)
select {
case params.countChan <- errInc:
case <-params.stopper.Ch():
Expand Down Expand Up @@ -221,8 +233,17 @@ func runCountReceiver(params *uploaderParams, startTime time.Time, totalCount in
}
}

func getFileNames(dir string) ([]string, error) {
f, err := os.Open(dir)
func getPaths(path string) ([]string, error) {
info, err := os.Stat(path)
if err != nil {
return nil, err
}

if info.Mode().IsRegular() {
return []string{path}, nil
}

f, err := os.Open(path)
if err != nil {
return nil, err
}
Expand All @@ -239,7 +260,7 @@ func getFileNames(dir string) ([]string, error) {
var filenames []string
for _, file := range files {
if !file.IsDir() {
filenames = append(filenames, file.Name())
filenames = append(filenames, path+"/"+file.Name())
}
}

Expand Down
30 changes: 7 additions & 23 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func addBlob(tx *sql.Tx, hash string, length int, isStored bool) error {
return errors.Err("length must be positive")
}

err := execPrepared(tx,
err := execTx(tx,
"INSERT INTO blob_ (hash, is_stored, length) VALUES (?,?,?) ON DUPLICATE KEY UPDATE is_stored = (is_stored or VALUES(is_stored))",
[]interface{}{hash, isStored, length},
)
Expand Down Expand Up @@ -200,7 +200,7 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error {
}

// insert stream
err = execPrepared(tx,
err = execTx(tx,
"INSERT IGNORE INTO stream (hash, sd_hash) VALUES (?,?)",
[]interface{}{sdBlob.StreamHash, sdHash},
)
Expand All @@ -220,7 +220,7 @@ func (s *SQL) AddSDBlob(sdHash string, sdBlobLength int, sdBlob SdBlob) error {
return err
}

err = execPrepared(tx,
err = execTx(tx,
"INSERT IGNORE INTO stream_blob (stream_hash, blob_hash, num) VALUES (?,?,?)",
[]interface{}{sdBlob.StreamHash, contentBlob.BlobHash, contentBlob.BlobNum},
)
Expand Down Expand Up @@ -264,13 +264,8 @@ func (s *SQL) GetStoredHashesInRange(ctx context.Context, start, end bits.Bitmap
return
}

//query := "SELECT hash FROM blob_ WHERE hash >= ? AND hash <= ? AND is_stored = 1"
//args := []interface{}{start.Hex(), end.Hex()}
query := "SELECT hash FROM blob_ WHERE hash IN (?,?) AND is_stored = 1"
args := []interface{}{
"6363e3ed8d32156aebbbe8c0dd077e7029c7cdaec58e08271aa35baa4250ec531129cb4f7a9ac9b7285dbb7ba375ab11",
"89c5c3f9794b0b24a03406e3b74361edb9ae70828e4c133512fc75db0a2d312673cdd4e30eed37892a46692d2fe439f3",
}
query := "SELECT hash FROM blob_ WHERE hash >= ? AND hash <= ? AND is_stored = 1"
args := []interface{}{start.Hex(), end.Hex()}

logQuery(query, args...)

Expand Down Expand Up @@ -352,20 +347,9 @@ func closeRows(rows *sql.Rows) {
}
}

func execPrepared(tx *sql.Tx, query string, args []interface{}) error {
func execTx(tx *sql.Tx, query string, args []interface{}) error {
logQuery(query, args...)

stmt, err := tx.Prepare(query)
if err != nil {
return errors.Err(err)
}

_, err = stmt.Exec(args...)
if err != nil {
return errors.Err(err)
}

err = stmt.Close()
_, err := tx.Exec(query, args...)
return errors.Err(err)
}

Expand Down

0 comments on commit 3855d5c

Please sign in to comment.