Skip to content

Commit

Permalink
Nick/neos 577 improve performance of benthos sync (#999)
Browse files Browse the repository at this point in the history
  • Loading branch information
nickzelei committed Dec 29, 2023
1 parent 2dec2eb commit b1428ba
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 123 deletions.
6 changes: 2 additions & 4 deletions cli/internal/cmds/neosync/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,12 +668,10 @@ func generateBenthosConfig(
Table: tableName,
Columns: columns,
ArgsMapping: buildPlainInsertArgs(columns),
ConnMaxIdle: 2,
ConnMaxOpen: 2,
Batching: &neosync_benthos.Batching{
Period: "1s",
Period: "5s",
// max allowed by postgres in a single batch
Count: computeMaxPgBatchCount(len(columns)),
Count: 100,
},
},
},
Expand Down
24 changes: 1 addition & 23 deletions worker/pkg/workflows/datasync/activities/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (a *Activities) GenerateBenthosConfigs(

neosyncUrl := viper.GetString("NEOSYNC_URL")
if neosyncUrl == "" {
neosyncUrl = "localhost:8080"
neosyncUrl = "http://localhost:8080"
}

neosyncApiKey := viper.GetString("NEOSYNC_API_KEY")
Expand Down Expand Up @@ -220,28 +220,6 @@ func buildBenthosS3Credentials(mgmtCreds *mgmtv1alpha1.AwsS3Credentials) *neosyn
return creds
}

const (
maxPgParamLimit = 65535
)

func computeMaxPgBatchCount(numCols int) int {
if numCols < 1 {
return maxPgParamLimit
}
return clampInt(maxPgParamLimit/numCols, 1, maxPgParamLimit) // automatically rounds down
}

// clamps the input between low, high
func clampInt(input, low, high int) int {
if input < low {
return low
}
if input > high {
return high
}
return input
}

