Skip to content

Commit

Permalink
optimize DB block sync
Browse files Browse the repository at this point in the history
  • Loading branch information
unit-adm committed Jul 28, 2020
1 parent d7cbf3d commit 80e54ac
Show file tree
Hide file tree
Showing 15 changed files with 197 additions and 112 deletions.
4 changes: 2 additions & 2 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type (
// It is safe to modify the contents of the argument after Put returns but not
// before.
func (b *Batch) Put(topic, payload []byte) error {
return b.PutEntry(NewEntry(topic).WithPayload(payload).WithContract(b.opts.Contract))
return b.PutEntry(NewEntry(topic, payload).WithContract(b.opts.Contract))
}

// PutEntry appends entries to a bacth for given topic->key/value pair.
Expand Down Expand Up @@ -120,7 +120,7 @@ func (b *Batch) PutEntry(e *Entry) error {
// It is safe to modify the contents of the argument after Delete returns but
// not before.
func (b *Batch) Delete(id, topic []byte) error {
return b.DeleteEntry(NewEntry(topic).WithID(id))
return b.DeleteEntry(NewEntry(topic, nil).WithID(id))
}

// DeleteEntry appends entry for deletion to a batch for given key.
Expand Down
26 changes: 17 additions & 9 deletions block_writer.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2020 Saffat Technologies, Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package unitdb

import (
Expand All @@ -8,8 +24,7 @@ import (
)

type blockWriter struct {
upperSeq uint64
blocks map[int32]block // map[blockIdx]block
blocks map[int32]block // map[blockIdx]block

*file
buffer *bpool.Buffer
Expand Down Expand Up @@ -85,9 +100,6 @@ func (bw *blockWriter) append(s slot, blockIdx int32) (exists bool, err error) {
if entryIdx == -1 {
return true, nil
}
if bw.upperSeq < s.seq {
bw.upperSeq = s.seq
}

if b.leased {
bw.leasing[s.seq] = struct{}{}
Expand Down Expand Up @@ -177,10 +189,6 @@ func (bw *blockWriter) write() error {
return nil
}

func (bw *blockWriter) UpperSeq() uint64 {
return bw.upperSeq
}

func blockRange(idx []int32) ([][]int32, error) {
if len(idx) == 0 {
return nil, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func benchmark4(dir string, numKeys int, minKS int, maxKS int, minVS int, maxVS
for contract := range keys {
for _, k := range keys[contract] {
topic := append(k, []byte("?ttl=1m")...)
b.PutEntry(unitdb.NewEntry(topic).WithPayload(vals[i]).WithContract(contract))
b.PutEntry(unitdb.NewEntry(topic, vals[i]).WithContract(contract))
}
}
return b.Write()
Expand Down
12 changes: 6 additions & 6 deletions cmd/sample/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func main() {

// Writing to multiple topics in a batch
err = db.Batch(func(b *unitdb.Batch, completed <-chan struct{}) error {
b.PutEntry(unitdb.NewEntry([]byte("teams.alpha.ch1.u2")).WithPayload([]byte("msg for team alpha channel1 receiver2")))
b.PutEntry(unitdb.NewEntry([]byte("teams.alpha.ch1.u3")).WithPayload([]byte("msg for team alpha channel1 receiver3")))
b.PutEntry(unitdb.NewEntry([]byte("teams.alpha.ch1.u2"), []byte("msg for team alpha channel1 receiver2")))
b.PutEntry(unitdb.NewEntry([]byte("teams.alpha.ch1.u3"), []byte("msg for team alpha channel1 receiver3")))
err := b.Write()
return err
})
Expand Down Expand Up @@ -100,10 +100,10 @@ func main() {
opts := unitdb.DefaultBatchOptions
opts.Contract = contract
b.SetOptions(opts)
b.PutEntry(unitdb.NewEntry([]byte("teams.*.ch1")).WithPayload([]byte("msg for any team channel1")))
b.PutEntry(unitdb.NewEntry([]byte("teams.alpha.*")).WithPayload([]byte("msg for team alpha all channels")))
b.PutEntry(unitdb.NewEntry([]byte("teams...")).WithPayload([]byte("msg for all teams and all channels")))
b.PutEntry(unitdb.NewEntry([]byte("...")).WithPayload([]byte("msg broadcast to all receivers of all teams all channels")))
b.PutEntry(unitdb.NewEntry([]byte("teams.*.ch1"), []byte("msg for any team channel1")))
b.PutEntry(unitdb.NewEntry([]byte("teams.alpha.*"), []byte("msg for team alpha all channels")))
b.PutEntry(unitdb.NewEntry([]byte("teams..."), []byte("msg for all teams and all channels")))
b.PutEntry(unitdb.NewEntry([]byte("..."), []byte("msg broadcast to all receivers of all teams all channels")))
return b.Write()
})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions cmd/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ func main() {

// delete message
messageId := db.NewID()
entry := unitdb.NewEntry([]byte("teams.alpha.ch1.u1")).WithID(messageId).WithPayload([]byte("msg for team alpha channel1 receiver1"))
entry := unitdb.NewEntry([]byte("teams.alpha.ch1.u1"), []byte("msg for team alpha channel1 receiver1")).WithID(messageId)
db.PutEntry(entry)

err = db.DeleteEntry(unitdb.NewEntry([]byte("teams.alpha.ch1.u1")).WithID(messageId))
err = db.DeleteEntry(unitdb.NewEntry([]byte("teams.alpha.ch1.u1"), nil).WithID(messageId))
if err != nil {
log.Fatal(err)
return
Expand All @@ -78,7 +78,7 @@ func main() {
// Topic isolation using contract
contract, err := db.NewContract()

db.PutEntry(unitdb.NewEntry([]byte("teams.alpha.ch1.u1")).WithPayload([]byte("msg for team alpha channel1 receiver1")).WithContract(contract))
db.PutEntry(unitdb.NewEntry([]byte("teams.alpha.ch1.u1"), []byte("msg for team alpha channel1 receiver1")).WithContract(contract))

// Get message for team alpha channel1 receiver1 with new contract
if msgs, err := db.Get(unitdb.NewQuery([]byte("teams.alpha.ch1.u1?last=1h")).WithContract(contract).WithLimit(10)); err == nil {
Expand Down
16 changes: 16 additions & 0 deletions data_writer.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2020 Saffat Technologies, Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package unitdb

import "github.com/unit-io/bpool"
Expand Down
20 changes: 10 additions & 10 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func Open(path string, opts *Options, flgs ...Flags) (*DB, error) {
logger.Error().Err(err).Str("context", "db.loadTrie")
// return nil, err
}
db.syncHandle = syncHandle{DB: db, internal: internal{}}
db.syncHandle = syncHandle{internal: internal{DB: db}}

logOpts := wal.Options{Path: path + logPostfix, TargetSize: opts.LogSize, BufferSize: opts.BufferSize}
wal, needLogRecovery, err := wal.New(logOpts)
Expand Down Expand Up @@ -247,9 +247,15 @@ func Open(path string, opts *Options, flgs ...Flags) (*DB, error) {
// Close closes the DB.
func (db *DB) Close() error {
if err := db.close(); err != nil {
db.close()
return err
}

//close bufferpool
db.bufPool.Done()

// close memdb
db.mem.Close()

if err := db.writeHeader(true); err != nil {
return err
}
Expand All @@ -269,12 +275,6 @@ func (db *DB) Close() error {
return err
}

//close bufferpool
db.bufPool.Done()

// close memdb
db.mem.Close()

var err error
if db.closer != nil {
if err1 := db.closer.Close(); err == nil {
Expand Down Expand Up @@ -426,7 +426,7 @@ func (db *DB) NewID() []byte {
// It is safe to modify the contents of the argument after Put returns but not
// before.
func (db *DB) Put(topic, payload []byte) error {
return db.PutEntry(NewEntry(topic).WithPayload(payload))
return db.PutEntry(NewEntry(topic, payload))
}

// PutEntry puts entry into the DB, if Contract is not specified then it uses master Contract.
Expand Down Expand Up @@ -491,7 +491,7 @@ func (db *DB) PutEntry(e *Entry) error {
// It is safe to modify the contents of the argument after Delete returns but not
// before.
func (db *DB) Delete(id, topic []byte) error {
return db.DeleteEntry(NewEntry(topic).WithID(id))
return db.DeleteEntry(NewEntry(topic, nil).WithID(id))
}

// DeleteEntry deletes an entry from DB. you must provide an ID to delete an entry.
Expand Down
24 changes: 12 additions & 12 deletions db_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ const (
)

type dbInfo struct {
encryption int8
sequence uint64
logSequence uint64
count uint64
blockIdx int32
windowIdx int32
cacheID uint64
encryption int8
sequence uint64
logSeq uint64
count uint64
blockIdx int32
windowIdx int32
cacheID uint64
}

func (db *DB) writeHeader(writeFreeList bool) error {
Expand Down Expand Up @@ -393,7 +393,7 @@ func (db *DB) tinyCommit() error {
}

db.setLogSeq(db.seq())
if err := <-logWriter.SignalInitWrite(db.logSeq()); err != nil {
if err := <-logWriter.SignalInitWrite(db.logSequence()); err != nil {
return err
}
db.meter.Puts.Inc(int64(db.tinyBatch.count()))
Expand Down Expand Up @@ -431,7 +431,7 @@ func (db *DB) commit(l int, buf *bpool.Buffer) error {
}

db.setLogSeq(db.seq())
return <-logWriter.SignalInitWrite(db.logSeq())
return <-logWriter.SignalInitWrite(db.logSequence())
}

// delete deletes the given key from the DB.
Expand Down Expand Up @@ -476,12 +476,12 @@ func (db *DB) nextSeq() uint64 {
}

// LogSeq current log sequence of the DB.
func (db *DB) logSeq() uint64 {
return atomic.LoadUint64(&db.logSequence)
func (db *DB) logSequence() uint64 {
return atomic.LoadUint64(&db.logSeq)
}

func (db *DB) setLogSeq(seq uint64) error {
atomic.StoreUint64(&db.logSequence, seq)
atomic.StoreUint64(&db.logSeq, seq)
return nil
}

Expand Down

0 comments on commit 80e54ac

Please sign in to comment.