-
-
Notifications
You must be signed in to change notification settings - Fork 56
/
snapshot.go
158 lines (131 loc) · 4.14 KB
/
snapshot.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// Copyright (c) Roman Atachiants and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.
package column
import (
"errors"
"fmt"
"io"
"github.com/kelindar/column/commit"
"github.com/kelindar/iostream"
"github.com/klauspost/compress/s2"
)
var (
errUnexpectedEOF = errors.New("column: unable to restore, unexpected EOF")
)
// --------------------------- Commit Replay ---------------------------
// Replay replays a commit on a collection, applying the changes.
func (c *Collection) Replay(change commit.Commit) error {
return c.Query(func(txn *Txn) error {
txn.dirty.Set(change.Chunk)
for i := range change.Updates {
if !change.Updates[i].IsEmpty() {
txn.updates = append(txn.updates, change.Updates[i])
}
}
return nil
})
}
// --------------------------- Snapshotting ---------------------------
// WriteTo writes collection encoded into binary format into the destination writer until
// there's no more data to write or when an error occurs. The return value n is the number
// of bytes written. Any error encountered during the write is also returned.
func (c *Collection) WriteTo(dst io.Writer) (int64, error) {
c.lock.Lock()
defer c.lock.Unlock()
// Create a writer, encoder and a reusable buffer
encoder := c.codec.EncoderFor(dst)
writer := iostream.NewWriter(c.codec.EncoderFor(dst))
buffer := c.txns.acquirePage(rowColumn)
defer c.txns.releasePage(buffer)
// Write the schema version
if err := writer.WriteUvarint(0x1); err != nil {
return writer.Offset(), err
}
// Write the number of columns
if err := writer.WriteUvarint(uint64(c.cols.Count()) + 1); err != nil {
return writer.Offset(), err
}
// Write the inserts column
buffer.PutBitmap(commit.Insert, c.fill)
if err := writer.WriteSelf(buffer); err != nil {
return writer.Offset(), err
}
// Snapshot each column and write the buffer
if err := c.cols.Range(func(column *column) error {
buffer.Reset(column.name)
column.Snapshot(buffer)
return writer.WriteSelf(buffer)
}); err != nil {
return writer.Offset(), err
}
return writer.Offset(), encoder.Close()
}
// ReadFrom reads a collection from the provided reader source until EOF or error. The
// return value n is the number of bytes read. Any error except EOF encountered during
// the read is also returned.
func (c *Collection) ReadFrom(src io.Reader) (int64, error) {
r := iostream.NewReader(c.codec.DecoderFor(src))
// Read the version and make sure it matches
version, err := r.ReadUvarint()
if err != nil || version != 0x1 {
return r.Offset(), fmt.Errorf("column: unable to restore (version %d) %v", version, err)
}
// Read the number of columns
count, err := r.ReadUvarint()
if err != nil {
return r.Offset(), err
}
// Read each column
err = c.Query(func(txn *Txn) error {
for i := uint64(0); i < count; i++ {
buffer := txn.owner.txns.acquirePage("")
_, err := buffer.ReadFrom(r)
switch {
case err == io.EOF && i < count:
return errUnexpectedEOF
case err != nil:
return err
default:
txn.updates = append(txn.updates, buffer)
buffer.RangeChunks(func(chunk uint32) {
txn.dirty.Set(chunk)
})
}
}
return nil
})
return r.Offset(), err
}
// --------------------------- Compression Codec ----------------------------
// newCodec creates a new compressor for the destination writer
func newCodec(options *Options) codec {
return &s2codec{
w: s2.NewWriter(nil),
r: s2.NewReader(nil),
}
}
type codec interface {
DecoderFor(reader io.Reader) io.Reader
EncoderFor(writer io.Writer) io.WriteCloser
}
type s2codec struct {
w *s2.Writer
r *s2.Reader
}
func (c *s2codec) Read(p []byte) (int, error) {
return c.r.Read(p)
}
func (c *s2codec) Write(p []byte) (int, error) {
return c.w.Write(p)
}
func (c *s2codec) DecoderFor(reader io.Reader) io.Reader {
c.r.Reset(reader)
return c
}
func (c *s2codec) EncoderFor(writer io.Writer) io.WriteCloser {
c.w.Reset(writer)
return c
}
func (c *s2codec) Close() error {
return c.w.Close()
}