From 01a73c1c693a5407f934a12f6bf8503fe784ec44 Mon Sep 17 00:00:00 2001 From: Michael Kaiser Date: Fri, 31 Oct 2025 13:55:31 +0100 Subject: [PATCH 1/6] fix(performance): try to improove performance of inbrdv query --- pkg/builder/InbRdv.go | 12 ++++++++---- pkg/builder/InbRdv_test.go | 12 ++++++++++-- pkg/pipeline/inbrdv/RdvPipelineBuilder_test.go | 4 ++-- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/pkg/builder/InbRdv.go b/pkg/builder/InbRdv.go index dea2786..f9c2fcb 100644 --- a/pkg/builder/InbRdv.go +++ b/pkg/builder/InbRdv.go @@ -55,8 +55,11 @@ func BuildInbRdvSatDeleteQuery(tableName string) (string, error) { } script += fmt.Sprintf("UPDATE rdv.%s_sat ", tableName) script += "SET delete_dts = NOW() " - script += fmt.Sprintf("WHERE frh NOT IN (SELECT frh FROM rdv.%s_sat_cur) ", tableName) - script += "AND delete_dts IS NULL;" + script += fmt.Sprintf("FROM rdv.%s_sat AS s ", tableName) + script += fmt.Sprintf("LEFT JOIN rdv.%s_sat_cur AS sc on s.frh = sc.frh ", tableName) + script += "WHERE s.frh IS NULL;" + // script += fmt.Sprintf("WHERE frh NOT IN (SELECT frh FROM rdv.%s_sat_cur) ", tableName) + // script += "AND delete_dts IS NULL;" return script, nil } @@ -71,8 +74,9 @@ func BuildInbRdvSatInsertQuery(tableName string) (string, error) { } script += fmt.Sprintf("INSERT INTO rdv.%s_sat ", tableName) - script += fmt.Sprintf("SELECT * FROM rdv.%s_sat_cur ", tableName) - script += fmt.Sprintf("WHERE frh NOT IN (SELECT frh FROM rdv.%s_sat);", tableName) + script += fmt.Sprintf("SELECT sc.* FROM rdv.%s_sat_cur AS sc LEFT JOIN rdv.%s_sat AS s ON s.frh = sc.frh ", tableName, tableName) + script += "WHERE s.frh IS NULL;" + // script += fmt.Sprintf("WHERE frh NOT IN (SELECT frh FROM rdv.%s_sat);", tableName) return script, nil } diff --git a/pkg/builder/InbRdv_test.go b/pkg/builder/InbRdv_test.go index 0d1d94e..31863fe 100644 --- a/pkg/builder/InbRdv_test.go +++ b/pkg/builder/InbRdv_test.go @@ -64,7 +64,11 @@ func TestBuildInbRdvSatDeleteQuery(t *testing.T) { t.Fatalf("Error on BuildInbRdvSatDeleteQuery build: %v", err) } - if query != "UPDATE rdv.TestTable_sat SET delete_dts = NOW() WHERE frh NOT IN (SELECT frh FROM rdv.TestTable_sat_cur) AND delete_dts IS NULL;" { + // if query != "UPDATE rdv.TestTable_sat SET delete_dts = NOW() WHERE frh NOT IN (SELECT frh FROM rdv.TestTable_sat_cur) AND delete_dts IS NULL;" { + // t.Fatalf("BuildInbRdvSatDeleteQuery incorrect: %v", query) + // } + + if query != "UPDATE rdv.TestTable_sat SET delete_dts = NOW() FROM rdv.TestTable_sat AS s LEFT JOIN rdv.TestTable_sat_cur AS sc on s.frh = sc.frh WHERE s.frh IS NULL;" { t.Fatalf("BuildInbRdvSatDeleteQuery incorrect: %v", query) } } @@ -77,7 +81,11 @@ func TestBuildInbRdvSatInsertQuery(t *testing.T) { t.Fatalf("Error on BuildInbRdvSatInsertQuery build: %v", err) } - if query != "INSERT INTO rdv.TestTable_sat SELECT * FROM rdv.TestTable_sat_cur WHERE frh NOT IN (SELECT frh FROM rdv.TestTable_sat);" { + // if query != "INSERT INTO rdv.TestTable_sat SELECT * FROM rdv.TestTable_sat_cur WHERE frh NOT IN (SELECT frh FROM rdv.TestTable_sat);" { + // t.Fatalf("BuildInbRdvSatInsertQuery incorrect: %v", query) + // } + if query != "INSERT INTO rdv.TestTable_sat SELECT sc.* FROM rdv.TestTable_sat_cur AS sc LEFT JOIN rdv.TestTable_sat AS s ON s.frh = sc.frh WHERE s.frh IS NULL;" { t.Fatalf("BuildInbRdvSatInsertQuery incorrect: %v", query) } + } diff --git a/pkg/pipeline/inbrdv/RdvPipelineBuilder_test.go b/pkg/pipeline/inbrdv/RdvPipelineBuilder_test.go index f652eb3..20cda00 100644 --- a/pkg/pipeline/inbrdv/RdvPipelineBuilder_test.go +++ b/pkg/pipeline/inbrdv/RdvPipelineBuilder_test.go @@ -140,7 +140,7 @@ func TestRdvPipeline_buildSatMarkDelete(t *testing.T) { wantErr bool }{ {name: "SatCurDeleteTest1", fields: fields{nil, ""}, want: nil, wantErr: true}, - {name: "SatCurDeleteTest2", fields: fields{nil, "testTableDelete"}, want: processors.NewSQLExecutor(nil, "UPDATE rdv.testTableDelete_sat SET delete_dts = NOW() WHERE frh NOT IN (SELECT frh FROM rdv.testTableDelete_sat_cur) AND delete_dts IS NULL;"), wantErr: false}, + {name: "SatCurDeleteTest2", fields: fields{nil, "testTableDelete"}, want: processors.NewSQLExecutor(nil, "UPDATE rdv.testTableDelete_sat SET delete_dts = NOW() FROM rdv.testTableDelete_sat AS s LEFT JOIN rdv.testTableDelete_sat_cur AS sc on s.frh = sc.frh WHERE s.frh IS NULL;"), wantErr: false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -175,7 +175,7 @@ func TestRdvPipeline_buildSatInsertExecutor(t *testing.T) { wantErr bool }{ {name: "SatInsertTest1", fields: fields{nil, ""}, want: nil, wantErr: true}, - {name: "SatInsertTest2", fields: fields{nil, "testTableSatInsert"}, want: processors.NewSQLExecutor(nil, "INSERT INTO rdv.testTableSatInsert_sat SELECT * FROM rdv.testTableSatInsert_sat_cur WHERE frh NOT IN (SELECT frh FROM rdv.testTableSatInsert_sat);"), wantErr: false}, + {name: "SatInsertTest2", fields: fields{nil, "testTableSatInsert"}, want: processors.NewSQLExecutor(nil, "INSERT INTO rdv.testTableSatInsert_sat SELECT sc.* FROM rdv.testTableSatInsert_sat_cur AS sc LEFT JOIN rdv.testTableSatInsert_sat AS s ON s.frh = sc.frh WHERE s.frh IS NULL;"), wantErr: false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From 52d9e62c401b3567cce40f49eba65e386a0822d7 Mon Sep 17 00:00:00 2001 From: Michael Kaiser Date: Fri, 31 Oct 2025 14:48:07 +0100 Subject: [PATCH 2/6] feat(concurrency): running the pipelines in the packages in paralell --- pkg/packages/inbrdv/InbPackage.go | 16 ++++++++++++---- pkg/packages/srcinb/SystemPackage.go | 16 ++++++++++++---- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/pkg/packages/inbrdv/InbPackage.go b/pkg/packages/inbrdv/InbPackage.go index 891812b..62d38bd 100644 --- a/pkg/packages/inbrdv/InbPackage.go +++ b/pkg/packages/inbrdv/InbPackage.go @@ -4,6 +4,7 @@ import ( "database/sql" "errors" "fmt" + "sync" "github.com/ooemperor/go-db-etl/pkg/logging" "github.com/ooemperor/go-db-etl/pkg/pipeline/inbrdv" @@ -34,12 +35,19 @@ func (inbP *InbPackage) Run() error { logging.EtlLogger.Error(msg) return errors.New(msg) } + + var wg sync.WaitGroup + wg.Add(len(inbP.pipelines)) for _, tablePipeline := range inbP.pipelines { - c := <-tablePipeline.Run() - if c != nil { - logging.EtlLogger.Error(c.Error(), tablePipeline.Stats()) - } + go func() { + defer wg.Done() + c := <-tablePipeline.Run() + if c != nil { + logging.EtlLogger.Error(c.Error(), tablePipeline.Stats()) + } + }() } + wg.Wait() logging.EtlLogger.Info(fmt.Sprintf("END %v", inbP.Name())) return nil } diff --git a/pkg/packages/srcinb/SystemPackage.go b/pkg/packages/srcinb/SystemPackage.go index ecbd86e..f93c3bc 100644 --- a/pkg/packages/srcinb/SystemPackage.go +++ b/pkg/packages/srcinb/SystemPackage.go @@ -4,6 +4,7 @@ import ( "database/sql" "errors" "fmt" + "sync" "github.com/ooemperor/go-db-etl/pkg/logging" "github.com/ooemperor/go-db-etl/pkg/pipeline/srcinb" @@ -34,12 +35,19 @@ func (srcP *SystemPackage) Run() error { logging.EtlLogger.Error(msg) return errors.New(msg) } + + var wg sync.WaitGroup + wg.Add(len(srcP.pipelines)) for _, tablePipeline := range srcP.pipelines { - c := <-tablePipeline.Run() - if c != nil { - logging.EtlLogger.Error(c.Error(), tablePipeline.Stats()) - } + go func() { + defer wg.Done() + c := <-tablePipeline.Run() + if c != nil { + logging.EtlLogger.Error(c.Error(), tablePipeline.Stats()) + } + }() } + wg.Wait() logging.EtlLogger.Info(fmt.Sprintf("END %v", srcP.Name())) return nil } From a8bc43a30257f2390bd4f531a35b84e9b3a02226 Mon Sep 17 00:00:00 2001 From: Michael Kaiser Date: Fri, 31 Oct 2025 15:03:35 +0100 Subject: [PATCH 3/6] fix(deleteSat): fixing delete in sat not working --- pkg/builder/InbRdv.go | 2 +- pkg/builder/InbRdv_test.go | 2 +- pkg/pipeline/inbrdv/RdvPipelineBuilder_test.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/builder/InbRdv.go b/pkg/builder/InbRdv.go index f9c2fcb..2d90485 100644 --- a/pkg/builder/InbRdv.go +++ b/pkg/builder/InbRdv.go @@ -57,7 +57,7 @@ func BuildInbRdvSatDeleteQuery(tableName string) (string, error) { script += "SET delete_dts = NOW() " script += fmt.Sprintf("FROM rdv.%s_sat AS s ", tableName) script += fmt.Sprintf("LEFT JOIN rdv.%s_sat_cur AS sc on s.frh = sc.frh ", tableName) - script += "WHERE s.frh IS NULL;" + script += "WHERE sc.frh IS NULL AND s.delete_dts IS NULL;" // script += fmt.Sprintf("WHERE frh NOT IN (SELECT frh FROM rdv.%s_sat_cur) ", tableName) // script += "AND delete_dts IS NULL;" diff --git a/pkg/builder/InbRdv_test.go b/pkg/builder/InbRdv_test.go index 31863fe..9914b72 100644 --- a/pkg/builder/InbRdv_test.go +++ b/pkg/builder/InbRdv_test.go @@ -68,7 +68,7 @@ func TestBuildInbRdvSatDeleteQuery(t *testing.T) { // t.Fatalf("BuildInbRdvSatDeleteQuery incorrect: %v", query) // } - if query != "UPDATE rdv.TestTable_sat SET delete_dts = NOW() FROM rdv.TestTable_sat AS s LEFT JOIN rdv.TestTable_sat_cur AS sc on s.frh = sc.frh WHERE s.frh IS NULL;" { + if query != "UPDATE rdv.TestTable_sat SET delete_dts = NOW() FROM rdv.TestTable_sat AS s LEFT JOIN rdv.TestTable_sat_cur AS sc on s.frh = sc.frh WHERE sc.frh IS NULL AND s.delete_dts IS NULL;" { t.Fatalf("BuildInbRdvSatDeleteQuery incorrect: %v", query) } } diff --git a/pkg/pipeline/inbrdv/RdvPipelineBuilder_test.go b/pkg/pipeline/inbrdv/RdvPipelineBuilder_test.go index 20cda00..a92a257 100644 --- a/pkg/pipeline/inbrdv/RdvPipelineBuilder_test.go +++ b/pkg/pipeline/inbrdv/RdvPipelineBuilder_test.go @@ -140,7 +140,7 @@ func TestRdvPipeline_buildSatMarkDelete(t *testing.T) { wantErr bool }{ {name: "SatCurDeleteTest1", fields: fields{nil, ""}, want: nil, wantErr: true}, - {name: "SatCurDeleteTest2", fields: fields{nil, "testTableDelete"}, want: processors.NewSQLExecutor(nil, "UPDATE rdv.testTableDelete_sat SET delete_dts = NOW() FROM rdv.testTableDelete_sat AS s LEFT JOIN rdv.testTableDelete_sat_cur AS sc on s.frh = sc.frh WHERE s.frh IS NULL;"), wantErr: false}, + {name: "SatCurDeleteTest2", fields: fields{nil, "testTableDelete"}, want: processors.NewSQLExecutor(nil, "UPDATE rdv.testTableDelete_sat SET delete_dts = NOW() FROM rdv.testTableDelete_sat AS s LEFT JOIN rdv.testTableDelete_sat_cur AS sc on s.frh = sc.frh WHERE sc.frh IS NULL AND s.delete_dts IS NULL;"), wantErr: false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From fe20a211e830b6fbd84c79fc3e6e7fa80750e369 Mon Sep 17 00:00:00 2001 From: Michael Kaiser Date: Fri, 31 Oct 2025 15:50:22 +0100 Subject: [PATCH 4/6] fix(transactions): add commit to the wrapper for better performance --- pkg/builder/Global.go | 2 +- pkg/builder/Global_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/builder/Global.go b/pkg/builder/Global.go index f4905c2..4b32222 100644 --- a/pkg/builder/Global.go +++ b/pkg/builder/Global.go @@ -58,6 +58,6 @@ ScriptTransactionWrapper Wraps string in transction for postgresql */ func ScriptTransactionWrapper(query string) string { prefix := "DO $$ BEGIN" - suffix := "END $$ ;" + suffix := "END $$; COMMIT;" return fmt.Sprintf("%v %v %v", prefix, query, suffix) } diff --git a/pkg/builder/Global_test.go b/pkg/builder/Global_test.go index 313a2d0..3d526f5 100644 --- a/pkg/builder/Global_test.go +++ b/pkg/builder/Global_test.go @@ -41,8 +41,8 @@ func TestScriptTransactionWrapper(t *testing.T) { args args want string }{ - {name: "InbPackageBuildTest1", args: args{query: "SELECT 1;"}, want: "DO $$ BEGIN SELECT 1; END $$ ;"}, - {name: "InbPackageBuildTest2", args: args{query: "TRUNCATE TABLE public.test;"}, want: "DO $$ BEGIN TRUNCATE TABLE public.test; END $$ ;"}, + {name: "InbPackageBuildTest1", args: args{query: "SELECT 1;"}, want: "DO $$ BEGIN SELECT 1; END $$; COMMIT;"}, + {name: "InbPackageBuildTest2", args: args{query: "TRUNCATE TABLE public.test;"}, want: "DO $$ BEGIN TRUNCATE TABLE public.test; END $$; COMMIT;"}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From f2dba9e351687af76f51ef1893399f1afd2d1abc Mon Sep 17 00:00:00 2001 From: Michael Kaiser Date: Fri, 31 Oct 2025 16:33:01 +0100 Subject: [PATCH 5/6] fix(transactions): add commit to the wrapper for better performance --- pkg/builder/InbRdv.go | 9 +++++---- pkg/builder/InbRdv_test.go | 2 +- pkg/pipeline/inbrdv/RdvPipelineBuilder_test.go | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/builder/InbRdv.go b/pkg/builder/InbRdv.go index 2d90485..67a383a 100644 --- a/pkg/builder/InbRdv.go +++ b/pkg/builder/InbRdv.go @@ -53,11 +53,12 @@ func BuildInbRdvSatDeleteQuery(tableName string) (string, error) { if tableName == "" { return "", fmt.Errorf("the tablename cannot be blank") } - script += fmt.Sprintf("UPDATE rdv.%s_sat ", tableName) + script += fmt.Sprintf("UPDATE rdv.%s_sat s ", tableName) script += "SET delete_dts = NOW() " - script += fmt.Sprintf("FROM rdv.%s_sat AS s ", tableName) - script += fmt.Sprintf("LEFT JOIN rdv.%s_sat_cur AS sc on s.frh = sc.frh ", tableName) - script += "WHERE sc.frh IS NULL AND s.delete_dts IS NULL;" + script += fmt.Sprintf("WHERE s.delete_dts IS NULL AND NOT EXISTS (SELECT 1 FROM rdv.%s_sat_cur sc WHERE sc.frh = s.frh);", tableName) + // script += fmt.Sprintf("FROM rdv.%s_sat AS s ", tableName) + // script += fmt.Sprintf("LEFT JOIN rdv.%s_sat_cur AS sc on s.frh = sc.frh ", tableName) + // script += "WHERE sc.frh IS NULL AND s.delete_dts IS NULL;" // script += fmt.Sprintf("WHERE frh NOT IN (SELECT frh FROM rdv.%s_sat_cur) ", tableName) // script += "AND delete_dts IS NULL;" diff --git a/pkg/builder/InbRdv_test.go b/pkg/builder/InbRdv_test.go index 9914b72..6126fb2 100644 --- a/pkg/builder/InbRdv_test.go +++ b/pkg/builder/InbRdv_test.go @@ -68,7 +68,7 @@ func TestBuildInbRdvSatDeleteQuery(t *testing.T) { // t.Fatalf("BuildInbRdvSatDeleteQuery incorrect: %v", query) // } - if query != "UPDATE rdv.TestTable_sat SET delete_dts = NOW() FROM rdv.TestTable_sat AS s LEFT JOIN rdv.TestTable_sat_cur AS sc on s.frh = sc.frh WHERE sc.frh IS NULL AND s.delete_dts IS NULL;" { + if query != "UPDATE rdv.TestTable_sat s SET delete_dts = NOW() WHERE s.delete_dts IS NULL AND NOT EXISTS (SELECT 1 FROM rdv.TestTable_sat_cur sc WHERE sc.frh = s.frh);" { t.Fatalf("BuildInbRdvSatDeleteQuery incorrect: %v", query) } } diff --git a/pkg/pipeline/inbrdv/RdvPipelineBuilder_test.go b/pkg/pipeline/inbrdv/RdvPipelineBuilder_test.go index a92a257..ad7d8d4 100644 --- a/pkg/pipeline/inbrdv/RdvPipelineBuilder_test.go +++ b/pkg/pipeline/inbrdv/RdvPipelineBuilder_test.go @@ -140,7 +140,7 @@ func TestRdvPipeline_buildSatMarkDelete(t *testing.T) { wantErr bool }{ {name: "SatCurDeleteTest1", fields: fields{nil, ""}, want: nil, wantErr: true}, - {name: "SatCurDeleteTest2", fields: fields{nil, "testTableDelete"}, want: processors.NewSQLExecutor(nil, "UPDATE rdv.testTableDelete_sat SET delete_dts = NOW() FROM rdv.testTableDelete_sat AS s LEFT JOIN rdv.testTableDelete_sat_cur AS sc on s.frh = sc.frh WHERE sc.frh IS NULL AND s.delete_dts IS NULL;"), wantErr: false}, + {name: "SatCurDeleteTest2", fields: fields{nil, "testTableDelete"}, want: processors.NewSQLExecutor(nil, "UPDATE rdv.testTableDelete_sat s SET delete_dts = NOW() WHERE s.delete_dts IS NULL AND NOT EXISTS (SELECT 1 FROM rdv.testTableDelete_sat_cur sc WHERE sc.frh = s.frh);"), wantErr: false}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From b2ce85df2bf809edd18784c92bc9957017907736 Mon Sep 17 00:00:00 2001 From: Michael Kaiser Date: Fri, 31 Oct 2025 17:08:34 +0100 Subject: [PATCH 6/6] clean(workflows): renaming github workflows for better naming --- .github/workflows/docker_build.yml | 2 +- .github/workflows/docker_build_publish_development.yml | 2 +- .github/workflows/docker_build_publish_main.yml | 2 +- .github/workflows/go_test.yml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/docker_build.yml b/.github/workflows/docker_build.yml index 662d122..8eba816 100644 --- a/.github/workflows/docker_build.yml +++ b/.github/workflows/docker_build.yml @@ -5,7 +5,7 @@ on: branches: [ "**" ] jobs: - build: + build-test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 diff --git a/.github/workflows/docker_build_publish_development.yml b/.github/workflows/docker_build_publish_development.yml index 550c6ea..02da3c7 100644 --- a/.github/workflows/docker_build_publish_development.yml +++ b/.github/workflows/docker_build_publish_development.yml @@ -19,7 +19,7 @@ env: jobs: - build: + build-publish-development: runs-on: ubuntu-latest permissions: diff --git a/.github/workflows/docker_build_publish_main.yml b/.github/workflows/docker_build_publish_main.yml index b2fccac..97ca05a 100644 --- a/.github/workflows/docker_build_publish_main.yml +++ b/.github/workflows/docker_build_publish_main.yml @@ -17,7 +17,7 @@ env: jobs: - build: + build-publish-main: runs-on: ubuntu-latest permissions: diff --git a/.github/workflows/go_test.yml b/.github/workflows/go_test.yml index fd0cc24..0b7921c 100644 --- a/.github/workflows/go_test.yml +++ b/.github/workflows/go_test.yml @@ -2,7 +2,7 @@ name: Go test on: [push] jobs: - build: + go-test: runs-on: ubuntu-latest steps: