Skip to content

Commit

Permalink
kungfu-notify-start (#302)
Browse files Browse the repository at this point in the history
  • Loading branch information
lgarithm committed Jun 29, 2020
1 parent 7fca5b4 commit e70b620
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 8 deletions.
2 changes: 1 addition & 1 deletion srcs/go/kungfu/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (p *Peer) ResizeClusterFromURL() (bool, bool, error) {
}

func (p *Peer) getClusterConfig(url string) (*plan.Cluster, error) {
f, err := p.openURL(url)
f, err := utils.OpenURL(url, &p.httpClient, fmt.Sprintf("KungFu Peer: %s", p.self))
if err != nil {
return nil, err
}
Expand Down
17 changes: 10 additions & 7 deletions srcs/go/kungfu/peer/urlclient.go → srcs/go/utils/urlclient.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
package peer
package utils

import (
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
)

func (p *Peer) openHTTP(client *http.Client, url string) (io.ReadCloser, error) {
func openHTTP(client *http.Client, url string, userAgent string) (io.ReadCloser, error) {
if client == nil {
client = http.DefaultClient
}
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
req.Header.Set("User-Agent", fmt.Sprintf("KungFu Peer: %s", p.self))
req.Header.Set("User-Agent", userAgent)
resp, err := client.Do(req)
if err != nil {
return nil, err
Expand All @@ -29,16 +31,17 @@ func (p *Peer) openHTTP(client *http.Client, url string) (io.ReadCloser, error)
var parseURL = url.Parse
var errUnsupportedURL = errors.New("unsupported URL")

func (p *Peer) openURL(url string) (io.ReadCloser, error) {
// OpenURL opens a file or URL as io.ReadCloser.
func OpenURL(url string, client *http.Client, userAgent string) (io.ReadCloser, error) {
u, err := parseURL(url)
if err != nil {
return nil, err
}
switch u.Scheme {
case "http":
return p.openHTTP(&p.httpClient, url)
return openHTTP(client, url, userAgent)
case "https":
return p.openHTTP(&p.httpClient, url)
return openHTTP(client, url, userAgent)
case "file":
// ignore u.Host
return os.Open(u.Path)
Expand Down
70 changes: 70 additions & 0 deletions tests/go/cmd/kungfu-notify-start/kungfu-notify-start.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package main

import (
"context"
"encoding/json"
"errors"
"flag"
"net/http"

"github.com/lsds/KungFu/srcs/go/kungfu/config"
"github.com/lsds/KungFu/srcs/go/kungfu/execution"
"github.com/lsds/KungFu/srcs/go/kungfu/runner"
"github.com/lsds/KungFu/srcs/go/log"
"github.com/lsds/KungFu/srcs/go/plan"
"github.com/lsds/KungFu/srcs/go/rchannel/client"
"github.com/lsds/KungFu/srcs/go/rchannel/connection"
"github.com/lsds/KungFu/srcs/go/utils"
)

var errWaitPeerFailed = errors.New("wait peer failed")

var (
configServer = flag.String("config-server", "", "config server URL")
)

func main() {
flag.Parse()
cluster, err := getClusterConfig(*configServer)
if err != nil {
utils.ExitErr(err)
}
notifyStart(*cluster)
}

func getClusterConfig(url string) (*plan.Cluster, error) {
f, err := utils.OpenURL(url, http.DefaultClient, utils.ProgName())
if err != nil {
return nil, err
}
defer f.Close()
var cluster plan.Cluster
if err = json.NewDecoder(f).Decode(&cluster); err != nil {
return nil, err
}
return &cluster, nil
}

func notifyStart(cluster plan.Cluster) {
stage := runner.Stage{
Version: 0,
Cluster: cluster,
}
var self plan.PeerID
client := client.New(self, config.UseUnixSock)
var notify execution.PeerFunc = func(ctrl plan.PeerID) error {
ctx, cancel := context.WithTimeout(context.TODO(), config.WaitRunnerTimeout)
defer cancel()
n, ok := client.Wait(ctx, ctrl)
if !ok {
return errWaitPeerFailed
}
if n > 0 {
log.Warnf("%s is up after pinged %d times", ctrl, n+1)
}
return client.Send(ctrl.WithName("update"), stage.Encode(), connection.ConnControl, 0)
}
if err := notify.Par(cluster.Runners); err != nil {
utils.ExitErr(err)
}
}

0 comments on commit e70b620

Please sign in to comment.