Skip to content

Commit

Permalink
Basic discovery application
Browse files Browse the repository at this point in the history
  • Loading branch information
inercia committed Jul 6, 2015
1 parent 5559f1d commit cf224fd
Show file tree
Hide file tree
Showing 5 changed files with 462 additions and 4 deletions.
16 changes: 12 additions & 4 deletions Makefile
Expand Up @@ -11,24 +11,27 @@ WEAVE_VERSION=git-$(shell git rev-parse --short=12 HEAD)

WEAVER_EXE=prog/weaver/weaver
WEAVEDNS_EXE=prog/weavedns/weavedns
WEAVEDISCOVERY_EXE=prog/weavediscovery/weavediscovery
WEAVEPROXY_EXE=prog/weaveproxy/weaveproxy
SIGPROXY_EXE=prog/sigproxy/sigproxy
WEAVEWAIT_EXE=prog/weavewait/weavewait
NETCHECK_EXE=prog/netcheck/netcheck

EXES=$(WEAVER_EXE) $(WEAVEDNS_EXE) $(SIGPROXY_EXE) $(WEAVEPROXY_EXE) $(WEAVEWAIT_EXE) $(NETCHECK_EXE)
EXES=$(WEAVER_EXE) $(WEAVEDNS_EXE) $(WEAVEDISCOVERY_EXE) $(SIGPROXY_EXE) $(WEAVEPROXY_EXE) $(WEAVEWAIT_EXE) $(NETCHECK_EXE)

WEAVER_UPTODATE=.weaver.uptodate
WEAVEDNS_UPTODATE=.weavedns.uptodate
WEAVEDISCOVERY_UPTODATE=.weavediscovery.uptodate
WEAVEEXEC_UPTODATE=.weaveexec.uptodate

IMAGES_UPTODATE=$(WEAVER_UPTODATE) $(WEAVEDNS_UPTODATE) $(WEAVEEXEC_UPTODATE)
IMAGES_UPTODATE=$(WEAVER_UPTODATE) $(WEAVEDNS_UPTODATE) $(WEAVEDISCOVERY_UPTODATE) $(WEAVEEXEC_UPTODATE)

WEAVER_IMAGE=$(DOCKERHUB_USER)/weave
WEAVEDNS_IMAGE=$(DOCKERHUB_USER)/weavedns
WEAVEDISCOVERY_IMAGE=$(DOCKERHUB_USER)/weavediscovery
WEAVEEXEC_IMAGE=$(DOCKERHUB_USER)/weaveexec

IMAGES=$(WEAVER_IMAGE) $(WEAVEDNS_IMAGE) $(WEAVEEXEC_IMAGE)
IMAGES=$(WEAVER_IMAGE) $(WEAVEDNS_IMAGE) $(WEAVEDISCOVERY_IMAGE) $(WEAVEEXEC_IMAGE)

WEAVE_EXPORT=weave.tar

Expand All @@ -43,7 +46,7 @@ travis: $(EXES)
update:
go get -u -f -v -tags -netgo $(addprefix ./,$(dir $(EXES)))

