-
Notifications
You must be signed in to change notification settings - Fork 6
/
influx.go
162 lines (140 loc) · 4.37 KB
/
influx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package LocalDb
import (
"fmt"
"log"
"strings"
)
func InfluxBatchNumbLinss(batch string) int {
return strings.Count(batch, "\n")
}
func (d *SqliteLocalDb) InfluxBacklogAdd(client, batch string) error {
if compressedBatch, err := compress(batch); err != nil {
return fmt.Errorf("cannot compress batch: %s", err)
} else if _, err := d.db.Exec(
"INSERT INTO influxBacklog (created, client, numbLines, compressedBatch) VALUES(datetime('now'), ?, ?, ?);",
client,
InfluxBatchNumbLinss(batch),
compressedBatch,
); err != nil {
return fmt.Errorf("cannot insert into influxBacklog: %s", err)
}
return nil
}
func (d *SqliteLocalDb) InfluxAggregateBacklog(client string, batchSize uint) error {
// fetch newest up to 100 rows that sum up to no more batchSize number of lines
rows, err := d.db.Query(`
SELECT id, numbLines, compressedBatch
FROM influxBacklog
WHERE client = ? AND id >= (
SELECT MIN(f.id)
FROM (
SELECT id, numbLines, SUM(numbLines) OVER (ORDER BY id DESC) AS cum
FROM influxBacklog
WHERE client = ?
ORDER BY id DESC
LIMIT 255
) f
WHERE f.cum < ?
GROUP BY NULL
HAVING COUNT() > 1
);`,
client, client, batchSize,
)
if err != nil {
return fmt.Errorf(" error during query: %s", err)
}
defer rows.Close()
// aggregate all rows by decompressing,
var ids []int
var batches []string
for rows.Next() {
var id, numbLines int
var compressedBatch []byte
if err := rows.Scan(&id, &numbLines, &compressedBatch); err != nil {
return fmt.Errorf("error during scan: %s", err)
}
ids = append(ids, id)
if batch, err := uncompress(compressedBatch); err != nil {
return fmt.Errorf("error during uncompress: %s", err)
} else {
batches = append(batches, batch)
}
}
if len(ids) < 1 {
// nothing to aggregte, free up unused disk space instead
if d.vacuumNeeded {
if _, err := d.db.Exec("VACUUM"); err != nil {
log.Printf("localDb: error during VACUUM: %s", err)
}
d.vacuumNeeded = false
}
return nil
}
// compute new batch
aggregatedBatch := strings.Join(batches, "")
// insert aggregated batch
if err := d.InfluxBacklogAdd(client, aggregatedBatch); err != nil {
return fmt.Errorf("error during add: %s", err)
} else {
// delete old ids that have been aggregated into new batch
for _, id := range ids {
if err := d.InfluxBacklogDelete(id); err != nil {
log.Printf("localDb[%s]: aggregateBacklog: error during delete: %s", client, err)
}
}
}
log.Printf("localDb[%s]: aggregateBacklog: aggragted %d entries into one", client, len(ids))
return nil
}
func (d *SqliteLocalDb) InfluxBacklogSize(client string) (numbBatches, numbLines uint, err error) {
row := d.db.QueryRow(
"SELECT COUNT(*), IFNULL(SUM(numbLines), 0) FROM influxBacklog WHERE client = ?",
client,
)
if e := row.Scan(&numbBatches, &numbLines); e != nil {
err = fmt.Errorf("cannot select from influxBacklog: %s", e)
}
return
}
func (d *SqliteLocalDb) InfluxBacklogGet(client string) (id int, batch string, err error) {
row := d.db.QueryRow(
"SELECT id, numbLines, compressedBatch FROM influxBacklog WHERE client = ? ORDER BY id ASC LIMIT 1",
client,
)
var numbLines int
var compressedBatch []byte
if e := row.Scan(&id, &numbLines, &compressedBatch); e != nil {
err = fmt.Errorf("cannot select from influxBacklog: %s", e)
} else {
batch, err = uncompress(compressedBatch)
if err != nil {
err = fmt.Errorf("cannot uncompress: %s", err)
}
}
if count := InfluxBatchNumbLinss(batch); count != numbLines {
log.Fatalf("numbLines does not match for id=%d, %d != %d", id, count, numbLines)
}
return
}
func (d *SqliteLocalDb) InfluxBacklogDelete(id int) error {
if _, err := d.db.Exec("DELETE FROM influxBacklog WHERE id = ?", id); err != nil {
return fmt.Errorf("cannot delete from influxBacklog: %s", err)
}
d.vacuumNeeded = true
return nil
}
func (d *DisabledLocalDb) InfluxBacklogAdd(client, batch string) error {
return fmt.Errorf("disabled")
}
func (d *DisabledLocalDb) InfluxBacklogSize(client string) (numbBatches, numbLines uint, err error) {
return 0, 0, fmt.Errorf("disabled")
}
func (d *DisabledLocalDb) InfluxBacklogGet(client string) (id int, batch string, err error) {
return 0, "", fmt.Errorf("disabled")
}
func (d *DisabledLocalDb) InfluxBacklogDelete(id int) error {
return fmt.Errorf("disabled")
}
func (d *DisabledLocalDb) InfluxAggregateBacklog(client string, batchSize uint) error {
return nil
}