Skip to content

Commit

Permalink
use pools to replace SentryGo
Browse files Browse the repository at this point in the history
  • Loading branch information
CMGS committed Jul 19, 2022
1 parent 8245c6b commit ee1e7b5
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 192 deletions.
2 changes: 1 addition & 1 deletion cluster/calcium/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (c *Calcium) pushImageAndClean(ctx context.Context, resp io.ReadCloser, nod
continue
}

for message := range processBuildImageStream(ctx, rc) {
for message := range c.processBuildImageStream(ctx, rc) {
ch <- message
}

Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ func (c *Calcium) ExecuteWorkload(ctx context.Context, opts *types.ExecuteWorklo

splitFunc, split := bufio.ScanLines, byte('\n')
if opts.OpenStdin {
processVirtualizationInStream(ctx, inStream, inCh, func(height, width uint) error {
c.processVirtualizationInStream(ctx, inStream, inCh, func(height, width uint) error {
return workload.Engine.ExecResize(ctx, execID, height, width)
})
splitFunc, split = bufio.ScanBytes, byte(0)
}

for m := range processStdStream(ctx, stdout, stderr, splitFunc, split) {
for m := range c.processStdStream(ctx, stdout, stderr, splitFunc, split) {
ch <- &types.AttachWorkloadMessage{WorkloadID: opts.WorkloadID, Data: m.Data, StdStreamType: m.StdStreamType}
}

Expand Down
185 changes: 0 additions & 185 deletions cluster/calcium/helper.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,6 @@
package calcium

import (
"bufio"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"sync"

"github.com/projecteru2/core/engine"
enginetypes "github.com/projecteru2/core/engine/types"
"github.com/projecteru2/core/log"
"github.com/projecteru2/core/types"
"github.com/projecteru2/core/utils"
Expand All @@ -18,44 +9,6 @@ import (
"golang.org/x/net/context"
)

var winchCommand = []byte{0x80} // 128, non-ASCII
var escapeCommand = []byte{0x1d} // 29, ^]

type window struct {
Height uint `json:"Row"`
Width uint `json:"Col"`
}

func execuateInside(ctx context.Context, client engine.API, ID, cmd, user string, env []string, privileged bool) ([]byte, error) {
cmds := utils.MakeCommandLineArgs(cmd)
execConfig := &enginetypes.ExecConfig{
User: user,
Cmd: cmds,
Privileged: privileged,
Env: env,
AttachStderr: true,
AttachStdout: true,
}
b := []byte{}
execID, stdout, stderr, _, err := client.Execute(ctx, ID, execConfig)
if err != nil {
return nil, errors.WithStack(err)
}

for m := range processStdStream(ctx, stdout, stderr, bufio.ScanLines, byte('\n')) {
b = append(b, m.Data...)
}

exitCode, err := client.ExecExitCode(ctx, ID, execID)
if err != nil {
return b, errors.WithStack(err)
}
if exitCode != 0 {
return b, errors.WithStack(fmt.Errorf("%s", b))
}
return b, nil
}

func distributionInspect(ctx context.Context, node *types.Node, image string, digests []string) bool {
remoteDigest, err := node.Engine.ImageRemoteDigest(ctx, image)
if err != nil {
Expand Down Expand Up @@ -105,141 +58,3 @@ func pullImage(ctx context.Context, node *types.Node, image string) error {
log.Infof(ctx, "[pullImage] Done pulling image %s", image)
return nil
}

func processVirtualizationInStream(
ctx context.Context,
inStream io.WriteCloser,
inCh <-chan []byte,
resizeFunc func(height, width uint) error,
) <-chan struct{} { // nolint
specialPrefixCallback := map[string]func([]byte){
string(winchCommand): func(body []byte) {
w := &window{}
if err := json.Unmarshal(body, w); err != nil {
log.Errorf(ctx, "[processVirtualizationInStream] invalid winch command: %q", body)
return
}
if err := resizeFunc(w.Height, w.Width); err != nil {
log.Errorf(ctx, "[processVirtualizationInStream] resize window error: %v", err)
return
}
},

string(escapeCommand): func(_ []byte) {
inStream.Close()
},
}
return rawProcessVirtualizationInStream(ctx, inStream, inCh, specialPrefixCallback)
}

func rawProcessVirtualizationInStream(
ctx context.Context,
inStream io.WriteCloser,
inCh <-chan []byte,
specialPrefixCallback map[string]func([]byte),
) <-chan struct{} {
done := make(chan struct{})
utils.SentryGo(func() {
defer close(done)
defer inStream.Close()

for cmd := range inCh {
if len(cmd) == 0 {
continue
}
if f, ok := specialPrefixCallback[string(cmd[:1])]; ok {
f(cmd[1:])
continue
}
if _, err := inStream.Write(cmd); err != nil {
log.Errorf(ctx, "[rawProcessVirtualizationInStream] failed to write virtual input stream: %v", err)
continue
}
}
})

return done
}

func processVirtualizationOutStream(
ctx context.Context,
outStream io.ReadCloser,
splitFunc bufio.SplitFunc,
split byte,

) <-chan []byte {
outCh := make(chan []byte)
utils.SentryGo(func() {
defer close(outCh)
if outStream == nil {
return
}
defer outStream.Close()
scanner := bufio.NewScanner(outStream)
scanner.Split(splitFunc)
for scanner.Scan() {
bs := scanner.Bytes()
if split != 0 {
bs = append(bs, split)
}
outCh <- bs
}
if err := scanner.Err(); err != nil {
log.Warnf(ctx, "[processVirtualizationOutStream] failed to read output from output stream: %v", err)
}
})
return outCh
}

func processBuildImageStream(ctx context.Context, reader io.ReadCloser) chan *types.BuildImageMessage {
ch := make(chan *types.BuildImageMessage)
utils.SentryGo(func() {
defer close(ch)
defer utils.EnsureReaderClosed(ctx, reader)
decoder := json.NewDecoder(reader)
for {
message := &types.BuildImageMessage{}
err := decoder.Decode(message)
if err != nil {
if err != io.EOF {
malformed, _ := ioutil.ReadAll(decoder.Buffered()) // TODO err check
log.Errorf(ctx, "[processBuildImageStream] Decode image message failed %v, buffered: %s", err, string(malformed))
message.Error = err.Error()
ch <- message
}
break
}
ch <- message
}
})
return ch
}

func processStdStream(ctx context.Context, stdout, stderr io.ReadCloser, splitFunc bufio.SplitFunc, split byte) chan types.StdStreamMessage {
ch := make(chan types.StdStreamMessage)

wg := sync.WaitGroup{}

wg.Add(1)
utils.SentryGo(func() {
defer wg.Done()
for data := range processVirtualizationOutStream(ctx, stdout, splitFunc, split) {
ch <- types.StdStreamMessage{Data: data, StdStreamType: types.Stdout}
}
})

wg.Add(1)
utils.SentryGo(func() {
defer wg.Done()
for data := range processVirtualizationOutStream(ctx, stderr, splitFunc, split) {
ch <- types.StdStreamMessage{Data: data, StdStreamType: types.Stderr}
}
})

utils.SentryGo(func() {
defer close(ch)
wg.Wait()
})

return ch
}
2 changes: 1 addition & 1 deletion cluster/calcium/hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (c *Calcium) doHook(
) ([]*bytes.Buffer, error) {
outputs := []*bytes.Buffer{}
for _, cmd := range cmds {
output, err := execuateInside(ctx, engine, ID, cmd, user, env, privileged)
output, err := c.execuateInside(ctx, engine, ID, cmd, user, env, privileged)
if err != nil {
// 执行 hook 的过程中,如果 cmdForce 为真并且不忽略 hook 就输出错误
outputs = append(outputs, bytes.NewBufferString(err.Error()))
Expand Down
4 changes: 2 additions & 2 deletions cluster/calcium/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ func (c *Calcium) RunAndWait(ctx context.Context, opts *types.DeployOptions, inC
}
}

processVirtualizationInStream(ctx, inStream, inCh, func(height, width uint) error {
c.processVirtualizationInStream(ctx, inStream, inCh, func(height, width uint) error {
return workload.Engine.VirtualizationResize(ctx, message.WorkloadID, height, width)
})

splitFunc, split = bufio.ScanBytes, byte(0)
}

for m := range processStdStream(ctx, stdout, stderr, splitFunc, split) {
for m := range c.processStdStream(ctx, stdout, stderr, splitFunc, split) {
runMsgCh <- &types.AttachWorkloadMessage{
WorkloadID: message.WorkloadID,
Data: m.Data,
Expand Down
2 changes: 1 addition & 1 deletion cluster/calcium/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (c *Calcium) LogStream(ctx context.Context, opts *types.LogStreamOptions) (
return
}

for m := range processStdStream(ctx, stdout, stderr, bufio.ScanLines, byte('\n')) {
for m := range c.processStdStream(ctx, stdout, stderr, bufio.ScanLines, byte('\n')) {
ch <- &types.LogStreamMessage{ID: opts.ID, Data: m.Data, StdStreamType: m.StdStreamType}
}
})
Expand Down

0 comments on commit ee1e7b5

Please sign in to comment.