Skip to content

Commit

Permalink
Merge pull request #195 from safing/feature/modules-microtask-improve…
Browse files Browse the repository at this point in the history
…ments

Microtasks Improvements and Module Status Export
  • Loading branch information
dhaavi committed Nov 10, 2022
2 parents 985a174 + f6f644f commit 72288a4
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 8 deletions.
6 changes: 3 additions & 3 deletions api/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,18 +388,18 @@ func (e *Endpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if eMethod != e.ReadMethod {
log.Tracer(r.Context()).Warningf(
"api: method %q does not match required read method %q%s",
" - this will be an error and abort the request in the future",
r.Method,
e.ReadMethod,
" - this will be an error and abort the request in the future",
)
}
} else {
if eMethod != e.WriteMethod {
log.Tracer(r.Context()).Warningf(
"api: method %q does not match required write method %q%s",
" - this will be an error and abort the request in the future",
r.Method,
e.ReadMethod,
e.WriteMethod,
" - this will be an error and abort the request in the future",
)
}
}
Expand Down
20 changes: 20 additions & 0 deletions api/endpoints_modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,21 @@ package api
import (
"errors"
"fmt"

"github.com/safing/portbase/modules"
)

func registerModulesEndpoints() error {
if err := RegisterEndpoint(Endpoint{
Path: "modules/status",
Read: PermitUser,
StructFunc: getStatusfunc,
Name: "Get Module Status",
Description: "Returns status information of all modules.",
}); err != nil {
return err
}

if err := RegisterEndpoint(Endpoint{
Path: "modules/{moduleName:.+}/trigger/{eventName:.+}",
Write: PermitSelf,
Expand All @@ -19,6 +31,14 @@ func registerModulesEndpoints() error {
return nil
}

func getStatusfunc(ar *Request) (i interface{}, err error) {
status := modules.GetStatus()
if status == nil {
return nil, errors.New("modules not yet initialized")
}
return status, nil
}

func triggerEvent(ar *Request) (msg string, err error) {
// Get parameters.
moduleName := ar.URLVars["moduleName"]
Expand Down
5 changes: 4 additions & 1 deletion log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ func log(level Severity, msg string, tracer *ContextTracer) {

// wake up writer if necessary
if logsWaitingFlag.SetToIf(false, true) {
logsWaiting <- struct{}{}
select {
case logsWaiting <- struct{}{}:
default:
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion log/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ var (
pkgLevels = make(map[string]Severity)
pkgLevelsLock sync.Mutex

logsWaiting = make(chan struct{}, 4)
logsWaiting = make(chan struct{}, 1)
logsWaitingFlag = abool.NewBool(false)

shutdownFlag = abool.NewBool(false)
Expand Down
117 changes: 117 additions & 0 deletions modules/export.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package modules

import "sync/atomic"

// Status holds an exported status summary of the modules system.
type Status struct {
Modules map[string]*ModuleStatus
Total struct {
Workers int
Tasks int
MicroTasks int
CtrlFuncRunning int
}
Config struct {
MicroTasksThreshhold int
MediumPriorityDelay string
LowPriorityDelay string
}
}

// ModuleStatus holds an exported status summary of one module.
type ModuleStatus struct { //nolint:maligned
Enabled bool

Status string
FailureType string
FailureID string
FailureMsg string

Workers int
Tasks int
MicroTasks int
CtrlFuncRunning bool
}

// GetStatus exports status data from the module system.
func GetStatus() *Status {
// Check if modules have been initialized.
if modulesLocked.IsNotSet() {
return nil
}

// Create new status.
status := &Status{
Modules: make(map[string]*ModuleStatus, len(modules)),
}

// Add config.
status.Config.MicroTasksThreshhold = int(atomic.LoadInt32(microTasksThreshhold))
status.Config.MediumPriorityDelay = defaultMediumPriorityMaxDelay.String()
status.Config.LowPriorityDelay = defaultLowPriorityMaxDelay.String()

// Gather status data.
for name, module := range modules {
moduleStatus := &ModuleStatus{
Enabled: module.Enabled(),
Status: getStatusName(module.Status()),
Workers: int(atomic.LoadInt32(module.workerCnt)),
Tasks: int(atomic.LoadInt32(module.taskCnt)),
MicroTasks: int(atomic.LoadInt32(module.microTaskCnt)),
CtrlFuncRunning: module.ctrlFuncRunning.IsSet(),
}

// Add failure status.
failureStatus, failureID, failureMsg := module.FailureStatus()
moduleStatus.FailureType = getFailureStatusName(failureStatus)
moduleStatus.FailureID = failureID
moduleStatus.FailureMsg = failureMsg

// Add to total counts.
status.Total.Workers += moduleStatus.Workers
status.Total.Tasks += moduleStatus.Tasks
status.Total.MicroTasks += moduleStatus.MicroTasks
if moduleStatus.CtrlFuncRunning {
status.Total.CtrlFuncRunning++
}

// Add to export.
status.Modules[name] = moduleStatus
}

return status
}

func getStatusName(status uint8) string {
switch status {
case StatusDead:
return "dead"
case StatusPreparing:
return "preparing"
case StatusOffline:
return "offline"
case StatusStopping:
return "stopping"
case StatusStarting:
return "starting"
case StatusOnline:
return "online"
default:
return "unknown"
}
}

func getFailureStatusName(status uint8) string {
switch status {
case FailureNone:
return ""
case FailureHint:
return "hint"
case FailureWarning:
return "warning"
case FailureError:
return "error"
default:
return "unknown"
}
}
3 changes: 2 additions & 1 deletion modules/microtasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ const (
func init() {
var microTasksVal int32
microTasks = &microTasksVal
var microTasksThreshholdVal int32

microTasksThreshholdVal := int32(runtime.GOMAXPROCS(0) * 2)
microTasksThreshhold = &microTasksThreshholdVal
}

Expand Down
2 changes: 0 additions & 2 deletions modules/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"os"
"runtime"

"github.com/tevino/abool"

Expand Down Expand Up @@ -36,7 +35,6 @@ func Start() error {
defer mgmtLock.Unlock()

// start microtask scheduler
SetMaxConcurrentMicroTasks(runtime.GOMAXPROCS(0))
go microTaskScheduler()

// inter-link modules
Expand Down

0 comments on commit 72288a4

Please sign in to comment.