-
Notifications
You must be signed in to change notification settings - Fork 351
/
import.go
97 lines (83 loc) · 2.55 KB
/
import.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package onboard
import (
"context"
"fmt"
"time"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/cmdutils"
"github.com/treeverse/lakefs/pkg/graveler"
"github.com/treeverse/lakefs/pkg/logging"
)
const (
CommitMsgTemplate = "Import from %s"
DefaultImportBranchName = "import-from-inventory"
)
type Importer struct {
inventoryGenerator block.InventoryGenerator
inventory block.Inventory
CatalogActions RepoActions
logger logging.Logger
progress []*cmdutils.Progress
prefixes []string
}
type Config struct {
CommitUsername string
InventoryURL string
RepositoryID graveler.RepositoryID
DefaultBranchID graveler.BranchID
InventoryGenerator block.InventoryGenerator
Store EntryCatalog
CatalogActions RepoActions
KeyPrefixes []string
// BaseCommit is available only for import-plumbing command
BaseCommit graveler.CommitID
}
type Stats struct {
AddedOrChanged int
DryRun bool
CommitRef string
PreviousImportDate time.Time
}
func CreateImporter(ctx context.Context, logger logging.Logger, config *Config) (importer *Importer, err error) {
res := &Importer{
inventoryGenerator: config.InventoryGenerator,
logger: logger,
CatalogActions: config.CatalogActions,
}
if res.CatalogActions == nil {
res.CatalogActions = NewCatalogRepoActions(config, logger)
}
if err := res.CatalogActions.Init(ctx, config.BaseCommit); err != nil {
return nil, fmt.Errorf("init catalog actions: %w", err)
}
res.inventory, err = config.InventoryGenerator.GenerateInventory(ctx, logger, config.InventoryURL, true, config.KeyPrefixes)
res.prefixes = config.KeyPrefixes
if err != nil {
return nil, err
}
return res, nil
}
func (s *Importer) Import(ctx context.Context, dryRun bool) (*Stats, error) {
var dataToImport Iterator
var err error
it := s.inventory.Iterator()
// no previous commit, add whole inventory
dataToImport = NewInventoryIterator(it)
s.progress = append(dataToImport.Progress(), s.CatalogActions.Progress()...)
stats, err := s.CatalogActions.ApplyImport(ctx, dataToImport, dryRun)
if err != nil {
return nil, err
}
stats.DryRun = dryRun
if !dryRun {
commitMetadata := CreateCommitMetadata(s.inventory, *stats, s.prefixes)
stats.CommitRef, err = s.CatalogActions.Commit(ctx, fmt.Sprintf(CommitMsgTemplate, s.inventory.SourceName()), commitMetadata)
if err != nil {
return nil, err
}
}
return stats, nil
}
func (s *Importer) Progress() []*cmdutils.Progress {
return s.progress
}