Skip to content

Commit

Permalink
[memdb] update tinylog commit code
Browse files Browse the repository at this point in the history
  • Loading branch information
unit-adm committed Dec 31, 2020
1 parent 3bf075d commit 7d0aefe
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 211 deletions.
37 changes: 34 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,40 @@ Entries are written to memdb and becomes immediately queryable. The memdb entrie

To efficiently compact and store data, the unitdb engine groups entries sequence by topic key, and then orders those sequences by time and each block keep offset of previous block in reverse time order. Index block offset is calculated from entry sequence in the time block. Data is read from data block using index entry information and the it un-compresses the data on read (if encryption flag was set then it un-encrypts the data on read).

<p align="left">
<img src="docs/assets/img/about.png" />
</p>
```
Time-block Write-ahead log
+---------+---------+-----------+-...-+---------------+ +--------------+--------------+--------------+-...-+-----------------+
| TinyLog | TinyLog | TinyLog | | TinyLog | ------->| TimeID|block | TimeID|block | TimeID|block | | TimeID|block |
+---------+---------+-----------+-...-+---------------+ +--------------+--------------+--------------+-...-+-----------------+
|
|
|
v
Topic trie Window block
+----------+ +------+ +------+ +--------+ Window-offset +-----------------------------+-----------------------------+-...-+-----------------------------+
| Contract | -----> | Hash | -----> | Hash | ---> | Offset | -----> | Topic hash|sequence...|next | Topic hash|sequence...|next | | Topic hash|sequence...|next |
+----------+ +------+ | +------+ +--------+ +-----------------------------+-----------------------------+-...-+-----------------------------+
| Contract | --- | +------+ +------+ +--------+ +-----------------------------+-----------------------------+-...-+-----------------------------+
+----------+ ---> | Hash | ---> | Hash | ---> | Offset | ---> | Topic hash|sequence...|next | Topic hash|sequence...|next | | Topic hash|sequence...|next |
| Contract | -- | +------+ +------+ +--------+ +-----------------------------+-----------------------------+-...-+-----------------------------+
+----------+ | | +------+ +------+ +--------+ +-----------------------------+-----------------------------+-...-+-----------------------------+
| ---> | Hash | ---> | Hash | ---> | Offset | ---> | Topic hash|sequence...|next | Topic hash|sequence...|next | | Topic hash|sequence...|next |
| +------+ +------+ +--------+ +-----------------------------+-----------------------------+-...-+-----------------------------+
| +------+ +--------+ |
---> | Hash | ---> | Offset | | Index block-offset = (sequence-1)/(block-size)
+------+ +--------+ |
Index block v
+-------------------+-------------------+-------------------+-...-+-------------------+
| Offset|value size | Offset|value size | Offset|value size | | Offset|value size |
+-------------------+-------------------+-------------------+-...-+-------------------+
|
| Data block-offset
Data block v
+------------+------------+------------+------------+-...-+------------+
| Topic|data | Topic|data | Topic|data | Topic|data | | Topic|data |
+------------+------------+------------+------------+-...-+------------+
```

