-
Notifications
You must be signed in to change notification settings - Fork 13
/
differ.go
223 lines (204 loc) · 4.6 KB
/
differ.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package diff
import (
"context"
"github.com/google/go-cmp/cmp"
)
// Document is a generic document retrieved from Elasticsearch.
type Document struct {
ID string `json:"_id,omitempty"`
Source map[string]interface{} `json:"_source,omitempty"`
}
// Mode describes the outcome of comparing two documents.
type Mode int
const (
// Unchanged means that a document has not been changed between
// source and destination index.
Unchanged Mode = iota
// Created means that a document has been added to the destination
// that didn't exist in the source index.
Created
// Updated means that a document has been found both in the source
// and destination index, but its contents (_source) has changed.
Updated
// Deleted means that a document has been found in the source index
// but it doesn't exist in the destination index.
Deleted
)
// Mode returns a string represenation for a mode.
func (m Mode) String() string {
switch m {
case Unchanged:
return "Unchanged"
case Created:
return "Created"
case Updated:
return "Updated"
case Deleted:
return "Deleted"
default:
return "<unspecified>"
}
}
// Diff is the outcome of comparing two documents in source and
// destination index.
type Diff struct {
Mode Mode
Src *Document
Dst *Document
}
// Differ compares the documents in the source index to those in
// the destination index. It returns the outcomes via a Diff structure,
// one by one.
func Differ(
ctx context.Context,
srcCh <-chan *Document,
dstCh <-chan *Document,
) (<-chan Diff, <-chan error) {
diffCh := make(chan Diff)
errCh := make(chan error)
go func() {
defer func() {
close(diffCh)
close(errCh)
}()
// Both src and dst are nil => no diffs
if srcCh == nil && dstCh == nil {
return
}
// No src => return all from dst as Created
if srcCh == nil && dstCh != nil {
for {
select {
case doc, ok := <-dstCh:
if !ok {
return
}
diffCh <- Diff{Mode: Created, Dst: doc}
case <-ctx.Done():
errCh <- ctx.Err()
return
}
}
}
// No dst => return all from src as Deleted
if srcCh != nil && dstCh == nil {
for {
select {
case doc, ok := <-srcCh:
if !ok {
return
}
diffCh <- Diff{Mode: Deleted, Src: doc}
case <-ctx.Done():
errCh <- ctx.Err()
return
}
}
}
// Read first document from both channels
srcDoc, dstDoc := <-srcCh, <-dstCh
// Main loop
for {
// Stop early because context might be canceled.
select {
default:
case <-ctx.Done():
errCh <- ctx.Err()
return
}
// No more documents from the channels => done
if srcDoc == nil && dstDoc == nil {
break
}
// No more from dst => everything in src has to be deleted
if srcDoc != nil && dstDoc == nil {
diffCh <- Diff{Mode: Deleted, Src: srcDoc}
for {
select {
case doc, ok := <-dstCh:
if !ok {
return
}
diffCh <- Diff{Mode: Deleted, Src: doc}
case <-ctx.Done():
errCh <- ctx.Err()
return
}
}
}
// No more from src => everything in dst has to be created
if srcDoc == nil && dstDoc != nil {
diffCh <- Diff{Mode: Created, Dst: dstDoc}
for {
select {
case doc, ok := <-dstCh:
if !ok {
return
}
diffCh <- Diff{Mode: Created, Dst: doc}
case <-ctx.Done():
errCh <- ctx.Err()
return
}
}
}
// We have two to compare
if srcDoc.ID > dstDoc.ID {
diffCh <- Diff{Mode: Created, Dst: dstDoc}
dstDoc = nil
var stop bool
for !stop {
select {
case doc, ok := <-dstCh:
if !ok {
stop = true
break
}
dstDoc = doc
if srcDoc.ID <= dstDoc.ID {
stop = true
break
}
diffCh <- Diff{Mode: Created, Dst: dstDoc}
dstDoc = nil
case <-ctx.Done():
errCh <- ctx.Err()
return
}
}
} else if srcDoc.ID < dstDoc.ID {
diffCh <- Diff{Mode: Deleted, Src: srcDoc}
srcDoc = nil
var stop bool
for !stop {
select {
case doc, ok := <-srcCh:
if !ok {
stop = true
break
}
srcDoc = doc
if srcDoc.ID >= dstDoc.ID {
stop = true
break
}
diffCh <- Diff{Mode: Deleted, Src: srcDoc}
srcDoc = nil
case <-ctx.Done():
errCh <- ctx.Err()
return
}
}
} else {
// srcDoc.ID == dstDoc.ID
if cmp.Equal(srcDoc.Source, dstDoc.Source) {
diffCh <- Diff{Mode: Unchanged, Src: srcDoc, Dst: dstDoc}
} else {
diffCh <- Diff{Mode: Updated, Src: srcDoc, Dst: dstDoc}
}
srcDoc, dstDoc = <-srcCh, <-dstCh
}
}
}()
return diffCh, errCh
}