Skip to content

Commit

Permalink
Refactor get.go
Browse files Browse the repository at this point in the history
Signed-off-by: Romain Keramitas <r.keramitas@gmail.com>
  • Loading branch information
r0mainK committed Oct 9, 2019
1 parent 5c827d4 commit ab08bf6
Showing 1 changed file with 19 additions and 44 deletions.
63 changes: 19 additions & 44 deletions PublicGitArchive/pga/cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@ package cmd

import (
"context"
"encoding/csv"
"fmt"
"io"
"io/ioutil"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"

"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/src-d/datasets/PublicGitArchive/pga/pga"
pb "gopkg.in/cheggaaa/pb.v1"
)

const rootURL = "http://pga.sourced.tech"

// getCmd represents the get command
var getCmd = &cobra.Command{
Use: "get",
Expand All @@ -25,22 +24,21 @@ var getCmd = &cobra.Command{
Alternatively, a list of .siva filenames can be passed through standard input.`,
RunE: func(cmd *cobra.Command, args []string) error {
dataset, err := handleDatasetArg(cmd.Use, cmd.Flags())
if err != nil {
return err
}
ctx := setupContext()

source := urlFS("http://pga.sourced.tech/")

source := urlFS(rootURL)
dest, err := FileSystemFromFlags(cmd.Flags())
if err != nil {
return err
}

maxDownloads, err := cmd.Flags().GetInt("jobs")
if err != nil {
return err
}

var filenames = map[string]struct{}{}

stdin, err := cmd.Flags().GetBool("stdin")
if err != nil {
return err
Expand All @@ -60,42 +58,34 @@ Alternatively, a list of .siva filenames can be passed through standard input.`,
filenames[filename] = struct{}{}
}
} else {
f, err := getIndex(ctx)
f, err := getIndex(ctx, dataset.Name())
if err != nil {
return fmt.Errorf("could not open index file: %v", err)
}
defer f.Close()

index, err := pga.IndexFromCSV(f)
r := csv.NewReader(f)
err = dataset.ReadHeader(r)
if err != nil {
return err
}

filter, err := filterFromFlags(cmd.Flags())
if err != nil {
return err
}
index = pga.WithFilter(index, filter)

for {
r, err := index.Next()
if err == io.EOF {
break
} else if err != nil {
return err
}

for _, f := range r.Filenames {
filenames[f] = struct{}{}
addFiles := func(r pga.Repository) error {
for _, filename := range r.GetFilenames() {
filenames[filename] = struct{}{}
}
return nil
}
dataset.ForEach(ctx, r, filter, addFiles)
}

return downloadFilenames(ctx, dest, source, filenames, maxDownloads)
return downloadFilenames(ctx, dest, source, dataset.Name(), filenames, maxDownloads)
},
}

func downloadFilenames(ctx context.Context, dest, source FileSystem,
func downloadFilenames(ctx context.Context, dest, source FileSystem, datasetName string,
filenames map[string]struct{}, maxDownloads int) error {

tokens := make(chan bool, maxDownloads)
Expand All @@ -105,7 +95,7 @@ func downloadFilenames(ctx context.Context, dest, source FileSystem,

done := make(chan error)
for filename := range filenames {
filename := filepath.Join("siva", pgaVersion, filename[:2], filename)
filename := filepath.Join(datasetName, pgaVersion, filename[:2], filename)
go func() {
select {
case <-tokens:
Expand Down Expand Up @@ -140,21 +130,6 @@ func downloadFilenames(ctx context.Context, dest, source FileSystem,
return finalErr
}

func setupContext() context.Context {
ctx, cancel := context.WithCancel(context.Background())
var term = make(chan os.Signal)
go func() {
select {
case <-term:
logrus.Warningf("signal received, stopping...")
cancel()
}
}()
signal.Notify(term, syscall.SIGTERM, os.Interrupt)

return ctx
}

func init() {
RootCmd.AddCommand(getCmd)
flags := getCmd.Flags()
Expand Down

0 comments on commit ab08bf6

Please sign in to comment.