## Projects Using Unitdb
Below is a list of projects that use unitdb.
Expand Down
Binary file removed docs/assets/img/about.png
Binary file not shown.
2 changes: 1 addition & 1 deletion memdb/db_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (

logDir = "logs"

nPoolSize = 10
nPoolSize = 27

nBlocks = 27

Expand Down
262 changes: 55 additions & 207 deletions memdb/tinylog.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package memdb
import (
"fmt"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -54,7 +53,6 @@ func (l *_TinyLog) timeID() _TimeID {
}

func (b *_TinyLog) abort() {
// atomic.StoreUint32(&b.entryCount, 0)
close(b.doneChan)
}

Expand Down Expand Up @@ -85,18 +83,14 @@ type (
logCount int
}
_LogPool struct {
db *DB
opts *_LogOptions
tinyLog *_TinyLog
writeQueue chan *_TinyLog
logQueue chan *_TinyLog
waitingQueue _Queue
stop chan struct{}
stopOnce sync.Once
stopped int32
waiting int32
wait bool
stopWg sync.WaitGroup
db *DB
opts *_LogOptions
tinyLog *_TinyLog
writeQueue chan *_TinyLog
logQueue chan *_TinyLog
stop chan struct{}
stopOnce sync.Once
stopWg sync.WaitGroup
}
)

Expand Down Expand Up @@ -137,8 +131,9 @@ func (db *DB) newLogPool(opts *_LogOptions) {
pool := &_LogPool{
db: db,
opts: opts,
tinyLog: &_TinyLog{},
writeQueue: make(chan *_TinyLog, 1),
logQueue: make(chan *_TinyLog),
logQueue: make(chan *_TinyLog, opts.poolCapacity),
stop: make(chan struct{}),
}

Expand All @@ -147,10 +142,14 @@ func (db *DB) newLogPool(opts *_LogOptions) {
// start the write loop
go pool.writeLoop(opts.writeInterval)

// start the commit loop
pool.stopWg.Add(1)
go pool.commitLoop()

// start the dispacther
for i := 0; i < opts.logCount; i++ {
pool.stopWg.Add(1)
go pool.dispatch(opts.poolCapacity, opts.timeout)
go pool.dispatch(opts.timeout)
}

db.internal.logPool = pool
Expand All @@ -164,8 +163,6 @@ func (p *_LogPool) size() int {
// stop tells dispatcher to exit, and wether or not complete queued jobs.
func (p *_LogPool) close(wait bool) {
p.stopOnce.Do(func() {
atomic.StoreInt32(&p.stopped, 1)
p.wait = wait
// Close write queue and wait for currently running jobs to finish.
close(p.stop)
})
Expand All @@ -177,16 +174,6 @@ func (p *_LogPool) closeWait() {
p.close(true)
}

// stopped returns true if worker pool has been stopped.
func (p *_LogPool) isClosed() bool {
return atomic.LoadInt32(&p.stopped) != 0
}

// waitQueueSize returns count of jobs in waitingQueue.
func (p *_LogPool) waitQueueSize() int {
return int(atomic.LoadInt32(&p.waiting))
}

// write enqueues a log to write.
func (p *_LogPool) write(tinyLog *_TinyLog) {
if tinyLog != nil {
Expand Down Expand Up @@ -242,199 +229,60 @@ func (p *_LogPool) writeLoop(interval time.Duration) {
}

// dispatch handles tiny log commit for the jobs in queue.
func (p *_LogPool) dispatch(cap int, timeOutInterval time.Duration) {
defer p.stopWg.Done()
timeout := time.NewTimer(timeOutInterval)
var logCount int
var idle bool
Loop:
func (p *_LogPool) dispatch(timeout time.Duration) {
LOOP:
for {
// As long as jobs are in waiting queue, incoming
// job are put into the waiting queueand jobs to run are taken from waiting queue.
if p.waitingQueue.len() != 0 {
if !p.processWaitingQueue() {
break Loop
}
continue
}

select {
case tinyLog, ok := <-p.writeQueue:
// Get a buffer from the queue
if !ok {
break Loop
close(p.logQueue)
p.stopWg.Done()
return
}

// return tinyLog to the pool
select {
case p.logQueue <- tinyLog:
default:
if logCount < cap {
go p.commit(tinyLog, p.logQueue)
logCount++
} else {
// Enqueue job to be executed later.
p.waitingQueue.push(tinyLog)
atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.len()))
}
}
idle = false
case <-timeout.C:
if idle && logCount > 0 {
if p.killIdleJob() {
logCount--
}
// pool is full, let GC handle the buffer
goto WAIT
}
idle = true
timeout.Reset(timeOutInterval)
}
}

// If instructed to wait, then run jobs that are already in queue.
if p.wait {
p.runQueuedJobs()
}

// Stop all remaining jobs as it become ready.
for logCount > 0 {
p.logQueue <- nil
logCount--
}
timeout.Stop()
WAIT:
// Wait for a while
time.Sleep(timeout)
goto LOOP
}

// commit run initial tiny log commit, then start tiny log waiting for more.
func (p *_LogPool) commit(tinyLog *_TinyLog, queue chan *_TinyLog) {
if err := p.db.tinyCommit(tinyLog); err != nil {
fmt.Println("workerPool.tinyCommit: error ", err)
}

go p.tinyCommit(queue)
}

// tinyCommit commits log and stops when it receive a nil log.
func (p *_LogPool) tinyCommit(queue chan *_TinyLog) {
for tinyLog := range queue {
if tinyLog == nil {
return
}

if err := p.db.tinyCommit(tinyLog); err != nil {
fmt.Println("workerPool.tinyCommit: error ", err)
// commitLoop commits the tiny log to the WAL.
func (p *_LogPool) commitLoop() {
for {
select {
case <-p.stop:
// run queued jobs from the log queue and
// process it until queue is empty.
for {
select {
case tinyLog, ok := <-p.logQueue:
if !ok {
p.stopWg.Done()
return
}
if err := p.db.tinyCommit(tinyLog); err != nil {
fmt.Println("logPool.tinyCommit: error ", err)
}
default:
}
}
case tinyLog := <-p.logQueue:
if tinyLog != nil {
if err := p.db.tinyCommit(tinyLog); err != nil {
fmt.Println("logPool.tinyCommit: error ", err)
}
}
}
}
}

// processWaiting queue puts new jobs onto the waiting queue,
// removes jobs from the waiting queue. Returns false if workerPool is stopped.
func (p *_LogPool) processWaitingQueue() bool {
select {
case p.logQueue <- p.waitingQueue.front():
p.waitingQueue.pop()
}
atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.len()))
return true
}

func (p *_LogPool) killIdleJob() bool {
select {
case p.logQueue <- nil:
return true
default:
return false
}
}

// runQueuedJobs removes each job from the waiting queue and
// process it until queue is empty.
func (p *_LogPool) runQueuedJobs() {
for p.waitingQueue.len() != 0 {
p.logQueue <- p.waitingQueue.pop()
atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.len()))
}
}

type _Queue struct {
buf []*_TinyLog
head int
tail int
count int
}

// len returns the number of elements currently stored in the queue.
func (q *_Queue) len() int {
return q.count
}

// push appends an element to the back of the queue.
func (q *_Queue) push(elem *_TinyLog) {
q.grow()

q.buf[q.tail] = elem
// calculate new tail position.
q.tail = (q.tail + 1) & (len(q.buf) - 1) // bitwise modulus
q.count++
}

// pop removes and return an element from front of the queue.
func (q *_Queue) pop() *_TinyLog {
if q.count <= 0 {
panic("Queue: pop called on empty queue")
}
elem := q.buf[q.head]
q.buf[q.head] = nil
// Calculate new head position.
q.head = (q.head + 1) & (len(q.buf) - 1) // bitwise modulus
q.count--
q.shrink()

return elem
}

// front returns an element from front of the queue.
func (q *_Queue) front() *_TinyLog {
if q.count <= 0 {
panic("Queue: pop called on empty queue")
}
return q.buf[q.head]
}

// at returns element at index i in the queue without removing element from the queue.
// at(0) refers to first element and is same as front(). at(len()0-1) refers to the last element.
func (q *_Queue) at(i int) *_TinyLog {
if i < 0 || i > q.count {
panic("Queue: at called with index out of range")
}
// bitwise modulus
return q.buf[(q.head+i)&(len(q.buf)-1)]
}

// grow resizes the queue to fit exactly twice its current content.
func (q *_Queue) grow() {
if len(q.buf) == 0 {
q.buf = make([]*_TinyLog, nPoolSize)
return
}
if q.count == len(q.buf) {
q.resize()
}
}

// shrink resizes the queue down if bugger if 1/4 full.
func (q *_Queue) shrink() {
if len(q.buf) > nPoolSize && (q.count<<2) == len(q.buf) {
q.resize()
}
}

// resize resizes the queue to fit exactly twice its current content.
func (q *_Queue) resize() {
newBuf := make([]*_TinyLog, q.count<<1)
if q.tail > q.head {
copy(newBuf, q.buf[q.head:q.tail])
} else {
n := copy(newBuf, q.buf[q.head:])
copy(newBuf[n:], q.buf[:q.tail])
}

q.head = 0
q.tail = q.count
q.buf = newBuf
}

0 comments on commit 7d0aefe

Please sign in to comment.