/
sync.go
118 lines (103 loc) · 2.93 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
/*
This file supports interactive syncing between data instances. It is different
from ingestion syncs that can more effectively batch changes.
*/
package labelarray
import (
"sync"
"time"
"github.com/janelia-flyem/dvid/datastore"
"github.com/janelia-flyem/dvid/datatype/common/downres"
"github.com/janelia-flyem/dvid/datatype/common/labels"
"github.com/janelia-flyem/dvid/dvid"
)
const (
numMutateHandlers = 16 // goroutines used to process mutations on blocks
numLabelHandlers = 256 // goroutines used to do get/put tx on label indices
)
// IngestedBlock is the unit of delta for a IngestBlockEvent.
type IngestedBlock struct {
MutID uint64
BCoord dvid.IZYXString
Data *labels.Block
}
// MutatedBlock tracks previous and updated block data.
// It is the unit of delta for a MutateBlockEvent.
type MutatedBlock struct {
MutID uint64
BCoord dvid.IZYXString
Prev *labels.Block
Data *labels.Block
}
type procMsg struct {
v dvid.VersionID
op interface{}
}
type downresOp struct {
mutID uint64
bcoord dvid.IZYXString // the high-res block coord corresponding to the Block
block *labels.Block
}
type mergeOp struct {
labels.MergeOp
mutID uint64
bcoord dvid.IZYXString
downresMut *downres.Mutation
}
type splitOp struct {
labels.SplitOp
mutID uint64
bcoord dvid.IZYXString
deleteBlkCh chan dvid.IZYXString
downresMut *downres.Mutation
}
// InitDataHandlers launches goroutines to handle each labelarray instance's syncs.
func (d *Data) InitDataHandlers() error {
if d.mutateCh[0] != nil {
return nil
}
// Start N goroutines to process mutations for each block that will be consistently
// assigned to one of the N goroutines.
for i := 0; i < numMutateHandlers; i++ {
d.mutateCh[i] = make(chan procMsg, 10)
go d.mutateBlock(d.mutateCh[i])
}
dvid.Infof("Launched mutation handlers for data %q...\n", d.DataName())
return nil
}
func (d *Data) queuedSize() int {
var queued int
for i := 0; i < numMutateHandlers; i++ {
queued += len(d.mutateCh[i])
}
return queued
}
// Shutdown terminates blocks until syncs are done then terminates background goroutines processing data.
func (d *Data) Shutdown(wg *sync.WaitGroup) {
var elapsed int
for {
queued := d.queuedSize()
if queued > 0 {
if elapsed >= datastore.DataShutdownTime {
dvid.Infof("Timed out after %d seconds waiting for data %q mutations: %d still to be processed", elapsed, d.DataName(), queued)
break
}
dvid.Infof("After %d seconds, data %q has %d mutations in queue pending.", elapsed, d.DataName(), queued)
time.Sleep(1 * time.Second)
elapsed++
} else {
break
}
}
for i := 0; i < numMutateHandlers; i++ {
close(d.mutateCh[i])
}
if indexCache != nil {
var hitrate float64
if metaAttempts > 0 {
hitrate = (float64(metaHits) / float64(metaAttempts)) * 100.0
}
dvid.Infof("Cache for data %q: got %d meta cache hits on %d attempts (%5.2f)\n", d.DataName(), metaHits, metaAttempts, hitrate)
}
wg.Done()
}