Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions app.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (app *appImpl) init() error {
Log().Debug("hook PersistentPreRunPlugin", "plugins", plugins)
for _, p := range plugins {
if err := p.V.PersistentPreRun(cmd, args); err != nil {
Log().Debug("error on PersistentPreRunPlugin", "plugin", p.K.String())
Log().Error("error on PersistentPreRunPlugin", "plugin", p.K.String(), "err", err)
return err
}
}
Expand Down Expand Up @@ -144,7 +144,7 @@ func (app *appImpl) init() error {
Log().Debug("hook OnAppInitPlugin", "plugins", plugins)
for _, p := range plugins {
if err = p.V.OnAppInit(app); err != nil {
Log().Debug("error on OnAppInit", "plugin", p.K.String())
Log().Error("error on OnAppInit", "plugin", p.K.String(), "err", err)
return err
}
}
Expand All @@ -165,7 +165,7 @@ func (app *appImpl) exec() error {
Log().Debug("hook CobraPlugin", "plugins", plugins)
for _, p := range plugins {
if err := p.V.CobraAddCommands(app.cmd); err != nil {
Log().Debug("error on CobraAddCommands", "plugin", p.K.String())
Log().Error("error on CobraAddCommands", "plugin", p.K.String(), "err", err)
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (app *appImpl) gen() error {
}
err = p.V.Generate(config)
if err != nil {
Log().Debug("error on Generate", "plugin", p.K.String())
Log().Error("error on Generate", "plugin", p.K.String(), "err", err)
return err
}
}
Expand Down
34 changes: 34 additions & 0 deletions internal/launchr/iter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package launchr

import "iter"

// SliceSeqStateful allows for resuming iteration after slice grows.
type SliceSeqStateful[V any] struct {
slice *[]V
index int // index keeps track of where we are in slice.
}

// NewSliceSeqStateful creates a new [SliceSeqStateful].
func NewSliceSeqStateful[V any](slice *[]V) *SliceSeqStateful[V] {
return &SliceSeqStateful[V]{slice: slice}
}

// Seq returns a slice iterator.
func (seq *SliceSeqStateful[V]) Seq() iter.Seq[V] {
return func(yield func(V) bool) {
for seq.index < len(*seq.slice) {
if !yield((*seq.slice)[seq.index]) {
// Early return by user break.
seq.index++
return
}
seq.index++
}
// If iteration exhausts naturally, it stays exhausted until slice grows again
}
}

// Reset resets current index to start.
func (seq *SliceSeqStateful[V]) Reset() {
seq.index = 0
}
6 changes: 0 additions & 6 deletions pkg/action/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,6 @@ type DiscoveryPlugin interface {
DiscoverActions(ctx context.Context) ([]*Action, error)
}

// AlterActionsPlugin is a launchr plugin to alter registered actions.
type AlterActionsPlugin interface {
launchr.Plugin
AlterActions() error
}

// DiscoveryFS is a file system to discover actions.
type DiscoveryFS struct {
// fs is a filesystem where to discover actions.
Expand Down
135 changes: 117 additions & 18 deletions pkg/action/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package action

import (
"context"
"errors"
"fmt"
"maps"
"strconv"
Expand All @@ -11,6 +12,9 @@ import (
"github.com/launchrctl/launchr/internal/launchr"
)

// DiscoverActionsFn defines a function to discover actions.
type DiscoverActionsFn func(ctx context.Context) ([]*Action, error)

// Manager handles actions and its execution.
type Manager interface {
launchr.Service
Expand Down Expand Up @@ -39,6 +43,16 @@ type Manager interface {
// GetValueProcessors returns list of available processors
GetValueProcessors() map[string]ValueProcessor

// AddDiscovery registers a discovery callback to find actions.
AddDiscovery(DiscoverActionsFn)
// SetDiscoveryTimeout sets discovery timeout to stop on long-running callbacks.
SetDiscoveryTimeout(timeout time.Duration)

RunManager
}

// RunManager runs actions and stores runtime information about them.
type RunManager interface {
// Run executes an action in foreground.
Run(ctx context.Context, a *Action) (RunInfo, error)
// RunBackground executes an action in background.
Expand Down Expand Up @@ -70,22 +84,32 @@ type DecorateWithFn = func(m Manager, a *Action)
type actionManagerMap struct {
actionStore map[string]*Action
actionAliases map[string]string
runStore map[string]RunInfo // @todo consider persistent storage
mx sync.Mutex
mxRun sync.Mutex
dwFns []DecorateWithFn
processors map[string]ValueProcessor
idProvider IDProvider

// Actions discovery.
discoveryFns []DiscoverActionsFn
discoverySeq *launchr.SliceSeqStateful[DiscoverActionsFn]
discTimeout time.Duration

runManagerMap
}

// NewManager constructs a new action manager.
func NewManager(withFns ...DecorateWithFn) Manager {
return &actionManagerMap{
actionStore: make(map[string]*Action),
actionAliases: make(map[string]string),
runStore: make(map[string]RunInfo),
dwFns: withFns,
processors: make(map[string]ValueProcessor),

discTimeout: 10 * time.Second,

runManagerMap: runManagerMap{
runStore: make(map[string]RunInfo),
},
}
}

Expand All @@ -96,7 +120,10 @@ func (m *actionManagerMap) ServiceInfo() launchr.ServiceInfo {
func (m *actionManagerMap) Add(a *Action) error {
m.mx.Lock()
defer m.mx.Unlock()
return m.add(a)
}

func (m *actionManagerMap) add(a *Action) error {
// Check action loads properly.
def, err := a.Raw()
if err != nil {
Expand Down Expand Up @@ -130,6 +157,9 @@ func (m *actionManagerMap) Add(a *Action) error {
func (m *actionManagerMap) AllUnsafe() map[string]*Action {
m.mx.Lock()
defer m.mx.Unlock()
ctx, cancel := context.WithTimeout(context.Background(), m.discTimeout)
defer cancel()
_ = m.finalizeDiscovery(ctx)
return maps.Clone(m.actionStore)
}

Expand Down Expand Up @@ -169,13 +199,77 @@ func (m *actionManagerMap) Get(id string) (*Action, bool) {
return m.Decorate(a, m.dwFns...), ok
}

func (m *actionManagerMap) GetUnsafe(id string) (*Action, bool) {
func (m *actionManagerMap) GetUnsafe(id string) (a *Action, ok bool) {
m.mx.Lock()
defer m.mx.Unlock()

a, ok = m.get(id)
if ok {
return a, ok
}

ctx, cancel := context.WithTimeout(context.Background(), m.discTimeout)
defer cancel()
for fn := range m.discoverySeq.Seq() {
if err := m.callDiscoveryFn(ctx, fn); err != nil {
continue
}

a, ok = m.get(id)
if ok {
return a, ok
}
}

return a, ok
}

func (m *actionManagerMap) get(id string) (*Action, bool) {
id = m.GetIDFromAlias(id)
a, ok := m.actionStore[id]
return a, ok
}

func (m *actionManagerMap) SetDiscoveryTimeout(timeout time.Duration) {
m.discTimeout = timeout
}

func (m *actionManagerMap) AddDiscovery(fn DiscoverActionsFn) {
if m.discoveryFns == nil {
m.discoveryFns = make([]DiscoverActionsFn, 0, 1)
m.discoverySeq = launchr.NewSliceSeqStateful(&m.discoveryFns)
}
m.discoveryFns = append(m.discoveryFns, fn)
}

func (m *actionManagerMap) finalizeDiscovery(ctx context.Context) error {
errs := make([]error, 0)
for fn := range m.discoverySeq.Seq() {
err := m.callDiscoveryFn(ctx, fn)
if err != nil {
errs = append(errs, err)
}
}
return errors.Join(errs...)
}

func (m *actionManagerMap) callDiscoveryFn(ctx context.Context, fn DiscoverActionsFn) error {
actions, err := fn(ctx)
if err != nil {
return err
}
// Add discovered actions.
for _, a := range actions {
err = m.add(a)
if err != nil {
launchr.Log().Warn("action was skipped due to error", "action_id", a.ID, "error", err)
launchr.Term().Warning().Printfln("Action %q was skipped:\n%v", a.ID, err)
continue
}
}
return nil
}

func (m *actionManagerMap) AddValueProcessor(name string, vp ValueProcessor) {
if _, ok := m.processors[name]; ok {
panic(fmt.Sprintf("processor `%q` with the same name already exists", name))
Expand Down Expand Up @@ -223,10 +317,15 @@ type RunInfo struct {
// @todo add more info for status like error message or exit code. Or have it in output.
}

func (m *actionManagerMap) registerRun(a *Action, id string) RunInfo {
type runManagerMap struct {
runStore map[string]RunInfo // @todo consider persistent storage
mx sync.Mutex
}

func (m *runManagerMap) registerRun(a *Action, id string) RunInfo {
// @todo rethink the implementation
m.mxRun.Lock()
defer m.mxRun.Unlock()
m.mx.Lock()
defer m.mx.Unlock()
if id == "" {
id = strconv.FormatInt(time.Now().Unix(), 10) + "-" + a.ID
}
Expand All @@ -240,21 +339,21 @@ func (m *actionManagerMap) registerRun(a *Action, id string) RunInfo {
return ri
}

func (m *actionManagerMap) updateRunStatus(id string, st string) {
m.mxRun.Lock()
defer m.mxRun.Unlock()
func (m *runManagerMap) updateRunStatus(id string, st string) {
m.mx.Lock()
defer m.mx.Unlock()
if ri, ok := m.runStore[id]; ok {
ri.Status = st
m.runStore[id] = ri
}
}

func (m *actionManagerMap) Run(ctx context.Context, a *Action) (RunInfo, error) {
func (m *runManagerMap) Run(ctx context.Context, a *Action) (RunInfo, error) {
// @todo add the same status change info
return m.registerRun(a, ""), a.Execute(ctx)
}

func (m *actionManagerMap) RunBackground(ctx context.Context, a *Action, runID string) (RunInfo, chan error) {
func (m *runManagerMap) RunBackground(ctx context.Context, a *Action, runID string) (RunInfo, chan error) {
// @todo change runID to runOptions with possibility to create filestream names in webUI.
ri := m.registerRun(a, runID)
chErr := make(chan error)
Expand All @@ -273,9 +372,9 @@ func (m *actionManagerMap) RunBackground(ctx context.Context, a *Action, runID s
return ri, chErr
}

func (m *actionManagerMap) RunInfoByAction(aid string) []RunInfo {
m.mxRun.Lock()
defer m.mxRun.Unlock()
func (m *runManagerMap) RunInfoByAction(aid string) []RunInfo {
m.mx.Lock()
defer m.mx.Unlock()
run := make([]RunInfo, 0, len(m.runStore)/2)
for _, v := range m.runStore {
if v.Action.ID == aid {
Expand All @@ -285,9 +384,9 @@ func (m *actionManagerMap) RunInfoByAction(aid string) []RunInfo {
return run
}

func (m *actionManagerMap) RunInfoByID(id string) (RunInfo, bool) {
m.mxRun.Lock()
defer m.mxRun.Unlock()
func (m *runManagerMap) RunInfoByID(id string) (RunInfo, bool) {
m.mx.Lock()
defer m.mx.Unlock()
ri, ok := m.runStore[id]
return ri, ok
}
Expand Down
Loading