/
dynatomic.go
136 lines (118 loc) · 3.11 KB
/
dynatomic.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// Package dynatomic provides a convenient wrapper API around
// using DynamoDB as highly available, concurrent, and performant
// asynchronous atomic counter
//
// Basic usage:
// // Initialize the dynatomic backround goroutine with a batch size of 100,
// // a wait time of a second, an AWS config and a function that will
// // notify the user of internal errors
// d := New(100, time.Second, config, errHandler)
// d.RowChan <- &types.Row{...}
// d.RowChan <- &types.Row{...}
// d.RowChan <- &types.Row{...}
// ...
// d.Done()
// Dynamo will update accordingly
// For example if you write the rows:
// Table: MyTable, Key: A, Range: A, Incr: 5
// Table: MyTable, Key: A, Range: A, Incr: 5
// Table: MyTable, Key: A, Range: A, Incr: 5
// Table: MyTable, Key: A, Range: A, Incr: 5
// Then MyTable Key A, Range A will now show a value of 20
package dynatomic
import (
"strconv"
"time"
"github.com/tylfin/dynatomic/pkg/dynamo"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/tylfin/dynatomic/pkg/types"
)
var (
new = dynamo.New
insert = dynamo.Insert
)
// Dynatomic struct contains all information
// necessary to perform bulk, asynchronous writes
type Dynatomic struct {
done chan bool
svc *dynamodb.DynamoDB
RowChan chan *types.Row
Config *aws.Config
ErrHandler func(loc string, err error)
BatchSize int
WaitTime time.Duration
}
// New creates a dynatomic struct listening to the RowChan for writes
func New(batchSize int, waitTime time.Duration, config *aws.Config, errHandler func(loc string, err error)) *Dynatomic {
dynatomic := &Dynatomic{}
dynatomic.done = make(chan bool)
dynatomic.RowChan = make(chan *types.Row)
dynatomic.Config = config
dynatomic.BatchSize = batchSize
dynatomic.WaitTime = waitTime
dynatomic.ErrHandler = errHandler
go dynatomic.run()
return dynatomic
}
// Done takes the last of the messages ready to be sent
// and destroys the running goroutine
func (d *Dynatomic) Done() {
select {
case <-d.done:
return
default:
}
close(d.done)
}
func (d *Dynatomic) run() {
var (
err error
finished bool
)
d.svc, err = new(d.Config)
if err != nil {
d.ErrHandler("run.dynamo.New", err)
d.Done()
return
}
for !finished {
finished = d.batch()
}
}
func (d *Dynatomic) batch() bool {
group := map[string][]*types.Row{}
for i := 0; i < d.BatchSize; i++ {
select {
case <-d.done:
d.write(group)
return true
case row := <-d.RowChan:
group[*row.Schema.TableName] = append(group[*row.Schema.TableName], row)
case <-time.After(d.WaitTime):
continue
}
}
d.write(group)
return false
}
func (d *Dynatomic) write(group map[string][]*types.Row) {
for _, rows := range group {
bulkRow := &types.Row{Schema: rows[0].Schema, HashValue: rows[0].HashValue, RangeValue: rows[0].RangeValue}
incr := 0
for _, row := range rows {
v, err := strconv.Atoi(*row.Incr)
if err != nil {
d.ErrHandler("write.atoi", err)
continue
}
incr += v
}
incrStr := strconv.Itoa(incr)
bulkRow.Incr = &incrStr
_, err := insert(d.svc, bulkRow)
if err != nil {
d.ErrHandler("write.insert", err)
}
}
}