Skip to content

Commit

Permalink
Merge branch 'v0.4.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
vhadianto committed Apr 1, 2024
2 parents 795ec79 + 639a751 commit 3f4aa8a
Show file tree
Hide file tree
Showing 21 changed files with 220 additions and 74 deletions.
17 changes: 13 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
# Flowpipe

## v0.4.2 [2023-03-26]
## v0.4.3 [2024-04-01]

_Bug fixes_

* Lazy create `flowpipe.db`. ([#808](https://github.com/turbot/flowpipe/issues/808)).
* Respect `max_concurrency` in `pipeline` and `input` steps. ([#815](https://github.com/turbot/flowpipe/issues/815)).
* Misleading error message for invalid step dependencies. ([#816](https://github.com/turbot/flowpipe/issues/816)).
* HTTP integration address is shown correctly at the beginning of each input step loop. ([#818](https://github.com/turbot/flowpipe/issues/818)).

## v0.4.2 [2024-03-26]

_Bug fixes_

* `loop` block now works in `container`, `function`, `message` and `input` steps.
* Use HCL expressions in `max_concurrency` step argument. ([#800](https://github.com/turbot/flowpipe/issues/800)).
* `throw`, `retry` and `error` block now works for `input` step.

## v0.4.1 [2023-03-19]
## v0.4.1 [2024-03-19]

_Bug fixes_

Expand All @@ -18,7 +27,7 @@ _Bug fixes_
* Integration and input step URLs should use the provided custom host & port. ([#792](https://github.com/turbot/flowpipe/issues/792)).
* Shows filename and line number for invalid step references.

## v0.4.0 [2023-03-14]
## v0.4.0 [2024-03-14]

_What's new?_

Expand All @@ -29,7 +38,7 @@ _Bug fixes_
* Step output attribute should be called `response` not `result`. ([#789](https://github.com/turbot/flowpipe/issues/789))
* Pipeline execution should not fail when a string argument is passed with double quotes. ([#791](https://github.com/turbot/flowpipe/issues/791))

## v0.3.2 [2023-03-11]
## v0.3.2 [2024-03-11]

_Bug fixes_

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
github.com/mattn/go-sqlite3 v1.14.19
github.com/sagikazarmark/slog-shim v0.1.0
github.com/turbot/flowpipe-sdk-go v0.3.0
github.com/turbot/pipe-fittings v1.0.3
github.com/turbot/pipe-fittings v1.0.4
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
golang.org/x/sync v0.5.0
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -946,8 +946,8 @@ github.com/turbot/flowpipe-sdk-go v0.3.0 h1:67RUlxI8hTjVM1I1bGnaLzNtG9RTzGKaF+Ju
github.com/turbot/flowpipe-sdk-go v0.3.0/go.mod h1:bcANgfiAkvdOvVWOmlm5Ss5ML0oZCdfkhMhNnKT1pZA=
github.com/turbot/go-kit v0.10.0-rc.0 h1:kd+jp2ibbIV33Hc8SsMAN410Dl9Pz6SJ40axbKUlSoA=
github.com/turbot/go-kit v0.10.0-rc.0/go.mod h1:fFQqR59I5z5JeeBLfK1PjSifn4Oprs3NiQx0CxeSJxs=
github.com/turbot/pipe-fittings v1.0.3 h1:RLNUtb7Ddn6+mK3UHn2k/JJsZ38MyENPfP+gbGluvTw=
github.com/turbot/pipe-fittings v1.0.3/go.mod h1:ghZa4S19iBIpMvNM0XJV9+vHErPEQHYZyKo+v7PXcaU=
github.com/turbot/pipe-fittings v1.0.4 h1:ZLweOrwNRbVKOlde854XJER7F3GHxIen835UI1Qfs+Q=
github.com/turbot/pipe-fittings v1.0.4/go.mod h1:ghZa4S19iBIpMvNM0XJV9+vHErPEQHYZyKo+v7PXcaU=
github.com/turbot/steampipe-cloud-sdk-go v0.6.0 h1:ufAxOpKS1uq7eejuE5sfEu1+d7QAd0RBjl8Bn6+mIs8=
github.com/turbot/steampipe-cloud-sdk-go v0.6.0/go.mod h1:M42TMBdMim4bV1YTMxhKyzfSGSMo4CXUkm3wt9w7t1Y=
github.com/turbot/steampipe-plugin-code v0.7.0 h1:SROYIo/TI/Q/YNfXK+sAIS71umypUFm1Uz851TmoJkM=
Expand Down
2 changes: 2 additions & 0 deletions internal/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ const (
ExecutionModeAsynchronous = "asynchronous"

MaxScanSize = bufio.MaxScanTokenSize * 40

FormUrl = "form_url"
)

const FlowpipeSampleContent = `
Expand Down
5 changes: 5 additions & 0 deletions internal/es/command/step_pipeline_finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,11 @@ func (h StepPipelineFinishHandler) Handle(ctx context.Context, c interface{}) er
}
}

err = execution.ReleasePipelineExecutionStepSemaphore(cmd.PipelineExecutionID, stepDefn)
if err != nil {
return h.EventBus.Publish(ctx, event.NewPipelineFailedFromStepPipelineFinish(cmd, err))
}

e, err := event.NewStepFinished(event.ForPipelineStepFinish(cmd))
e.StepLoop = stepLoop

Expand Down
3 changes: 3 additions & 0 deletions internal/es/command/step_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func (h StepStartHandler) Handle(ctx context.Context, c interface{}) error {
if stepDefn.GetType() == schema.BlockTypePipelineStepInput {
slog.Debug("Step execution is an input step, not releasing semaphore", "step_name", cmd.StepName, "pipeline_execution_id", cmd.PipelineExecutionID)
return
} else if stepDefn.GetType() == schema.BlockTypePipelineStepPipeline {
slog.Debug("Step execution is a pipeline step, not releasing semaphore", "step_name", cmd.StepName, "pipeline_execution_id", cmd.PipelineExecutionID)
return
}

err := execution.ReleasePipelineExecutionStepSemaphore(cmd.PipelineExecutionID, stepDefn)
Expand Down
4 changes: 3 additions & 1 deletion internal/es/estest/default_mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
"github.com/stretchr/testify/suite"
"github.com/turbot/flowpipe/internal/cache"
localcmdconfig "github.com/turbot/flowpipe/internal/cmdconfig"
fconstants "github.com/turbot/flowpipe/internal/constants"
"github.com/turbot/flowpipe/internal/filepaths"
"github.com/turbot/flowpipe/internal/service/manager"
"github.com/turbot/pipe-fittings/constants"

"github.com/turbot/pipe-fittings/error_helpers"
"github.com/turbot/pipe-fittings/modconfig"
putils "github.com/turbot/pipe-fittings/utils"
Expand Down Expand Up @@ -463,7 +465,7 @@ func (suite *DefaultModTestSuite) TestInputStepWithDefaultNotifier() {

assert.NotNil(stepExecution)
assert.Equal("starting", stepExecution.Status)
assert.True(strings.HasPrefix(stepExecution.Input["form_url"].(string), "http://localhost:7103/form"), "form_url should start with http://localhost:7103/form but "+stepExecution.Input["form_url"].(string))
assert.True(strings.HasPrefix(stepExecution.Input[fconstants.FormUrl].(string), "http://localhost:7103/form"), "form_url should start with http://localhost:7103/form but "+stepExecution.Input["form_url"].(string))
}

func (suite *DefaultModTestSuite) TestInputStepOptionResolution() {
Expand Down
18 changes: 18 additions & 0 deletions internal/es/estest/test_suite_mod/pipelines/default_credential.fp
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
pipeline "defaul_aws_credential" {

param "region" {
default = "us-east-1"
}

param "cred" {
default = "default"
}

step "transform" "transform" {
value = merge(credential.aws[param.cred].env, { AWS_REGION = param.region })
}

output "val" {
value = step.transform.transform
}
}
15 changes: 15 additions & 0 deletions internal/es/estest/test_suite_mod/pipelines/lots_of_sleep.fp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,21 @@ pipeline "lots_of_sleep_bound" {
}
}

pipeline "lots_of_nested_pipeline" {
step "pipeline" "pipeline" {
for_each = [1, 2, 3, 4, 5, 6, 7]
max_concurrency = 2
pipeline = pipeline.nested
}
}

pipeline "nested" {
step "transform" "transform" {
value = "foo"
}
}


pipeline "lots_of_sleep_bound_with_param" {
param "concurrency" {
default = 1
Expand Down
57 changes: 5 additions & 52 deletions internal/es/event/step_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package event

import (
"fmt"
"github.com/turbot/pipe-fittings/schema"
"log/slog"
"strings"

"github.com/turbot/flowpipe/internal/util"
"github.com/turbot/pipe-fittings/modconfig"
Expand Down Expand Up @@ -64,8 +61,10 @@ func NewStepQueueFromPipelineStepFinishedForLoop(e *StepFinished, stepName strin
cmd.PipelineExecutionID = e.PipelineExecutionID
}

extendedInput := util.ExtendInputs(cmd.Event.ExecutionID, cmd.PipelineExecutionID, cmd.StepExecutionID, stepName, *e.StepLoop.Input)

cmd.StepName = stepName
cmd.StepInput = *e.StepLoop.Input
cmd.StepInput = extendedInput
cmd.StepForEach = e.StepForEach
cmd.StepLoop = e.StepLoop
cmd.StepRetry = e.StepRetry
Expand Down Expand Up @@ -105,7 +104,7 @@ func NewStepQueueFromStepForEachPlanned(e *StepForEachPlanned, nextStep *modconf
return nil, perr.BadRequestWithMessage(fmt.Sprintf("missing pipeline execution ID in pipeline planned event: %v", e))
}

extendedInput := extendInputs(cmd, e.StepName, nextStep.Input)
extendedInput := util.ExtendInputs(cmd.Event.ExecutionID, cmd.PipelineExecutionID, cmd.StepExecutionID, e.StepName, nextStep.Input)
cmd.StepName = e.StepName
cmd.StepInput = extendedInput
cmd.StepForEach = nextStep.StepForEach
Expand All @@ -129,7 +128,7 @@ func StepQueueForPipelinePlanned(e *PipelinePlanned) StepQueueOption {

func StepQueueWithStep(name string, input modconfig.Input, stepForEach *modconfig.StepForEach, stepLoop *modconfig.StepLoop, nextStepAction modconfig.NextStepAction) StepQueueOption {
return func(cmd *StepQueue) error {
extendedInput := extendInputs(cmd, name, input)
extendedInput := util.ExtendInputs(cmd.Event.ExecutionID, cmd.PipelineExecutionID, cmd.StepExecutionID, name, input)
cmd.StepName = name
cmd.StepInput = extendedInput
cmd.StepForEach = stepForEach
Expand All @@ -138,49 +137,3 @@ func StepQueueWithStep(name string, input modconfig.Input, stepForEach *modconfi
return nil
}
}

// TODO: refactor/tidy
// extendInputs is only relevant for the "input step". This is the best location we can find right now.
//
// We have tried to encapsulate this in the:
// 1) Step Definition's getInput() in pipe-fittings, but it needs the salt and we believe that it's not appropriate to use the salt in pipe-fittings.
// 2) In the primitive itself, but it's too late. We need this information in the Event for the remote CLI use case.
func extendInputs(cmd *StepQueue, stepName string, input modconfig.Input) modconfig.Input {
stepType := strings.Split(stepName, ".")[0]
switch stepType {
case "input":
if notifier, ok := input[schema.AttributeTypeNotifier].(map[string]any); ok {
if notifies, ok := notifier[schema.AttributeTypeNotifies].([]any); ok {
for _, n := range notifies {
if notify, ok := n.(map[string]any); ok {
integration := notify["integration"].(map[string]any)
integrationType := integration["type"].(string)
switch integrationType {
case schema.IntegrationTypeEmail, schema.IntegrationTypeHttp:
formUrl, err := util.GetHttpFormUrl(cmd.Event.ExecutionID, cmd.PipelineExecutionID, cmd.StepExecutionID)
if err != nil {
slog.Error("Failed to get http form URL", "error", err)
} else {
input["form_url"] = formUrl
}
return input
default:
// slack, msteams, etc - do nothing
}
}
}
} else {
formUrl, err := util.GetHttpFormUrl(cmd.Event.ExecutionID, cmd.PipelineExecutionID, cmd.StepExecutionID)
if err != nil {
slog.Error("Failed to get http form URL", "error", err)
} else {
input["form_url"] = formUrl
}
return input
}
}
return input
default:
return input
}
}
12 changes: 12 additions & 0 deletions internal/es/execution/execution_in_memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/turbot/flowpipe/internal/cache"
"github.com/turbot/flowpipe/internal/es/db"
"github.com/turbot/flowpipe/internal/es/event"
"github.com/turbot/flowpipe/internal/log"
"github.com/turbot/pipe-fittings/constants"
pfconstants "github.com/turbot/pipe-fittings/constants"
"github.com/turbot/pipe-fittings/credential"
Expand Down Expand Up @@ -138,6 +139,17 @@ func (ex *ExecutionInMemory) BuildEvalContext(pipelineDefn *modconfig.Pipeline,
}
}

if log.GetLogLevel().Level() == slog.LevelDebug {
for k, p := range params {
goVal, err := hclhelpers.CtyToGo(p)
if err != nil {
slog.Debug("Error converting cty value to Go value", "error", err)
} else {
slog.Debug("Parsed param value", "name", k, "value", goVal)
}
}
}

paramsCtyVal := cty.ObjectVal(params)
evalContext.Variables[schema.BlockTypeParam] = paramsCtyVal

Expand Down
4 changes: 2 additions & 2 deletions internal/log/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

func FlowpipeLogger() *slog.Logger {
handlerOptions := &slog.HandlerOptions{
Level: getLogLevel(),
Level: GetLogLevel(),
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
sanitized := sanitize.Instance.SanitizeKeyValue(a.Key, a.Value.Any())

Expand Down Expand Up @@ -57,7 +57,7 @@ func SetDefaultLogger() {
slog.SetDefault(logger)
}

func getLogLevel() slog.Leveler {
func GetLogLevel() slog.Leveler {
logLevel := os.Getenv(app_specific.EnvLogLevel)

if logLevel == "" {
Expand Down
3 changes: 2 additions & 1 deletion internal/primitive/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/atc0005/go-teams-notify/v2/messagecard"
"github.com/slack-go/slack"
fconstants "github.com/turbot/flowpipe/internal/constants"
"github.com/turbot/flowpipe/internal/es/db"
o "github.com/turbot/flowpipe/internal/output"
"github.com/turbot/flowpipe/internal/types"
Expand Down Expand Up @@ -296,7 +297,7 @@ func (ip *Input) sendNotifications(ctx context.Context, input modconfig.Input, m
case schema.IntegrationTypeEmail:
e := NewInputIntegrationEmail(base)

if formUrl, ok := input["form_url"].(string); ok {
if formUrl, ok := input[fconstants.FormUrl].(string); ok {
e.FormUrl = formUrl
}

Expand Down
4 changes: 4 additions & 0 deletions internal/service/api/form.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,10 @@ func (api *APIService) finishInputStepFromForm(ex *execution.ExecutionInMemory,
}

err := command.EndStepFromApi(ex, stepExecution, pipelineDefn, stepDefn, &out, api.EsService.EventBus)
if err != nil {
return err
}

err = execution.ReleasePipelineExecutionStepSemaphore(stepExecution.PipelineExecutionID, stepDefn)
return err
}
5 changes: 5 additions & 0 deletions internal/service/api/integration_slack.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,5 +390,10 @@ func (api *APIService) finishInputStep(execId string, pExecId string, sExecId st
return false, nil, perr.InternalWithMessage(fmt.Sprintf("error raising step finished event: %s", err.Error()))
}

err = execution.ReleasePipelineExecutionStepSemaphore(stepExecution.PipelineExecutionID, stepDefn)
if err != nil {
return false, nil, perr.InternalWithMessage(fmt.Sprintf("error releasing step semaphore: %s", err.Error()))
}

return true, stepExecution, nil
}
14 changes: 9 additions & 5 deletions internal/service/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,6 @@ func (m *Manager) initializeModDirectory() error {
}
}

err = store.InitializeFlowpipeDB()
if err != nil {
return err
}

// Force cleanup if it hasn't run for 1 day
store.ForceCleanup()

Expand Down Expand Up @@ -222,6 +217,15 @@ func (m *Manager) initializeResources() error {
// effectively forever .. we don't want to expire the config
if flowpipeConfig != nil {
cache.GetCache().SetWithTTL(fpconstants.FlowpipeConfigCacheKey, flowpipeConfig, 24*7*52*99*time.Hour)
for _, c := range flowpipeConfig.Integrations {
slog.Debug("Integration loaded", "name", c.GetHclResourceImpl().FullName)
}

for _, c := range flowpipeConfig.Credentials {
if !strings.HasSuffix(c.GetHclResourceImpl().FullName, ".default") {
slog.Debug("Credential loaded", "name", c.GetHclResourceImpl().FullName)
}
}
}

err = m.cacheConfigData()
Expand Down
10 changes: 10 additions & 0 deletions internal/store/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ func CleanupRunner() {

// Force cleanup run if we haven't run it more than 1 day
func ForceCleanup() {
// can only clean up if flowpipe.db exist
dbPath := filepaths.FlowpipeDBFileName()

_, err := os.Stat(dbPath)

if os.IsNotExist(err) {
slog.Debug("Skipping force cleanup as flowpipe.db does not exist")
return
}

slog.Debug("Checking if cleanup must be run")

sql := `select value from internal where name = 'last_cleanup'`
Expand Down
Loading

0 comments on commit 3f4aa8a

Please sign in to comment.