Skip to content
Permalink
Browse files

netlog: make TopicScanner an interface

  • Loading branch information...
bictorman committed May 14, 2016
1 parent 1a6871d commit 17068ec0e838381ef467e859f95a29ed08c8f129
Showing with 59 additions and 49 deletions.
  1. +9 −10 topic.go
  2. +39 −28 topicscanner.go
  3. +10 −10 topicscanner_atomicmap.go
  4. +1 −1 transport/http_transport.go
@@ -12,7 +12,6 @@ import (
"strings"
"time"

"github.com/comail/go-uuid/uuid"
"github.com/ninibe/bigduration"
"golang.org/x/net/context"

@@ -24,7 +23,7 @@ const settingsFile = "settings.json"
var enc = binary.BigEndian

//go:generate atomicmapper -pointer -type Topic
//go:generate atomicmapper -pointer -type TopicScanner
//go:generate atomicmapper -type TopicScanner

// Topic is a log of linear messages.
type Topic struct {
@@ -256,34 +255,34 @@ func (t *Topic) Payload(offset int64) ([]byte, error) {
}

// CreateScanner creates a new scanner starting at offset `from`.
func (t *Topic) CreateScanner(from int64) (ts *TopicScanner, err error) {
func (t *Topic) CreateScanner(from int64) (ts TopicScanner, err error) {
defer func() {
if err != nil {
log.Printf("warn: failed to create scanner %s:%d err: %s", t.Name(), from, err)
}
}()

log.Printf("info: creating scanner from offset %d", from)
ts, err = NewTopicScanner(t.bl, from)
ts, err = NewBLTopicScanner(t.bl, from)
if ts == nil || err != nil {
return ts, ExtErr(err)
}

ts.ID = uuid.New()
t.scanners.Set(ts.ID, ts)
// register scanner in this topic
t.scanners.Set(ts.Info().ID, ts)

log.Printf("info: created scanner %s from %s:%d", ts.ID, t.Name(), from)
log.Printf("info: created scanner from %s:%d", t.Name(), from)
return ts, nil
}

// Scanner returns an existing scanner for the topic given and ID
// or ErrScannerNotFound if it doesn't exists.
func (t *Topic) Scanner(ID string) (bs *TopicScanner, err error) {
bs, ok := t.scanners.Get(ID)
func (t *Topic) Scanner(ID string) (ts TopicScanner, err error) {
ts, ok := t.scanners.Get(ID)
if !ok {
return nil, ErrScannerNotFound
}
return bs, nil
return ts, nil
}

// DeleteScanner removes the scanner from the topic
@@ -7,6 +7,7 @@ package netlog
import (
"sync"

"github.com/comail/go-uuid/uuid"
"golang.org/x/net/context"

"github.com/ninibe/netlog/biglog"
@@ -15,43 +16,50 @@ import (
// TopicScanner reads one by one over the messages in a topic
// blocking until new data is available for a period of time.
// TopicScanners are thread-safe.
type TopicScanner struct {
ID string
type TopicScanner interface {
Scan(ctx context.Context) (m Message, offset int64, err error)
Info() TScannerInfo
Close() error
}

// BLTopicScanner implements TopicScanner reading from BigLog.
type BLTopicScanner struct {
mu sync.RWMutex
ID string
start int64
offset int64
last int64
messages []Message

sc *biglog.Scanner
wc *biglog.Watcher
}

// NewTopicScanner returns a new topic scanner on a given BigLog from a given 'from' offset.
func NewTopicScanner(bl *biglog.BigLog, from int64) (ts *TopicScanner, err error) {
// NewBLTopicScanner returns a new topic scanner on a given BigLog from a given 'from' offset.
func NewBLTopicScanner(bl *biglog.BigLog, from int64) (bts *BLTopicScanner, err error) {
if bl == nil {
return nil, biglog.ErrInvalid
}

sc, err := biglog.NewScanner(bl, from)

ts = &TopicScanner{
start: from,
offset: -1,
sc: sc,
wc: biglog.NewWatcher(bl),
bts = &BLTopicScanner{
ID: uuid.New(),
start: from,
last: -1,
sc: sc,
wc: biglog.NewWatcher(bl),
}

// auto-scan forward if embedded offset
if err == biglog.ErrEmbeddedOffset {
err = ts.scanForward(from)
err = bts.scanForward(from)
}

return ts, err
return bts, err
}

// scans in a loop until the next offset is the target offset
func (ts *TopicScanner) scanForward(target int64) (err error) {
func (ts *BLTopicScanner) scanForward(target int64) (err error) {
ctx := context.Background()
var offset int64
for {
@@ -72,7 +80,7 @@ func (ts *TopicScanner) scanForward(target int64) (err error) {
// Scan advances the Scanner to the next message, returning the message and the offset.
// Scan will block when it reaches EOF until there is more data available,
// the user must provide a context to cancel the request when it needs to stop waiting.
func (ts *TopicScanner) Scan(ctx context.Context) (m Message, offset int64, err error) {
func (ts *BLTopicScanner) Scan(ctx context.Context) (m Message, offset int64, err error) {
ts.mu.Lock()
defer ts.mu.Unlock()

@@ -81,9 +89,9 @@ func (ts *TopicScanner) Scan(ctx context.Context) (m Message, offset int64, err
// from a set return one of those
if len(ts.messages) > 0 {
m = ts.messages[0]
offset = ts.offset
offset = ts.last

ts.offset++
ts.last++
ts.messages = ts.messages[1:]

return m, offset, nil
@@ -92,11 +100,11 @@ func (ts *TopicScanner) Scan(ctx context.Context) (m Message, offset int64, err
// scan a new entry
ok := ts.scan(ctx)
if ok {
ts.offset = ts.sc.Offset()
ts.last = ts.sc.Offset()

// if it's got only one message return it
if ts.sc.ODelta() == 1 {
return Message(ts.sc.Bytes()), ts.offset, nil
return Message(ts.sc.Bytes()), ts.last, nil
}

// unpack message-set into buffer
@@ -115,7 +123,7 @@ func (ts *TopicScanner) Scan(ctx context.Context) (m Message, offset int64, err
return nil, -1, ErrEndOfTopic
}

func (ts *TopicScanner) scan(ctx context.Context) bool {
func (ts *BLTopicScanner) scan(ctx context.Context) bool {
for {
ok := ts.sc.Scan()

@@ -140,24 +148,24 @@ func (ts *TopicScanner) scan(ctx context.Context) bool {
}
}

// Offset returns the offset of the last scanned message.
func (ts *TopicScanner) Offset() int64 {
// Last returns the offset of the last scanned message.
func (ts *BLTopicScanner) Last() int64 {
ts.mu.RLock()
defer ts.mu.RUnlock()

return ts.offset
return ts.last
}

// Start returns the offset of the first scanned message.
func (ts *TopicScanner) Start() int64 {
func (ts *BLTopicScanner) Start() int64 {
ts.mu.RLock()
defer ts.mu.RUnlock()

return ts.start
}

// Close implements io.Closer and releases the TopicScanner resources.
func (ts *TopicScanner) Close() error {
func (ts *BLTopicScanner) Close() error {
if err := ts.sc.Close(); err != nil {
return err
}
@@ -167,17 +175,20 @@ func (ts *TopicScanner) Close() error {

// TScannerInfo holds the scanner's offset information
type TScannerInfo struct {
Start int64 `json:"start"`
Last int64 `json:"last"`
ID string `json:"id"`
Start int64 `json:"start"`
Last int64 `json:"last"`
Persist bool `json:"persistent"`
}

// Info returns a TScannerInfo struct with the scanner's
// original starting offset and the last scanned one
func (ts *TopicScanner) Info() TScannerInfo {
func (ts *BLTopicScanner) Info() TScannerInfo {
ts.mu.RLock()
defer ts.mu.RUnlock()
return TScannerInfo{
ID: ts.ID,
Start: ts.start,
Last: ts.offset,
Last: ts.last,
}
}
@@ -1,5 +1,5 @@
// generated file - DO NOT EDIT
// command: atomicmapper -pointer -type TopicScanner
// command: atomicmapper -type TopicScanner

package netlog

@@ -8,13 +8,13 @@ import (
"sync/atomic"
)

// TopicScannerAtomicMap is a copy-on-write thread-safe map of pointers to TopicScanner
// TopicScannerAtomicMap is a copy-on-write thread-safe map of TopicScanner
type TopicScannerAtomicMap struct {
mu sync.Mutex
val atomic.Value
}

type _TopicScannerMap map[string]*TopicScanner
type _TopicScannerMap map[string]TopicScanner

// NewTopicScannerAtomicMap returns a new initialized TopicScannerAtomicMap
func NewTopicScannerAtomicMap() *TopicScannerAtomicMap {
@@ -23,16 +23,16 @@ func NewTopicScannerAtomicMap() *TopicScannerAtomicMap {
return am
}

// Get returns a pointer to TopicScanner for a given key
func (am *TopicScannerAtomicMap) Get(key string) (value *TopicScanner, ok bool) {
// Get returns a TopicScanner for a given key
func (am *TopicScannerAtomicMap) Get(key string) (value TopicScanner, ok bool) {
value, ok = am.val.Load().(_TopicScannerMap)[key]
return value, ok
}

// GetAll returns the underlying map of pointers to TopicScanner
// GetAll returns the underlying map of TopicScanner
// this map must NOT be modified, to change the map safely use the Set and Delete
// functions and Get the value again
func (am *TopicScannerAtomicMap) GetAll() map[string]*TopicScanner {
func (am *TopicScannerAtomicMap) GetAll() map[string]TopicScanner {
return am.val.Load().(_TopicScannerMap)
}

@@ -41,8 +41,8 @@ func (am *TopicScannerAtomicMap) Len() int {
return len(am.val.Load().(_TopicScannerMap))
}

// Set inserts in the map a pointer to TopicScanner under a given key
func (am *TopicScannerAtomicMap) Set(key string, value *TopicScanner) {
// Set inserts in the map a TopicScanner under a given key
func (am *TopicScannerAtomicMap) Set(key string, value TopicScanner) {
am.mu.Lock()
defer am.mu.Unlock()

@@ -57,7 +57,7 @@ func (am *TopicScannerAtomicMap) Set(key string, value *TopicScanner) {
return
}

// Delete removes the pointer to TopicScanner under key from the map
// Delete removes the TopicScanner under key from the map
func (am *TopicScannerAtomicMap) Delete(key string) {
am.mu.Lock()
defer am.mu.Unlock()
@@ -265,7 +265,7 @@ func (ht *HTTPTransport) handleCreateScanner(w http.ResponseWriter, r *http.Requ
}

w.WriteHeader(http.StatusCreated)
JSONResponse(w, IDMsg{ID: ts.ID})
JSONResponse(w, ts.Info())
}

func (ht *HTTPTransport) handleDeleteScanner(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {

0 comments on commit 17068ec

Please sign in to comment.
You can’t perform that action at this time.