-
Notifications
You must be signed in to change notification settings - Fork 0
/
db.go
153 lines (131 loc) · 3.15 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package main
import (
"context"
"database/sql"
"fmt"
"time"
)
// DBSliceSize is the size of a slice of data.
//
// It was selected by trial and error to find the best performing buffer size.
const DBSliceSize = 1 << 16
// DB is the database access layer of our application.
type DB struct {
db *sql.DB
}
// NewDB instanciates a [DB].
func NewDB(path string) (*DB, error) {
db, err := sql.Open("sqlite3", path)
if err != nil {
return nil, fmt.Errorf("open %v: %v", path, err)
}
return &DB{
db: db,
}, nil
}
// Close closes allocated ressources.
func (db *DB) Close() error {
if err := db.db.Close(); err != nil {
return err
}
return nil
}
var listPagesQuery = `SELECT id, updated_at, title, text FROM pages LIMIT ?`
// Page stores information on a Wiki page.
type Page struct {
ID int64
UpdatedAt time.Time
Title string
Text string
}
// ListPages lists all pages.
func (db *DB) ListPages(ctx context.Context, limit int) ([]Page, error) {
rows, err := db.db.QueryContext(ctx, listPagesQuery, softLimit(limit))
if err != nil {
return nil, fmt.Errorf("query: %v", err)
}
defer rows.Close()
var pages []Page
for rows.Next() {
var p Page
err := rows.Scan(&p.ID, &p.UpdatedAt, &p.Title, &p.Text)
if err != nil {
return nil, fmt.Errorf("scan: %v", err)
}
pages = append(pages, p)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("next: %v", err)
}
return pages, nil
}
// StreamPages streams pages from the database.
func (db *DB) StreamPages(ctx context.Context, limit int) func(func(Page, error) bool) {
return func(yield func(Page, error) bool) {
var zero Page
rows, err := db.db.QueryContext(ctx, listPagesQuery, softLimit(limit))
if err != nil {
yield(zero, fmt.Errorf("query: %v", err))
return
}
defer rows.Close()
for rows.Next() {
var p Page
err := rows.Scan(&p.ID, &p.UpdatedAt, &p.Title, &p.Text)
if err != nil {
yield(zero, fmt.Errorf("scan: %v", err))
return
}
if !yield(p, err) {
return
}
}
if err := rows.Err(); err != nil {
yield(zero, fmt.Errorf("next: %v", err))
return
}
}
}
// StreamPageSlice streams pages from the database into slices.
func (db *DB) StreamPageSlice(ctx context.Context, limit int) func(func([]Page, error) bool) {
return func(yield func([]Page, error) bool) {
rows, err := db.db.QueryContext(ctx, listPagesQuery, softLimit(limit))
if err != nil {
yield(nil, fmt.Errorf("query: %v", err))
return
}
defer rows.Close()
pages := make([]Page, 0, DBSliceSize)
for rows.Next() {
var p Page
err := rows.Scan(&p.ID, &p.UpdatedAt, &p.Title, &p.Text)
if err != nil {
yield(nil, fmt.Errorf("scan: %v", err))
return
}
pages = append(pages, p)
if len(pages) < DBSliceSize {
continue
}
if !yield(pages, err) {
return
}
pages = pages[:0]
}
if err := rows.Err(); err != nil {
yield(nil, fmt.Errorf("next: %v", err))
return
}
if len(pages) > 0 {
yield(pages, nil)
}
}
}
// softLimit changes the zero value of limit into -1, allowing SQLite to return
// the full dataset if the limit is unset.
func softLimit(limit int) int {
if limit < 1 {
return -1
}
return limit
}