Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 44 additions & 7 deletions pkg/blockcontroller/blockcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"log"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/wavetermdev/waveterm/pkg/filestore"
Expand Down Expand Up @@ -77,6 +78,7 @@ type BlockController struct {
ShellInputCh chan *BlockInputUnion
ShellProcStatus string
ShellProcExitCode int
RunLock *atomic.Bool
}

type BlockControllerRuntimeStatus struct {
Expand Down Expand Up @@ -492,7 +494,9 @@ func (bc *BlockController) DoRunShellCommand(rc *RunShellOpts, blockMeta waveobj
defer func() {
wshutil.DefaultRouter.UnregisterRoute(wshutil.MakeControllerRouteId(bc.BlockId))
bc.UpdateControllerAndSendUpdate(func() bool {
bc.ShellProcStatus = Status_Done
if bc.ShellProcStatus == Status_Running {
bc.ShellProcStatus = Status_Done
}
bc.ShellProcExitCode = exitCode
return true
})
Expand Down Expand Up @@ -568,7 +572,31 @@ func setTermSize(ctx context.Context, blockId string, termSize waveobj.TermSize)
return nil
}

func (bc *BlockController) LockRunLock() bool {
rtn := bc.RunLock.CompareAndSwap(false, true)
if rtn {
log.Printf("block %q run() lock\n", bc.BlockId)
}
return rtn
}

func (bc *BlockController) UnlockRunLock() {
bc.RunLock.Store(false)
log.Printf("block %q run() unlock\n", bc.BlockId)
}

func (bc *BlockController) run(bdata *waveobj.Block, blockMeta map[string]any, rtOpts *waveobj.RuntimeOpts, force bool) {
runningShellCommand := false
ok := bc.LockRunLock()
if !ok {
log.Printf("block %q is already executing run()\n", bc.BlockId)
return
}
defer func() {
if !runningShellCommand {
bc.UnlockRunLock()
}
}()
curStatus := bc.GetRuntimeStatus()
controllerName := bdata.Meta.GetString(waveobj.MetaKey_Controller, "")
if controllerName != BlockController_Shell && controllerName != BlockController_Cmd {
Expand Down Expand Up @@ -597,8 +625,10 @@ func (bc *BlockController) run(bdata *waveobj.Block, blockMeta map[string]any, r
return
}
}
runningShellCommand = true
go func() {
defer panichandler.PanicHandler("blockcontroller:run-shell-command")
defer bc.UnlockRunLock()
var termSize waveobj.TermSize
if rtOpts != nil {
termSize = rtOpts.TermSize
Expand Down Expand Up @@ -658,7 +688,7 @@ func CheckConnStatus(blockId string) error {
func (bc *BlockController) StopShellProc(shouldWait bool) {
bc.Lock.Lock()
defer bc.Lock.Unlock()
if bc.ShellProc == nil || bc.ShellProcStatus == Status_Done {
if bc.ShellProc == nil || bc.ShellProcStatus == Status_Done || bc.ShellProcStatus == Status_Init {
return
}
bc.ShellProc.Close()
Expand Down Expand Up @@ -689,6 +719,7 @@ func getOrCreateBlockController(tabId string, blockId string, controllerName str
TabId: tabId,
BlockId: blockId,
ShellProcStatus: Status_Init,
RunLock: &atomic.Bool{},
}
blockControllerMap[blockId] = bc
createdController = true
Expand Down Expand Up @@ -716,11 +747,13 @@ func ResyncController(ctx context.Context, tabId string, blockId string, rtOpts
}
return nil
}
// check if conn is different, if so, stop the current controller
log.Printf("resync controller %s %q (%q) (force %v)\n", blockId, controllerName, connName, force)
// check if conn is different, if so, stop the current controller, and set status back to init
if curBc != nil {
bcStatus := curBc.GetRuntimeStatus()
if bcStatus.ShellProcStatus == Status_Running && bcStatus.ShellProcConnName != connName {
StopBlockController(blockId)
log.Printf("stopping blockcontroller %s due to conn change\n", blockId)
StopBlockControllerAndSetStatus(blockId, Status_Init)
}
}
// now if there is a conn, ensure it is connected
Expand Down Expand Up @@ -754,20 +787,20 @@ func startBlockController(ctx context.Context, tabId string, blockId string, rtO
return fmt.Errorf("unknown controller %q", controllerName)
}
connName := blockData.Meta.GetString(waveobj.MetaKey_Connection, "")
log.Printf("start blockcontroller %s %q (%q)\n", blockId, controllerName, connName)
err = CheckConnStatus(blockId)
if err != nil {
return fmt.Errorf("cannot start shellproc: %w", err)
}
bc := getOrCreateBlockController(tabId, blockId, controllerName)
bcStatus := bc.GetRuntimeStatus()
log.Printf("start blockcontroller %s %q (%q) (curstatus %s) (force %v)\n", blockId, controllerName, connName, bcStatus.ShellProcStatus, force)
if bcStatus.ShellProcStatus == Status_Init || bcStatus.ShellProcStatus == Status_Done {
go bc.run(blockData, blockData.Meta, rtOpts, force)
}
return nil
}

func StopBlockController(blockId string) {
func StopBlockControllerAndSetStatus(blockId string, newStatus string) {
bc := GetBlockController(blockId)
if bc == nil {
return
Expand All @@ -776,13 +809,17 @@ func StopBlockController(blockId string) {
bc.ShellProc.Close()
<-bc.ShellProc.DoneCh
bc.UpdateControllerAndSendUpdate(func() bool {
bc.ShellProcStatus = Status_Done
bc.ShellProcStatus = newStatus
return true
})
}

}

func StopBlockController(blockId string) {
StopBlockControllerAndSetStatus(blockId, Status_Done)
}

func getControllerList() []*BlockController {
globalLock.Lock()
defer globalLock.Unlock()
Expand Down
1 change: 0 additions & 1 deletion pkg/service/clientservice/clientservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func (cs *ClientService) GetAllConnStatus(ctx context.Context) ([]wshrpc.ConnSta

// moves the window to the front of the windowId stack
func (cs *ClientService) FocusWindow(ctx context.Context, windowId string) error {
log.Printf("FocusWindow %s\n", windowId)
return wcore.FocusWindow(ctx, windowId)
}

Expand Down