Skip to content

Commit

Permalink
add store
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Oct 19, 2016
1 parent f0c4e52 commit 4b0aa1d
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 81 deletions.
40 changes: 40 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package main

import (
"fmt"
"os"
"time"

gracefully "github.com/tj/go-gracefully"
"github.com/travisjeffery/jocko/server"
"github.com/travisjeffery/jocko/store"
kingpin "gopkg.in/alecthomas/kingpin.v2"
)

var (
logDir = kingpin.Flag("logdir", "A comma separated list of directories under which to store log files").Default("/tmp/jocko").String()
httpAddr = kingpin.Flag("httpaddr", "HTTP Address to listen on").String()
raftDir = kingpin.Flag("raftdir", "Directory for raft to store data").String()
raftAddr = kingpin.Flag("raftaddr", "Address for Raft to bind on").String()
)

func main() {
kingpin.Parse()

store := store.New(*raftDir, *raftAddr)
if err := store.Open(); err != nil {
fmt.Fprintf(os.Stderr, "Error opening raft store: %s\n", err)
os.Exit(1)
}

server := server.New(*httpAddr, store)
if err := server.Start(); err != nil {
fmt.Fprintf(os.Stderr, "Error starting server: %s\n", err)
os.Exit(1)
}

gracefully.Timeout = 10 * time.Second
gracefully.Shutdown()

server.Close()
}
77 changes: 6 additions & 71 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,23 @@ package server

import (
"encoding/json"
"io"
"log"
"net"
"net/http"
"strings"
)

type Store interface {
// Get returns the value for the given key.
Get(key string) (string, error)
Get(key []byte) ([]byte, error)

// Set sets the value for the given key, via distributed consensus.
Set(key, value string) error
Set(key, value []byte) error

// Delete removes the given key, via distributed consensus.
Delete(key string) error
Delete(key []byte) error

// Join joins the node, reachable at addr, to the cluster.
Join(addr string) error
Join(addr []byte) error
}

