Skip to content

Commit

Permalink
Scaffold: FlowpipeConfig filewatcher (#701)
Browse files Browse the repository at this point in the history
  • Loading branch information
vhadianto committed Feb 22, 2024
1 parent e59349f commit 505e7de
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
4 changes: 3 additions & 1 deletion internal/es/db/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ func GetFlowpipeConfig() (*flowpipeconfig.FlowpipeConfig, error) {
flowpipeConfigCached, found := cache.GetCache().Get("#flowpipeconfig")

if !found {
return flowpipeconfig.NewFlowpipeConfig(), nil
// TODO: if we return an error all our "non mod based test" fail
// return nil, perr.BadRequestWithMessage("flowpipe config not found")
return &flowpipeconfig.FlowpipeConfig{}, nil
}

flowpipeConfig, ok := flowpipeConfigCached.(*flowpipeconfig.FlowpipeConfig)
Expand Down
19 changes: 14 additions & 5 deletions internal/service/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,20 @@ func (m *Manager) initializeResources() error {
cache.GetCache().SetWithTTL("#flowpipeconfig", flowpipeConfig, 24*7*52*99*time.Hour)
}

err = m.cacheConfigData()
if err != nil {
return err
}

if m.shouldStartAPI() {
err := flowpipeConfig.SetupWatcher(context.TODO(), func(c context.Context, e error) {

})
if err != nil {
return err
}
}

w, errorAndWarning := workspace.LoadWorkspacePromptingForVariables(
m.ctx,
modLocation,
Expand Down Expand Up @@ -275,11 +289,6 @@ func (m *Manager) initializeResources() error {
return err
}

err = m.cacheConfigData()
if err != nil {
return err
}

slog.Info("Pipelines and triggers loaded", "pipelines", len(pipelines), "triggers", len(triggers), "rootMod", rootModName)

m.RootMod = mod
Expand Down

0 comments on commit 505e7de

Please sign in to comment.