/
txn.go
79 lines (71 loc) · 1.88 KB
/
txn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package godb
// Tx represents a transaction on the database. This transaction can either be
// read-only or read/write. Read-only transactions can be used for retrieving
// values for keys and iterating through keys and values. Read/write
// transactions can set and delete keys.
//
// All transactions must be committed or rolled-back when done.
type Tx struct {
db *DB // the underlying database.
writable bool // when false mutable operations fail.
committed records
}
// lock locks the database based on the transaction type.
func (tx *Tx) lock() {
if tx.writable {
tx.db.mu.Lock()
} else {
tx.db.mu.RLock()
}
}
// unlock unlocks the database based on the transaction type.
func (tx *Tx) unlock() {
if tx.writable {
tx.db.mu.Unlock()
} else {
tx.db.mu.RUnlock()
}
}
func (tx *Tx) Commit() error {
if tx.db == nil {
return ErrTxClosed
} else if !tx.writable {
return ErrTxNotWritable
}
var err error
if (len(tx.committed) > 0) && tx.writable {
// If this operation fails then the write did failed and we must
// rollback.
err = tx.db.writeBatch(tx.committed)
if err != nil {
tx.rollback()
}
}
// apply all commands
err = tx.buildIndex(tx.committed)
// Unlock the database and allow for another writable transaction.
tx.unlock()
// Clear the db field to disable this transaction from future use.
tx.db = nil
return err
}
// rollback handles the underlying rollback logic.
// Intended to be called from Commit() and Rollback().
func (tx *Tx) rollback() {
tx.committed = nil
}
func (tx *Tx) buildIndex(recs []*record) error {
for _, rec := range recs {
tx.db.idx.Insert(rec.key, newIndex(rec.seg, rec.offset))
}
return nil
}
func (tx *Tx) addRecord(rec *record) {
tx.committed = append(tx.committed, rec)
}
// Set saves a key-value pair.
func (tx *Tx) Put(key, value []byte) error {
e := newRecord(key, value, putted)
tx.addRecord(e)
return nil
}