Skip to content
This repository has been archived by the owner on Mar 5, 2024. It is now read-only.

Commit

Permalink
partial
Browse files Browse the repository at this point in the history
  • Loading branch information
colinmarc committed Feb 1, 2016
1 parent d449bb8 commit 3ffc85f
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 20 deletions.
13 changes: 5 additions & 8 deletions dataset.go
Expand Up @@ -153,11 +153,6 @@ func (ds *dataset) build(be backend.Backend, storagePath string) error {
return ErrNoValidPartitions
}

// Create the partitions directory, so we can publish ephemeral
if ds.peers != nil {

}

err := ds.buildLocalPartitions(be, storagePath)
if err != nil {
return err
Expand All @@ -172,7 +167,9 @@ func (ds *dataset) build(be backend.Backend, storagePath string) error {

// Advertise all the partitions we have locally.
// TODO: very short possible race, since aren't actually listening over HTTP.
// need some way to accept proxied requests, but then wait?
// We shouldn't advertise until we're listening on HTTP, and then we should
// block proxied requests until we see all partitions ready (possibly just refuse outside connections?)
// (in multiplexed world, we can just 404 if not a proxied request)
ds.advertisePartitions()

updates := ds.zkWatcher.watchChildren(partitionPath)
Expand All @@ -199,7 +196,7 @@ func (ds *dataset) build(be backend.Backend, storagePath string) error {
return nil
}

// TODO: parallelize
// TODO: parallelize multiple files at once

func (ds *dataset) buildLocalPartitions(be backend.Backend, storagePath string) error {
files, err := be.ListFiles(ds.version)
Expand Down Expand Up @@ -273,7 +270,7 @@ func (ds *dataset) advertisePartitions() {
}
}

// TODO: cleanup?
// TODO: cleanup zk state. not everything is ephemeral, and we may be long-running w/ multiple versions

func (ds *dataset) close() error {
return ds.blockStore.Close()
Expand Down
15 changes: 11 additions & 4 deletions sequins.go
Expand Up @@ -90,7 +90,7 @@ func (s *sequins) initDistributed() error {

routableAddress := net.JoinHostPort(hostname, port)
peers := watchPeers(zkWatcher, routableAddress)
peers.waitToConverge(3 * time.Second) // TODO configurable
peers.waitToConverge(10 * time.Second) // TODO configurable

s.zkWatcher = zkWatcher
s.peers = peers
Expand All @@ -102,14 +102,21 @@ func (s *sequins) start() error {
// cause requests that start processing after this runs to 500
// However, this may not be a problem, since you have to shift traffic to
// another instance before shutting down anyway, otherwise you'd have downtime
defer func() {
s.dataset.replace(nil).close()
}()
defer s.shutdown()

log.Println("Listening on", s.options.address)
return http.ListenAndServe(s.options.address, s)
}

func (s *sequins) shutdown() {
// Swallow errors here.
s.dataset.replace(nil).close()
zk := s.zkWatcher
if zk != nil{
zk.close()
}
}

func (s *sequins) reloadLatest() error {
err := s.refresh()
if err != nil {
Expand Down
50 changes: 42 additions & 8 deletions zk_watcher.go
Expand Up @@ -7,6 +7,7 @@ import (
"strings"
"sync"
"time"
"errors"

"github.com/samuel/go-zookeeper/zk"
)
Expand All @@ -28,6 +29,7 @@ type zkWatcher struct {
conn *zk.Conn
errs chan error
shutdown chan bool
shutdownOnce sync.Once

hooksLock sync.Mutex
ephemeralNodes []string
Expand Down Expand Up @@ -55,14 +57,38 @@ func connectZookeeper(zkServers []string, prefix string) (*zkWatcher, error) {

func (w *zkWatcher) reconnect() error {
log.Println("Connecting to zookeeper at", strings.Join(w.zkServers, ","))
conn, _, err := zk.Connect(w.zkServers, 100*time.Millisecond)
conn, events, err := zk.Connect(w.zkServers, 100*time.Millisecond)
if err != nil {
return err
}

// Don't log anything ever.
conn.SetLogger(nullLogger{})
w.conn = conn

// TODO: recreate permanent paths? What if zookeeper dies and loses data?
// TODO: clear data on setup? or just hope that it's uniquely namespaced enough
err = w.createPath("")
if err != nil {
return err
}

log.Println("Successfully connected to zookeeper!")

go func() {
for {
ev := <-events
log.Println("Got event:", ev)
if ev.Err != nil {
sendErr(w.errs, ev.Err)
return
} else if ev.State == zk.StateDisconnected {
sendErr(w.errs, errors.New("zk disconnected"))
return
}
}
}()

return nil
}

Expand Down Expand Up @@ -90,11 +116,6 @@ func (w *zkWatcher) run() {
time.Sleep(time.Second)
w.reconnect()
}

conn := w.conn
if conn != nil {
conn.Close()
}
}

func (w *zkWatcher) createEphemeral(node string) {
Expand Down Expand Up @@ -146,7 +167,13 @@ func (w *zkWatcher) hookWatchChildren(node string, updates chan []string) {

// createPath creates a node and all its parents permanently.
func (w *zkWatcher) createPath(node string) error {
return w.createAll(path.Join(w.prefix, node))
node = path.Join(w.prefix, node)
err := w.createAll(node)
if err != nil {
return fmt.Errorf("create %s: %s", node, err)
} else {
return err
}
}

func (w *zkWatcher) createAll(fullNode string) error {
Expand All @@ -160,12 +187,19 @@ func (w *zkWatcher) createAll(fullNode string) error {

_, err := w.conn.Create(path.Clean(fullNode), nil, 0, defaultACL)
if err != nil && err != zk.ErrNodeExists {
return fmt.Errorf("create %s: %s", fullNode, err)
return err
}

return nil
}

func (w *zkWatcher) close() {
w.shutdownOnce.Do(func() {
w.shutdown <- true
w.conn.Close()
})
}

// sendErr sends the error over the channel, or discards it if the error is full.
func sendErr(errs chan error, err error) {
log.Println("Zookeeper error:", err)
Expand Down
31 changes: 31 additions & 0 deletions zk_watcher_test.go
@@ -0,0 +1,31 @@
package main

type zkLogWriter struct {
t *testing.T
}

func (lw zkLogWriter) Write(b []byte) (int, error) {
lw.t.Logf("[ZK] %s", string(b))
return len(b), nil
}

func connectZookeeperTest(t *testing.T, ts *zk.TestCluster) (*zkWatcher, *zk.TestCluster) {
ts, err := StartTestCluster(3, nil, logWriter{t})
if err != nil {
t.Fatal(err)
}

hosts := make([]string, len(ts.Servers))
for i, srv := range ts.Servers {
hosts[i] = fmt.Sprintf("127.0.0.1:%d", srv.Port)
}

zkWatcher, err := connectZookeeper(hosts, "test-sequins")

return zkWatcher, ts
}

func TestZKWatcher(t *testing.T) {


}

0 comments on commit 3ffc85f

Please sign in to comment.