$(WEAVER_EXE) $(WEAVEDNS_EXE) $(WEAVEPROXY_EXE) $(NETCHECK_EXE): common/*.go common/*/*.go net/*.go
$(WEAVER_EXE) $(WEAVEDNS_EXE) $(WEAVEDISCOVERY_EXE) $(WEAVEPROXY_EXE) $(NETCHECK_EXE): common/*.go common/*/*.go net/*.go
go get -tags netgo ./$(@D)
go build -ldflags "-extldflags \"-static\" -X main.version $(WEAVE_VERSION)" -tags netgo -o $@ ./$(@D)
@strings $@ | grep cgo_stub\\\.go >/dev/null || { \
Expand All @@ -57,6 +60,7 @@ $(WEAVER_EXE) $(WEAVEDNS_EXE) $(WEAVEPROXY_EXE) $(NETCHECK_EXE): common/*.go com

$(WEAVER_EXE): router/*.go ipam/*.go ipam/*/*.go prog/weaver/main.go
$(WEAVEDNS_EXE): nameserver/*.go prog/weavedns/main.go
$(WEAVEDISCOVERY_EXE): prog/weavediscovery/*.go
$(WEAVEPROXY_EXE): proxy/*.go prog/weaveproxy/main.go
$(NETCHECK_EXE): prog/netcheck/netcheck.go

Expand All @@ -77,6 +81,10 @@ $(WEAVEDNS_UPTODATE): prog/weavedns/Dockerfile $(WEAVEDNS_EXE)
$(SUDO) docker build -t $(WEAVEDNS_IMAGE) prog/weavedns
touch $@

$(WEAVEDISCOVERY_UPTODATE): prog/weavediscovery/Dockerfile $(WEAVEDISCOVERY_EXE)
$(SUDO) docker build -t $(WEAVEDISCOVERY_IMAGE) prog/weavediscovery
touch $@

$(WEAVEEXEC_UPTODATE): prog/weaveexec/Dockerfile $(DOCKER_DISTRIB) weave $(SIGPROXY_EXE) $(WEAVEPROXY_EXE) $(WEAVEWAIT_EXE) $(NETCHECK_EXE)
cp weave prog/weaveexec/weave
cp $(SIGPROXY_EXE) prog/weaveexec/sigproxy
Expand Down
6 changes: 6 additions & 0 deletions prog/weavediscovery/Dockerfile
@@ -0,0 +1,6 @@
FROM scratch
MAINTAINER Weaveworks Inc <help@weave.works>
WORKDIR /home/weave
ADD ./weavediscovery /home/weave/
EXPOSE 6789/tcp
ENTRYPOINT ["/home/weave/weavediscovery"]
190 changes: 190 additions & 0 deletions prog/weavediscovery/http.go
@@ -0,0 +1,190 @@
package main

import (
"bytes"
"fmt"
"net"
"net/http"
"net/url"
"strconv"
"time"

"github.com/gorilla/mux"
. "github.com/weaveworks/weave/common"
weavenet "github.com/weaveworks/weave/net"
)

const (
defaultHTTPPort = 6789 // default port for the Discovery API
defaultWeaveURL = "http://127.0.0.1:6784" // default URL for the router API
)

type WeaveClient struct {
url url.URL
}

// Create a new Weave API client
func NewWeaveClient(u string) (*WeaveClient, error) {
fullURL, err := url.Parse(u)
if err != nil {
Log.Error(err)
return nil, err
}
cli := WeaveClient{
url: *fullURL,
}

return &cli, nil
}

// Join a new peer
func (w *WeaveClient) Join(host, port string) {
connectURL := w.url
connectURL.Path = "/connect"

Log.Printf("[client] Notifying about '%s' (API: '%s')", host, connectURL.String())
_, err := http.PostForm(connectURL.String(), url.Values{"peer": {host}})
if err != nil {
Log.Warningf("[client] Could not notify about '%s': %s", host, err)
}
}

// Forget about a peer
func (w *WeaveClient) Forget(host, port string) {
forgetURL := w.url
forgetURL.Path = "/forget"

Log.Printf("[client] Forgetting about '%s' (API: '%s')", host, forgetURL.String())
_, err := http.PostForm(forgetURL.String(), url.Values{"peer": {host}})
if err != nil {
Log.Warningf("[client] Could not forget about '%s': %s", host, err)
}
}

//////////////////////////////////////////////////////////////////////////////////////////

type DiscoveryHTTP struct {
dm *DiscoveryManager
listener net.Listener
}

func NewDiscoveryHTTP(dm *DiscoveryManager, httpIfaceName string, httpPort int, wait int) *DiscoveryHTTP {
var httpIP string
if httpIfaceName == "" {
httpIP = "0.0.0.0"
} else {
Log.Infoln("[http] Waiting for HTTP interface", httpIfaceName, "to come up")
httpIface, err := weavenet.EnsureInterface(httpIfaceName, wait)
if err != nil {
Log.Fatal(err)
}
Log.Infoln("Interface", httpIfaceName, "is up")

addrs, err := httpIface.Addrs()
if err != nil {
Log.Fatal(err)
}

if len(addrs) == 0 {
Log.Fatal("[http] No addresses on HTTP interface")
}

ip, _, err := net.ParseCIDR(addrs[0].String())
if err != nil {
Log.Fatal(err)
}

httpIP = ip.String()

}
httpAddr := net.JoinHostPort(httpIP, strconv.Itoa(httpPort))

httpListener, err := net.Listen("tcp", httpAddr)
if err != nil {
Log.Fatal("[http] Unable to create HTTP listener: ", err)
}
Log.Infoln("[http] HTTP API listening on", httpAddr)

return &DiscoveryHTTP{
dm: dm,
listener: httpListener,
}
}

// Start the HTTP API
func (dh *DiscoveryHTTP) Start() {
httpErrorAndLog := func(w http.ResponseWriter, msg string, status int, logmsg string, logargs ...interface{}) {
http.Error(w, msg, status)
Log.Warningf("[http] "+logmsg, logargs...)
}

go func() {
muxRouter := mux.NewRouter()

// Join a endpoint.
// Parameters provided in the form: "url", "hb", "ttl"
muxRouter.Methods("GET").Path("/join").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
reqError := func(msg string, logmsg string, logargs ...interface{}) {
httpErrorAndLog(w, msg, http.StatusBadRequest, logmsg, logargs...)
}

hbParam := r.FormValue("hb")
hb, err := time.ParseDuration(hbParam)
if err != nil {
reqError("Invalid heartbeat", "Invalid heartbeat: %v", err)
return
}
if hb < 1*time.Second {
reqError("Invalid heartbeat", "Heartbeat should be at least one second")
return
}

ttlParam := r.FormValue("ttl")
ttl, err := time.ParseDuration(ttlParam)
if err != nil {
reqError("Invalid TTL", "Invalid TTL: %v", err)
return
}
if ttl <= hb {
reqError("Invalid TTL", "TTL must be strictly superior to the heartbeat value")
return
}

urlParam := r.FormValue("url")
if urlParam == "" {
reqError("Invalid URL", "Invalid URL in request: %s, %s", r.URL, r.Form)
return
}

err = dh.dm.Join(urlParam, hb, ttl)
if err != nil {
Log.Warning(err)
}
})

// Report status
muxRouter.Methods("GET").Path("/status").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "weave Discovery", version)
fmt.Fprint(w, dh.Status())
fmt.Fprint(w, dh.dm.Status())
})

http.Handle("/", muxRouter)
if err := http.Serve(dh.listener, nil); err != nil {
Log.Fatal("[http] Unable to serve http: ", err)
}
}()

}

func (dh *DiscoveryHTTP) Stop() error {
Log.Debugf("[http] Stopping HTTP API")
dh.listener.Close()
return nil
}

func (dh *DiscoveryHTTP) Status() string {
var buf bytes.Buffer
fmt.Fprintln(&buf, "Listen address", dh.listener.Addr())
return buf.String()
}
93 changes: 93 additions & 0 deletions prog/weavediscovery/main.go
@@ -0,0 +1,93 @@
package main

import (
"flag"
"fmt"
"os"
"time"

. "github.com/weaveworks/weave/common"
)

var version = "(unreleased version)"

const (
defaultHeartbeat = "30s"
defaultTTL = "60s"
)

func main() {
var (
justVersion bool
routerURL string
httpIfaceName string
httpPort int
localAddr string
wait int
hb string
ttl string
logLevel string
verbose bool
)

flag.BoolVar(&justVersion, "version", false, "print version and exit")
flag.StringVar(&routerURL, "weave", defaultWeaveURL, "weave API URL")
flag.StringVar(&localAddr, "local", "", "local address announced to other peers")
flag.IntVar(&wait, "wait", -1, "number of seconds to wait for interfaces to come up (0=don't wait, -1=wait forever)")
flag.StringVar(&hb, "hb", defaultHeartbeat, "heartbeat (with units)")
flag.StringVar(&ttl, "ttl", defaultTTL, "TTL (with units)")
flag.StringVar(&httpIfaceName, "http-iface", "", "interface on which to listen for HTTP requests (defaults to empty string which listens on all interfaces)")
flag.IntVar(&httpPort, "http-port", defaultHTTPPort, "port for the Discovery HTTP API")
flag.StringVar(&logLevel, "log-level", "info", "logging level (debug, info, warning, error)")
flag.BoolVar(&verbose, "v", false, "enable verbose mode (debug logging level)")

flag.Parse()

if justVersion {
fmt.Printf("weave Discovery %s\n", version)
os.Exit(0)
}
if verbose {
logLevel = "debug"
}

SetLogLevel(logLevel)
Log.Infof("[main] WeaveDiscovery version %s", version) // first thing in log: the version

if len(localAddr) == 0 {
Log.Fatalf("[main] Local address not provided")
}

weaveCli, err := NewWeaveClient(routerURL)
if err != nil {
Log.Fatalf("[main] Could not initialize the weave router client: %s", err)
}
manager := NewDiscoveryManager(localAddr, weaveCli)
httpServer := NewDiscoveryHTTP(manager, httpIfaceName, httpPort, wait)

manager.Start()
httpServer.Start()

// join any additional endpoint provided in command line, using the heartbeat and TTL arguments
numEps := len(flag.Args())
if numEps > 0 {
Log.Debugf("[main] %d endpoints provided in arguments", numEps)
hbDur, err := time.ParseDuration(hb)
if err != nil {
Log.Fatalf("[main] Could not parse heartbeat '%s': %s", hb, err)
}
ttlDur, err := time.ParseDuration(ttl)
if err != nil {
Log.Fatalf("[main] Could not parse TTL '%s': %s", ttl, err)
}
for _, ep := range flag.Args() {
if err = manager.Join(ep, hbDur, ttlDur); err != nil {
Log.Fatalf("[main] Could not join '%s': %s", ep, err)
}
}
} else {
Log.Debugf("[main] No endpoints provided in arguments")
}

SignalHandlerLoop(httpServer, manager)
}

0 comments on commit cf224fd

Please sign in to comment.