/
one_four_to_one_five.go
83 lines (70 loc) · 2.31 KB
/
one_four_to_one_five.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package migration
import (
"context"
"fmt"
"path"
"migration/onefoureight/db/pfs"
"migration/onefoureight/db/pps"
"github.com/gogo/protobuf/proto"
"github.com/pachyderm/pachyderm/src/client"
etcd "github.com/coreos/etcd/clientv3"
log "github.com/sirupsen/logrus"
)
const (
reposPrefix = "/repos"
commitsPrefix = "/commits"
branchesPrefix = "/branches"
pipelinesPrefix = "/pipelines"
jobsPrefix = "/jobs"
)
func oneFourToOneFive(etcdAddress, pfsPrefix, ppsPrefix string) error {
etcdClient, err := etcd.New(etcd.Config{
Endpoints: []string{fmt.Sprintf("%s:2379", etcdAddress)},
DialOptions: client.EtcdDialOptions(),
})
if err != nil {
return fmt.Errorf("error constructing etcdClient: %v", err)
}
// This function migrates objects under a specific prefix
migrate := func(prefix string, template proto.Message) {
// We want to sort the objects by oldest-to-newest order,
// so we preserve their timestamp ordering as we update them.
resp, err := etcdClient.Get(context.Background(), prefix, etcd.WithPrefix(), etcd.WithSort(etcd.SortByModRevision, etcd.SortAscend))
if err != nil {
log.Errorf("error getting %v: %v", prefix, err)
return
}
for _, kv := range resp.Kvs {
key := string(kv.Key)
if err := proto.UnmarshalText(string(kv.Value), template); err != nil {
log.Errorf("error unmarshalling object %v: %v", key, err)
continue
}
bytes, err := proto.Marshal(template)
if err != nil {
log.Errorf("error marshalling object %v: %v", key, err)
continue
}
if _, err := etcdClient.Put(context.Background(), key, string(bytes)); err != nil {
log.Errorf("error putting object %v: %v", key, err)
continue
}
}
}
var repoInfo pfs.RepoInfo
migrate(path.Join(pfsPrefix, reposPrefix), &repoInfo)
log.Infof("finished migrating repos")
var commitInfo pfs.CommitInfo
migrate(path.Join(pfsPrefix, commitsPrefix), &commitInfo)
log.Infof("finished migrating commits")
var head pfs.Commit
migrate(path.Join(pfsPrefix, branchesPrefix), &head)
log.Infof("finished migrating branches")
var pipelineInfo pps.PipelineInfo
migrate(path.Join(ppsPrefix, pipelinesPrefix), &pipelineInfo)
log.Infof("finished migrating pipelines")
var jobInfo pps.JobInfo
migrate(path.Join(ppsPrefix, jobsPrefix), &jobInfo)
log.Infof("finished migrating jobs")
return nil
}