From ee1e7b52df4a996da6b81ef0bc162117d48fbe43 Mon Sep 17 00:00:00 2001 From: CMGS Date: Tue, 19 Jul 2022 10:27:02 +0800 Subject: [PATCH] use pools to replace SentryGo --- cluster/calcium/build.go | 2 +- cluster/calcium/execute.go | 4 +- cluster/calcium/helper.go | 185 ----------------------------------- cluster/calcium/hook.go | 2 +- cluster/calcium/lambda.go | 4 +- cluster/calcium/log.go | 2 +- cluster/calcium/stream.go | 194 +++++++++++++++++++++++++++++++++++++ 7 files changed, 201 insertions(+), 192 deletions(-) create mode 100644 cluster/calcium/stream.go diff --git a/cluster/calcium/build.go b/cluster/calcium/build.go index a58f6d1d3..fb14b5ba1 100644 --- a/cluster/calcium/build.go +++ b/cluster/calcium/build.go @@ -174,7 +174,7 @@ func (c *Calcium) pushImageAndClean(ctx context.Context, resp io.ReadCloser, nod continue } - for message := range processBuildImageStream(ctx, rc) { + for message := range c.processBuildImageStream(ctx, rc) { ch <- message } diff --git a/cluster/calcium/execute.go b/cluster/calcium/execute.go index 33e67bf34..1cb666a5a 100644 --- a/cluster/calcium/execute.go +++ b/cluster/calcium/execute.go @@ -50,13 +50,13 @@ func (c *Calcium) ExecuteWorkload(ctx context.Context, opts *types.ExecuteWorklo splitFunc, split := bufio.ScanLines, byte('\n') if opts.OpenStdin { - processVirtualizationInStream(ctx, inStream, inCh, func(height, width uint) error { + c.processVirtualizationInStream(ctx, inStream, inCh, func(height, width uint) error { return workload.Engine.ExecResize(ctx, execID, height, width) }) splitFunc, split = bufio.ScanBytes, byte(0) } - for m := range processStdStream(ctx, stdout, stderr, splitFunc, split) { + for m := range c.processStdStream(ctx, stdout, stderr, splitFunc, split) { ch <- &types.AttachWorkloadMessage{WorkloadID: opts.WorkloadID, Data: m.Data, StdStreamType: m.StdStreamType} } diff --git a/cluster/calcium/helper.go b/cluster/calcium/helper.go index 5655527fd..04024de5b 100644 --- a/cluster/calcium/helper.go +++ b/cluster/calcium/helper.go @@ -1,15 +1,6 @@ package calcium import ( - "bufio" - "encoding/json" - "fmt" - "io" - "io/ioutil" - "sync" - - "github.com/projecteru2/core/engine" - enginetypes "github.com/projecteru2/core/engine/types" "github.com/projecteru2/core/log" "github.com/projecteru2/core/types" "github.com/projecteru2/core/utils" @@ -18,44 +9,6 @@ import ( "golang.org/x/net/context" ) -var winchCommand = []byte{0x80} // 128, non-ASCII -var escapeCommand = []byte{0x1d} // 29, ^] - -type window struct { - Height uint `json:"Row"` - Width uint `json:"Col"` -} - -func execuateInside(ctx context.Context, client engine.API, ID, cmd, user string, env []string, privileged bool) ([]byte, error) { - cmds := utils.MakeCommandLineArgs(cmd) - execConfig := &enginetypes.ExecConfig{ - User: user, - Cmd: cmds, - Privileged: privileged, - Env: env, - AttachStderr: true, - AttachStdout: true, - } - b := []byte{} - execID, stdout, stderr, _, err := client.Execute(ctx, ID, execConfig) - if err != nil { - return nil, errors.WithStack(err) - } - - for m := range processStdStream(ctx, stdout, stderr, bufio.ScanLines, byte('\n')) { - b = append(b, m.Data...) - } - - exitCode, err := client.ExecExitCode(ctx, ID, execID) - if err != nil { - return b, errors.WithStack(err) - } - if exitCode != 0 { - return b, errors.WithStack(fmt.Errorf("%s", b)) - } - return b, nil -} - func distributionInspect(ctx context.Context, node *types.Node, image string, digests []string) bool { remoteDigest, err := node.Engine.ImageRemoteDigest(ctx, image) if err != nil { @@ -105,141 +58,3 @@ func pullImage(ctx context.Context, node *types.Node, image string) error { log.Infof(ctx, "[pullImage] Done pulling image %s", image) return nil } - -func processVirtualizationInStream( - ctx context.Context, - inStream io.WriteCloser, - inCh <-chan []byte, - resizeFunc func(height, width uint) error, -) <-chan struct{} { // nolint - specialPrefixCallback := map[string]func([]byte){ - string(winchCommand): func(body []byte) { - w := &window{} - if err := json.Unmarshal(body, w); err != nil { - log.Errorf(ctx, "[processVirtualizationInStream] invalid winch command: %q", body) - return - } - if err := resizeFunc(w.Height, w.Width); err != nil { - log.Errorf(ctx, "[processVirtualizationInStream] resize window error: %v", err) - return - } - }, - - string(escapeCommand): func(_ []byte) { - inStream.Close() - }, - } - return rawProcessVirtualizationInStream(ctx, inStream, inCh, specialPrefixCallback) -} - -func rawProcessVirtualizationInStream( - ctx context.Context, - inStream io.WriteCloser, - inCh <-chan []byte, - specialPrefixCallback map[string]func([]byte), -) <-chan struct{} { - done := make(chan struct{}) - utils.SentryGo(func() { - defer close(done) - defer inStream.Close() - - for cmd := range inCh { - if len(cmd) == 0 { - continue - } - if f, ok := specialPrefixCallback[string(cmd[:1])]; ok { - f(cmd[1:]) - continue - } - if _, err := inStream.Write(cmd); err != nil { - log.Errorf(ctx, "[rawProcessVirtualizationInStream] failed to write virtual input stream: %v", err) - continue - } - } - }) - - return done -} - -func processVirtualizationOutStream( - ctx context.Context, - outStream io.ReadCloser, - splitFunc bufio.SplitFunc, - split byte, - -) <-chan []byte { - outCh := make(chan []byte) - utils.SentryGo(func() { - defer close(outCh) - if outStream == nil { - return - } - defer outStream.Close() - scanner := bufio.NewScanner(outStream) - scanner.Split(splitFunc) - for scanner.Scan() { - bs := scanner.Bytes() - if split != 0 { - bs = append(bs, split) - } - outCh <- bs - } - if err := scanner.Err(); err != nil { - log.Warnf(ctx, "[processVirtualizationOutStream] failed to read output from output stream: %v", err) - } - }) - return outCh -} - -func processBuildImageStream(ctx context.Context, reader io.ReadCloser) chan *types.BuildImageMessage { - ch := make(chan *types.BuildImageMessage) - utils.SentryGo(func() { - defer close(ch) - defer utils.EnsureReaderClosed(ctx, reader) - decoder := json.NewDecoder(reader) - for { - message := &types.BuildImageMessage{} - err := decoder.Decode(message) - if err != nil { - if err != io.EOF { - malformed, _ := ioutil.ReadAll(decoder.Buffered()) // TODO err check - log.Errorf(ctx, "[processBuildImageStream] Decode image message failed %v, buffered: %s", err, string(malformed)) - message.Error = err.Error() - ch <- message - } - break - } - ch <- message - } - }) - return ch -} - -func processStdStream(ctx context.Context, stdout, stderr io.ReadCloser, splitFunc bufio.SplitFunc, split byte) chan types.StdStreamMessage { - ch := make(chan types.StdStreamMessage) - - wg := sync.WaitGroup{} - - wg.Add(1) - utils.SentryGo(func() { - defer wg.Done() - for data := range processVirtualizationOutStream(ctx, stdout, splitFunc, split) { - ch <- types.StdStreamMessage{Data: data, StdStreamType: types.Stdout} - } - }) - - wg.Add(1) - utils.SentryGo(func() { - defer wg.Done() - for data := range processVirtualizationOutStream(ctx, stderr, splitFunc, split) { - ch <- types.StdStreamMessage{Data: data, StdStreamType: types.Stderr} - } - }) - - utils.SentryGo(func() { - defer close(ch) - wg.Wait() - }) - - return ch -} diff --git a/cluster/calcium/hook.go b/cluster/calcium/hook.go index 418ed89e4..8ef59642b 100644 --- a/cluster/calcium/hook.go +++ b/cluster/calcium/hook.go @@ -18,7 +18,7 @@ func (c *Calcium) doHook( ) ([]*bytes.Buffer, error) { outputs := []*bytes.Buffer{} for _, cmd := range cmds { - output, err := execuateInside(ctx, engine, ID, cmd, user, env, privileged) + output, err := c.execuateInside(ctx, engine, ID, cmd, user, env, privileged) if err != nil { // 执行 hook 的过程中,如果 cmdForce 为真并且不忽略 hook 就输出错误 outputs = append(outputs, bytes.NewBufferString(err.Error())) diff --git a/cluster/calcium/lambda.go b/cluster/calcium/lambda.go index b63b881f1..3a0f2dd45 100644 --- a/cluster/calcium/lambda.go +++ b/cluster/calcium/lambda.go @@ -138,14 +138,14 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC } } - processVirtualizationInStream(ctx, inStream, inCh, func(height, width uint) error { + c.processVirtualizationInStream(ctx, inStream, inCh, func(height, width uint) error { return workload.Engine.VirtualizationResize(ctx, message.WorkloadID, height, width) }) splitFunc, split = bufio.ScanBytes, byte(0) } - for m := range processStdStream(ctx, stdout, stderr, splitFunc, split) { + for m := range c.processStdStream(ctx, stdout, stderr, splitFunc, split) { runMsgCh <- &types.AttachWorkloadMessage{ WorkloadID: message.WorkloadID, Data: m.Data, diff --git a/cluster/calcium/log.go b/cluster/calcium/log.go index 032f3175a..7c20511d9 100644 --- a/cluster/calcium/log.go +++ b/cluster/calcium/log.go @@ -35,7 +35,7 @@ func (c *Calcium) LogStream(ctx context.Context, opts *types.LogStreamOptions) ( return } - for m := range processStdStream(ctx, stdout, stderr, bufio.ScanLines, byte('\n')) { + for m := range c.processStdStream(ctx, stdout, stderr, bufio.ScanLines, byte('\n')) { ch <- &types.LogStreamMessage{ID: opts.ID, Data: m.Data, StdStreamType: m.StdStreamType} } }) diff --git a/cluster/calcium/stream.go b/cluster/calcium/stream.go new file mode 100644 index 000000000..fed151541 --- /dev/null +++ b/cluster/calcium/stream.go @@ -0,0 +1,194 @@ +package calcium + +import ( + "bufio" + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "sync" + + "github.com/pkg/errors" + "github.com/projecteru2/core/engine" + enginetypes "github.com/projecteru2/core/engine/types" + "github.com/projecteru2/core/log" + "github.com/projecteru2/core/types" + "github.com/projecteru2/core/utils" +) + +var winchCommand = []byte{0x80} // 128, non-ASCII +var escapeCommand = []byte{0x1d} // 29, ^] + +type window struct { + Height uint `json:"Row"` + Width uint `json:"Col"` +} + +func (c *Calcium) execuateInside(ctx context.Context, client engine.API, ID, cmd, user string, env []string, privileged bool) ([]byte, error) { + cmds := utils.MakeCommandLineArgs(cmd) + execConfig := &enginetypes.ExecConfig{ + User: user, + Cmd: cmds, + Privileged: privileged, + Env: env, + AttachStderr: true, + AttachStdout: true, + } + b := []byte{} + execID, stdout, stderr, _, err := client.Execute(ctx, ID, execConfig) + if err != nil { + return nil, errors.WithStack(err) + } + + for m := range c.processStdStream(ctx, stdout, stderr, bufio.ScanLines, byte('\n')) { + b = append(b, m.Data...) + } + + exitCode, err := client.ExecExitCode(ctx, ID, execID) + if err != nil { + return b, errors.WithStack(err) + } + if exitCode != 0 { + return b, errors.WithStack(fmt.Errorf("%s", b)) + } + return b, nil +} + +func (c *Calcium) processVirtualizationInStream( + ctx context.Context, + inStream io.WriteCloser, + inCh <-chan []byte, + resizeFunc func(height, width uint) error, +) <-chan struct{} { // nolint + specialPrefixCallback := map[string]func([]byte){ + string(winchCommand): func(body []byte) { + w := &window{} + if err := json.Unmarshal(body, w); err != nil { + log.Errorf(ctx, "[processVirtualizationInStream] invalid winch command: %q", body) + return + } + if err := resizeFunc(w.Height, w.Width); err != nil { + log.Errorf(ctx, "[processVirtualizationInStream] resize window error: %v", err) + return + } + }, + + string(escapeCommand): func(_ []byte) { + inStream.Close() + }, + } + return c.rawProcessVirtualizationInStream(ctx, inStream, inCh, specialPrefixCallback) +} + +func (c *Calcium) rawProcessVirtualizationInStream( + ctx context.Context, + inStream io.WriteCloser, + inCh <-chan []byte, + specialPrefixCallback map[string]func([]byte), +) <-chan struct{} { + done := make(chan struct{}) + _ = c.pool.Invoke(func() { + defer close(done) + defer inStream.Close() + + for cmd := range inCh { + if len(cmd) == 0 { + continue + } + if f, ok := specialPrefixCallback[string(cmd[:1])]; ok { + f(cmd[1:]) + continue + } + if _, err := inStream.Write(cmd); err != nil { + log.Errorf(ctx, "[rawProcessVirtualizationInStream] failed to write virtual input stream: %v", err) + continue + } + } + }) + + return done +} + +func (c *Calcium) processVirtualizationOutStream( + ctx context.Context, + outStream io.ReadCloser, + splitFunc bufio.SplitFunc, + split byte, + +) <-chan []byte { + outCh := make(chan []byte) + _ = c.pool.Invoke(func() { + defer close(outCh) + if outStream == nil { + return + } + defer outStream.Close() + scanner := bufio.NewScanner(outStream) + scanner.Split(splitFunc) + for scanner.Scan() { + bs := scanner.Bytes() + if split != 0 { + bs = append(bs, split) + } + outCh <- bs + } + if err := scanner.Err(); err != nil { + log.Warnf(ctx, "[processVirtualizationOutStream] failed to read output from output stream: %v", err) + } + }) + return outCh +} + +func (c *Calcium) processBuildImageStream(ctx context.Context, reader io.ReadCloser) chan *types.BuildImageMessage { + ch := make(chan *types.BuildImageMessage) + _ = c.pool.Invoke(func() { + defer close(ch) + defer utils.EnsureReaderClosed(ctx, reader) + decoder := json.NewDecoder(reader) + for { + message := &types.BuildImageMessage{} + err := decoder.Decode(message) + if err != nil { + if err != io.EOF { + malformed, _ := ioutil.ReadAll(decoder.Buffered()) // TODO err check + log.Errorf(ctx, "[processBuildImageStream] Decode image message failed %v, buffered: %s", err, string(malformed)) + message.Error = err.Error() + ch <- message + } + break + } + ch <- message + } + }) + return ch +} + +func (c *Calcium) processStdStream(ctx context.Context, stdout, stderr io.ReadCloser, splitFunc bufio.SplitFunc, split byte) chan types.StdStreamMessage { + ch := make(chan types.StdStreamMessage) + + wg := sync.WaitGroup{} + + wg.Add(1) + _ = c.pool.Invoke(func() { + defer wg.Done() + for data := range c.processVirtualizationOutStream(ctx, stdout, splitFunc, split) { + ch <- types.StdStreamMessage{Data: data, StdStreamType: types.Stdout} + } + }) + + wg.Add(1) + _ = c.pool.Invoke(func() { + defer wg.Done() + for data := range c.processVirtualizationOutStream(ctx, stderr, splitFunc, split) { + ch <- types.StdStreamMessage{Data: data, StdStreamType: types.Stderr} + } + }) + + _ = c.pool.Invoke(func() { + defer close(ch) + wg.Wait() + }) + + return ch +}