Skip to content

Commit

Permalink
naming style changes and linter error fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
unit-adm committed Jul 18, 2020
1 parent 9658b42 commit 33b49ad
Show file tree
Hide file tree
Showing 15 changed files with 123 additions and 126 deletions.
4 changes: 2 additions & 2 deletions batch.go
Expand Up @@ -51,7 +51,7 @@ type (

// Batch is a write batch.
Batch struct {
batchId uid.LID
batchID uid.LID
opts *BatchOptions
managed bool
grouped bool
Expand Down Expand Up @@ -151,7 +151,7 @@ func (b *Batch) DeleteEntry(e *Entry) error {
case b.opts.Immutable:
return errImmutable
case len(e.ID) == 0:
return errMsgIdEmpty
return errMsgIDEmpty
case len(e.Topic) == 0:
return errTopicEmpty
case len(e.Topic) > maxTopicLength:
Expand Down
2 changes: 1 addition & 1 deletion batchdb.go
Expand Up @@ -68,7 +68,7 @@ func (db *DB) batch() *Batch {
opts := DefaultBatchOptions
opts.Immutable = db.flags.Immutable
opts.Encryption = db.encryption == 1
b := &Batch{opts: opts, batchId: uid.NewLID(), db: db}
b := &Batch{opts: opts, batchID: uid.NewLID(), db: db}
b.buffer = db.bufPool.Get()

return b
Expand Down
3 changes: 1 addition & 2 deletions cmd/sample/main.go
Expand Up @@ -18,8 +18,7 @@ func main() {
}
defer db.Close()

// Use DB.SetEntry() method to bulk store messages as topic is parsed on first request and subsequent requests skips parsing.

// Use Entry.WithPayload() method to bulk store messages as topic is parsed on first request and subsequent requests skips parsing.
topic := []byte("teams.alpha.ch1.u1")
entry := &unitdb.Entry{Topic: topic}
for j := 0; j < 50; j++ {
Expand Down
10 changes: 5 additions & 5 deletions db.go
Expand Up @@ -329,7 +329,7 @@ func (db *DB) Get(q *Query) (items [][]byte, err error) {
}
e, err := db.readEntry(we.topicHash, we.seq)
if err != nil {
if err == errMsgIdDeleted {
if err == errMsgIDDeleted {
invalidCount++
return nil
}
Expand All @@ -341,13 +341,13 @@ func (db *DB) Get(q *Query) (items [][]byte, err error) {
logger.Error().Err(err).Str("context", "data.readMessage")
return err
}
msgId := message.ID(id)
if !msgId.EvalPrefix(q.Contract, q.cutoff) {
msgID := message.ID(id)
if !msgID.EvalPrefix(q.Contract, q.cutoff) {
invalidCount++
return nil
}

if msgId.IsEncrypted() {
if msgID.IsEncrypted() {
val, err = db.mac.Decrypt(nil, val)
if err != nil {
logger.Error().Err(err).Str("context", "mac.decrypt")
Expand Down Expand Up @@ -520,7 +520,7 @@ func (db *DB) DeleteEntry(e *Entry) error {
case db.flags.Immutable:
return errImmutable
case len(e.ID) == 0:
return errMsgIdEmpty
return errMsgIDEmpty
case len(e.Topic) == 0:
return errTopicEmpty
case len(e.Topic) > maxTopicLength:
Expand Down
4 changes: 2 additions & 2 deletions db_internal.go
Expand Up @@ -186,7 +186,7 @@ func (db *DB) readEntry(topicHash uint64, seq uint64) (entry, error) {
e := entry{}
data, err := db.mem.Get(topicHash, cacheKey)
if err != nil {
return entry{}, errMsgIdDeleted
return entry{}, errMsgIDDeleted
}
if data != nil {
e.UnmarshalBinary(data[:entrySize])
Expand Down Expand Up @@ -484,7 +484,7 @@ func (db *DB) isClosed() bool {
// Check read ok status.
func (db *DB) ok() error {
if db.isClosed() {
return errors.New("wal is closed.")
return errors.New("wal is closed")
}
return nil
}
30 changes: 15 additions & 15 deletions db_test.go
Expand Up @@ -65,13 +65,13 @@ func TestSimple(t *testing.T) {

entry := NewEntry(topic).WithContract(contract).WithTTL([]byte("1m"))
for i = 0; i < n; i++ {
messageId := db.NewID()
entry.WithID(messageId)
messageID := db.NewID()
entry.WithID(messageID)
val := []byte(fmt.Sprintf("msg.%2d", i))
if err := db.PutEntry(entry.WithPayload(val)); err != nil {
t.Fatal(err)
}
ids = append(ids, messageId)
ids = append(ids, messageID)
}

verifyMsgsAndClose := func() {
Expand Down Expand Up @@ -115,15 +115,15 @@ func TestSimple(t *testing.T) {
}

for i = 0; i < n; i++ {
messageId := db.NewID()
messageID := db.NewID()
val := []byte(fmt.Sprintf("msg.%2d", i))
if err := db.Put(topic, val); err != nil {
t.Fatal(err)
}
if err := db.PutEntry(NewEntry(topic).WithID(messageId).WithPayload(val)); err != nil {
if err := db.PutEntry(NewEntry(topic).WithID(messageID).WithPayload(val)); err != nil {
t.Fatal(err)
}
ids = append(ids, messageId)
ids = append(ids, messageID)
}
db.tinyCommit()
if err := db.Sync(); err != nil {
Expand Down Expand Up @@ -183,13 +183,13 @@ func TestBatch(t *testing.T) {
// wg.Add(1)
var ids [][]byte
for i = 0; i < n; i++ {
messageId := db.NewID()
messageID := db.NewID()
topic := append(topic, []byte("?ttl=1h")...)
val := []byte(fmt.Sprintf("msg.%2d", i))
if err := b.PutEntry(NewEntry(topic).WithID(messageId).WithPayload(val).WithContract(contract)); err != nil {
if err := b.PutEntry(NewEntry(topic).WithID(messageID).WithPayload(val).WithContract(contract)); err != nil {
t.Fatal(err)
}
ids = append(ids, messageId)
ids = append(ids, messageID)
}
for _, id := range ids {
if err := b.Delete(id, topic); err != nil {
Expand Down Expand Up @@ -338,12 +338,12 @@ func TestLeasing(t *testing.T) {
topic := []byte("unit1.test")
var ids [][]byte
for i = 0; i < n; i++ {
messageId := db.NewID()
messageID := db.NewID()
val := []byte(fmt.Sprintf("msg.%2d", i))
if err := db.PutEntry(NewEntry(topic).WithID(messageId).WithPayload(val)); err != nil {
if err := db.PutEntry(NewEntry(topic).WithID(messageID).WithPayload(val)); err != nil {
t.Fatal(err)
}
ids = append(ids, messageId)
ids = append(ids, messageID)
}
db.tinyCommit()
if err := db.Sync(); err != nil {
Expand All @@ -353,15 +353,15 @@ func TestLeasing(t *testing.T) {
db.Delete(id, topic)
}
for i = 0; i < n; i++ {
messageId := db.NewID()
messageID := db.NewID()
val := []byte(fmt.Sprintf("msg.%2d", i))
if err := db.Put(topic, val); err != nil {
t.Fatal(err)
}
if err := db.PutEntry(NewEntry(topic).WithID(messageId).WithPayload(val)); err != nil {
if err := db.PutEntry(NewEntry(topic).WithID(messageID).WithPayload(val)); err != nil {
t.Fatal(err)
}
ids = append(ids, messageId)
ids = append(ids, messageID)
}
db.tinyCommit()
if err := db.Sync(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions entry.go
Expand Up @@ -39,6 +39,7 @@ type (
val []byte
encryption bool
}
// Entry entry is a message entry structure
Entry struct {
internalEntry
ID []byte // The ID of the message
Expand Down
8 changes: 4 additions & 4 deletions errors.go
Expand Up @@ -22,10 +22,10 @@ import (

var (
errTopicEmpty = errors.New("Topic is empty")
errMsgIdEmpty = errors.New("Message Id is empty")
errMsgIdDeleted = errors.New("Message Id is deleted")
errMsgIdDoesNotExist = errors.New("Message Id does not exist in database")
errMsgIdPrefixMismatch = errors.New("Message Id does not match topic or Contract")
errMsgIDEmpty = errors.New("Message ID is empty")
errMsgIDDeleted = errors.New("Message ID is deleted")
errMsgIDDoesNotExist = errors.New("Message ID does not exist in database")
errMsgIDPrefixMismatch = errors.New("Message ID does not match topic or Contract")
errTtlTooLarge = errors.New("TTL is too large")
errTopicTooLarge = errors.New("Topic is too large")
errMsgExpired = errors.New("Message has expired")
Expand Down
8 changes: 4 additions & 4 deletions iterator.go
Expand Up @@ -138,7 +138,7 @@ func (it *ItemIterator) Next() {
}
e, err := it.db.readEntry(we.topicHash, we.seq)
if err != nil {
if err == errMsgIdDoesNotExist {
if err == errMsgIDDoesNotExist {
logger.Error().Err(err).Str("context", "db.readEntry")
return err
}
Expand All @@ -150,13 +150,13 @@ func (it *ItemIterator) Next() {
logger.Error().Err(err).Str("context", "data.readMessage")
return err
}
msgId := message.ID(id)
if !msgId.EvalPrefix(it.query.Contract, it.query.cutoff) {
msgID := message.ID(id)
if !msgID.EvalPrefix(it.query.Contract, it.query.cutoff) {
it.invalidKeys++
return nil
}

if msgId.IsEncrypted() {
if msgID.IsEncrypted() {
val, err = it.db.mac.Decrypt(nil, val)
if err != nil {
logger.Error().Err(err).Str("context", "mac.Decrypt")
Expand Down
34 changes: 17 additions & 17 deletions memdb/memdb.go
Expand Up @@ -144,15 +144,15 @@ func (db *DB) Close() error {
return nil
}

// getCache returns cache under given blockId
func (db *DB) getCache(blockId uint64) *memCache {
return db.blockCache[db.consistent.FindBlock(blockId)]
// getCache returns cache under given blockID
func (db *DB) getCache(blockID uint64) *memCache {
return db.blockCache[db.consistent.FindBlock(blockID)]
}

// Get gets data for the provided key under a blockId
func (db *DB) Get(blockId uint64, key uint64) ([]byte, error) {
// Get gets data for the provided key under a blockID
func (db *DB) Get(blockID uint64, key uint64) ([]byte, error) {
// Get cache
cache := db.getCache(blockId)
cache := db.getCache(blockID)
cache.RLock()
defer cache.RUnlock()
// Get item from cache.
Expand All @@ -175,10 +175,10 @@ func (db *DB) Get(blockId uint64, key uint64) ([]byte, error) {
return data[4:], nil
}

// Remove sets data offset to -1 for the key under a blockId
func (db *DB) Remove(blockId uint64, key uint64) error {
// Remove sets data offset to -1 for the key under a blockID
func (db *DB) Remove(blockID uint64, key uint64) error {
// Get cache
cache := db.getCache(blockId)
cache := db.getCache(blockID)
cache.RLock()
defer cache.RUnlock()
// Get item from cache.
Expand All @@ -188,10 +188,10 @@ func (db *DB) Remove(blockId uint64, key uint64) error {
return nil
}

// Set sets the value for the given entry for a blockId.
func (db *DB) Set(blockId uint64, key uint64, data []byte) error {
// Set sets the value for the given entry for a blockID.
func (db *DB) Set(blockID uint64, key uint64, data []byte) error {
// Get cache.
cache := db.getCache(blockId)
cache := db.getCache(blockID)
cache.Lock()
defer cache.Unlock()
off, err := cache.data.allocate(uint32(len(data) + 4))
Expand All @@ -211,10 +211,10 @@ func (db *DB) Set(blockId uint64, key uint64, data []byte) error {
return nil
}

// Keys gets all keys from block cache for the provided blockId
func (db *DB) Keys(blockId uint64) []uint64 {
// Keys gets all keys from block cache for the provided blockID
func (db *DB) Keys(blockID uint64) []uint64 {
// Get cache
cache := db.getCache(blockId)
cache := db.getCache(blockID)
cache.RLock()
defer cache.RUnlock()
// Get keys from block cache.
Expand All @@ -226,9 +226,9 @@ func (db *DB) Keys(blockId uint64) []uint64 {
}

// Free free keeps first offset that can be free if memdb exceeds target size.
func (db *DB) Free(blockId uint64, key uint64) error {
func (db *DB) Free(blockID uint64, key uint64) error {
// Get cache
cache := db.getCache(blockId)
cache := db.getCache(blockID)
cache.Lock()
defer cache.Unlock()
if cache.freeOffset > 0 {
Expand Down
8 changes: 4 additions & 4 deletions message/id.go
Expand Up @@ -38,9 +38,9 @@ const (
// Contract generates contract from parts and concatenate contract and first part of the topic
func Contract(parts []Part) uint64 {
if len(parts) == 1 {
return /*uint64(Wildcard)<<32 +*/ uint64(parts[0].Query)
return uint64(parts[0].Hash)
}
return uint64(parts[1].Query)<<32 + uint64(parts[0].Query)
return uint64(parts[1].Hash)<<32 + uint64(parts[0].Hash)
}

// ID represents a message ID encoded at 128bit and lexigraphically sortable
Expand All @@ -54,15 +54,15 @@ func (id *ID) AddContract(contract uint64) {
*id = newid
}

// Contract gets the contract for the id.
// Contract gets the contract for the ID.
func (id ID) Contract() uint64 {
if len(id) < fixed+8 {
return 0
}
return binary.LittleEndian.Uint64(id[fixed:])
}

// NewID generates a new message identifier without containing a prefix. Prefix is set later when arrives.
// NewID generates a new message identifier without containing a prefix. Prefix is set later.
func NewID(seq uint64, encrypted bool) ID {
var eBit int8
if encrypted {
Expand Down

0 comments on commit 33b49ad

Please sign in to comment.