type Server struct {
Expand Down Expand Up @@ -69,9 +67,7 @@ func (s *Server) Close() {

// ServeHTTP allows Server to serve HTTP requests.
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.URL.Path, "/key") {
s.handleKeyRequest(w, r)
} else if r.URL.Path == "/join" {
if r.URL.Path == "/join" {
s.handleJoin(w, r)
} else {
w.WriteHeader(http.StatusNotFound)
Expand All @@ -96,73 +92,12 @@ func (s *Server) handleJoin(w http.ResponseWriter, r *http.Request) {
return
}

if err := s.store.Join(remoteAddr); err != nil {
if err := s.store.Join([]byte(remoteAddr)); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}

func (s *Server) handleKeyRequest(w http.ResponseWriter, r *http.Request) {
getKey := func() string {
parts := strings.Split(r.URL.Path, "/")
if len(parts) != 3 {
return ""
}
return parts[2]
}

switch r.Method {
case "GET":
k := getKey()
if k == "" {
w.WriteHeader(http.StatusBadRequest)
}
v, err := s.store.Get(k)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

b, err := json.Marshal(map[string]string{k: v})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}

io.WriteString(w, string(b))

case "POST":
// Read the value from the POST body.
m := map[string]string{}
if err := json.NewDecoder(r.Body).Decode(&m); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
for k, v := range m {
if err := s.store.Set(k, v); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}

case "DELETE":
k := getKey()
if k == "" {
w.WriteHeader(http.StatusBadRequest)
return
}
if err := s.store.Delete(k); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
s.store.Delete(k)

default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
return
}

// Addr returns the address on which the Server is listening
func (s *Server) Addr() net.Addr {
return s.ln.Addr()
Expand Down
20 changes: 10 additions & 10 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestNewServer(t *testing.T) {
b = doGet(t, s.URL(), "k1")
assert.Equal(t, `{"k1":"v1"}`, string(b))

store.m["k2"] = "v2"
store.m["k2"] = []byte("v2")
b = doGet(t, s.URL(), "k2")
assert.Equal(t, `{"k2":"v2"}`, string(b))

Expand All @@ -46,30 +46,30 @@ func (t *testServer) URL() string {
}

type testStore struct {
m map[string]string
m map[string][]byte
}

func newTestStore() *testStore {
return &testStore{
m: make(map[string]string),
m: make(map[string][]byte),
}
}

func (t *testStore) Get(key string) (string, error) {
return t.m[key], nil
func (t *testStore) Get(key []byte) ([]byte, error) {
return t.m[string(key)], nil
}

func (t *testStore) Set(key, value string) error {
t.m[key] = value
func (t *testStore) Set(key, value []byte) error {
t.m[string(key)] = value
return nil
}

func (t *testStore) Delete(key string) error {
delete(t.m, key)
func (t *testStore) Delete(key []byte) error {
delete(t.m, string(key))
return nil
}

func (t *testStore) Join(addr string) error {
func (t *testStore) Join(addr []byte) error {
return nil
}

Expand Down
136 changes: 136 additions & 0 deletions store/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package store

import (
"fmt"
"io"
"net"
"os"
"path/filepath"
"sync"
"time"

msgpack "gopkg.in/vmihailenco/msgpack.v2"

"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb"
"github.com/pkg/errors"
)

const (
timeout = 10 * time.Second
)

type command struct {
Op []byte `msgpack:"op"`
Key []byte `msgpack:"key"`
Value []byte `msgpack:"value"`
}

type Store struct {
dataDir string
bindAddr string

mu sync.Mutex

peerStore raft.PeerStore
raft *raft.Raft
store *raftboltdb.BoltStore
}

func New(dataDir, bindAddr string) *Store {
return &Store{
dataDir: dataDir,
bindAddr: bindAddr,
}
}

func (s *Store) Open() error {
conf := raft.DefaultConfig()

conf.EnableSingleNode = true

addr, err := net.ResolveTCPAddr("tcp", s.bindAddr)
if err != nil {
return errors.Wrap(err, "resolve bind addr failed")
}

transport, err := raft.NewTCPTransport(s.bindAddr, addr, 3, timeout, os.Stderr)
if err != nil {
return errors.Wrap(err, "tcp transport failede")
}

s.peerStore = raft.NewJSONPeers(s.dataDir, transport)

snapshots, err := raft.NewFileSnapshotStore(s.dataDir, 2, os.Stderr)
if err != nil {
return fmt.Errorf("file snapshot store: %s", err)
}

boltStore, err := raftboltdb.NewBoltStore(filepath.Join(s.dataDir, "store.db"))
if err != nil {
return errors.Wrap(err, "bolt store failed")
}
s.store = boltStore

raft, err := raft.NewRaft(conf, s, boltStore, boltStore, snapshots, s.peerStore, transport)
if err != nil {
return errors.Wrap(err, "raft failed")
}

s.raft = raft

return nil
}

func (s *Store) Get(key []byte) ([]byte, error) {
// add cache
return s.store.Get(key)
}

func (s *Store) Set(key, value []byte) error {
c := &command{
Op: []byte("set"),
Key: key,
Value: value,
}
b, err := msgpack.Marshal(c)
if err != nil {
return errors.Wrap(err, "msgpack failed")
}
f := s.raft.Apply(b, timeout)
return f.Error()
}

func (s *Store) Delete(key []byte) error {
return nil
}

func (s *Store) Join(addr []byte) error {
f := s.raft.AddPeer(string(addr))
return f.Error()
}

func (s *Store) Apply(l *raft.Log) interface{} {
return nil
}

func (s *Store) applySet(k, v []byte) interface{} {
return nil
}

func (s *Store) Restore(rc io.ReadCloser) error {
return nil
}

type FSMSnapshot struct {
}

func (f *FSMSnapshot) Persist(sink raft.SnapshotSink) error {
return nil
}

func (f *FSMSnapshot) Release() {}

func (s *Store) Snapshot() (raft.FSMSnapshot, error) {
return &FSMSnapshot{}, nil
}
19 changes: 19 additions & 0 deletions store/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package store

import (
"io/ioutil"
"os"
"testing"

"github.com/stretchr/testify/assert"
)

func TestStoreOpen(t *testing.T) {
dataDir, _ := ioutil.TempDir("", "storetest")
defer os.RemoveAll(dataDir)
bindAddr := "127.0.0.1:0"

s := New(dataDir, bindAddr)
assert.NotNil(t, s)
assert.NoError(t, s.Open())
}

0 comments on commit 4b0aa1d

Please sign in to comment.