func areMappingsSubsetOfSchemas(
groupedSchemas map[string]map[string]struct{},
mappings []*mgmtv1alpha1.JobMapping,
Expand Down
20 changes: 0 additions & 20 deletions worker/pkg/workflows/datasync/activities/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"math"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -130,25 +129,6 @@ func TestShouldHaltOnSchemaAddition(t *testing.T) {
assert.True(t, ok, "job mappings have same column count, but missing specific column")
}

func TestClampInt(t *testing.T) {
assert.Equal(t, clampInt(0, 1, 2), 1)
assert.Equal(t, clampInt(1, 1, 2), 1)
assert.Equal(t, clampInt(2, 1, 2), 2)
assert.Equal(t, clampInt(3, 1, 2), 2)
assert.Equal(t, clampInt(1, 1, 1), 1)

assert.Equal(t, clampInt(1, 3, 2), 3, "low is evaluated first, order is relevant")

}

func TestComputeMaxPgBatchCount(t *testing.T) {
assert.Equal(t, computeMaxPgBatchCount(65535), 1)
assert.Equal(t, computeMaxPgBatchCount(65536), 1, "anything over max should clamp to 1")
assert.Equal(t, computeMaxPgBatchCount(math.MaxInt), 1, "anything over pgmax should clamp to 1")
assert.Equal(t, computeMaxPgBatchCount(1), 65535)
assert.Equal(t, computeMaxPgBatchCount(0), 65535)
}

func Test_Sync_Run_Success(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestActivityEnvironment()
Expand Down
36 changes: 9 additions & 27 deletions worker/pkg/workflows/datasync/activities/benthos-builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,6 @@ func (b *benthosBuilder) GenerateBenthosConfigs(
colSourceMap[col.Column] = col.GetTransformer().Source
}
filteredCols := b.filterColsBySource(resp.Config.Input.SqlSelect.Columns, colSourceMap)
logger.Info(fmt.Sprintf("sql batch count: %d", maxPgParamLimit/len(resp.Config.Input.SqlSelect.Columns)))
resp.Config.Output.Broker.Outputs = append(resp.Config.Output.Broker.Outputs, neosync_benthos.Outputs{
SqlRaw: &neosync_benthos.SqlRaw{

Expand All @@ -352,13 +351,9 @@ func (b *benthosBuilder) GenerateBenthosConfigs(
ArgsMapping: buildPlainInsertArgs(filteredCols),
InitStatement: initStmt,

ConnMaxIdle: 2,
ConnMaxOpen: 2,

Batching: &neosync_benthos.Batching{
Period: "1s",
// max allowed by postgres in a single batch
Count: computeMaxPgBatchCount(len(resp.Config.Input.SqlSelect.Columns)),
Period: "5s",
Count: 100,
},
},
})
Expand Down Expand Up @@ -401,13 +396,9 @@ func (b *benthosBuilder) GenerateBenthosConfigs(

InitStatement: initStmt,

ConnMaxIdle: 2,
ConnMaxOpen: 2,

Batching: &neosync_benthos.Batching{
Period: "1s",
// max allowed by postgres in a single batch
Count: computeMaxPgBatchCount(len(cols)),
Period: "5s",
Count: 100,
},
},
})
Expand Down Expand Up @@ -457,7 +448,6 @@ func (b *benthosBuilder) GenerateBenthosConfigs(
colSourceMap[col.Column] = col.GetTransformer().Source
}
filteredCols := b.filterColsBySource(resp.Config.Input.SqlSelect.Columns, colSourceMap)
logger.Info(fmt.Sprintf("sql batch count: %d", maxPgParamLimit/len(resp.Config.Input.SqlSelect.Columns)))
resp.Config.Output.Broker.Outputs = append(resp.Config.Output.Broker.Outputs, neosync_benthos.Outputs{
SqlRaw: &neosync_benthos.SqlRaw{
Driver: "mysql",
Expand All @@ -467,13 +457,9 @@ func (b *benthosBuilder) GenerateBenthosConfigs(
ArgsMapping: buildPlainInsertArgs(filteredCols),
InitStatement: initStmt,

ConnMaxIdle: 2,
ConnMaxOpen: 2,

Batching: &neosync_benthos.Batching{
Period: "1s",
// max allowed by postgres in a single batch
Count: computeMaxPgBatchCount(len(resp.Config.Input.SqlSelect.Columns)),
Period: "5s",
Count: 100,
},
},
})
Expand Down Expand Up @@ -515,13 +501,9 @@ func (b *benthosBuilder) GenerateBenthosConfigs(
ArgsMapping: buildPlainInsertArgs(filteredCols),
InitStatement: initStmt,

ConnMaxIdle: 2,
ConnMaxOpen: 2,

Batching: &neosync_benthos.Batching{
Period: "1s",
// max allowed by postgres in a single batch
Count: computeMaxPgBatchCount(len(cols)),
Period: "5s",
Count: 100,
},
},
})
Expand Down Expand Up @@ -552,7 +534,7 @@ func (b *benthosBuilder) GenerateBenthosConfigs(
Path: fmt.Sprintf("/%s", strings.Join(s3pathpieces, "/")),
Batching: &neosync_benthos.Batching{
Count: 100,
Period: "1s",
Period: "5s",
Processors: []*neosync_benthos.BatchProcessor{
{Archive: &neosync_benthos.ArchiveProcessor{Format: "lines"}},
{Compress: &neosync_benthos.CompressProcessor{Algorithm: "gzip"}},
Expand Down
72 changes: 24 additions & 48 deletions worker/pkg/workflows/datasync/activities/benthos-builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,10 @@ output:
query: INSERT INTO public.users (id, name) VALUES ($1, $2);
args_mapping: root = [this.id, this.name]
init_statement: ""
conn_max_idle: 2
conn_max_open: 2
batching:
count: 32767
count: 100
byte_size: 0
period: 1s
period: 5s
check: ""
processors: []
`),
Expand Down Expand Up @@ -306,12 +304,10 @@ output:
query: INSERT INTO public.users (id, name) VALUES (DEFAULT, $1);
args_mapping: root = [this.name]
init_statement: ""
conn_max_idle: 2
conn_max_open: 2
batching:
count: 32767
count: 100
byte_size: 0
period: 1s
period: 5s
check: ""
processors: []
`),
Expand Down Expand Up @@ -469,12 +465,10 @@ output:
query: INSERT INTO public.users (id, name) VALUES ($1, $2);
args_mapping: root = [this.id, this.name]
init_statement: ""
conn_max_idle: 2
conn_max_open: 2
batching:
count: 32767
count: 100
byte_size: 0
period: 1s
period: 5s
check: ""
processors: []
`),
Expand Down Expand Up @@ -639,12 +633,10 @@ output:
query: INSERT INTO public.users (id, name) VALUES (DEFAULT, $1);
args_mapping: root = [this.name]
init_statement: ""
conn_max_idle: 2
conn_max_open: 2
batching:
count: 32767
count: 100
byte_size: 0
period: 1s
period: 5s
check: ""
processors: []
`),
Expand Down Expand Up @@ -847,12 +839,10 @@ output:
query: INSERT INTO public.users (id, name) VALUES ($1, $2);
args_mapping: root = [this.id, this.name]
init_statement: ""
conn_max_idle: 2
conn_max_open: 2
batching:
count: 32767
count: 100
byte_size: 0
period: 1s
period: 5s
check: ""
processors: []
`),
Expand Down Expand Up @@ -891,12 +881,10 @@ output:
query: INSERT INTO public.user_account_associations (id, user_id) VALUES ($1, $2);
args_mapping: root = [this.id, this.user_id]
init_statement: ""
conn_max_idle: 2
conn_max_open: 2
batching:
count: 32767
count: 100
byte_size: 0
period: 1s
period: 5s
check: ""
processors: []
`),
Expand Down Expand Up @@ -1049,12 +1037,10 @@ output:
query: INSERT INTO public.users (id, name) VALUES (?, ?);
args_mapping: root = [this.id, this.name]
init_statement: ""
conn_max_idle: 2
conn_max_open: 2
batching:
count: 32767
count: 100
byte_size: 0
period: 1s
period: 5s
check: ""
processors: []
`),
Expand Down Expand Up @@ -1218,12 +1204,10 @@ output:
query: INSERT INTO public.users (id, name) VALUES (?, ?);
args_mapping: root = [this.id, this.name]
init_statement: ""
conn_max_idle: 2
conn_max_open: 2
batching:
count: 32767
count: 100
byte_size: 0
period: 1s
period: 5s
check: ""
processors: []
`),
Expand Down Expand Up @@ -1424,12 +1408,10 @@ output:
query: INSERT INTO public.users (id, name) VALUES (?, ?);
args_mapping: root = [this.id, this.name]
init_statement: ""
conn_max_idle: 2
conn_max_open: 2
batching:
count: 32767
count: 100
byte_size: 0
period: 1s
period: 5s
check: ""
processors: []
`),
Expand Down Expand Up @@ -1468,12 +1450,10 @@ output:
query: INSERT INTO public.user_account_associations (id, user_id) VALUES (?, ?);
args_mapping: root = [this.id, this.user_id]
init_statement: ""
conn_max_idle: 2
conn_max_open: 2
batching:
count: 32767
count: 100
byte_size: 0
period: 1s
period: 5s
check: ""
processors: []
`),
Expand Down Expand Up @@ -1623,12 +1603,10 @@ output:
query: INSERT INTO public.users (id, name) VALUES (DEFAULT, ?);
args_mapping: root = [this.name]
init_statement: ""
conn_max_idle: 2
conn_max_open: 2
batching:
count: 32767
count: 100
byte_size: 0
period: 1s
period: 5s
check: ""
processors: []
`),
Expand Down Expand Up @@ -1793,12 +1771,10 @@ output:
query: INSERT INTO public.users (id, name) VALUES (DEFAULT, ?);
args_mapping: root = [this.name]
init_statement: ""
conn_max_idle: 2
conn_max_open: 2
batching:
count: 32767
count: 100
byte_size: 0
period: 1s
period: 5s
check: ""
processors: []
`),
Expand Down
2 changes: 1 addition & 1 deletion worker/pkg/workflows/datasync/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type WorkflowResponse struct{}

func Workflow(wfctx workflow.Context, req *WorkflowRequest) (*WorkflowResponse, error) {
ao := workflow.ActivityOptions{
StartToCloseTimeout: 2 * time.Minute, // this will need to be drastically increased and probably settable via the UI
StartToCloseTimeout: 10 * time.Minute, // this will need to be drastically increased and probably settable via the UI
RetryPolicy: &temporal.RetryPolicy{
MaximumAttempts: 1,
},
Expand Down

1 comment on commit b1428ba

@vercel
Copy link

@vercel vercel bot commented on b1428ba Dec 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.