Skip to content
Permalink
Browse files

biglog: createdTS and optimistic RO lookup bug fixes / segment headers (

  • Loading branch information...
bictorman committed Oct 27, 2018
1 parent caac997 commit 5d4656baa8a7b670ce34a442fc8a663c453f2400
@@ -1,21 +1,22 @@
language: go

go:
- 1.7
- 1.9
- 1.11
- tip


before_install:
- go get github.com/kardianos/govendor
- go get golang.org/x/tools/cmd/goimports
- go get github.com/golang/lint/golint
- go get golang.org/x/lint/golint
- go get github.com/kisielk/errcheck
- go get github.com/mattn/goveralls
- go get github.com/opennota/check/cmd/structcheck
- go get github.com/opennota/check/cmd/varcheck
- go get github.com/jgautheron/goconst/cmd/goconst
- go get honnef.co/go/simple/cmd/gosimple
- go get honnef.co/go/staticcheck/cmd/staticcheck
- go get honnef.co/go/tools/cmd/gosimple
- go get honnef.co/go/tools/cmd/staticcheck
- go get github.com/onsi/ginkgo/ginkgo
- go get github.com/onsi/gomega

@@ -3,8 +3,8 @@
# NetLog
A lightweight, HTTP-centric, log-based (Kafka-style) message queue.

## Work-in-progress!
This is still experimental software potentially full of bugs.
## Alpha software
This is still early software and potentially buggy.
To peek at the internals start with [BigLog](https://github.com/ninibe/netlog/tree/master/biglog).

### Roadmap
@@ -136,7 +136,7 @@ func Open(dirPath string) (*BigLog, error) {
bl.hotSeg.Store(hotSeg)

// sort by index file name, should reflect base offset
sort.Sort(sort.StringSlice(indexes))
sort.Strings(indexes)
var seg *segment
for _, index := range indexes {
seg, err = loadSegment(filepath.Join(dirPath, index))
@@ -154,8 +154,8 @@ func Open(dirPath string) (*BigLog, error) {
}

err = bl.setHotSeg(hotSeg)
bl.watchers.Store(make(watcherMap, 0))
bl.readers.Store(make(readerMap, 0))
bl.watchers.Store(make(watcherMap))
bl.readers.Store(make(readerMap))
go bl.notify()

return bl, err
@@ -150,6 +150,10 @@ func TestNewBigLog(t *testing.T) {
}

sc, err := biglog.NewScanner(bl, 0, biglog.UseBuffer(make([]byte, 10)))
if err != nil {
t.Error(err)
}

for i := 0; i < 10; i++ {
if !sc.Scan() {
if i != 6 {
@@ -4,11 +4,10 @@ import (
"fmt"
"io"
"io/ioutil"
"log"
"os"
"time"

"log"

"github.com/ninibe/netlog/biglog"
)

@@ -92,6 +91,8 @@ func step3() {

// Read the entries stored
entries, err := ir.ReadEntries(10)
panicOn(err)

println("Entries:")
for _, entry := range entries {
fmt.Printf("%+v\n", entry)
@@ -77,7 +77,7 @@ func (r *Reader) Read(b []byte) (n int, err error) {
}

r.setSegment(seg)
r.dFO = 0
r.dFO = headerSize
sumn += n
}

@@ -51,7 +51,7 @@ func TestScanner(t *testing.T) {
}

if sc.Offset() != offset {
t.Fatalf("Bad offset Actual: %d\n Expected: %d\n Prev: %d \n",
t.Errorf("Bad offset Actual: %d\n Expected: %d\n Prev: %d \n",
sc.Offset(),
offset,
prevOffset,
@@ -64,7 +64,7 @@ func TestScanner(t *testing.T) {
fmt.Printf("%d => %s \n \n", k, data[k])
defer wg.Done()

t.Fatalf("fatal payload read i=%d k=%d offset=%d delta=%d error:\n"+
t.Errorf("fatal payload read i=%d k=%d offset=%d delta=%d error:\n"+
" Actual: % x\n Expected: % x\n Prev: % x \n prevOffset=%d prevDelta=%d \n",
i,

@@ -147,6 +147,12 @@ func loadSegment(indexPath string) (*segment, error) {
return nil, ErrLoadSegment
}

sh, err := readSegHeader(dataFile)
if err != nil {
Logger.Printf("error: '%s' %s", dataPath, err)
return nil, ErrLoadSegment
}

var readers int32
seg := &segment{
readers: &readers,
@@ -155,6 +161,7 @@ func loadSegment(indexPath string) (*segment, error) {
indexPath: indexPath,
dataFile: dataFile,
dataPath: dataPath,
createdTS: sh.createdTS(),
writer: dataFile,
notify: make(chan struct{}, 1),
}
@@ -381,6 +388,10 @@ func (s *segment) Lookup(RO uint32) (l *lookupRes, err error) {
// highest possible entry in the index for this relative offset.
// it should be an exact match if there was no batching.
maxIFO := (RO - 1) * iw
// if the highest possible entry is bigger than the index itself bail.
if len(s.index) < int(maxIFO) {
return s.searchRO(RO)
}
maxRO, TS, dFO := readEntry(s.index[maxIFO:])

// found it!
@@ -519,12 +530,7 @@ func (s *segment) healthCheckPartialWrite() error {

/// prepare new data file
tmpDataPath := s.dataPath + ".temp"
err = createSegData(tmpDataPath)
if err != nil {
return err
}

tmpDataFile, err := os.OpenFile(tmpDataPath, os.O_RDWR|os.O_APPEND, 0666)
tmpDataFile, err := os.OpenFile(tmpDataPath, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0666)
if err != nil {
return err
}
@@ -569,7 +575,7 @@ func createSegIndex(path string, maxIndexEntries int) error {
}()

init := make([]byte, maxIndexEntries*iw)
writeEntry(init, 1, 0)
writeEntry(init, 1, headerSize)
_, err = f.Write(init)

return err
@@ -582,6 +588,12 @@ func createSegData(path string) error {
return err
}

sh := newSegHeader()
err = sh.write(f)
if err != nil {
return err
}

return f.Close()
}

@@ -0,0 +1,54 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package biglog

import (
"io"
"time"
)

const (
headerVersionPos = 0 // headerVersionPos uint8
headerLengthPos = 1 // headerLengthPos uint8
headerCreatedPos = 2 // headerCreatedPos uint32
headerSize = headerCreatedPos + 4 + 10 // 4 bytes of CreatedTS + 10 bytes reserved
)

func readSegHeader(r io.Reader) (segHeader, error) {
buf := make([]byte, headerSize)
_, err := r.Read(buf)
return segHeader(buf), err
}

func newSegHeader() segHeader {
buf := make([]byte, headerSize)
buf[headerVersionPos] = 1
buf[headerLengthPos] = headerSize
enc.PutUint32(buf[headerCreatedPos:headerCreatedPos+4], uint32(time.Now().Unix()))
return segHeader(buf)
}

type segHeader []byte

func (sh *segHeader) ver() uint8 {
return sh.bytes()[headerVersionPos]
}

func (sh *segHeader) len() uint8 {
return sh.bytes()[headerLengthPos]
}

func (sh *segHeader) createdTS() uint32 {
return enc.Uint32(sh.bytes()[headerCreatedPos : headerCreatedPos+4])
}

func (sh *segHeader) bytes() []byte {
return []byte(*sh)
}

func (sh *segHeader) write(w io.Writer) error {
_, err := w.Write(sh.bytes())
return err
}
@@ -37,7 +37,7 @@ func TestCreateSegment(t *testing.T) {
mustWrite("fifth", 1)

buf := make([]byte, 1000)
_, err = seg.ReadAt(buf, 0)
_, err = seg.ReadAt(buf, headerSize)
if err != io.EOF {
t.Error(err)
}
@@ -133,13 +133,19 @@ func TestHealthCheckPartialWrite(t *testing.T) {
_, err = seg.WriteN([]byte("data"), 1)
panicOn(err)

// internal direct write method to file
_, err = seg.write([]byte("bypassing the index update"))
panicOn(err)

_, err = seg.dataFile.Seek(0, 0)
panicOn(err)

sh, err := readSegHeader(seg.dataFile)
panicOn(err)

data, err := ioutil.ReadAll(seg.dataFile)
panicOn(err)

if string(data) != "sometestdatabypassing the index update" {
t.Fatalf("can not test HealthCheckPartialWrite, data: %s", data)
}
@@ -149,10 +155,12 @@ func TestHealthCheckPartialWrite(t *testing.T) {
t.Fatal(err)
}

_, err = seg.dataFile.Seek(0, 0)
_, err = seg.dataFile.Seek(int64(sh.len()), 0)
panicOn(err)

data, err = ioutil.ReadAll(seg.dataFile)
panicOn(err)

if string(data) != "sometestdata" {
t.Errorf("data file not corrected from partial write, data: %s", data)
}
@@ -5,10 +5,9 @@
package biglog

import (
"sync/atomic"
"testing"
"time"

"sync/atomic"
)

func TestNewWatcher(t *testing.T) {
@@ -10,11 +10,10 @@ import (
"net/http"

"comail.io/go/colog"
"golang.org/x/net/http2"

"github.com/ninibe/bigduration"
"github.com/ninibe/netlog"
"github.com/ninibe/netlog/transport"
"golang.org/x/net/http2"
)

var (
@@ -5,17 +5,15 @@ import (
"io/ioutil"
"net/http"
"os"
"testing"
"time"

"golang.org/x/net/http2"

"github.com/ninibe/bigduration"
"github.com/ninibe/netlog"
"github.com/ninibe/netlog/transport"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"testing"
"golang.org/x/net/http2"
)

func TestIntegration(t *testing.T) {
@@ -5,11 +5,11 @@
package netlog

import (
"context"
"hash/crc32"
"strconv"

"github.com/ninibe/netlog/biglog"
"golang.org/x/net/context"
)

// IntegrityErrorType is the category of possible errors in the data.
@@ -1,9 +1,8 @@
package netlog

import (
"context"
"testing"

"golang.org/x/net/context"
)

func TestCheckMessageIntegrity(t *testing.T) {
@@ -63,7 +63,7 @@ func MessageSet(msgs []Message, comp CompressionType) Message {
w = gzip.NewWriter(buf)

case CompressionSnappy:
w = snappy.NewWriter(buf)
w = snappy.NewBufferedWriter(buf)

default:
panic("invalid compression type")
@@ -90,7 +90,7 @@ func MessageSet(msgs []Message, comp CompressionType) Message {
// Message the unit of data storage.
type Message []byte

// CompVer returns the first byte which reflects both compression a format version.
// CompVer returns the first byte which reflects both compression and format version.
func (m *Message) CompVer() uint8 {
return m.Bytes()[compverPos]
}
@@ -13,7 +13,6 @@ import (
"time"

"github.com/ninibe/bigduration"

"github.com/ninibe/netlog/biglog"
)

@@ -5,6 +5,7 @@
package netlog

import (
"context"
"encoding/binary"
"fmt"
"io"
@@ -16,10 +17,8 @@ import (
"strings"
"time"

"github.com/ninibe/bigduration"
"golang.org/x/net/context"

"github.com/comail/go-uuid/uuid"
"github.com/ninibe/bigduration"
"github.com/ninibe/netlog/biglog"
)

Oops, something went wrong.

0 comments on commit 5d4656b

Please sign in to comment.
You can’t perform that action at this time.