Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,17 @@ func initAutoScaling(mainThread *phpMainThread) {
return
}

done := mainThread.done
mstate := mainThread.state

scalingMu.Lock()
scaleChan = make(chan *frankenPHPContext)
maxScaledThreads := mainThread.maxThreads - mainThread.numThreads
autoScaledThreads = make([]*phpThread, 0, maxScaledThreads)
scalingMu.Unlock()

go startUpscalingThreads(maxScaledThreads, scaleChan, mainThread.done)
go startDownScalingThreads(mainThread.done)
go startUpscalingThreads(maxScaledThreads, scaleChan, done, mstate)
go startDownScalingThreads(done)
}

func drainAutoScaling() {
Expand Down Expand Up @@ -81,16 +84,16 @@ func addWorkerThread(worker *worker) (*phpThread, error) {
}

// scaleWorkerThread adds a worker PHP thread automatically
func scaleWorkerThread(worker *worker) {
func scaleWorkerThread(worker *worker, done chan struct{}, mstate *state.ThreadState) {
// probe CPU usage before acquiring the lock (avoids holding lock during 120ms sleep)
if !cpu.ProbeCPUs(cpuProbeTime, maxCpuUsageForScaling, mainThread.done) {
if !cpu.ProbeCPUs(cpuProbeTime, maxCpuUsageForScaling, done) {
return
}

scalingMu.Lock()
defer scalingMu.Unlock()

if !mainThread.state.Is(state.Ready) {
if !mstate.Is(state.Ready) {
return
}

Expand All @@ -111,16 +114,16 @@ func scaleWorkerThread(worker *worker) {
}

// scaleRegularThread adds a regular PHP thread automatically
func scaleRegularThread() {
func scaleRegularThread(done chan struct{}, mstate *state.ThreadState) {
// probe CPU usage before acquiring the lock (avoids holding lock during 120ms sleep)
if !cpu.ProbeCPUs(cpuProbeTime, maxCpuUsageForScaling, mainThread.done) {
if !cpu.ProbeCPUs(cpuProbeTime, maxCpuUsageForScaling, done) {
return
}

scalingMu.Lock()
defer scalingMu.Unlock()

if !mainThread.state.Is(state.Ready) {
if !mstate.Is(state.Ready) {
return
}

Expand All @@ -140,7 +143,7 @@ func scaleRegularThread() {
}
}

func startUpscalingThreads(maxScaledThreads int, scale chan *frankenPHPContext, done chan struct{}) {
func startUpscalingThreads(maxScaledThreads int, scale chan *frankenPHPContext, done chan struct{}, mstate *state.ThreadState) {
for {
scalingMu.Lock()
scaledThreadCount := len(autoScaledThreads)
Expand Down Expand Up @@ -171,7 +174,7 @@ func startUpscalingThreads(maxScaledThreads int, scale chan *frankenPHPContext,

// if the request has been stalled long enough, scale
if fc.worker == nil {
scaleRegularThread()
scaleRegularThread(done, mstate)
continue
}

Expand All @@ -184,7 +187,7 @@ func startUpscalingThreads(maxScaledThreads int, scale chan *frankenPHPContext,
continue
}

scaleWorkerThread(fc.worker)
scaleWorkerThread(fc.worker, done, mstate)
case <-done:
return
}
Expand Down
6 changes: 3 additions & 3 deletions scaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestScaleARegularThreadUpAndDown(t *testing.T) {
autoScaledThread := phpThreads[1]

// scale up
scaleRegularThread()
scaleRegularThread(mainThread.done, mainThread.state)
assert.Equal(t, state.Ready, autoScaledThread.state.Get())
assert.IsType(t, &regularThread{}, autoScaledThread.handler)

Expand Down Expand Up @@ -48,7 +48,7 @@ func TestScaleAWorkerThreadUpAndDown(t *testing.T) {
autoScaledThread := phpThreads[2]

// scale up
scaleWorkerThread(workersByPath[workerPath])
scaleWorkerThread(workersByPath[workerPath], mainThread.done, mainThread.state)
assert.Equal(t, state.Ready, autoScaledThread.state.Get())

// on down-scale, the thread will be marked as inactive
Expand All @@ -69,7 +69,7 @@ func TestMaxIdleTimePreventsEarlyDeactivation(t *testing.T) {
autoScaledThread := phpThreads[1]

// scale up
scaleRegularThread()
scaleRegularThread(mainThread.done, mainThread.state)
assert.Equal(t, state.Ready, autoScaledThread.state.Get())

// set wait time to 30 minutes (less than 1 hour max idle time)
Expand Down
Loading