Skip to content

Commit

Permalink
add api to append messages
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Oct 23, 2016
1 parent b7d1367 commit cf4038f
Show file tree
Hide file tree
Showing 14 changed files with 246 additions and 196 deletions.
15 changes: 0 additions & 15 deletions cluster/broker.go

This file was deleted.

44 changes: 0 additions & 44 deletions cluster/controller.go

This file was deleted.

17 changes: 0 additions & 17 deletions cluster/partition.go

This file was deleted.

43 changes: 43 additions & 0 deletions cluster/topic_partition.go
@@ -0,0 +1,43 @@
package cluster

import (
"fmt"
"path"

"github.com/travisjeffery/jocko/commitlog"
)

type TopicPartition struct {
Topic string `json:"topic"`
Partition int `json:"partition"`

// broker ids
Replicas []string `json:"replicas"`
Leader string `json:"leader"`
PreferredLeader string `json:"preferred_leader"`

CommitLog *commitlog.CommitLog `json:"-"`
}

func (p TopicPartition) String() string {
return fmt.Sprintf("%s-%d", p.Topic, p.Partition)
}

// OpenCommitLog opens a commit log for the partition at the path.
func (partition *TopicPartition) OpenCommitLog(logDir string) error {
var err error
partition.CommitLog, err = commitlog.New(commitlog.Options{
Path: path.Join(logDir, partition.String()),
SegmentBytes: 1024,
})
if err != nil {
return err
}
if err = partition.CommitLog.Init(); err != nil {
return err
}
if err = partition.CommitLog.Open(); err != nil {
return err
}
return nil
}
44 changes: 31 additions & 13 deletions commitlog/commitlog.go
Expand Up @@ -6,15 +6,28 @@ import (
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/pkg/errors"
)

type Message struct {
Offset int64 `json:"offset"`
Value []byte `json:"value"`
Timestamp time.Time `json:"timestamp"`
}

type MessageSet struct {
Offset int64 `json:"offset"`
MessageSize int32 `json:"message_size"`
Payload []byte `json:"payload"`
}

type CommitLog struct {
Options
name string
mu sync.RWMutex
segments []*segment
segments []*Segment
vActiveSegment atomic.Value
}

Expand All @@ -41,15 +54,15 @@ func New(opts Options) (*CommitLog, error) {
return l, nil
}

