Skip to content

Commit

Permalink
Add --event-store parameter to specify the location of the event stor…
Browse files Browse the repository at this point in the history
…e database. #852.
  • Loading branch information
vhadianto committed Jun 3, 2024
1 parent 982de08 commit 8b9109d
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 43 deletions.
1 change: 1 addition & 0 deletions internal/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func rootCommand() *cobra.Command {
AddPersistentStringFlag(constants.ArgConfigPath, "", "Colon separated list of paths to search for workspace files, in order of decreasing precedence").
// Common (steampipe, flowpipe) flags
AddPersistentStringFlag(constants.ArgModLocation, cwd, "Path to the workspace working directory").
AddPersistentStringFlag(constants.ArgEventStore, cwd, "Event store datatabase filename").
AddPersistentStringFlag(constants.ArgWorkspaceProfile, "default", "The workspace to use").
// Define the CLI flag parameters for wrapped enum flag.
AddPersistentVarFlag(enumflag.New(&outputMode, constants.ArgOutput, types.OutputModeIds, enumflag.EnumCaseInsensitive),
Expand Down
37 changes: 37 additions & 0 deletions internal/es/estest/default_mod_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,43 @@ func (suite *DefaultModTestSuite) TestEchoOne() {
assert.Equal(1, len(pex.PipelineOutput))
}

func (suite *DefaultModTestSuite) TestEchoOneCustomEventStoreLocation() {
assert := assert.New(suite.T())

pipelineInput := modconfig.Input{}

viper.SetDefault(constants.ArgEventStore, "./event-store-test-dir/test-echo.db")

_, pipelineCmd, err := runPipeline(suite.FlowpipeTestSuite, "default_mod.pipeline.echo_one", 100*time.Millisecond, pipelineInput)

if err != nil {
assert.Fail("Error creating execution", err)
return
}

_, pex, err := getPipelineExAndWait(suite.FlowpipeTestSuite, pipelineCmd.Event, pipelineCmd.PipelineExecutionID, 100*time.Millisecond, 40, "finished")
if err != nil {
assert.Fail("Error getting pipeline execution", err)
return
}
assert.Equal("finished", pex.Status)

assert.Equal(0, len(pex.Errors))
assert.Equal("Hello World from Depend A", pex.PipelineOutput["echo_one_output"])
assert.Equal(1, len(pex.PipelineOutput))

// check if the event store file was created
fi, err := os.Stat("./event-store-test-dir/test-echo.db")
assert.Nil(err)

assert.Equal("test-echo.db", fi.Name())
assert.False(fi.IsDir())

// now delete the event store file
err = os.Remove("./event-store-test-dir/test-echo.db")
assert.Nil(err)
}

func (suite *DefaultModTestSuite) TestBasicAuth() {
assert := assert.New(suite.T())

Expand Down
38 changes: 0 additions & 38 deletions internal/es/estest/es_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package estest
// Basic imports
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path"
Expand Down Expand Up @@ -483,41 +480,6 @@ func (suite *EsTestSuite) TestErrorHandlingOnPipelines() {
assert.Equal(404, pex.StepStatus["http.http_step"]["1"].StepExecutions[0].Output.Data["status_code"])
assert.Equal(200, pex.StepStatus["http.http_step"]["2"].StepExecutions[0].Output.Data["status_code"])

if pex.StepStatus["transform.http_step"] == nil {
p := filepaths.EventStoreFilePath(cmd.Event.ExecutionID)

// Open the file
file, err := os.Open(p)
if err != nil {
assert.Fail("Error opening file:", err)
return
}
defer file.Close()

// Read the file
byteValue, err := io.ReadAll(file)
if err != nil {
assert.Fail("Error reading file:", err)
return
}

// Assuming the JSON is in a format like {"key": "value"}
var result map[string]interface{}

// Unmarshal the JSON data into the map
err = json.Unmarshal(byteValue, &result)
if err != nil {
assert.Fail("Error parsing JSON:", err)
return
}

// Print the data
fmt.Println(result) //nolint:forbidigo // test

assert.Fail("transform.http_step not found in StepStatus: " + cmd.Event.ExecutionID)
return
}

assert.Equal("skipped", pex.StepStatus["transform.http_step"]["0"].StepExecutions[0].Output.Status)
assert.Equal("skipped", pex.StepStatus["transform.http_step"]["1"].StepExecutions[0].Output.Status)
assert.Equal("finished", pex.StepStatus["transform.http_step"]["2"].StepExecutions[0].Output.Status)
Expand Down
12 changes: 7 additions & 5 deletions internal/filepaths/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"path"
"path/filepath"
"strings"

"github.com/spf13/viper"
"github.com/turbot/pipe-fittings/app_specific"
Expand Down Expand Up @@ -43,15 +44,16 @@ func LegacyFlowpipeDBFileName() string {
}

func FlowpipeDBFileName() string {

dbPath := viper.GetString(constants.ArgEventStore)
if strings.Trim(dbPath, " ") != "" {
return dbPath
}
modLocation := ModFlowpipeDir()
dbPath := filepath.Join(modLocation, "flowpipe.db")
dbPath = filepath.Join(modLocation, "flowpipe.db")
return dbPath
}

func EventStoreFilePath(executionId string) string {
return path.Join(EventStoreDir(), fmt.Sprintf("%s.jsonl", executionId))
}

func SnapshotFilePath(executionId string) string {
return path.Join(EventStoreDir(), fmt.Sprintf("%s.sps", executionId))
}
Expand Down

0 comments on commit 8b9109d

Please sign in to comment.