This repository has been archived by the owner on Mar 1, 2024. It is now read-only.
forked from alpacahq/marketstore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cache.go
52 lines (44 loc) · 1.66 KB
/
cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package executor
import (
"sync/atomic"
"time"
"github.com/alpacahq/marketstore/utils/io"
)
/*
NOTE: Access to the TransactionPipe structures is single threaded with the exception of the CommandChannel.
Modification of the cache contents is performed by de-queueing commands from the CommandChannel
in the single Cache thread.
*/
const WriteChannelCommandDepth = 1000000
type WriteCommand struct {
RecordType io.EnumRecordType
WALKeyPath string
Offset, Index int64
Data []byte
}
// TransactionPipe stores the contents of the current pending Transaction Group
// and writes it to WAL when flush() is called
type TransactionPipe struct {
tgID int64 // Current transaction group ID
writeChannel chan *WriteCommand // Channel for write commands
flushChannel chan chan struct{} // Channel for flush request
}
// NewTransactionPipe creates a new transaction pipe that channels all
// of the write transactions to the WAL and primary writers
func NewTransactionPipe() *TransactionPipe {
tgc := new(TransactionPipe)
// Allocate the write channel with enough depth to allow all conceivable writers concurrent access
tgc.writeChannel = make(chan *WriteCommand, WriteChannelCommandDepth)
tgc.flushChannel = make(chan chan struct{}, WriteChannelCommandDepth)
tgc.NewTGID()
return tgc
}
// NewTGID monotonically increases the transaction group ID using
// the current unix epoch nanosecond timestamp
func (tgc *TransactionPipe) NewTGID() int64 {
return atomic.AddInt64(&tgc.tgID, time.Now().UTC().UnixNano()-tgc.tgID)
}
// TGID returns the latest transaction group ID
func (tgc *TransactionPipe) TGID() int64 {
return atomic.LoadInt64(&tgc.tgID)
}