Skip to content

Commit

Permalink
Add commit log
Browse files Browse the repository at this point in the history
  • Loading branch information
kelindar committed Dec 22, 2021
1 parent 17a5451 commit 78b7be4
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 62 deletions.
6 changes: 3 additions & 3 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ type Collection struct {
fill bitmap.Bitmap // The fill-list
opts Options // The options configured
codec codec // The compression codec
writer commit.Writer // The commit writer
logger commit.Logger // The commit writer
pk *columnKey // The primary key column
cancel context.CancelFunc // The cancellation function for the context
}

// Options represents the options for a collection.
type Options struct {
Capacity int // The initial capacity when creating columns
Writer commit.Writer // The writer for the commit log (optional)
Writer commit.Logger // The writer for the commit log (optional)
Vacuum time.Duration // The interval at which the vacuum of expired entries will be done
}

Expand Down Expand Up @@ -76,7 +76,7 @@ func NewCollection(opts ...Options) *Collection {
opts: options,
slock: new(smutex.SMutex128),
fill: make(bitmap.Bitmap, 0, options.Capacity>>6),
writer: options.Writer,
logger: options.Writer,
codec: newCodec(&options),
cancel: cancel,
}
Expand Down
6 changes: 3 additions & 3 deletions commit/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func TestPutBitmap(t *testing.T) {
assert.Equal(t, Insert, r.Type)
}

