-
Notifications
You must be signed in to change notification settings - Fork 0
/
batch.go
160 lines (132 loc) · 3.88 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package baradb
import (
"sync"
"sync/atomic"
"github.com/saint-yellow/baradb/data"
"github.com/saint-yellow/baradb/index"
)
// nonTranNo This is not a transaction serial number
const nonTranNo uint64 = 0
// tranFinishedKey This is a key that indicates a transaction is finished
var tranFinishedKey = []byte("transaction-finished")
// WriteBatch A transaction that batch writes data and makes sure atomicity
type WriteBatch struct {
mu *sync.RWMutex // Lock
db *DB // DB engine
options WriteBatchOptions // options for batch writing
pendingWrites map[string]*data.LogRecord // data (log records) pending to be writen
}
// NewWriteBatch initializes a write batch in the DB engine
func (db *DB) NewWriteBatch(options WriteBatchOptions) *WriteBatch {
if db.options.IndexType == index.BPtree && !db.tranNoFileExists && !db.isFirstLaunch {
panic("Can not use a write batch since the tran-no file does not exist")
}
wb := &WriteBatch{
mu: new(sync.RWMutex),
db: db,
options: options,
pendingWrites: make(map[string]*data.LogRecord),
}
return wb
}
// Put writes data
func (wb *WriteBatch) Put(key, value []byte) error {
if len(key) == 0 {
return ErrKeyIsEmpty
}
wb.mu.Lock()
defer wb.mu.Unlock()
lr := &data.LogRecord{
Key: key,
Value: value,
Type: data.NormalLogRecord,
}
// Temporarily store a log record which is pending to be written
wb.pendingWrites[string(key)] = lr
return nil
}
// Delete deletes data
func (wb *WriteBatch) Delete(key []byte) error {
if len(key) == 0 {
return ErrKeyIsEmpty
}
// Try to find the corresponding position in the index
// If the position is not found, then delete the corresponding log record in this transaction
lrp := wb.db.index.Get(key)
if lrp == nil {
if wb.pendingWrites[string(key)] != nil {
delete(wb.pendingWrites, string(key))
}
return nil
}
lr := &data.LogRecord{
Key: key,
Type: data.DeletedLogRecord,
}
// Temporarily store a log record which is pending to be written
wb.pendingWrites[string(key)] = lr
return nil
}
// Commit commits the transaction, writes the pending data to the disk and updates the in-memory index
func (wb *WriteBatch) Commit() error {
wb.mu.Lock()
defer wb.mu.Unlock()
// No pending data
if len(wb.pendingWrites) == 0 {
return nil
}
// To much pending data
if len(wb.pendingWrites) > wb.options.MaxBatchNumber {
return ErrExceedMaxBatchNumber
}
// Lock the DB to make sure the serialization of transaction Commit
wb.db.mu.Lock()
defer wb.db.mu.Unlock()
// Get the latest serial number of this transaction
transNo := atomic.AddUint64(&wb.db.tranNo, 1)
// Write pending data to a data file
positions := make(map[string]*data.LogRecordPosition)
for _, lr := range wb.pendingWrites {
lrp, err := wb.db.appendLogRecord(&data.LogRecord{
Key: data.EncodeKey(lr.Key, transNo),
Value: lr.Value,
Type: lr.Type,
}, false)
if err != nil {
return err
}
positions[string(lr.Key)] = lrp
}
// Add a log record that means this transaction is finished
_, err := wb.db.appendLogRecord(&data.LogRecord{
Key: data.EncodeKey(tranFinishedKey, transNo),
Type: data.TransactionFinishedLogRecord,
}, false)
if err != nil {
return err
}
// Sync the written data to the active data file
if wb.options.SyncWrites && wb.db.activeFile != nil {
err := wb.db.activeFile.Sync()
if err != nil {
return err
}
}
// Update in-memory index
for _, lr := range wb.pendingWrites {
lrp := positions[string(lr.Key)]
var oldLRP *data.LogRecordPosition
switch lr.Type {
case data.DeletedLogRecord:
oldLRP, _ = wb.db.index.Delete(lr.Key)
case data.NormalLogRecord:
oldLRP = wb.db.index.Put(lr.Key, lrp)
}
if oldLRP != nil {
wb.db.reclaimSize += int64(oldLRP.Size)
}
}
// Clear the panding data
wb.pendingWrites = make(map[string]*data.LogRecord)
return nil
}