Skip to content

Commit

Permalink
Fix Hanging Notebooks (#175)
Browse files Browse the repository at this point in the history
Fixes #173

* Propagate context through all long-running operations.
* Ensure context is cancelled when either of the 2 primary go-routines
exit (for syncing or port-forwarding).
* Move tar-balling to after notebook file check (I found that I would
accidentally execute `k notebook -d .` on the root of the repo and it
would take a long time to tar and then fail after it cant find the
notebook.yaml file).
* Add some context-related logging.

Fixes #174

* Ignore syncing of all dot-files. 

Misc

* Add exponential backoff to port-forward retries (currently max 3
retries).
  • Loading branch information
nstogner committed Aug 9, 2023
1 parent cce660b commit bfa2e99
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 50 deletions.
12 changes: 4 additions & 8 deletions containertools/cmd/nbwatch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"
"os"
"path/filepath"
"strings"

"k8s.io/klog/v2"

Expand Down Expand Up @@ -58,14 +59,9 @@ func watchLoop(w *fsnotify.Watcher) {
i++
path := e.Name

//path, err := filepath.EvalSymlinks(e.Name)
//if err != nil {
// klog.Error(err)
// continue
//}

switch filepath.Base(path) {
case ".git", ".gitignore", ".gitmodules", ".gitattributes", ".ipynb_checkpoints":
// Covers ".git", ".gitignore", ".gitmodules", ".gitattributes", ".ipynb_checkpoints"
// and also temporary files that Jupyter writes on save like: ".~hello.py"
if strings.HasPrefix(filepath.Base(path), ".") {
continue
}

Expand Down
8 changes: 6 additions & 2 deletions docs/development.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@ TODO: Automate the cleanup of PVs... Don't forget to manually clean them up for
You can test out the latest kubectl plugin by building from source directly:

```sh
go build ./kubectl/cmd/notebook && sudo mv notebook /usr/local/bin/kubectl-notebook
go build ./kubectl/cmd/applybuild && sudo mv applybuild /usr/local/bin/kubectl-applybuild
go build ./kubectl/cmd/notebook &&
mv notebook /usr/local/bin/kubectl-notebook ||
sudo mv notebook /usr/local/bin/kubectl-notebook
go build ./kubectl/cmd/applybuild &&
mv applybuild /usr/local/bin/kubectl-applybuild ||
sudo mv applybuild /usr/local/bin/kubectl-applybuild
```

The `kubectl notebook` command depends on container-tools for live-syncing. The plugin will try
Expand Down
22 changes: 16 additions & 6 deletions kubectl/internal/client/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (c *Client) SyncFilesFromNotebook(ctx context.Context, nb *apiv1.Notebook)
return fmt.Errorf("getting node arch: %w", err)
}

if err := getContainerTools(toolsPath, targetOS); err != nil {
if err := getContainerTools(ctx, toolsPath, targetOS); err != nil {
return fmt.Errorf("getting container-tools: %w", err)
}

Expand Down Expand Up @@ -79,11 +79,12 @@ func (c *Client) SyncFilesFromNotebook(ctx context.Context, nb *apiv1.Notebook)
var event NBWatchEvent
if err := json.Unmarshal(eventLine, &event); err != nil {
klog.Errorf("Failed to unmarshal nbevent: %w", err)
continue
}

relPath, err := filepath.Rel("/content/src", event.Path)
if err != nil {
klog.Errorf("Failed to determining relative path: %w", err)
klog.Errorf("Failed to determine relative path: %w", err)
continue
}

Expand All @@ -95,10 +96,12 @@ func (c *Client) SyncFilesFromNotebook(ctx context.Context, nb *apiv1.Notebook)
// NOTE: A long-running port-forward might be more performant here.
if err := cp.FromPod(ctx, event.Path, localPath, podRef, containerName); err != nil {
klog.Errorf("Sync: failed to copy: %w", err)
continue
}
} else if event.Op == "REMOVE" || event.Op == "RENAME" {
if err := os.Remove(localPath); err != nil {
klog.Errorf("Sync: failed to remove: %w", err)
continue
}
}
}
Expand All @@ -110,11 +113,13 @@ func (c *Client) SyncFilesFromNotebook(ctx context.Context, nb *apiv1.Notebook)
}()

if err := c.exec(ctx, podRef, "/tmp/nbwatch", nil, w, os.Stderr); err != nil {
w.Close()
return fmt.Errorf("exec: nbwatch: %w", err)
}

klog.V(2).Info("Waiting for file sync loop to finish...")
wg.Wait()
klog.V(2).Info("Done waiting for file sync loop to finish.")

return nil
}
Expand Down Expand Up @@ -164,7 +169,7 @@ type NBWatchEvent struct {
Op string `json:"op"`
}

