diff --git a/go/src/koding/klient/machine/client/supervised.go b/go/src/koding/klient/machine/client/supervised.go new file mode 100644 index 00000000000..d0b69cc071a --- /dev/null +++ b/go/src/koding/klient/machine/client/supervised.go @@ -0,0 +1,121 @@ +package client + +import ( + "context" + "time" + + "koding/klient/machine/index" +) + +// DynamicClientFunc is an adapter that allows to dynamically provide clients. +type DynamicClientFunc func() (Client, error) + +// Supervised is a decorator type for Clients which will wait for valid client +// if the current client is Disconnected. This type is meant to handle temporary +// network issues or cases when underlying client is not set up yet. Eg. when +// program started but haven't made connection to remote machine yet. +type Supervised struct { + dcf DynamicClientFunc + timeout time.Duration +} + +// NewSupervised creates a new Supervised client instance. +func NewSupervised(dcf DynamicClientFunc, timeout time.Duration) *Supervised { + return &Supervised{ + timeout: timeout, + dcf: dcf, + } +} + +// CurrentUser calls registered Client's CurrentUser method and returns its +// result if it's not produced by Disconnected client. If it is, this function +// will wait until valid client is available or timeout is reached. +func (s *Supervised) CurrentUser() (user string, err error) { + fn := func(c Client) error { + user, err = c.CurrentUser() + return err + } + + err = s.call(fn) + return +} + +// SSHAddKeys calls registered Client's SSHAddKeys method and returns its result +// if it's not produced by Disconnected client. If it is, this function will +// wait until valid client is available or timeout is reached. +func (s *Supervised) SSHAddKeys(username string, keys ...string) (err error) { + fn := func(c Client) error { + return c.SSHAddKeys(username, keys...) + } + + return s.call(fn) +} + +// MountHeadIndex calls registered Client's MountHeadIndex method and returns +// its result if it's not produced by Disconnected client. If it is, this +// function will wait until valid client is available or timeout is reached. +func (s *Supervised) MountHeadIndex(path string) (absPath string, count int, diskSize int64, err error) { + fn := func(c Client) error { + absPath, count, diskSize, err = c.MountHeadIndex(path) + return err + } + + err = s.call(fn) + return +} + +// MountGetIndex calls registered Client's MountGetIndex method and returns its +// result if it's not produced by Disconnected client. If it is, this function +// will wait until valid client is available or timeout is reached. +func (s *Supervised) MountGetIndex(path string) (idx *index.Index, err error) { + fn := func(c Client) error { + idx, err = c.MountGetIndex(path) + return err + } + + err = s.call(fn) + return +} + +// Context calls registered Client's Context method and returns its result. If +// there is an error during client retrieving, this function will return +// canceled context. +func (s *Supervised) Context() context.Context { + c, err := s.dcf() + if err != nil { + ctx, cancel := context.WithCancel(context.Background()) + cancel() + return ctx + } + + return c.Context() +} + +func (s *Supervised) call(f func(Client) error) error { + c, err := s.dcf() + if err != nil { + return err + } + + ctx := c.Context() + if err = f(c); err != ErrDisconnected { + return err + } + + // Wait for new client. + ctx, cancel := context.WithTimeout(ctx, s.timeout) + defer cancel() + + if <-ctx.Done(); ctx.Err() == context.DeadlineExceeded { + // Client is still disconnected. Return it as is. + return ErrDisconnected + } + + // Previous context was canceled. This means that the client changed. + c, err = s.dcf() + if err != nil { + return err + } + + return f(c) +} diff --git a/go/src/koding/klient/machine/client/supervised_test.go b/go/src/koding/klient/machine/client/supervised_test.go new file mode 100644 index 00000000000..a49d84c1f28 --- /dev/null +++ b/go/src/koding/klient/machine/client/supervised_test.go @@ -0,0 +1,67 @@ +package client_test + +import ( + "testing" + "time" + + "koding/klient/machine/client" + "koding/klient/machine/client/testutil" +) + +func TestSupervisedWait(t *testing.T) { + serv := &testutil.Server{} + + dc, err := client.NewDynamic(testutil.DynamicOpts(serv, testutil.NewBuilder(nil))) + if err != nil { + t.Fatalf("want err = nil; got %v", err) + } + defer dc.Close() + + dcf := func() (client.Client, error) { return dc.Client(), nil } + + // Server is off, so the timeout should be reached. + const timeout = 50 * time.Millisecond + spv := client.NewSupervised(dcf, timeout) + startC, hitC := make(chan struct{}), make(chan time.Time) + + go func() { + <-startC + _, err = spv.CurrentUser() + hitC <- time.Now() + }() + + now := time.Now() + close(startC) + select { + case called := <-hitC: + if tret := called.Sub(now); tret < timeout { + t.Fatalf("want return at least after %v; got %v", timeout, tret) + } + if err != client.ErrDisconnected { + t.Fatalf("want err = %v; got %v", client.ErrDisconnected, err) + } + case <-time.After(time.Second): + t.Fatalf("test timed out after 1s") + } + + // Server starts to be responsive while Supervisor waits for it. + spv = client.NewSupervised(dcf, 2*time.Second) + startC, hitC = make(chan struct{}), make(chan time.Time) + + go func() { + <-startC + _, err = spv.CurrentUser() + hitC <- time.Now() + }() + + close(startC) + serv.TurnOn() + select { + case <-hitC: + if err == client.ErrDisconnected { + t.Fatal("want err != client.ErrDisconnected") + } + case <-time.After(time.Second): + t.Fatalf("test timed out after 1s") + } +} diff --git a/go/src/koding/klient/machine/client/testutil/client.go b/go/src/koding/klient/machine/client/testutil/client.go index af90a2fb9d3..d4e4f96889b 100644 --- a/go/src/koding/klient/machine/client/testutil/client.go +++ b/go/src/koding/klient/machine/client/testutil/client.go @@ -3,12 +3,14 @@ package testutil import ( "context" "fmt" + "os/user" "sync" "sync/atomic" "time" "koding/klient/machine" "koding/klient/machine/client" + "koding/klient/machine/index" ) // Builder uses Server logic to build test clients. @@ -81,8 +83,6 @@ func (n *Builder) BuildsCount() int { // Client satisfies machine.Client interface. It mimics real client and should // be used for testing purposes. type Client struct { - client.Disconnected - mu sync.Mutex ctx context.Context } @@ -94,6 +94,37 @@ func NewClient() *Client { } } +// CurrentUser returns the current user of local machine. +func (c *Client) CurrentUser() (string, error) { + u, err := user.Current() + if err != nil { + return "", err + } + + return u.Username, nil +} + +// SSHAddKeys is a no-op method and always returns nil. +func (c *Client) SSHAddKeys(_ string, _ ...string) error { + return nil +} + +// MountHeadIndex gets basic info about the index generated from local path. +func (c *Client) MountHeadIndex(path string) (string, int, int64, error) { + idx, err := c.MountGetIndex(path) + if err != nil { + return "", 0, 0, err + } + + return path, idx.Count(-1), idx.DiskSize(-1), nil +} + +// MountGetIndex creates an index from provided local path. Generated index is +// not cached. +func (c *Client) MountGetIndex(path string) (*index.Index, error) { + return index.NewIndexFiles(path) +} + // SetContext sets provided context to test client. func (c *Client) SetContext(ctx context.Context) { c.mu.Lock()