Skip to content

Commit

Permalink
[#109]: fix: return stop handler
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian committed Jan 28, 2024
2 parents a80d9de + e8f3ade commit fc71c6d
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 7 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ module github.com/roadrunner-server/sdk/v4

go 1.21

toolchain go1.22rc1
toolchain go1.21.6

require (
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.5.0
github.com/google/uuid v1.6.0
github.com/prometheus/client_golang v1.18.0
github.com/roadrunner-server/errors v1.3.0
github.com/roadrunner-server/goridge/v3 v3.8.1
Expand All @@ -28,7 +28,7 @@ require (
github.com/prometheus/procfs v0.12.0 // indirect
github.com/tklauser/go-sysconf v0.3.13 // indirect
github.com/tklauser/numcpus v0.7.0 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/sys v0.16.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand Down Expand Up @@ -43,8 +43,8 @@ github.com/tklauser/go-sysconf v0.3.13 h1:GBUpcahXSpR2xN01jhkNAbTLRk2Yzgggk8IM08
github.com/tklauser/go-sysconf v0.3.13/go.mod h1:zwleP4Q4OehZHGn4CYZDipCgg9usW5IJePewFCGVEa0=
github.com/tklauser/numcpus v0.7.0 h1:yjuerZP127QG9m5Zh/mSO4wqurYil27tHrqwRoRjpr4=
github.com/tklauser/numcpus v0.7.0/go.mod h1:bb6dMVcj8A42tSE7i32fsIUCbQNllK5iDguyOZRUzAY=
github.com/yusufpapurcu/wmi v1.2.3 h1:E1ctvB7uKFMOJw3fdOW32DwGE9I7t++CRUEMKvFoFiw=
github.com/yusufpapurcu/wmi v1.2.3/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0=
github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
Expand Down
14 changes: 14 additions & 0 deletions pool/static_pool/workers_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"runtime"
"sync"
"sync/atomic"
"unsafe"

"github.com/roadrunner-server/errors"
"github.com/roadrunner-server/goridge/v3/pkg/frame"
Expand All @@ -17,6 +18,11 @@ import (
"go.uber.org/zap"
)

const (
// StopRequest can be sent by worker to indicate that restart is required.
StopRequest = `{"stop":true}`
)

// Pool controls worker creation, destruction and task routing. Pool uses fixed amount of stack.
type Pool struct {
// pool configuration
Expand Down Expand Up @@ -235,6 +241,14 @@ begin:
}
}

// worker want's to be terminated
// unsafe is used to quickly transform []byte to string
if len(rsp.Body) == 0 && unsafe.String(unsafe.SliceData(rsp.Context), len(rsp.Context)) == StopRequest {
w.State().Transition(fsm.StateInvalid)
sp.ww.Release(w)
goto begin
}

switch {
case rsp.Flags&frame.STREAM != 0:
sp.log.Debug("stream mode", zap.Int64("pid", w.Pid()))
Expand Down
45 changes: 45 additions & 0 deletions pool/static_pool/workers_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,51 @@ func Test_StaticPool_NoFreeWorkers(t *testing.T) {
p.Destroy(ctx)
}

// identical to replace but controlled on worker side
func Test_StaticPool_Stop_Worker(t *testing.T) {
ctx := context.Background()
p, err := NewPool(
ctx,
func(cmd []string) *exec.Cmd { return exec.Command("php", "../../tests/client.php", "stop", "pipes") },
pipe.NewPipeFactory(log()),
&pool.Config{
NumWorkers: 1,
AllocateTimeout: time.Second,
DestroyTimeout: time.Second,
},
log(),
)
assert.NoError(t, err)
assert.NotNil(t, p)

defer p.Destroy(ctx)
time.Sleep(time.Second)

var lastPID string
lastPID = strconv.Itoa(int(p.Workers()[0].Pid()))

re, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{}))
assert.NoError(t, err)

res := <-re

assert.Equal(t, lastPID, string(res.Body()))

for i := 0; i < 10; i++ {
re, err := p.Exec(ctx, &payload.Payload{Body: []byte("hello")}, make(chan struct{}))

res := <-re

assert.NoError(t, err)
assert.NotNil(t, res)
assert.NotNil(t, res.Body())
assert.Empty(t, res.Context())

assert.NotEqual(t, lastPID, string(res.Body()))
lastPID = string(res.Body())
}
}

func Test_StaticPool_QueueSize(t *testing.T) {
ctx := context.Background()

Expand Down

0 comments on commit fc71c6d

Please sign in to comment.