func getContainerTools(dir, targetOS string) error {
func getContainerTools(ctx context.Context, dir, targetOS string) error {
// Check to see if tools need to be downloaded.
versionPath := filepath.Join(dir, "version.txt")
exists, err := fileExists(versionPath)
Expand Down Expand Up @@ -195,7 +200,7 @@ func getContainerTools(dir, targetOS string) error {
if err := os.MkdirAll(archDir, 0755); err != nil {
return fmt.Errorf("recreating directory: %w", err)
}
if err := getContainerToolsRelease(archDir, targetOS, arch); err != nil {
if err := getContainerToolsRelease(ctx, archDir, targetOS, arch); err != nil {
return fmt.Errorf("getting container-tools: %w", err)
}
}
Expand All @@ -207,10 +212,15 @@ func getContainerTools(dir, targetOS string) error {
return nil
}

func getContainerToolsRelease(dir, targetOS, targetArch string) error {
func getContainerToolsRelease(ctx context.Context, dir, targetOS, targetArch string) error {
releaseURL := fmt.Sprintf("https://github.com/substratusai/substratus/releases/download/v%s/container-tools-%s-%s.tar.gz", Version, targetOS, targetArch)
klog.V(1).Infof("Downloading: %s", releaseURL)
resp, err := http.Get(releaseURL)

req, err := http.NewRequestWithContext(ctx, "GET", releaseURL, nil)
if err != nil {
return fmt.Errorf("new request: %w", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("downloading release: %w", err)
}
Expand Down
11 changes: 8 additions & 3 deletions kubectl/internal/client/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type Tarball struct {
MD5Checksum string
}

func PrepareImageTarball(buildPath string) (*Tarball, error) {
func PrepareImageTarball(ctx context.Context, buildPath string) (*Tarball, error) {
exists, err := fileExists(filepath.Join(buildPath, "Dockerfile"))
if err != nil {
return nil, fmt.Errorf("checking if Dockerfile exists: %w", err)
Expand All @@ -52,7 +52,7 @@ func PrepareImageTarball(buildPath string) (*Tarball, error) {
}

tarPath := filepath.Join(tmpDir, "/archive.tar.gz")
err = tarGz(buildPath, tarPath)
err = tarGz(ctx, buildPath, tarPath)
if err != nil {
return nil, fmt.Errorf("failed to create a tar.gz of the directory: %w", err)
}
Expand Down Expand Up @@ -208,7 +208,7 @@ func calculateMD5(path string) (string, error) {
return fmt.Sprintf("%x", hash.Sum(nil)), nil
}

func tarGz(src, dst string) error {
func tarGz(ctx context.Context, src, dst string) error {
tarFile, err := os.Create(dst)
if err != nil {
return fmt.Errorf("failed to create tarFile: %w", err)
Expand All @@ -223,6 +223,11 @@ func tarGz(src, dst string) error {

// TODO(bjb): #121 read .dockerignore if it exists, exclude those files
return filepath.Walk(src, func(path string, info os.FileInfo, err error) error {
klog.V(4).Infof("Tarring: %v", path)
if err := ctx.Err(); err != nil {
return err
}

if err != nil {
return fmt.Errorf("failed to walk the tempdir path: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion kubectl/internal/commands/applybuild.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func ApplyBuild() *cobra.Command {
}
cfg.build = args[0]

tarball, err := client.PrepareImageTarball(cfg.build)
tarball, err := client.PrepareImageTarball(ctx, cfg.build)
if err != nil {
return fmt.Errorf("preparing tarball: %w", err)
}
Expand Down
7 changes: 6 additions & 1 deletion kubectl/internal/commands/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,12 @@ type mockClient struct {

func (c *mockClient) PortForwardNotebook(ctx context.Context, verbose bool, nb *apiv1.Notebook, ready chan struct{}) error {
log.Println("mockClient.PortForwardNotebook called")
ready <- struct{}{}
select {
case ready <- struct{}{}:
fmt.Println("sent ready")
default:
fmt.Println("no ready sent")
}
ctx.Done()
return fmt.Errorf("mock PortForwardNotebook exiting because of ctx.Done()")
}
Expand Down
64 changes: 36 additions & 28 deletions kubectl/internal/commands/notebook.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
"math"
"os"
"os/signal"
"path/filepath"
Expand Down Expand Up @@ -48,7 +49,11 @@ func Notebook() *cobra.Command {
RunE: func(cmd *cobra.Command, args []string) error {
client.Version = Version

ctx, cancel := context.WithCancel(cmd.Context())
ctx, ctxCancel := context.WithCancel(cmd.Context())
cancel := func() {
klog.V(1).Info("Context cancelled")
ctxCancel()
}
defer cancel()

// The -v flag is managed by klog, so we need to check it manually.
Expand Down Expand Up @@ -82,22 +87,6 @@ func Notebook() *cobra.Command {

spin := spinner.New(spinner.CharSets[9], 100*time.Millisecond)

var tarball *client.Tarball
if cfg.build != "" {
spin.Suffix = " Preparing tarball..."
spin.Start()

var err error
tarball, err = client.PrepareImageTarball(cfg.build)
if err != nil {
return fmt.Errorf("preparing tarball: %w", err)
}
defer os.Remove(tarball.TempDir)

spin.Stop()
fmt.Fprintln(NotebookStdout, "Tarball prepared.")
}

restConfig, err := clientcmd.BuildConfigFromFlags("", cfg.kubeconfig)
if err != nil {
return fmt.Errorf("rest config: %w", err)
Expand Down Expand Up @@ -149,7 +138,21 @@ func Notebook() *cobra.Command {
}
nb.Spec.Suspend = ptr.To(false)

var tarball *client.Tarball
if cfg.build != "" {
spin.Suffix = " Preparing tarball..."
spin.Start()

var err error
tarball, err = client.PrepareImageTarball(ctx, cfg.build)
if err != nil {
return fmt.Errorf("preparing tarball: %w", err)
}
defer os.Remove(tarball.TempDir)

spin.Stop()
fmt.Fprintln(NotebookStdout, "Tarball prepared.")

if err := client.ClearImage(nb); err != nil {
return fmt.Errorf("clearing image in spec: %w", err)
}
Expand All @@ -172,11 +175,13 @@ func Notebook() *cobra.Command {
// Suspend notebook.
spin.Suffix = " Suspending notebook..."
spin.Start()
if _, err := notebooks.Patch(nb.Namespace, nb.Name, types.MergePatchType, []byte(`{"spec": {"suspend": true} }`), &metav1.PatchOptions{}); err != nil {
_, err := notebooks.Patch(nb.Namespace, nb.Name, types.MergePatchType, []byte(`{"spec": {"suspend": true} }`), &metav1.PatchOptions{})
spin.Stop()
if err != nil {
klog.Errorf("Error suspending notebook: %v", err)
} else {
fmt.Fprintln(NotebookStdout, "Notebook suspended.")
}
spin.Stop()
fmt.Fprintln(NotebookStdout, "Notebook suspended.")
}
}
defer cleanup()
Expand Down Expand Up @@ -213,13 +218,13 @@ func Notebook() *cobra.Command {
defer func() {
wg.Done()
klog.V(2).Info("Syncing files from notebook: Done.")

// Stop other goroutines.
cancel()
}()
if err := c.SyncFilesFromNotebook(ctx, nb); err != nil {
if !errors.Is(err, context.Canceled) {
klog.Errorf("Error syncing files from notebook: %v", err)
}
cancel()
}
}()
}
Expand All @@ -230,10 +235,12 @@ func Notebook() *cobra.Command {
defer func() {
wg.Done()
klog.V(2).Info("Port-forwarding: Done.")
// Stop other goroutines.
cancel()
}()

first := true
for {
const maxRetries = 3
for i := 0; i < maxRetries; i++ {
portFwdCtx, cancelPortFwd := context.WithCancel(ctx)
defer cancelPortFwd() // Avoid a context leak
runtime.ErrorHandlers = []func(err error){
Expand All @@ -248,15 +255,14 @@ func Notebook() *cobra.Command {
// so we only use the outer ready channel once. On restart of the portForward,
// we use a new channel.
var ready chan struct{}
if first {
if i == 0 {
ready = serveReady
} else {
ready = make(chan struct{})
}

if err := c.PortForwardNotebook(portFwdCtx, verbose, nb, ready); err != nil {
klog.Errorf("Port-forward returned an error: %v", err)
return
}

// Check if the command's context is cancelled, if so,
Expand All @@ -267,9 +273,11 @@ func Notebook() *cobra.Command {
}

cancelPortFwd() // Avoid a build up of contexts before returning.
klog.V(1).Info("Restarting port forward")
first = false
backoff := time.Duration(math.Pow(2, float64(i))) * time.Second
klog.V(1).Infof("Restarting port forward (index = %v), after backoff: %s", i, backoff)
time.Sleep(backoff)
}
klog.V(1).Info("Done trying to port-forward")
}()

spin.Suffix = " Waiting for connection to be ready to serve..."
Expand Down
2 changes: 1 addition & 1 deletion kubectl/internal/commands/notebook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestNotebook(t *testing.T) {
"--build", "./test-notebook",
"--kubeconfig", kubectlKubeconfigPath,
"--no-open-browser",
//"-v=9",
"-v=4",
})
cmd.SetContext(ctx)
var wg sync.WaitGroup
Expand Down

0 comments on commit bfa2e99

Please sign in to comment.