-
Notifications
You must be signed in to change notification settings - Fork 6
/
batch_processing.go
102 lines (88 loc) · 2.28 KB
/
batch_processing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package zipcode
// SendLookups is a high-level convenience function that leverages an internal reusable Batch
// to send all lookups provided in serial, blocking fashion.
func (c *Client) SendLookups(lookups ...*Lookup) error {
input := make(chan *Lookup)
go loadAll(input, lookups)
return c.SendFromChannel(input, nil)
}
func loadAll(stream chan *Lookup, lookups []*Lookup) {
load(stream, lookups)
close(stream)
}
func load(stream chan *Lookup, lookups []*Lookup) {
for _, lookup := range lookups {
stream <- lookup
}
}
// SendFromChannel is a high-level convenience function that leverages an
// internal reusable Batch to send everything received from the provided
// input channel in serial, blocking fashion. The same lookups are sent
// on the output channel as they are processed. Whereas it is the caller's
// job to close the input channel when all inputs have been sent, the output
// channel will be closed for the caller.
func (c *Client) SendFromChannel(input, output chan *Lookup) error {
processor := newBatchProcessor(input, output, c)
processor.ProcessAll()
return processor.Error()
}
type batchProcessor struct {
input chan *Lookup
output chan *Lookup
client *Client
batch *Batch
err error
}
func newBatchProcessor(input, output chan *Lookup, client *Client) *batchProcessor {
return &batchProcessor{
input: input,
output: output,
client: client,
batch: NewBatch(),
}
}
func (c *batchProcessor) ProcessAll() {
c.readLookups()
c.sendLastBatch()
c.cleanup()
}
func (c *batchProcessor) readLookups() {
for lookup := range c.input {
c.processLookup(lookup)
if c.errorOccurred() {
break
}
}
}
func (c *batchProcessor) processLookup(lookup *Lookup) {
c.batch.Append(lookup)
if c.batch.IsFull() {
c.sendBatch()
}
}
func (c *batchProcessor) sendBatch() {
c.err = c.client.SendBatch(c.batch)
c.routeOutput()
c.batch.Clear()
}
func (c *batchProcessor) routeOutput() {
if c.output != nil {
load(c.output, c.batch.Records())
}
}
func (c *batchProcessor) sendLastBatch() {
if !c.errorOccurred() && !c.batch.isEmpty() {
c.sendBatch()
}
}
func (c *batchProcessor) errorOccurred() bool {
return c.err != nil
}
func (c *batchProcessor) cleanup() {
if c.output != nil {
close(c.output)
}
}
func (c *batchProcessor) Error() error {
return c.err
}