/
load.go
145 lines (126 loc) · 4.01 KB
/
load.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package lib
import (
"context"
"errors"
"fmt"
"github.com/qri-io/dataset"
"github.com/qri-io/qri/base"
"github.com/qri-io/qri/base/dsfs"
"github.com/qri-io/qri/dsref"
qerr "github.com/qri-io/qri/errors"
"github.com/qri-io/qri/fsi"
)
type datasetLoader struct {
inst *Instance
userOwner string
source string
useFSI bool
}
func newDatasetLoader(inst *Instance, userOwner, source string, useFSI bool) dsref.Loader {
return &datasetLoader{
inst: inst,
userOwner: userOwner,
source: source,
useFSI: useFSI,
}
}
// LoadDataset fetches, dereferences and opens a dataset from a reference
// implements the dsfs.Loader interface
// this function expects the passed in reference is fully resolved
func (d *datasetLoader) loadRefFromLocation(ctx context.Context, ref dsref.Ref, location string) (*dataset.Dataset, error) {
if location == "" {
return d.loadLocalDataset(ctx, ref)
}
msg := fmt.Sprintf("pulling %s from %s ...\n", ref.Human(), location)
if d.inst.streams.Out != nil {
d.inst.streams.Out.Write([]byte(msg))
}
// TODO (b5) - it'd be nice to us the returned dataset here, skipping the
// loadLocalDataset call entirely. For that to work dsfs.LoadDataset &
// inst.loadLocalDataset would have to behave exactly the same, and currently
// they don't
if _, err := d.inst.remoteClient.PullDataset(ctx, &ref, location); err != nil {
return nil, err
}
return d.loadLocalDataset(ctx, ref)
}
func (d *datasetLoader) loadLocalDataset(ctx context.Context, ref dsref.Ref) (*dataset.Dataset, error) {
var (
ds *dataset.Dataset
err error
)
if fsi.IsFSIPath(ref.Path) {
// Has an FSI Path, load from working directory
if ds, err = fsi.ReadDir(fsi.FilesystemPathToLocal(ref.Path)); err != nil {
return nil, err
}
// Assign the FSI path to the dataset so callers know where it was loaded from
ds.Path = ref.Path
} else {
// Load from dsfs
if ds, err = dsfs.LoadDataset(ctx, d.inst.qfs, ref.Path); err != nil {
return nil, err
}
}
// Set transient info on the returned dataset
ds.Name = ref.Name
ds.Peername = ref.Username
// TODO(dustmop): When dscache / dscollect is in use, enable this since resolved
// references should always have it set
// ds.ID = ref.InitID
if err = base.OpenDataset(ctx, d.inst.repo.Filesystem(), ds); err != nil {
log.Debugf("Get dataset, base.OpenDataset failed, error: %s", err)
return nil, err
}
return ds, nil
}
// LoadDataset loads a dataset by resolving where it is available according to
// the source being used, and loading it from there
func (d *datasetLoader) LoadDataset(ctx context.Context, refstr string) (*dataset.Dataset, error) {
if d == nil {
return nil, fmt.Errorf("no datasetLoader")
}
if d.inst == nil {
return nil, fmt.Errorf("no instance")
}
ref, err := dsref.Parse(refstr)
if err != nil {
return nil, fmt.Errorf("%q is not a valid dataset reference: %w", refstr, err)
}
if ref.Username == "me" {
if d.userOwner == "" {
msg := fmt.Sprintf(`Can't use the "me" keyword to refer to a dataset in this context.
Replace "me" with your username for the reference:
%s`, refstr)
return nil, qerr.New(fmt.Errorf("invalid contextual reference"), msg)
}
ref.Username = d.userOwner
}
resolver, err := d.inst.resolverForSource(d.source)
if err != nil {
return nil, err
}
// Whether the reference came with an explicit version
pathGiven := ref.Path != ""
// Resolve the reference
location, err := resolver.ResolveRef(ctx, &ref)
if err != nil {
if errors.Is(err, dsref.ErrRefNotFound) {
return nil, qerr.New(err, fmt.Sprintf("reference %q not found", refstr))
}
return nil, err
}
// If no version was given, and FSI is enabled for the loader, look
// up if the dataset has a version on disk.
if !pathGiven && d.useFSI {
err = d.inst.fsi.ResolvedPath(&ref)
if err == fsi.ErrNoLink {
err = nil
}
}
if ref.Path == "" {
err = qerr.New(dsref.ErrNoHistory, fmt.Sprintf("can't load dataset %q, it has no saved versions", ref.Human()))
return nil, err
}
return d.loadRefFromLocation(ctx, ref, location)
}