Skip to content

Commit

Permalink
Improve error handlers in the up command (#245)
Browse files Browse the repository at this point in the history
* Remove extra logic from exec

* Improve error handlers in the up command

* add debug log for restoring
  • Loading branch information
pchico83 authored and rberrelleza committed Jun 6, 2019
1 parent 4f59195 commit 765ffe9
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 135 deletions.
47 changes: 9 additions & 38 deletions cmd/exec.go
Expand Up @@ -4,13 +4,11 @@ import (
"context"
"errors"
"fmt"
"net/http"
"os"

"github.com/okteto/okteto/pkg/config"
"github.com/okteto/okteto/pkg/k8s/exec"
"github.com/okteto/okteto/pkg/k8s/pods"
"github.com/okteto/okteto/pkg/log"
"github.com/okteto/okteto/pkg/model"

k8Client "github.com/okteto/okteto/pkg/k8s/client"
Expand All @@ -21,10 +19,7 @@ import (
//Exec executes a command on the CND container
func Exec() *cobra.Command {
var devPath string
var pod string
var container string
var namespace string
var port int

cmd := &cobra.Command{
Use: "exec COMMAND",
Expand All @@ -34,18 +29,6 @@ func Exec() *cobra.Command {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

if port != 0 {
go func() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
log.Debug("canceling process due to a request")
cancel()
})

log.Error(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
panic("webserver stopped handling requests")
}()
}

if _, err := os.Stat(devPath); os.IsNotExist(err) {
return fmt.Errorf("'%s' does not exist", devPath)
}
Expand All @@ -57,7 +40,7 @@ func Exec() *cobra.Command {
if namespace != "" {
dev.Namespace = namespace
}
err = executeExec(ctx, pod, container, dev, args)
err = executeExec(ctx, dev, args)
return err
},
Args: func(cmd *cobra.Command, args []string) error {
Expand All @@ -70,17 +53,11 @@ func Exec() *cobra.Command {

cmd.Flags().StringVarP(&devPath, "file", "f", config.ManifestFileName(), "path to the manifest file")
cmd.Flags().StringVarP(&namespace, "namespace", "n", "", "namespace where the exec command is executed")
cmd.Flags().StringVarP(&pod, "pod", "p", "", "pod where it is executed")
cmd.Flags().MarkHidden("pod")
cmd.Flags().StringVarP(&container, "container", "c", "", "container where it is executed")
cmd.Flags().MarkHidden("container")
cmd.Flags().IntVar(&port, "port", 0, "port to listen to signals")
cmd.Flags().MarkHidden("port")

return cmd
}

func executeExec(ctx context.Context, pod, container string, dev *model.Dev, args []string) error {
func executeExec(ctx context.Context, dev *model.Dev, args []string) error {
client, cfg, namespace, err := k8Client.GetLocal()
if err != nil {
return err
Expand All @@ -90,20 +67,14 @@ func executeExec(ctx context.Context, pod, container string, dev *model.Dev, arg
dev.Namespace = namespace
}

if len(pod) == 0 {
p, err := pods.GetDevPod(ctx, dev, client)
if err != nil {
return err
}

pod = p.Name
if len(dev.Container) == 0 {
dev.Container = p.Spec.Containers[0].Name
}
p, err := pods.GetDevPod(ctx, dev, client)
if err != nil {
return err
}

if len(container) > 0 {
dev.Container = container
if len(dev.Container) == 0 {
dev.Container = p.Spec.Containers[0].Name
}
return exec.Exec(ctx, client, cfg, dev.Namespace, pod, dev.Container, true, os.Stdin, os.Stdout, os.Stderr, args)

return exec.Exec(ctx, client, cfg, dev.Namespace, p.Name, dev.Container, true, os.Stdin, os.Stdout, os.Stderr, args)
}
156 changes: 63 additions & 93 deletions cmd/up.go
Expand Up @@ -3,18 +3,18 @@ package cmd
import (
"context"
"fmt"
"net/http"
"os"
"os/exec"
"os/signal"
"sync"
"time"

"github.com/docker/docker/pkg/term"
"github.com/okteto/okteto/pkg/analytics"
"github.com/okteto/okteto/pkg/config"
"github.com/okteto/okteto/pkg/errors"
k8Client "github.com/okteto/okteto/pkg/k8s/client"
"github.com/okteto/okteto/pkg/k8s/deployments"
"github.com/okteto/okteto/pkg/k8s/exec"
"github.com/okteto/okteto/pkg/k8s/pods"
"github.com/okteto/okteto/pkg/k8s/secrets"
"github.com/okteto/okteto/pkg/k8s/services"
Expand Down Expand Up @@ -99,12 +99,9 @@ func Up() *cobra.Command {
//RunUp starts the up sequence
func RunUp(dev *model.Dev) error {
up := &UpContext{
WG: &sync.WaitGroup{},
Dev: dev,
Disconnect: make(chan struct{}, 1),
Running: make(chan error, 1),
Exit: make(chan error, 1),
ErrChan: make(chan error, 1),
WG: &sync.WaitGroup{},
Dev: dev,
Exit: make(chan error, 1),
}

defer up.shutdown()
Expand All @@ -128,19 +125,26 @@ func RunUp(dev *model.Dev) error {

// Activate activates the dev environment
func (up *UpContext) Activate() {
up.WG.Add(1)
defer up.WG.Done()
var prevError error
attach := false
retry := false
inFd, _ := term.GetFdInfo(os.Stdin)
state, err := term.SaveState(inFd)
if err != nil {
up.Exit <- err
return
}

for {
up.Context, up.Cancel = context.WithCancel(context.Background())
err := up.devMode(attach)
up.Disconnect = make(chan struct{}, 1)
up.Running = make(chan error, 1)
up.ErrChan = make(chan error, 1)

err := up.devMode(retry)
if err != nil {
up.Exit <- err
return
}
attach = true
retry = true

fmt.Println(" ✓ Okteto Environment activated")

Expand All @@ -152,67 +156,39 @@ func (up *UpContext) Activate() {
up.Exit <- err
return
}

fmt.Println(" ✓ Files synchronized")

progress = newProgressBar("Finalizing configuration...")
progress.start()
err = up.forceLocalSyncState()
progress.stop()
if err != nil {
up.Exit <- err
return
}

switch prevError {
case errors.ErrLostConnection:
log.Green("Reconnected to your cluster.")
}

printDisplayContext("Your Okteto Environment is ready", up.Dev.Namespace, up.Dev.Name, up.Dev.Forward)
cmd, port := up.buildExecCommand()
if err := cmd.Start(); err != nil {
log.Infof("Failed to execute okteto exec: %s", err)
up.Exit <- err
return
}

log.Debugf("started new okteto exec")

go func() {
up.WG.Add(1)
defer up.WG.Done()
up.Running <- cmd.Wait()
up.Running <- up.runCommand()
return
}()

execEndpoint := fmt.Sprintf("http://127.0.0.1:%d", port)
prevError = up.WaitUntilExitOrInterrupt(execEndpoint)
if prevError != nil && (prevError == errors.ErrLostConnection ||
prevError == errors.ErrCommandFailed && !up.Sy.IsConnected()) {
log.Yellow("\nConnection lost to your Okteto Environment, reconnecting...")
fmt.Println()
up.shutdown()
continue
prevError := up.WaitUntilExitOrInterrupt()
if err := term.RestoreTerminal(inFd, state); err != nil {
log.Debugf("failed to restore terminal: %s", err)
}

up.Exit <- nil
if prevError != nil {
if prevError == errors.ErrLostConnection || (prevError == errors.ErrCommandFailed && !up.Sy.IsConnected()) {
log.Yellow("\nConnection lost to your Okteto Environment, reconnecting...\n")
up.shutdown()
continue
}
}

up.Exit <- prevError
return
}
}

// WaitUntilExitOrInterrupt blocks execution until a stop signal is sent or a disconnect event or an error
func (up *UpContext) WaitUntilExitOrInterrupt(endpoint string) error {
func (up *UpContext) WaitUntilExitOrInterrupt() error {
for {
select {
case <-up.Context.Done():
log.Debug("context is done, sending interrupt to process")
if _, err := http.Get(endpoint); err != nil {
log.Infof("failed to communicate to exec: %s", err)
}
return nil

case err := <-up.Running:
fmt.Println()
if err != nil {
log.Infof("Command execution error: %s\n", err)
return errors.ErrCommandFailed
Expand All @@ -221,11 +197,8 @@ func (up *UpContext) WaitUntilExitOrInterrupt(endpoint string) error {

case err := <-up.ErrChan:
log.Yellow(err.Error())

case <-up.Disconnect:
log.Debug("disconnected, sending interrupt to process")
if _, err := http.Get(endpoint); err != nil {
log.Infof("failed to communicate to exec: %s", err)
}
return errors.ErrLostConnection
}
}
Expand Down Expand Up @@ -334,10 +307,6 @@ func (up *UpContext) startSync() error {
return err
}

return nil
}

func (up *UpContext) forceLocalSyncState() error {
if err := up.Sy.OverrideChanges(up.Context, up.WG, up.Dev); err != nil {
return err
}
Expand All @@ -354,6 +323,35 @@ func (up *UpContext) forceLocalSyncState() error {
return up.Sy.Restart(up.Context, up.WG)
}

func (up *UpContext) runCommand() error {
exec.Exec(
up.Context,
up.Client,
up.RestConfig,
up.Dev.Namespace,
up.Pod,
up.Dev.Container,
true,
os.Stdin,
os.Stdout,
os.Stderr,
[]string{"sh", "-c", "trap '' TERM && kill -- -1 && sleep 0.1 & kill -s KILL -- -1 >/dev/null 2>&1"},
)
return exec.Exec(
up.Context,
up.Client,
up.RestConfig,
up.Dev.Namespace,
up.Pod,
up.Dev.Container,
true,
os.Stdin,
os.Stdout,
os.Stderr,
up.Dev.Command,
)
}

// Shutdown runs the cancellation sequence. It will wait for all tasks to finish for up to 500 milliseconds
func (up *UpContext) shutdown() {
log.Debugf("cancelling context")
Expand Down Expand Up @@ -400,31 +398,3 @@ func printDisplayContext(message, namespace, name string, ports []model.Forward)
}
fmt.Println()
}

func (up *UpContext) buildExecCommand() (*exec.Cmd, int) {
port, err := model.GetAvailablePort()
if err != nil {
log.Infof("couldn't access the network: %s", err)
port = 15000
}

args := []string{
"exec",
"--pod",
up.Pod,
"--container",
up.Container,
"--port",
fmt.Sprintf("%d", port),
"-n",
up.Dev.Namespace,
"--",
}
args = append(args, up.Dev.Command...)

cmd := exec.Command(config.GetBinaryFullPath(), args...)
cmd.Stdin = os.Stdin
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
return cmd, port
}
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -11,7 +11,7 @@ require (
github.com/chai2010/gettext-go v0.0.0-20170215093142-bf70f2a70fb1 // indirect
github.com/cheggaaa/pb v1.0.27
github.com/denisbrodbeck/machineid v1.0.1
github.com/docker/docker v0.0.0-20180612054059-a9fbbdc8dd87 // indirect
github.com/docker/docker v0.0.0-20180612054059-a9fbbdc8dd87
github.com/docker/spdystream v0.0.0-20170912183627-bc6354cbbc29 // indirect
github.com/dukex/mixpanel v0.0.0-20180925151559-f8d5594f958e
github.com/elazarl/goproxy v0.0.0-20181111060418-2ce16c963a8a // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Expand Up @@ -331,6 +331,7 @@ k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30 h1:TRb4wNWoBVrH9plmkp2q86
k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc=
k8s.io/kubernetes v1.13.4 h1:gQqFv/pH8hlbznLXQUsi8s5zqYnv0slmUDl/yVA0EWc=
k8s.io/kubernetes v1.13.4/go.mod h1:ocZa8+6APFNC2tX1DZASIbocyYT5jHzqFVsY5aoB7Jk=
k8s.io/kubernetes v1.14.2 h1:Gdq2hPpttbaJBoClIanCE6WSu4IZReA54yhkZtvPUOo=
k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7 h1:8r+l4bNWjRlsFYlQJnKJ2p7s1YQPj4XyXiJVqDHRx7c=
k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0=
sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs=
Expand Down
1 change: 1 addition & 0 deletions pkg/k8s/deployments/translate.go
Expand Up @@ -118,6 +118,7 @@ func translate(d *appsv1.Deployment, dev *model.Dev) (*appsv1.Deployment, *apiv1
d.Spec.Replicas = &devReplicas

devContainer := getDevContainer(d, dev.Container)
dev.Container = devContainer.Name
if devContainer == nil {
return nil, nil, fmt.Errorf("container/%s doesn't exist in deployment/%s", dev.Container, d.Name)
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/syncthing/monitor.go
Expand Up @@ -21,16 +21,20 @@ func (s *Syncthing) IsConnected() bool {
func (s *Syncthing) Monitor(ctx context.Context, wg *sync.WaitGroup, disconnect chan struct{}) {
wg.Add(1)
defer wg.Done()
ticker := time.NewTicker(5 * time.Second)
ticker := time.NewTicker(3 * time.Second)
connected := true
for {
select {
case <-ticker.C:
if !s.IsConnected() {
if !s.IsConnected() {
if s.IsConnected() {
connected = true
} else {
if !connected {
log.Debug("not connected to syncthing, sending disconnect signal")
disconnect <- struct{}{}
return
}
connected = false
}
case <-ctx.Done():
return
Expand Down

0 comments on commit 765ffe9

Please sign in to comment.