-
Notifications
You must be signed in to change notification settings - Fork 0
/
mirror.go
118 lines (105 loc) · 3.02 KB
/
mirror.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// Copyright 2016 Hcnet Development Foundation and contributors. Licensed
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0
package historyarchive
import (
"fmt"
"log"
"sync"
"sync/atomic"
"github.com/shantanu-hashcash/go/support/errors"
)
// Mirror mirrors an archive, it assumes that the source and destination have the same checkpoint ledger frequency
func Mirror(src *Archive, dst *Archive, opts *CommandOptions) error {
rootHAS, e := src.GetRootHAS()
if e != nil {
return e
}
opts.Range = opts.Range.clamp(rootHAS.Range(), src.checkpointManager)
log.Printf("copying range %s\n", opts.Range)
// Make a bucket-fetch map that shows which buckets are
// already-being-fetched
bucketFetch := make(map[Hash]bool)
var bucketFetchMutex sync.Mutex
var errs uint32
tick := makeTicker(func(ticks uint) {
bucketFetchMutex.Lock()
sz := opts.Range.SizeInCheckPoints(src.checkpointManager)
log.Printf("Copied %d/%d checkpoints (%f%%), %d buckets",
ticks, sz,
100.0*float64(ticks)/float64(sz),
len(bucketFetch))
bucketFetchMutex.Unlock()
})
var wg sync.WaitGroup
checkpoints := opts.Range.GenerateCheckpoints(src.checkpointManager)
wg.Add(opts.Concurrency)
for i := 0; i < opts.Concurrency; i++ {
go func() {
for {
ix, ok := <-checkpoints
if !ok {
break
}
has, err := src.GetCheckpointHAS(ix)
if err != nil {
atomic.AddUint32(&errs, noteError(err))
continue
}
buckets, err := has.Buckets()
if err != nil {
panic(errors.Wrap(err, "error getting buckets"))
}
for _, bucket := range buckets {
alreadyFetching := false
bucketFetchMutex.Lock()
_, alreadyFetching = bucketFetch[bucket]
if !alreadyFetching {
bucketFetch[bucket] = true
}
bucketFetchMutex.Unlock()
if !alreadyFetching {
pth := BucketPath(bucket)
err = copyPath(src, dst, pth, opts)
atomic.AddUint32(&errs, noteError(err))
}
}
for _, cat := range Categories() {
if opts.SkipOptional && !categoryRequired(cat) {
continue
}
pth := CategoryCheckpointPath(cat, ix)
err = copyPath(src, dst, pth, opts)
if err != nil && !categoryRequired(cat) {
continue
}
atomic.AddUint32(&errs, noteError(err))
}
tick <- true
}
wg.Done()
}()
}
wg.Wait()
log.Printf("copied %d checkpoints, %d buckets, range %s",
opts.Range.SizeInCheckPoints(src.checkpointManager), len(bucketFetch), opts.Range)
close(tick)
if rootHAS.CurrentLedger == opts.Range.High {
log.Printf("updating destination archive current-ledger pointer to 0x%8.8x",
rootHAS.CurrentLedger)
e = dst.PutRootHAS(rootHAS, opts)
errs += noteError(e)
} else {
dstHAS, e := dst.GetRootHAS()
if e != nil {
errs += noteError(e)
} else {
log.Printf("leaving destination archive current-ledger pointer at 0x%8.8x",
dstHAS.CurrentLedger)
}
}
if errs != 0 {
return fmt.Errorf("%d errors while mirroring", errs)
}
return nil
}