func (l *CommitLog) init() error {
func (l *CommitLog) Init() error {
err := os.MkdirAll(l.Path, 0755)
if err != nil {
return errors.Wrap(err, "mkdir failed")
}
return nil
}

func (l *CommitLog) open() error {
func (l *CommitLog) Open() error {
_, err := ioutil.ReadDir(l.Path)
if err != nil {
return errors.Wrap(err, "read dir failed")
Expand All @@ -66,21 +79,26 @@ func (l *CommitLog) open() error {
return nil
}

func (l *CommitLog) deleteAll() error {
func (l *CommitLog) DeleteAll() error {
return os.RemoveAll(l.Path)
}

func (l *CommitLog) Write(p []byte) (n int, err error) {
func (l *CommitLog) Append(m MessageSet) error {
l.mu.Lock()
defer l.mu.Unlock()

if l.checkSplit() {
if err = l.split(); err != nil {
return 0, err
if err := l.split(); err != nil {
return err
}
}

return l.activeSegment().Write(p)
position := l.activeSegment().Position
if _, err := l.activeSegment().Write(m.Payload); err != nil {
return err
}
return l.activeSegment().Index.WriteEntry(Entry{
Offset: m.Offset,
Position: position,
})
}

func (l *CommitLog) Read(p []byte) (n int, err error) {
Expand All @@ -104,9 +122,9 @@ func (l *CommitLog) split() error {
}

func (l *CommitLog) newestOffset() int64 {
return l.activeSegment().NextOffset()
return l.activeSegment().NextOffset
}

func (l *CommitLog) activeSegment() *segment {
return l.vActiveSegment.Load().(*segment)
func (l *CommitLog) activeSegment() *Segment {
return l.vActiveSegment.Load().(*Segment)
}
32 changes: 24 additions & 8 deletions commitlog/commitlog_test.go
Expand Up @@ -8,43 +8,59 @@ import (
"os"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"
)

func TestNewCommitLog(t *testing.T) {
path := filepath.Join(os.TempDir(), fmt.Sprintf("commitlogtest%d", rand.Int63()))
fmt.Println(path)
os.RemoveAll(path)
os.MkdirAll(path, 0755)

opts := Options{
Path: path,
SegmentBytes: 6,
}
l, err := New(opts)

// remove old data
l.deleteAll()
assert.NoError(t, l.DeleteAll())

l.init()
l.open()
assert.NoError(t, l.Init())
assert.NoError(t, l.Open())

if err != nil {
t.Fatal(err)
}

_, err = l.Write([]byte("one"))
err = l.Append(MessageSet{
Offset: 0,
Payload: []byte("one"),
})
if err != nil {
t.Error(err)
}

_, err = l.Write([]byte("two"))
err = l.Append(MessageSet{
Offset: 1,
Payload: []byte("two"),
})
if err != nil {
t.Error(err)
}

_, err = l.Write([]byte("three"))
err = l.Append(MessageSet{
Offset: 2,
Payload: []byte("three"),
})
if err != nil {
t.Error(err)
}

_, err = l.Write([]byte("four"))
err = l.Append(MessageSet{
Offset: 3,
Payload: []byte("four"),
})
if err != nil {
t.Error(err)
}
Expand Down
18 changes: 10 additions & 8 deletions commitlog/index.go
Expand Up @@ -27,25 +27,25 @@ type index struct {
offset int64
}

type entry struct {
type Entry struct {
Offset int64
Position int64
}

// relEntry is an entry relative to the base offset
// relEntry is an Entry relative to the base offset
type relEntry struct {
Offset int8
Position int8
}

func newRelEntry(e entry, baseOffset int64) relEntry {
func newRelEntry(e Entry, baseOffset int64) relEntry {
return relEntry{
Offset: int8(e.Offset - baseOffset),
Position: int8(e.Position),
}
}

func (rel relEntry) fill(e *entry, baseOffset int64) {
func (rel relEntry) fill(e *Entry, baseOffset int64) {
e.Offset = baseOffset + int64(rel.Offset)
e.Position = int64(rel.Position)
}
Expand All @@ -68,6 +68,7 @@ func newIndex(opts options) (idx *index, err error) {
idx = &index{
options: opts,
}

idx.file, err = os.OpenFile(opts.path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
return nil, errors.Wrap(err, "open file failed")
Expand All @@ -83,16 +84,17 @@ func newIndex(opts options) (idx *index, err error) {
if err != nil {
return nil, errors.Wrap(err, "file truncate failed")
}
} else {
idx.offset = size
}
idx.offset = size
idx.mmap, err = gommap.Map(idx.file.Fd(), gommap.PROT_READ|gommap.PROT_WRITE, gommap.MAP_SHARED)
if err != nil {
return nil, errors.Wrap(err, "mmap file failed")
}
return idx, nil
}

func (idx *index) WriteEntry(entry entry) (err error) {
func (idx *index) WriteEntry(entry Entry) (err error) {
b := new(bytes.Buffer)
relEntry := newRelEntry(entry, idx.baseOffset)
if err = binary.Write(b, binary.BigEndian, relEntry); err != nil {
Expand All @@ -105,7 +107,7 @@ func (idx *index) WriteEntry(entry entry) (err error) {
return nil
}

func (idx *index) ReadEntry(e *entry, offset int64) error {
func (idx *index) ReadEntry(e *Entry, offset int64) error {
p := make([]byte, entryWidth)
copy(p, idx.mmap[offset:offset+entryWidth])
b := bytes.NewReader(p)
Expand All @@ -119,7 +121,7 @@ func (idx *index) ReadEntry(e *entry, offset int64) error {
}

func (idx *index) ReadAt(p []byte, offset int64) (n int, err error) {
n = copy(idx.mmap[idx.offset:idx.offset+entryWidth], []byte("hellohellomanman"))
n = copy(idx.mmap[idx.offset:idx.offset+entryWidth], p)
return n, nil
}

Expand Down
6 changes: 3 additions & 3 deletions commitlog/index_test.go
Expand Up @@ -23,9 +23,9 @@ func TestIndex(t *testing.T) {
if err != nil {
t.Fatal(err)
}
entries := []entry{}
entries := []Entry{}
for i := 0; i < 3; i++ {
entries = append(entries, entry{
entries = append(entries, Entry{
int64(i),
int64(i * 5),
})
Expand All @@ -38,7 +38,7 @@ func TestIndex(t *testing.T) {
if err = idx.Sync(); err != nil {
t.Fatal(err)
}
act := &entry{}
act := &Entry{}
for i, exp := range entries {
if err = idx.ReadEntry(act, int64(i*entryWidth)); err != nil {
t.Fatal(err)
Expand Down
4 changes: 2 additions & 2 deletions commitlog/reader.go
Expand Up @@ -8,8 +8,8 @@ import (
)

type Reader struct {
segment *segment
segments []*segment
segment *Segment
segments []*Segment
idx int
mu sync.Mutex
offset int64
Expand Down

0 comments on commit cf4038f

Please sign in to comment.