Skip to content

Commit

Permalink
kontrol/test: wait for kites after restarting kontrol
Browse files Browse the repository at this point in the history
  • Loading branch information
rjeczalik committed May 18, 2016
1 parent bcce71b commit 3d22cab
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 25 deletions.
2 changes: 0 additions & 2 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ func (k *Kite) handleHeartbeat(r *Request) (interface{}, error) {
for {
select {
case <-done:
// remove the onDisconnect again so it doesn't call close twice
r.Client.onDisconnectHandlers = nil
heartbeat.Stop()
case <-heartbeat.C:
if err := ping.Call(); err != nil {
Expand Down
78 changes: 59 additions & 19 deletions kontrol/kontrol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,24 @@ var (

var interactive = os.Getenv("TEST_INTERACTIVE") == "1"

func startKontrol(pem, pub string, port int) (kon *Kontrol, conf *config.Config) {
conf = config.New()
type Config struct {
Config *config.Config
Private string
Public string
}

func startKontrol(pem, pub string, port int) (*Kontrol, *Config) {
conf := config.New()
conf.Username = "testuser"
conf.KontrolURL = fmt.Sprintf("http://localhost:%d/kite", port)
conf.KontrolKey = pub
conf.KontrolUser = "testuser"
conf.KiteKey = testutil.NewKiteKeyWithKeyPair(pem, pub).Raw
conf.KiteKey = testutil.NewToken("kontrol", pem, pub).Raw
conf.Transport = config.XHRPolling
conf.ReadEnvironmentVariables()

DefaultPort = port
kon = New(conf.Copy(), "0.0.1")
kon := New(conf.Copy(), "1.0.0")

switch os.Getenv("KONTROL_STORAGE") {
case "etcd":
Expand All @@ -59,7 +65,11 @@ func startKontrol(pem, pub string, port int) (kon *Kontrol, conf *config.Config)
go kon.Run()
<-kon.Kite.ServerReadyNotify()

return kon, conf
return kon, &Config{
Config: conf,
Private: pem,
Public: pub,
}
}

type HelloKite struct {
Expand All @@ -70,10 +80,11 @@ type HelloKite struct {
clients map[string]*kite.Client
}

func NewHelloKite(name string, conf *config.Config) (*HelloKite, error) {
func NewHelloKite(name string, conf *Config) (*HelloKite, error) {
k := kite.New(name, "1.0.0")
k.Config = conf.Copy()
k.Config = conf.Config.Copy()
k.Config.Port = 0
k.Config.KiteKey = testutil.NewToken(name, conf.Private, conf.Public).Raw
k.SetLogLevel(kite.DEBUG)

k.HandleFunc("hello", func(r *kite.Request) (interface{}, error) {
Expand Down Expand Up @@ -187,6 +198,40 @@ func Call(kitePairs HelloKites) error {
return nil
}

func WaitTillConnected(conf *Config, timeout time.Duration, hks ...*HelloKite) error {
k := kite.New("WaitTillConnected", "1.0.0")
k.Config = conf.Config.Copy()

t := time.After(timeout)

for {
select {
case <-t:
return fmt.Errorf("timed out waiting for %v after %s", hks, timeout)
default:
kites, err := k.GetKites(&protocol.KontrolQuery{
Version: "1.0.0",
})
if err != nil && err != kite.ErrNoKitesAvailable {
return err
}

notReady := make(map[string]struct{}, len(hks))
for _, hk := range hks {
notReady[hk.Kite.Kite().ID] = struct{}{}
}

for _, kite := range kites {
delete(notReady, kite.Kite.ID)
}

if len(notReady) == 0 {
return nil
}
}
}
}

func pause(args ...interface{}) {
if testing.Verbose() && interactive {
fmt.Println("[PAUSE]", fmt.Sprint(args...), fmt.Sprintf(`("kill -1 %d" to continue)`, os.Getpid()))
Expand All @@ -208,20 +253,14 @@ func TestUpdateKeys(t *testing.T) {
t.Skip("skipping TestUpdateKeys for storage %q: not implemented", storage)
}

pause("starting TestUpdateKeys")

kon, conf := startKontrol(testkeys.Private, testkeys.Public, 5501)

pause("kontrol 1 started")

hk1, err := NewHelloKite("kite1", conf)
if err != nil {
t.Fatalf("error creating kite1: %s", err)
}
defer hk1.Close()

pause("kite 1 started")

hk2, err := NewHelloKite("kite2", conf)
if err != nil {
t.Fatalf("error creating kite1: %s", err)
Expand All @@ -230,8 +269,6 @@ func TestUpdateKeys(t *testing.T) {

hk2.Token = true

pause("kite 2 started")

calls := HelloKites{
hk1: hk2,
hk2: hk1,
Expand All @@ -241,21 +278,25 @@ func TestUpdateKeys(t *testing.T) {
t.Fatal(err)
}

// pause("going to close kontrol 1")

kon.Close()

pause("kontrol 1 closed")
// pause("going to start kontrol 2")

kon, conf = startKontrol(testkeys.PrivateSecond, testkeys.PublicSecond, 5501)

pause("kontrol 2 started")
pause("started kontrol 2")

hk3, err := NewHelloKite("kite3", conf)
if err != nil {
t.Fatalf("error creating kite3: %s", err)
}
defer hk3.Close()

pause("kite 3 started")
if err := WaitTillConnected(conf, 15*time.Second, hk1, hk2, hk3); err != nil {
t.Fatal(err)
}

calls = HelloKites{
hk3: hk1,
Expand Down Expand Up @@ -346,7 +387,6 @@ func TestTokenInvalidation(t *testing.T) {
if token3 != token4 {
t.Error("tokens should be the same")
}

}

func TestMultiple(t *testing.T) {
Expand Down
10 changes: 6 additions & 4 deletions testutil/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,22 @@ import (
// kontrol.go) If the host does not have a kite.key file kite.New() panics.
// This is a helper to put a fake key on it's location.
func NewKiteKey() *jwt.Token {
return newKiteKey("", testkeys.Private, testkeys.Public)
return NewToken("", testkeys.Private, testkeys.Public)
}

// NewKiteKeyUsername is like NewKiteKey() but it uses the given username
// instead of using the "testuser" name
func NewKiteKeyUsername(username string) *jwt.Token {
return newKiteKey(username, testkeys.Private, testkeys.Public)
return NewToken(username, testkeys.Private, testkeys.Public)
}

func NewKiteKeyWithKeyPair(private, public string) *jwt.Token {
return newKiteKey("", private, public)
return NewToken("", private, public)
}

func newKiteKey(username, private, public string) *jwt.Token {
// NewToken creates new JWT token for the gien username. It embedds the given
// public key as kontrolKey and signs the token with the private one.
func NewToken(username, private, public string) *jwt.Token {
tknID := uuid.NewV4()

hostname, err := os.Hostname()
Expand Down

0 comments on commit 3d22cab

Please sign in to comment.