func TestWriteTo(t *testing.T) {
func TestBufferWriteTo(t *testing.T) {
input := NewBuffer(0)
input.Column = "test"
input.PutInt16(Put, 10, 100)
Expand All @@ -266,7 +266,7 @@ func TestWriteTo(t *testing.T) {
assert.Equal(t, input, output)
}

func TestWriteToFailures(t *testing.T) {
func TestBufferWriteToFailures(t *testing.T) {
buf := NewBuffer(0)
buf.Column = "test"
buf.PutInt16(Put, 10, 100)
Expand All @@ -279,7 +279,7 @@ func TestWriteToFailures(t *testing.T) {
}
}

func TestReadFromFailures(t *testing.T) {
func TestBufferReadFromFailures(t *testing.T) {
input := NewBuffer(0)
input.Column = "test"
input.PutInt16(Put, 10, 100)
Expand Down
47 changes: 0 additions & 47 deletions commit/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@ package commit

import (
"io"
"sync"
"sync/atomic"
"time"

"github.com/kelindar/bitmap"
"github.com/kelindar/iostream"
"github.com/klauspost/compress/s2"
)

// --------------------------- ID ----------------------------
Expand Down Expand Up @@ -74,11 +72,6 @@ func min(v1, v2 int32) int32 {

// --------------------------- Commit ----------------------------

// Writer represents a contract that a commit writer must implement
type Writer interface {
Write(commit Commit) error
}

// Commit represents an individual transaction commit. If multiple chunks are committed
// in the same transaction, it would result in multiple commits per transaction.
type Commit struct {
Expand Down Expand Up @@ -213,43 +206,3 @@ func (c *Commit) ReadFrom(src io.Reader) (int64, error) {

return r.Offset(), nil
}

// --------------------------- Channel ----------------------------

var _ Writer = new(Channel)

// Channel represents an impementation of a commit writer that simply sends each commit
// into the channel.
type Channel chan Commit

// Write clones the commit and writes it into the writer
func (w *Channel) Write(commit Commit) error {
*w <- commit.Clone()
return nil
}

// --------------------------- File ----------------------------

// Recorder represents a commit recorder that can be used to write
// the changes to the collection during a snapshot.
type Recorder struct {
lock sync.Mutex
writer *iostream.Writer
}

func NewRecorder(dst io.Writer) *Recorder {
return &Recorder{
writer: iostream.NewWriter(s2.NewWriter(dst)),
}
}

func (w *Recorder) Write(commit Commit) error {
w.lock.Lock()
defer w.lock.Unlock()

if _, err := commit.WriteTo(w.writer); err != nil {
return err
}

return w.writer.Close()
}
6 changes: 5 additions & 1 deletion commit/commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestCommitClone(t *testing.T) {

func TestWriterChannel(t *testing.T) {
w := make(Channel, 1)
w.Write(Commit{
w.Append(Commit{
Chunk: 123,
})

Expand Down Expand Up @@ -213,3 +213,7 @@ func (w *limitWriter) Write(p []byte) (int, error) {
}
return len(p), nil
}

func (w *limitWriter) Read(p []byte) (int, error) {
return 0, io.EOF
}
86 changes: 86 additions & 0 deletions commit/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) Roman Atachiants and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.

package commit

import (
"io"
"sync"

"github.com/kelindar/iostream"
"github.com/klauspost/compress/s2"
)

// Logger represents a contract that a commit logger must implement
type Logger interface {
Append(commit Commit) error
}

var _ Logger = new(Channel)
var _ Logger = new(Log)

// --------------------------- Channel ----------------------------

// Channel represents an impementation of a commit writer that simply sends each commit
// into the channel.
type Channel chan Commit

// Append clones the commit and writes it into the logger
func (w *Channel) Append(commit Commit) error {
*w <- commit.Clone()
return nil
}

// --------------------------- Log ----------------------------

// Log represents a commit log that can be used to write the changes to the collection
// during a snapshot. It also supports reading a commit log back.
type Log struct {
lock sync.Mutex
writer *iostream.Writer
reader *iostream.Reader
}

// Open opens a commit log file (or stream) for both read and write.
func Open(file io.ReadWriter) *Log {
return &Log{
writer: iostream.NewWriter(s2.NewWriter(file)),
reader: iostream.NewReader(s2.NewReader(file)),
}
}

// Append writes the commit into the log destination
func (l *Log) Append(commit Commit) (err error) {
l.lock.Lock()
defer l.lock.Unlock()

// Write the commit into the stream
if _, err = commit.WriteTo(l.writer); err == nil {
err = l.writer.Flush()
}
return
}

// Range iterates over all the commits in the log and calls the provided
// callback function on each of them. If the callback returns an error, the
// iteration will stop.
func (l *Log) Range(fn func(Commit) error) error {
l.lock.Lock()
defer l.lock.Unlock()

for {
var commit Commit
_, err := commit.ReadFrom(l.reader)
switch {
case err == io.EOF:
return nil
case err != nil:
return err
}

// Read the commit
if err := fn(commit); err != nil {
return err
}
}
}
97 changes: 97 additions & 0 deletions commit/log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright (c) Roman Atachiants and contributors. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for details.

package commit

import (
"bytes"
"fmt"
"io"
"testing"

"github.com/stretchr/testify/assert"
)

// --------------------------- Commit Encoding ----------------------------

func TestCommitWriteToFailures(t *testing.T) {
for size := 0; size < 30; size++ {
output := &limitWriter{Limit: size}
commit := newCommit(1)
_, err := commit.WriteTo(output)
assert.Error(t, err)
}
}

func TestCommitReadFromFailures(t *testing.T) {
commit := newCommit(1)
buffer := bytes.NewBuffer(nil)
n, err := commit.WriteTo(buffer)
assert.NoError(t, err)

for size := 0; size < int(n)-1; size++ {
output := new(Commit)
_, err := output.ReadFrom(bytes.NewReader(buffer.Bytes()[:size]))
assert.Error(t, err)
}
}

func newCommit(id int) Commit {
return Commit{
ID: uint64(id),
Chunk: 0,
Updates: []*Buffer{
newInterleaved("a"),
newInterleaved("b"),
},
}
}

// --------------------------- Log Operations ----------------------------

func TestLogAppendRange(t *testing.T) {
buffer := bytes.NewBuffer(nil)
logger := Open(buffer)

assert.NoError(t, logger.Append(newCommit(1)))
assert.NoError(t, logger.Append(newCommit(2)))

var arr []uint64
assert.NoError(t, logger.Range(func(commit Commit) error {
arr = append(arr, commit.ID)
return nil
}))

assert.Equal(t, []uint64{1, 2}, arr)
}

func TestLogRangeFailures(t *testing.T) {
buffer := bytes.NewBuffer(nil)
logger := Open(buffer)
assert.NoError(t, logger.Append(newCommit(1)))
assert.NoError(t, logger.Append(newCommit(2)))

n := buffer.Len()
for size := 0; size < int(n)-1; size++ {
tmp := bytes.NewBuffer(buffer.Bytes()[:size])
out := Open(tmp)

count := 0
out.Range(func(commit Commit) error {
count++
return nil
})
assert.Less(t, count, 2, fmt.Sprintf("size=%v", size))
}
}

func TestLogRangeStopOnError(t *testing.T) {
buffer := bytes.NewBuffer(nil)
logger := Open(buffer)
assert.NoError(t, logger.Append(newCommit(1)))
assert.NoError(t, logger.Append(newCommit(2)))

assert.Error(t, logger.Range(func(commit Commit) error {
return io.ErrClosedPipe
}))
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.17
require (
github.com/kelindar/bitmap v1.1.4
github.com/kelindar/intmap v1.1.0
github.com/kelindar/iostream v1.2.1
github.com/kelindar/iostream v1.3.0
github.com/kelindar/smutex v1.0.0
github.com/klauspost/compress v1.13.6
github.com/stretchr/testify v1.7.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ github.com/kelindar/bitmap v1.1.4 h1:rNwZ6RMRhrE3Um0QqBwFoJAcAzYi/4M7XGDCZYS6TOU
github.com/kelindar/bitmap v1.1.4/go.mod h1:shAFyS8BOif+pvJ05GqxnCM0SdohHQjKvDetqI/9z6M=
github.com/kelindar/intmap v1.1.0 h1:S+YEDvw5FQus5UJDEG+xsLp8il3BTYqBMkkuVVZPMH8=
github.com/kelindar/intmap v1.1.0/go.mod h1:tDanawPWq1B0HC+X3W8Z6IKNrJqxjruy6CdyTlf6Nic=
github.com/kelindar/iostream v1.2.1 h1:4hfRJHFiRIzkh00EKNgPWj11nS62v9ZA8Intx5htij8=
github.com/kelindar/iostream v1.2.1/go.mod h1:MkjMuVb6zGdPQVdwLnFRO0xOTOdDvBWTztFmjRDQkXk=
github.com/kelindar/iostream v1.3.0 h1:Bz2qQabipZlF1XCk64bnxsGLete+iHtayGPeWVpbwbo=
github.com/kelindar/iostream v1.3.0/go.mod h1:MkjMuVb6zGdPQVdwLnFRO0xOTOdDvBWTztFmjRDQkXk=
github.com/kelindar/smutex v1.0.0 h1:+LIZYwPz+v3IWPOse764fNaVQGMVxKV6mbD6OWjQV3o=
github.com/kelindar/smutex v1.0.0/go.mod h1:nMbCZeAHWCsY9Kt4JqX7ETd+NJeR6Swy9im+Th+qUZQ=
github.com/kelindar/xxrand v1.0.1 h1:TG9Ix5h3ulBXVWwRUF8ePXl65FjIj48CzsgZw0nHvfY=
Expand Down
2 changes: 1 addition & 1 deletion snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ type noopWriter struct {
}

// Write clones the commit and writes it into the writer
func (w *noopWriter) Write(commit commit.Commit) error {
func (w *noopWriter) Append(commit commit.Commit) error {
atomic.AddUint64(&w.commits, 1)
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newTxnPool() *txnPool {
func (p *txnPool) acquire(owner *Collection) *Txn {
txn := p.txns.Get().(*Txn)
txn.owner = owner
txn.writer = owner.writer
txn.logger = owner.logger
txn.index.Grow(uint32(owner.opts.Capacity))
owner.fill.Clone(&txn.index)
return txn
Expand Down Expand Up @@ -83,7 +83,7 @@ type Txn struct {
dirty bitmap.Bitmap // The dirty chunks
updates []*commit.Buffer // The update buffers
columns []columnCache // The column mapping
writer commit.Writer // The optional commit writer
logger commit.Logger // The optional commit logger
reader *commit.Reader // The commit reader to re-use
}

Expand Down Expand Up @@ -447,8 +447,8 @@ func (txn *Txn) commit() {
updated := txn.commitUpdates(chunk)

// Write the commited chunk to the writer (if any)
if (changedRows || updated) && txn.writer != nil {
txn.writer.Write(commit.Commit{
if (changedRows || updated) && txn.logger != nil {
txn.logger.Append(commit.Commit{
ID: id,
Chunk: chunk,
Updates: txn.updates,
Expand Down

0 comments on commit 78b7be4

Please sign in to comment.