Skip to content

Commit

Permalink
feat: support in-memory storage
Browse files Browse the repository at this point in the history
  • Loading branch information
shaj13 committed Feb 6, 2023
1 parent 9e9ba0d commit 5d95c93
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 11 deletions.
102 changes: 102 additions & 0 deletions internal/storage/inmemory/inmemory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package inmemory

import (
"bytes"
"errors"
"fmt"
"io"
"sync"

"github.com/shaj13/raft/internal/raftpb"
"github.com/shaj13/raft/internal/storage"
etcdraftpb "go.etcd.io/etcd/raft/v3/raftpb"
)

// New creates an in-memory storage backed by an bytes.Buffer.
func New() storage.Storage {
return &inmemory{
buf: bytes.NewBuffer(nil),
}
}

type inmemory struct {
mu sync.Mutex
state raftpb.SnapshotState
buf *bytes.Buffer
}

func (mem *inmemory) Writer(uint64, uint64) (io.WriteCloser, error) {
mem.mu.Lock()
mem.buf.Reset()

return &inmemoryWriter{
Buffer: mem.buf,
fn: mem.mu.Unlock, // Unlock memory on close.
}, nil
}

func (mem *inmemory) Write(s *storage.Snapshot) error {
mem.mu.Lock()
defer mem.mu.Unlock()

mem.buf.Reset()
_, err := io.Copy(mem.buf, s.Data)
return err
}

func (mem *inmemory) Reader(term uint64, index uint64) (io.ReadCloser, error) {
s, err := mem.Read(term, index)
if err != nil {
return nil, err
}

return s.Data, nil
}

func (mem *inmemory) Read(_ uint64, index uint64) (*storage.Snapshot, error) {
mem.mu.Lock()
defer mem.mu.Unlock()

if mem.state.Raw.Metadata.Index != index {
return nil, fmt.Errorf("raft: snapshot %d not found", index)
}

buf := bytes.NewBuffer(nil)
buf.Write(mem.buf.Bytes())

return &storage.Snapshot{
SnapshotState: mem.state,
Data: io.NopCloser(buf),
}, nil
}

func (mem *inmemory) SaveSnapshot(snap etcdraftpb.Snapshot) error {
mem.mu.Lock()
defer mem.mu.Unlock()

mem.state.Raw = snap
return nil
}

func (mem *inmemory) Boot(meta []byte) ([]byte, etcdraftpb.HardState, []etcdraftpb.Entry, *storage.Snapshot, error) {
return meta, etcdraftpb.HardState{}, nil, nil, nil
}

func (mem *inmemory) ReadFrom(string) (*storage.Snapshot, error) {
return nil, errors.New("raft: in-memory snapshotter operation not supported")
}

func (mem *inmemory) Snapshotter() storage.Snapshotter { return mem }
func (mem *inmemory) SaveEntries(etcdraftpb.HardState, []etcdraftpb.Entry) (err error) { return }
func (mem *inmemory) Exist() (ok bool) { return }
func (mem *inmemory) Close() (err error) { return }

type inmemoryWriter struct {
*bytes.Buffer
fn func()
}

func (w *inmemoryWriter) Close() error {
w.fn()
return nil
}
7 changes: 3 additions & 4 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/shaj13/raft/internal/raftengine"
"github.com/shaj13/raft/internal/raftpb"
"github.com/shaj13/raft/internal/storage"
"github.com/shaj13/raft/internal/storage/disk"
"github.com/shaj13/raft/internal/transport"
etransport "github.com/shaj13/raft/transport"
etcdraftpb "go.etcd.io/etcd/raft/v3/raftpb"
Expand All @@ -36,10 +35,10 @@ func NewNode(fsm StateMachine, proto etransport.Proto, opts ...Option) *Node {

newHandler, dialer := transport.Proto(proto).Get()
ctrl := new(controller)
// TODO(shaj): find another mechanism for DI.
cfg := newConfig(opts...)
cfg.fsm = fsm
cfg.controller = ctrl
cfg.storage = disk.New(cfg)
cfg.dial = dialer(cfg)
cfg.pool = membership.New(cfg)
cfg.engine = raftengine.New(cfg)
Expand Down Expand Up @@ -143,8 +142,8 @@ func (ng *NodeGroup) Create(groupID uint64, fsm StateMachine, opts ...Option) *N
// after the removal, the actual node will become idle,
// it must coordinate with node shutdown explicitly.
//
// nodeGroup.Remove(12)
// node.Shutdown(ctx)
// nodeGroup.Remove(12)
// node.Shutdown(ctx)
func (ng *NodeGroup) Remove(groupID uint64) {
ng.router.remove(groupID)
}
Expand Down
32 changes: 25 additions & 7 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/shaj13/raft/internal/raftengine"
"github.com/shaj13/raft/internal/raftpb"
"github.com/shaj13/raft/internal/storage"
"github.com/shaj13/raft/internal/storage/disk"
"github.com/shaj13/raft/internal/storage/inmemory"
"github.com/shaj13/raft/internal/transport"
"github.com/shaj13/raft/raftlog"
"go.etcd.io/etcd/raft/v3"
Expand Down Expand Up @@ -82,6 +84,13 @@ func (fn optionFunc) apply(c *config) {
fn(c)
}

// TODO(shaj): add docs and make it expermintal.
func WithInMemoryStoage() Option {
return optionFunc(func(c *config) {
c.storage = inmemory.New()
})
}

// WithLinearizableReadSafe guarantees the linearizability of the read request by
// communicating with the quorum. It is the default and suggested option.
func WithLinearizableReadSafe() Option {
Expand Down Expand Up @@ -173,7 +182,9 @@ func WithElectionTick(tick int) Option {
}

// WithHeartbeatTick is the number of node tick (WithTickInterval) invocations that
// must pass between heartbeats. That is, a leader sends heartbeat messages to
//
// must pass between heartbeats. That is, a leader sends heartbeat messages to
//
// maintain its leadership every HeartbeatTick ticks.
//
// Default Value: 1.
Expand Down Expand Up @@ -364,11 +375,11 @@ func WithRestart() StartOption {
// WithMembers and WithInitCluster must be applied to all cluster nodes when they are composed,
// Otherwise, the quorum will be lost and the cluster become unavailable.
//
// Node A:
// n.Start(WithInitCluster(), WithMembers(<node A>, <node B>))
// Node A:
// n.Start(WithInitCluster(), WithMembers(<node A>, <node B>))
//
// Node B:
// n.Start(WithInitCluster(), WithMembers(<node B>, <node A>))
// Node B:
// n.Start(WithInitCluster(), WithMembers(<node B>, <node A>))
//
// Note: first member will be assigned to the current node.
func WithMembers(membs ...RawMember) StartOption {
Expand All @@ -387,11 +398,10 @@ func WithAddress(addr string) StartOption {

// WithFallback can be used if other options do not succeed.
//
// WithFallback(
// WithFallback(
// WithJoin(),
// WithRestart,
// )
//
func WithFallback(opts ...StartOption) StartOption {
return startOptionFunc(func(c *startConfig) {
// create new startConfig annd apply all opts,
Expand Down Expand Up @@ -536,6 +546,14 @@ func newConfig(opts ...Option) *config {
pipelining: false,
}

// Workaround: to bind storage passed on cfg.
opts = append(opts, optionFunc(func(c *config) {
if c.storage != nil {
return
}
c.storage = disk.New(c)
}))

for _, opt := range opts {
opt.apply(c)
}
Expand Down

0 comments on commit 5d95c93

Please sign in to comment.