Skip to content

Commit

Permalink
Consolidate and simplify downloading (#105)
Browse files Browse the repository at this point in the history
* Consolidate and simplify feed downloading

* Automatically find internal prefix for zip files
  • Loading branch information
irees committed Apr 13, 2021
1 parent bc9632b commit 1083e3d
Show file tree
Hide file tree
Showing 18 changed files with 144 additions and 402 deletions.
3 changes: 0 additions & 3 deletions dmfr/cmd/dmfr_cmd.go
Expand Up @@ -27,7 +27,6 @@ func (cmd *Command) Run(args []string) error {
log.Print(" sync")
log.Print(" import")
log.Print(" fetch")
log.Print(" recalculate")
fl.PrintDefaults()
}
fl.Parse(args)
Expand All @@ -50,8 +49,6 @@ func (cmd *Command) Run(args []string) error {
r = &importer.Command{}
case "fetch":
r = &fetch.Command{}
case "recalculate":
r = &fetch.RecalculateCommand{}
default:
return fmt.Errorf("Invalid command: %q", subc)
}
Expand Down
52 changes: 0 additions & 52 deletions dmfr/fetch/auth_adapter.go

This file was deleted.

43 changes: 10 additions & 33 deletions dmfr/fetch/fetch.go
Expand Up @@ -5,14 +5,13 @@ import (
"errors"
"fmt"
"io"
"net/url"
"os"
"os/exec"
"path/filepath"
"strings"
"time"

"github.com/interline-io/transitland-lib/dmfr"
"github.com/interline-io/transitland-lib/internal/download"
"github.com/interline-io/transitland-lib/tl"
"github.com/interline-io/transitland-lib/tlcsv"
"github.com/interline-io/transitland-lib/tldb"
Expand All @@ -27,7 +26,7 @@ type Options struct {
Directory string
S3 string
FetchedAt time.Time
Secrets Secrets
Secrets download.Secrets
}

// Result contains results of a fetch operation.
Expand Down Expand Up @@ -97,6 +96,10 @@ func DatabaseFetch(atx tldb.Adapter, opts Options) (Result, error) {
return fr, nil
}

type canSetAuth interface {
SetAuth(tl.FeedAuthorization, download.Secret)
}

// fetchAndCreateFeedVersion from a URL.
// Returns an error if a serious failure occurs, such as database or filesystem access.
// Sets Result.FetchError if a regular failure occurs, such as a 404.
Expand All @@ -107,14 +110,8 @@ func fetchAndCreateFeedVersion(atx tldb.Adapter, feed tl.Feed, opts Options) (Re
fr.FetchError = errors.New("no url")
return fr, nil
}
// Handle fragments
u, err := url.Parse(opts.FeedURL)
if err != nil {
fr.FetchError = errors.New("cannot parse url")
return fr, nil
}
// Get secret
secret := Secret{}
secret := download.Secret{}
if a, err := opts.Secrets.MatchFeed(opts.FeedID); err == nil {
secret = a
} else if a, err := opts.Secrets.MatchFilename(feed.File); err == nil {
Expand All @@ -123,20 +120,14 @@ func fetchAndCreateFeedVersion(atx tldb.Adapter, feed tl.Feed, opts Options) (Re
fr.FetchError = errors.New("no secret found")
return fr, nil
}
// Check reader type
// Get reader
reader, err := tlcsv.NewReader(opts.FeedURL)
if err != nil {
fr.FetchError = err
return fr, nil
}
// Override the default URLAdapter
if u.Scheme == "http" || u.Scheme == "https" || u.Scheme == "ftp" || u.Scheme == "s3" {
aa := AuthenticatedURLAdapter{}
if err := aa.Download(opts.FeedURL, feed.Authorization, secret); err != nil {
fr.FetchError = err
return fr, nil
}
reader.Adapter = &aa
if v, ok := reader.Adapter.(canSetAuth); ok {
v.SetAuth(feed.Authorization, secret)
}
// Open
if err := reader.Open(); err != nil {
Expand Down Expand Up @@ -257,17 +248,3 @@ func copyFileContents(src, dst string) (err error) {
err = out.Sync()
return
}

// dmfrGetReaderURL helps load a file from an S3 or Directory location
func dmfrGetReaderURL(s3 string, directory string, url string) string {
if s3 != "" {
url = s3 + "/" + url
} else if directory != "" {
url = filepath.Join(directory, url)
}
urlsplit := strings.SplitN(url, "#", 2)
if len(urlsplit) > 1 {
url = url + "#" + urlsplit[1]
}
return url
}
183 changes: 0 additions & 183 deletions dmfr/fetch/refresh_cmd.go

This file was deleted.

27 changes: 16 additions & 11 deletions dmfr/importer/importer.go
Expand Up @@ -227,17 +227,7 @@ func MainImportFeedVersion(adapter tldb.Adapter, opts Options) (Result, error) {
func ImportFeedVersion(atx tldb.Adapter, fv tl.FeedVersion, opts Options) (dmfr.FeedVersionImport, error) {
fvi := dmfr.FeedVersionImport{FeedVersionID: fv.ID}
// Get Reader
url := fv.File
if opts.S3 != "" {
url = opts.S3 + "/" + fv.File
} else if opts.Directory != "" {
url = filepath.Join(opts.Directory, fv.File)
}
urlsplit := strings.SplitN(fv.URL, "#", 2)
if len(urlsplit) > 1 && !strings.HasSuffix(fv.URL, ".zip") {
url = url + "#" + urlsplit[1]
}
reader, err := tlcsv.NewReader(url)
reader, err := tlcsv.NewReader(dmfrGetReaderURL(opts.S3, opts.Directory, fv.File))
if err != nil {
return fvi, err
}
Expand Down Expand Up @@ -280,3 +270,18 @@ func ImportFeedVersion(atx tldb.Adapter, fv tl.FeedVersion, opts Options) (dmfr.
fvi.SkipEntityMarkedCount = counts.SkipEntityMarkedCount
return fvi, nil
}

// dmfrGetReaderURL helps load a file from an S3 or Directory location
func dmfrGetReaderURL(s3 string, directory string, url string) string {
if s3 != "" {
url = s3 + "/" + url
} else if directory != "" {
url = filepath.Join(directory, url)
}
urlsplit := strings.SplitN(url, "#", 2)
if len(urlsplit) > 1 && !strings.HasSuffix(url, ".zip") {
// Add fragment back only if fragment does not end in ".zip"
url = url + "#" + urlsplit[1]
}
return url
}
2 changes: 1 addition & 1 deletion dmfr/importer/importer_test.go
Expand Up @@ -92,7 +92,7 @@ func TestMainImportFeedVersion(t *testing.T) {
}
explog := "file does not exist"
if fvi.ExceptionLog != explog {
t.Errorf("got %s expected %s", fvi.ExceptionLog, explog)
t.Errorf("got '%s' expected '%s'", fvi.ExceptionLog, explog)
}
if fvi.InProgress != false {
t.Errorf("expected in_progress = false")
Expand Down

0 comments on commit 1083e3d

Please sign in to comment.