Skip to content
This repository has been archived by the owner on Aug 15, 2022. It is now read-only.

Commit

Permalink
Merge pull request #10306 from koding/machine_mount_client_supervised
Browse files Browse the repository at this point in the history
kd/machine: Add supervised client.
  • Loading branch information
rjeczalik committed Jan 19, 2017
2 parents e177bd7 + 8436495 commit 2443f1c
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 2 deletions.
121 changes: 121 additions & 0 deletions go/src/koding/klient/machine/client/supervised.go
@@ -0,0 +1,121 @@
package client

import (
"context"
"time"

"koding/klient/machine/index"
)

// DynamicClientFunc is an adapter that allows to dynamically provide clients.
type DynamicClientFunc func() (Client, error)

// Supervised is a decorator type for Clients which will wait for valid client
// if the current client is Disconnected. This type is meant to handle temporary
// network issues or cases when underlying client is not set up yet. Eg. when
// program started but haven't made connection to remote machine yet.
type Supervised struct {
dcf DynamicClientFunc
timeout time.Duration
}

// NewSupervised creates a new Supervised client instance.
func NewSupervised(dcf DynamicClientFunc, timeout time.Duration) *Supervised {
return &Supervised{
timeout: timeout,
dcf: dcf,
}
}

// CurrentUser calls registered Client's CurrentUser method and returns its
// result if it's not produced by Disconnected client. If it is, this function
// will wait until valid client is available or timeout is reached.
func (s *Supervised) CurrentUser() (user string, err error) {
fn := func(c Client) error {
user, err = c.CurrentUser()
return err
}

err = s.call(fn)
return
}

// SSHAddKeys calls registered Client's SSHAddKeys method and returns its result
// if it's not produced by Disconnected client. If it is, this function will
// wait until valid client is available or timeout is reached.
func (s *Supervised) SSHAddKeys(username string, keys ...string) (err error) {
fn := func(c Client) error {
return c.SSHAddKeys(username, keys...)
}

return s.call(fn)
}

// MountHeadIndex calls registered Client's MountHeadIndex method and returns
// its result if it's not produced by Disconnected client. If it is, this
// function will wait until valid client is available or timeout is reached.
func (s *Supervised) MountHeadIndex(path string) (absPath string, count int, diskSize int64, err error) {
fn := func(c Client) error {
absPath, count, diskSize, err = c.MountHeadIndex(path)
return err
}

err = s.call(fn)
return
}

// MountGetIndex calls registered Client's MountGetIndex method and returns its
// result if it's not produced by Disconnected client. If it is, this function
// will wait until valid client is available or timeout is reached.
func (s *Supervised) MountGetIndex(path string) (idx *index.Index, err error) {
fn := func(c Client) error {
idx, err = c.MountGetIndex(path)
return err
}

err = s.call(fn)
return
}

// Context calls registered Client's Context method and returns its result. If
// there is an error during client retrieving, this function will return
// canceled context.
func (s *Supervised) Context() context.Context {
c, err := s.dcf()
if err != nil {
ctx, cancel := context.WithCancel(context.Background())
cancel()
return ctx
}

return c.Context()
}

func (s *Supervised) call(f func(Client) error) error {
c, err := s.dcf()
if err != nil {
return err
}

ctx := c.Context()
if err = f(c); err != ErrDisconnected {
return err
}

// Wait for new client.
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()

if <-ctx.Done(); ctx.Err() == context.DeadlineExceeded {
// Client is still disconnected. Return it as is.
return ErrDisconnected
}

// Previous context was canceled. This means that the client changed.
c, err = s.dcf()
if err != nil {
return err
}

return f(c)
}
67 changes: 67 additions & 0 deletions go/src/koding/klient/machine/client/supervised_test.go
@@ -0,0 +1,67 @@
package client_test

import (
"testing"
"time"

"koding/klient/machine/client"
"koding/klient/machine/client/testutil"
)

func TestSupervisedWait(t *testing.T) {
serv := &testutil.Server{}

dc, err := client.NewDynamic(testutil.DynamicOpts(serv, testutil.NewBuilder(nil)))
if err != nil {
t.Fatalf("want err = nil; got %v", err)
}
defer dc.Close()

dcf := func() (client.Client, error) { return dc.Client(), nil }

// Server is off, so the timeout should be reached.
const timeout = 50 * time.Millisecond
spv := client.NewSupervised(dcf, timeout)
startC, hitC := make(chan struct{}), make(chan time.Time)

go func() {
<-startC
_, err = spv.CurrentUser()
hitC <- time.Now()
}()

now := time.Now()
close(startC)
select {
case called := <-hitC:
if tret := called.Sub(now); tret < timeout {
t.Fatalf("want return at least after %v; got %v", timeout, tret)
}
if err != client.ErrDisconnected {
t.Fatalf("want err = %v; got %v", client.ErrDisconnected, err)
}
case <-time.After(time.Second):
t.Fatalf("test timed out after 1s")
}

// Server starts to be responsive while Supervisor waits for it.
spv = client.NewSupervised(dcf, 2*time.Second)
startC, hitC = make(chan struct{}), make(chan time.Time)

go func() {
<-startC
_, err = spv.CurrentUser()
hitC <- time.Now()
}()

close(startC)
serv.TurnOn()
select {
case <-hitC:
if err == client.ErrDisconnected {
t.Fatal("want err != client.ErrDisconnected")
}
case <-time.After(time.Second):
t.Fatalf("test timed out after 1s")
}
}
35 changes: 33 additions & 2 deletions go/src/koding/klient/machine/client/testutil/client.go
Expand Up @@ -3,12 +3,14 @@ package testutil
import (
"context"
"fmt"
"os/user"
"sync"
"sync/atomic"
"time"

"koding/klient/machine"
"koding/klient/machine/client"
"koding/klient/machine/index"
)

// Builder uses Server logic to build test clients.
Expand Down Expand Up @@ -81,8 +83,6 @@ func (n *Builder) BuildsCount() int {
// Client satisfies machine.Client interface. It mimics real client and should
// be used for testing purposes.
type Client struct {
client.Disconnected

mu sync.Mutex
ctx context.Context
}
Expand All @@ -94,6 +94,37 @@ func NewClient() *Client {
}
}

// CurrentUser returns the current user of local machine.
func (c *Client) CurrentUser() (string, error) {
u, err := user.Current()
if err != nil {
return "", err
}

return u.Username, nil
}

// SSHAddKeys is a no-op method and always returns nil.
func (c *Client) SSHAddKeys(_ string, _ ...string) error {
return nil
}

// MountHeadIndex gets basic info about the index generated from local path.
func (c *Client) MountHeadIndex(path string) (string, int, int64, error) {
idx, err := c.MountGetIndex(path)
if err != nil {
return "", 0, 0, err
}

return path, idx.Count(-1), idx.DiskSize(-1), nil
}

// MountGetIndex creates an index from provided local path. Generated index is
// not cached.
func (c *Client) MountGetIndex(path string) (*index.Index, error) {
return index.NewIndexFiles(path)
}

// SetContext sets provided context to test client.
func (c *Client) SetContext(ctx context.Context) {
c.mu.Lock()
Expand Down

0 comments on commit 2443f1c

Please sign in to comment.