Skip to content

Commit

Permalink
Add changes to improve go report
Browse files Browse the repository at this point in the history
  • Loading branch information
maorfr committed Nov 15, 2018
1 parent 111ac03 commit aafe713
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 61 deletions.
105 changes: 44 additions & 61 deletions pkg/skbn/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,12 @@ func GetListOfFilesFromK8s(iClient interface{}, path, findType, findName string)
attempt++

output, stderr, err := Exec(client, namespace, podName, containerName, command, nil)
if len(stderr) != 0 {
if attempt == attempts {
return nil, fmt.Errorf("STDERR: " + (string)(stderr))
}
utils.Sleep(attempt)
shouldContinue, err := checkerr(stderr, attempt, attempts, err)
if shouldContinue {
continue
}
if err != nil {
if attempt == attempts {
return nil, err
}
utils.Sleep(attempt)
continue
return nil, err
}

lines := strings.Split((string)(output), "\n")
Expand Down Expand Up @@ -119,18 +112,14 @@ func DownloadFromK8s(iClient interface{}, path string) ([]byte, error) {
attempt++

stdout, stderr, err := Exec(client, namespace, podName, containerName, command, nil)
if attempt == attempts {
if len(stderr) != 0 {
return stdout, fmt.Errorf("STDERR: " + (string)(stderr))
}
if err != nil {
return stdout, err
}
shouldContinue, err := checkerr(stderr, attempt, attempts, err)
if shouldContinue {
continue
}
if err == nil {
return stdout, nil
if err != nil {
return nil, err
}
utils.Sleep(attempt)
return stdout, nil
}

return nil, nil
Expand All @@ -156,72 +145,40 @@ func UploadToK8s(iClient interface{}, toPath, fromPath string, buffer []byte) er
dir, _ := filepath.Split(pathToCopy)
command := []string{"mkdir", "-p", dir}
_, stderr, err := Exec(client, namespace, podName, containerName, command, nil)

if len(stderr) != 0 {
if attempt == attempts {
return fmt.Errorf("STDERR: " + (string)(stderr))
}
utils.Sleep(attempt)
shouldContinue, err := checkerr(stderr, attempt, attempts, err)
if shouldContinue {
continue
}
if err != nil {
if attempt == attempts {
return err
}
utils.Sleep(attempt)
continue
return err
}

command = []string{"touch", pathToCopy}
_, stderr, err = Exec(client, namespace, podName, containerName, command, nil)

if len(stderr) != 0 {
if attempt == attempts {
return fmt.Errorf("STDERR: " + (string)(stderr))
}
utils.Sleep(attempt)
shouldContinue, err = checkerr(stderr, attempt, attempts, err)
if shouldContinue {
continue
}
if err != nil {
if attempt == attempts {
return err
}
utils.Sleep(attempt)
continue
return err
}

command = []string{"cp", "/dev/stdin", pathToCopy}
stdin := bytes.NewReader(buffer)
_, stderr, err = Exec(client, namespace, podName, containerName, command, readerWrapper{stdin})

if len(stderr) != 0 {
if attempt == attempts {
return fmt.Errorf("STDERR: " + (string)(stderr))
}
utils.Sleep(attempt)
shouldContinue, err = checkerr(stderr, attempt, attempts, err)
if shouldContinue {
continue
}
if err != nil {
if attempt == attempts {
return err
}
utils.Sleep(attempt)
continue
return err
}
return nil
}

return nil
}

type readerWrapper struct {
reader io.Reader
}

func (r readerWrapper) Read(p []byte) (int, error) {
return r.reader.Read(p)
}

// Exec executes a command in a given container
func Exec(client K8sClient, namespace, podName, containerName string, command []string, stdin io.Reader) ([]byte, []byte, error) {
clientset, config := client.ClientSet, client.Config
Expand Down Expand Up @@ -284,3 +241,29 @@ func initK8sVariables(split []string) (string, string, string, string) {
func getAbsPath(path ...string) string {
return filepath.Join("/", filepath.Join(path...))
}

func checkerr(stderr []byte, attempt, attempts int, err error) (bool, error) {
if len(stderr) != 0 {
if attempt == attempts {
return false, fmt.Errorf("STDERR: " + (string)(stderr))
}
utils.Sleep(attempt)
return true, nil
}
if err != nil {
if attempt == attempts {
return false, err
}
utils.Sleep(attempt)
return true, nil
}
return false, nil
}

type readerWrapper struct {
reader io.Reader
}

func (r readerWrapper) Read(p []byte) (int, error) {
return r.reader.Read(p)
}
5 changes: 5 additions & 0 deletions pkg/utils/bwg.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ import (
"sync"
)

// BoundedWaitGroup implements a sized WaitGroup
type BoundedWaitGroup struct {
wg sync.WaitGroup
ch chan struct{}
}

// NewBoundedWaitGroup initializes a new BoundedWaitGroup
func NewBoundedWaitGroup(cap int) BoundedWaitGroup {
return BoundedWaitGroup{ch: make(chan struct{}, cap)}
}

// Add performs a WaitGroup Add of a specified delta
func (bwg *BoundedWaitGroup) Add(delta int) {
for i := 0; i > delta; i-- {
<-bwg.ch
Expand All @@ -23,10 +26,12 @@ func (bwg *BoundedWaitGroup) Add(delta int) {
bwg.wg.Add(delta)
}

// Done performs a WaitGroup Add of -1
func (bwg *BoundedWaitGroup) Done() {
bwg.Add(-1)
}

// Wait performs a WaitGroup Wait
func (bwg *BoundedWaitGroup) Wait() {
bwg.wg.Wait()
}

0 comments on commit aafe713

Please sign in to comment.