diff --git a/app.go b/app.go index a61b9f7..f4b3936 100644 --- a/app.go +++ b/app.go @@ -97,7 +97,7 @@ func (app *appImpl) init() error { Log().Debug("hook PersistentPreRunPlugin", "plugins", plugins) for _, p := range plugins { if err := p.V.PersistentPreRun(cmd, args); err != nil { - Log().Debug("error on PersistentPreRunPlugin", "plugin", p.K.String()) + Log().Error("error on PersistentPreRunPlugin", "plugin", p.K.String(), "err", err) return err } } @@ -144,7 +144,7 @@ func (app *appImpl) init() error { Log().Debug("hook OnAppInitPlugin", "plugins", plugins) for _, p := range plugins { if err = p.V.OnAppInit(app); err != nil { - Log().Debug("error on OnAppInit", "plugin", p.K.String()) + Log().Error("error on OnAppInit", "plugin", p.K.String(), "err", err) return err } } @@ -165,7 +165,7 @@ func (app *appImpl) exec() error { Log().Debug("hook CobraPlugin", "plugins", plugins) for _, p := range plugins { if err := p.V.CobraAddCommands(app.cmd); err != nil { - Log().Debug("error on CobraAddCommands", "plugin", p.K.String()) + Log().Error("error on CobraAddCommands", "plugin", p.K.String(), "err", err) return err } } diff --git a/gen.go b/gen.go index 100e798..ddc8103 100644 --- a/gen.go +++ b/gen.go @@ -37,7 +37,7 @@ func (app *appImpl) gen() error { } err = p.V.Generate(config) if err != nil { - Log().Debug("error on Generate", "plugin", p.K.String()) + Log().Error("error on Generate", "plugin", p.K.String(), "err", err) return err } } diff --git a/internal/launchr/iter.go b/internal/launchr/iter.go new file mode 100644 index 0000000..9802d50 --- /dev/null +++ b/internal/launchr/iter.go @@ -0,0 +1,34 @@ +package launchr + +import "iter" + +// SliceSeqStateful allows for resuming iteration after slice grows. +type SliceSeqStateful[V any] struct { + slice *[]V + index int // index keeps track of where we are in slice. +} + +// NewSliceSeqStateful creates a new [SliceSeqStateful]. +func NewSliceSeqStateful[V any](slice *[]V) *SliceSeqStateful[V] { + return &SliceSeqStateful[V]{slice: slice} +} + +// Seq returns a slice iterator. +func (seq *SliceSeqStateful[V]) Seq() iter.Seq[V] { + return func(yield func(V) bool) { + for seq.index < len(*seq.slice) { + if !yield((*seq.slice)[seq.index]) { + // Early return by user break. + seq.index++ + return + } + seq.index++ + } + // If iteration exhausts naturally, it stays exhausted until slice grows again + } +} + +// Reset resets current index to start. +func (seq *SliceSeqStateful[V]) Reset() { + seq.index = 0 +} diff --git a/pkg/action/discover.go b/pkg/action/discover.go index 1fad2db..445aebb 100644 --- a/pkg/action/discover.go +++ b/pkg/action/discover.go @@ -24,12 +24,6 @@ type DiscoveryPlugin interface { DiscoverActions(ctx context.Context) ([]*Action, error) } -// AlterActionsPlugin is a launchr plugin to alter registered actions. -type AlterActionsPlugin interface { - launchr.Plugin - AlterActions() error -} - // DiscoveryFS is a file system to discover actions. type DiscoveryFS struct { // fs is a filesystem where to discover actions. diff --git a/pkg/action/manager.go b/pkg/action/manager.go index ac45479..b714238 100644 --- a/pkg/action/manager.go +++ b/pkg/action/manager.go @@ -2,6 +2,7 @@ package action import ( "context" + "errors" "fmt" "maps" "strconv" @@ -11,6 +12,9 @@ import ( "github.com/launchrctl/launchr/internal/launchr" ) +// DiscoverActionsFn defines a function to discover actions. +type DiscoverActionsFn func(ctx context.Context) ([]*Action, error) + // Manager handles actions and its execution. type Manager interface { launchr.Service @@ -39,6 +43,16 @@ type Manager interface { // GetValueProcessors returns list of available processors GetValueProcessors() map[string]ValueProcessor + // AddDiscovery registers a discovery callback to find actions. + AddDiscovery(DiscoverActionsFn) + // SetDiscoveryTimeout sets discovery timeout to stop on long-running callbacks. + SetDiscoveryTimeout(timeout time.Duration) + + RunManager +} + +// RunManager runs actions and stores runtime information about them. +type RunManager interface { // Run executes an action in foreground. Run(ctx context.Context, a *Action) (RunInfo, error) // RunBackground executes an action in background. @@ -70,12 +84,17 @@ type DecorateWithFn = func(m Manager, a *Action) type actionManagerMap struct { actionStore map[string]*Action actionAliases map[string]string - runStore map[string]RunInfo // @todo consider persistent storage mx sync.Mutex - mxRun sync.Mutex dwFns []DecorateWithFn processors map[string]ValueProcessor idProvider IDProvider + + // Actions discovery. + discoveryFns []DiscoverActionsFn + discoverySeq *launchr.SliceSeqStateful[DiscoverActionsFn] + discTimeout time.Duration + + runManagerMap } // NewManager constructs a new action manager. @@ -83,9 +102,14 @@ func NewManager(withFns ...DecorateWithFn) Manager { return &actionManagerMap{ actionStore: make(map[string]*Action), actionAliases: make(map[string]string), - runStore: make(map[string]RunInfo), dwFns: withFns, processors: make(map[string]ValueProcessor), + + discTimeout: 10 * time.Second, + + runManagerMap: runManagerMap{ + runStore: make(map[string]RunInfo), + }, } } @@ -96,7 +120,10 @@ func (m *actionManagerMap) ServiceInfo() launchr.ServiceInfo { func (m *actionManagerMap) Add(a *Action) error { m.mx.Lock() defer m.mx.Unlock() + return m.add(a) +} +func (m *actionManagerMap) add(a *Action) error { // Check action loads properly. def, err := a.Raw() if err != nil { @@ -130,6 +157,9 @@ func (m *actionManagerMap) Add(a *Action) error { func (m *actionManagerMap) AllUnsafe() map[string]*Action { m.mx.Lock() defer m.mx.Unlock() + ctx, cancel := context.WithTimeout(context.Background(), m.discTimeout) + defer cancel() + _ = m.finalizeDiscovery(ctx) return maps.Clone(m.actionStore) } @@ -169,13 +199,77 @@ func (m *actionManagerMap) Get(id string) (*Action, bool) { return m.Decorate(a, m.dwFns...), ok } -func (m *actionManagerMap) GetUnsafe(id string) (*Action, bool) { +func (m *actionManagerMap) GetUnsafe(id string) (a *Action, ok bool) { m.mx.Lock() defer m.mx.Unlock() + + a, ok = m.get(id) + if ok { + return a, ok + } + + ctx, cancel := context.WithTimeout(context.Background(), m.discTimeout) + defer cancel() + for fn := range m.discoverySeq.Seq() { + if err := m.callDiscoveryFn(ctx, fn); err != nil { + continue + } + + a, ok = m.get(id) + if ok { + return a, ok + } + } + + return a, ok +} + +func (m *actionManagerMap) get(id string) (*Action, bool) { + id = m.GetIDFromAlias(id) a, ok := m.actionStore[id] return a, ok } +func (m *actionManagerMap) SetDiscoveryTimeout(timeout time.Duration) { + m.discTimeout = timeout +} + +func (m *actionManagerMap) AddDiscovery(fn DiscoverActionsFn) { + if m.discoveryFns == nil { + m.discoveryFns = make([]DiscoverActionsFn, 0, 1) + m.discoverySeq = launchr.NewSliceSeqStateful(&m.discoveryFns) + } + m.discoveryFns = append(m.discoveryFns, fn) +} + +func (m *actionManagerMap) finalizeDiscovery(ctx context.Context) error { + errs := make([]error, 0) + for fn := range m.discoverySeq.Seq() { + err := m.callDiscoveryFn(ctx, fn) + if err != nil { + errs = append(errs, err) + } + } + return errors.Join(errs...) +} + +func (m *actionManagerMap) callDiscoveryFn(ctx context.Context, fn DiscoverActionsFn) error { + actions, err := fn(ctx) + if err != nil { + return err + } + // Add discovered actions. + for _, a := range actions { + err = m.add(a) + if err != nil { + launchr.Log().Warn("action was skipped due to error", "action_id", a.ID, "error", err) + launchr.Term().Warning().Printfln("Action %q was skipped:\n%v", a.ID, err) + continue + } + } + return nil +} + func (m *actionManagerMap) AddValueProcessor(name string, vp ValueProcessor) { if _, ok := m.processors[name]; ok { panic(fmt.Sprintf("processor `%q` with the same name already exists", name)) @@ -223,10 +317,15 @@ type RunInfo struct { // @todo add more info for status like error message or exit code. Or have it in output. } -func (m *actionManagerMap) registerRun(a *Action, id string) RunInfo { +type runManagerMap struct { + runStore map[string]RunInfo // @todo consider persistent storage + mx sync.Mutex +} + +func (m *runManagerMap) registerRun(a *Action, id string) RunInfo { // @todo rethink the implementation - m.mxRun.Lock() - defer m.mxRun.Unlock() + m.mx.Lock() + defer m.mx.Unlock() if id == "" { id = strconv.FormatInt(time.Now().Unix(), 10) + "-" + a.ID } @@ -240,21 +339,21 @@ func (m *actionManagerMap) registerRun(a *Action, id string) RunInfo { return ri } -func (m *actionManagerMap) updateRunStatus(id string, st string) { - m.mxRun.Lock() - defer m.mxRun.Unlock() +func (m *runManagerMap) updateRunStatus(id string, st string) { + m.mx.Lock() + defer m.mx.Unlock() if ri, ok := m.runStore[id]; ok { ri.Status = st m.runStore[id] = ri } } -func (m *actionManagerMap) Run(ctx context.Context, a *Action) (RunInfo, error) { +func (m *runManagerMap) Run(ctx context.Context, a *Action) (RunInfo, error) { // @todo add the same status change info return m.registerRun(a, ""), a.Execute(ctx) } -func (m *actionManagerMap) RunBackground(ctx context.Context, a *Action, runID string) (RunInfo, chan error) { +func (m *runManagerMap) RunBackground(ctx context.Context, a *Action, runID string) (RunInfo, chan error) { // @todo change runID to runOptions with possibility to create filestream names in webUI. ri := m.registerRun(a, runID) chErr := make(chan error) @@ -273,9 +372,9 @@ func (m *actionManagerMap) RunBackground(ctx context.Context, a *Action, runID s return ri, chErr } -func (m *actionManagerMap) RunInfoByAction(aid string) []RunInfo { - m.mxRun.Lock() - defer m.mxRun.Unlock() +func (m *runManagerMap) RunInfoByAction(aid string) []RunInfo { + m.mx.Lock() + defer m.mx.Unlock() run := make([]RunInfo, 0, len(m.runStore)/2) for _, v := range m.runStore { if v.Action.ID == aid { @@ -285,9 +384,9 @@ func (m *actionManagerMap) RunInfoByAction(aid string) []RunInfo { return run } -func (m *actionManagerMap) RunInfoByID(id string) (RunInfo, bool) { - m.mxRun.Lock() - defer m.mxRun.Unlock() +func (m *runManagerMap) RunInfoByID(id string) (RunInfo, bool) { + m.mx.Lock() + defer m.mx.Unlock() ri, ok := m.runStore[id] return ri, ok } diff --git a/plugins/actionscobra/plugin.go b/plugins/actionscobra/plugin.go index d18b612..b03df2e 100644 --- a/plugins/actionscobra/plugin.go +++ b/plugins/actionscobra/plugin.go @@ -3,7 +3,6 @@ package actionscobra import ( "context" - "errors" "math" "time" @@ -11,10 +10,6 @@ import ( "github.com/launchrctl/launchr/pkg/action" ) -var ( - errDiscoveryTimeout = "action discovery timeout exceeded" -) - // ActionsGroup is a command group definition. var ActionsGroup = &launchr.CommandGroup{ ID: "actions", @@ -30,9 +25,6 @@ type Plugin struct { app launchr.AppInternal am action.Manager pm launchr.PluginManager - - // reqaid is a action id that was requested to run in cli. - reqaid string } // PluginInfo implements [launchr.Plugin] interface. @@ -60,52 +52,20 @@ func (p *Plugin) discoverActions() (err error) { } // @todo configure timeout from flags // Define timeout for cases when we may traverse the whole FS, e.g. in / or home. - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() + p.am.SetDiscoveryTimeout(30 * time.Second) plugins := launchr.GetPluginByType[action.DiscoveryPlugin](p.pm) launchr.Log().Debug("hook DiscoveryPlugin", "plugins", plugins) for _, pldisc := range plugins { - actions, errDis := pldisc.V.DiscoverActions(ctx) - if errDis != nil { - launchr.Log().Debug("error on DiscoverActions", "plugin", pldisc.K.String()) - return errDis - } - - // Add discovered actions. - for _, a := range actions { - err = p.am.Add(a) - if err != nil { - launchr.Log().Warn("action was skipped due to error", "action_id", a.ID, "error", err) - launchr.Term().Warning().Printfln("Action %q was skipped:\n%v", a.ID, err) - continue + p.am.AddDiscovery(func(ctx context.Context) ([]*action.Action, error) { + actions, errDisc := pldisc.V.DiscoverActions(ctx) + if errDisc != nil { + launchr.Log().Error("error on DiscoverActions", "plugin", pldisc.K.String(), "err", errDisc) } - } - - // Stop discovery if the requested command found. - // Check if an alias was provided to find the real action. - aid := p.am.GetIDFromAlias(early.Command) - if _, ok := p.am.Get(aid); ok { - p.reqaid = aid - // @fixme There is an issue that we can't call other actions because they are not discovered. - //break discoveryLoop - } - } - // Failed to discover actions in reasonable time. - if errCtx := ctx.Err(); errCtx != nil { - return errors.New(errDiscoveryTimeout) + return actions, errDisc + }) } - // Alter all registered actions. - plalter := launchr.GetPluginByType[action.AlterActionsPlugin](p.pm) - launchr.Log().Debug("hook AlterActionsPlugin", "plugins", plalter) - for _, pl := range plalter { - err = pl.V.AlterActions() - if err != nil { - launchr.Log().Debug("error on AlterActions", "plugin", pl.K.String()) - return err - } - } // @todo maybe cache discovery result for performance. return err } @@ -117,13 +77,10 @@ func (p *Plugin) CobraAddCommands(rootCmd *launchr.Command) error { // Convert actions to cobra commands. // Check the requested command to see what actions we must actually load. var actions map[string]*action.Action - if p.reqaid != "" { + a, ok := p.am.Get(early.Command) + if ok { // Use only the requested action. - a, ok := p.am.Get(p.reqaid) - if !ok { - panic("unexpected action id provided") - } - actions = map[string]*action.Action{p.reqaid: a} + actions = map[string]*action.Action{a.ID: a} } else if early.Command != "" { // Action was not requested, no need to load them. return nil diff --git a/types.go b/types.go index 1f60e38..c9d183c 100644 --- a/types.go +++ b/types.go @@ -88,8 +88,6 @@ type ( OnAppInitPlugin = launchr.OnAppInitPlugin // ActionDiscoveryPlugin is an interface to implement a plugin to discover actions. ActionDiscoveryPlugin = action.DiscoveryPlugin - // ActionsAlterPlugin is in interface to implement a plugin to alter registered actions. - ActionsAlterPlugin = action.AlterActionsPlugin // CobraPlugin is an interface to implement a plugin for cobra. CobraPlugin = launchr.CobraPlugin // PersistentPreRunPlugin is an interface to implement a plugin