Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docker_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
branches: [ "**" ]

jobs:
build:
build-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/docker_build_publish_development.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ env:


jobs:
build:
build-publish-development:

runs-on: ubuntu-latest
permissions:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/docker_build_publish_main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ env:


jobs:
build:
build-publish-main:

runs-on: ubuntu-latest
permissions:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/go_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: Go test
on: [push]

jobs:
build:
go-test:
runs-on: ubuntu-latest

steps:
Expand Down
2 changes: 1 addition & 1 deletion pkg/builder/Global.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,6 @@ ScriptTransactionWrapper Wraps string in transction for postgresql
*/
func ScriptTransactionWrapper(query string) string {
prefix := "DO $$ BEGIN"
suffix := "END $$ ;"
suffix := "END $$; COMMIT;"
return fmt.Sprintf("%v %v %v", prefix, query, suffix)
}
4 changes: 2 additions & 2 deletions pkg/builder/Global_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func TestScriptTransactionWrapper(t *testing.T) {
args args
want string
}{
{name: "InbPackageBuildTest1", args: args{query: "SELECT 1;"}, want: "DO $$ BEGIN SELECT 1; END $$ ;"},
{name: "InbPackageBuildTest2", args: args{query: "TRUNCATE TABLE public.test;"}, want: "DO $$ BEGIN TRUNCATE TABLE public.test; END $$ ;"},
{name: "InbPackageBuildTest1", args: args{query: "SELECT 1;"}, want: "DO $$ BEGIN SELECT 1; END $$; COMMIT;"},
{name: "InbPackageBuildTest2", args: args{query: "TRUNCATE TABLE public.test;"}, want: "DO $$ BEGIN TRUNCATE TABLE public.test; END $$; COMMIT;"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
15 changes: 10 additions & 5 deletions pkg/builder/InbRdv.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,14 @@ func BuildInbRdvSatDeleteQuery(tableName string) (string, error) {
if tableName == "" {
return "", fmt.Errorf("the tablename cannot be blank")
}
script += fmt.Sprintf("UPDATE rdv.%s_sat ", tableName)
script += fmt.Sprintf("UPDATE rdv.%s_sat s ", tableName)
script += "SET delete_dts = NOW() "
script += fmt.Sprintf("WHERE frh NOT IN (SELECT frh FROM rdv.%s_sat_cur) ", tableName)
script += "AND delete_dts IS NULL;"
script += fmt.Sprintf("WHERE s.delete_dts IS NULL AND NOT EXISTS (SELECT 1 FROM rdv.%s_sat_cur sc WHERE sc.frh = s.frh);", tableName)
// script += fmt.Sprintf("FROM rdv.%s_sat AS s ", tableName)
// script += fmt.Sprintf("LEFT JOIN rdv.%s_sat_cur AS sc on s.frh = sc.frh ", tableName)
// script += "WHERE sc.frh IS NULL AND s.delete_dts IS NULL;"
// script += fmt.Sprintf("WHERE frh NOT IN (SELECT frh FROM rdv.%s_sat_cur) ", tableName)
// script += "AND delete_dts IS NULL;"

return script, nil
}
Expand All @@ -71,8 +75,9 @@ func BuildInbRdvSatInsertQuery(tableName string) (string, error) {
}

script += fmt.Sprintf("INSERT INTO rdv.%s_sat ", tableName)
script += fmt.Sprintf("SELECT * FROM rdv.%s_sat_cur ", tableName)
script += fmt.Sprintf("WHERE frh NOT IN (SELECT frh FROM rdv.%s_sat);", tableName)
script += fmt.Sprintf("SELECT sc.* FROM rdv.%s_sat_cur AS sc LEFT JOIN rdv.%s_sat AS s ON s.frh = sc.frh ", tableName, tableName)
script += "WHERE s.frh IS NULL;"
// script += fmt.Sprintf("WHERE frh NOT IN (SELECT frh FROM rdv.%s_sat);", tableName)

return script, nil
}
Expand Down
12 changes: 10 additions & 2 deletions pkg/builder/InbRdv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ func TestBuildInbRdvSatDeleteQuery(t *testing.T) {
t.Fatalf("Error on BuildInbRdvSatDeleteQuery build: %v", err)
}

if query != "UPDATE rdv.TestTable_sat SET delete_dts = NOW() WHERE frh NOT IN (SELECT frh FROM rdv.TestTable_sat_cur) AND delete_dts IS NULL;" {
// if query != "UPDATE rdv.TestTable_sat SET delete_dts = NOW() WHERE frh NOT IN (SELECT frh FROM rdv.TestTable_sat_cur) AND delete_dts IS NULL;" {
// t.Fatalf("BuildInbRdvSatDeleteQuery incorrect: %v", query)
// }

if query != "UPDATE rdv.TestTable_sat s SET delete_dts = NOW() WHERE s.delete_dts IS NULL AND NOT EXISTS (SELECT 1 FROM rdv.TestTable_sat_cur sc WHERE sc.frh = s.frh);" {
t.Fatalf("BuildInbRdvSatDeleteQuery incorrect: %v", query)
}
}
Expand All @@ -77,7 +81,11 @@ func TestBuildInbRdvSatInsertQuery(t *testing.T) {
t.Fatalf("Error on BuildInbRdvSatInsertQuery build: %v", err)
}

if query != "INSERT INTO rdv.TestTable_sat SELECT * FROM rdv.TestTable_sat_cur WHERE frh NOT IN (SELECT frh FROM rdv.TestTable_sat);" {
// if query != "INSERT INTO rdv.TestTable_sat SELECT * FROM rdv.TestTable_sat_cur WHERE frh NOT IN (SELECT frh FROM rdv.TestTable_sat);" {
// t.Fatalf("BuildInbRdvSatInsertQuery incorrect: %v", query)
// }
if query != "INSERT INTO rdv.TestTable_sat SELECT sc.* FROM rdv.TestTable_sat_cur AS sc LEFT JOIN rdv.TestTable_sat AS s ON s.frh = sc.frh WHERE s.frh IS NULL;" {
t.Fatalf("BuildInbRdvSatInsertQuery incorrect: %v", query)
}

}
16 changes: 12 additions & 4 deletions pkg/packages/inbrdv/InbPackage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"database/sql"
"errors"
"fmt"
"sync"

"github.com/ooemperor/go-db-etl/pkg/logging"
"github.com/ooemperor/go-db-etl/pkg/pipeline/inbrdv"
Expand Down Expand Up @@ -34,12 +35,19 @@ func (inbP *InbPackage) Run() error {
logging.EtlLogger.Error(msg)
return errors.New(msg)
}

var wg sync.WaitGroup
wg.Add(len(inbP.pipelines))
for _, tablePipeline := range inbP.pipelines {
c := <-tablePipeline.Run()
if c != nil {
logging.EtlLogger.Error(c.Error(), tablePipeline.Stats())
}
go func() {
defer wg.Done()
c := <-tablePipeline.Run()
if c != nil {
logging.EtlLogger.Error(c.Error(), tablePipeline.Stats())
}
}()
}
wg.Wait()
logging.EtlLogger.Info(fmt.Sprintf("END %v", inbP.Name()))
return nil
}
Expand Down
16 changes: 12 additions & 4 deletions pkg/packages/srcinb/SystemPackage.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"database/sql"
"errors"
"fmt"
"sync"

"github.com/ooemperor/go-db-etl/pkg/logging"
"github.com/ooemperor/go-db-etl/pkg/pipeline/srcinb"
Expand Down Expand Up @@ -34,12 +35,19 @@ func (srcP *SystemPackage) Run() error {
logging.EtlLogger.Error(msg)
return errors.New(msg)
}

var wg sync.WaitGroup
wg.Add(len(srcP.pipelines))
for _, tablePipeline := range srcP.pipelines {
c := <-tablePipeline.Run()
if c != nil {
logging.EtlLogger.Error(c.Error(), tablePipeline.Stats())
}
go func() {
defer wg.Done()
c := <-tablePipeline.Run()
if c != nil {
logging.EtlLogger.Error(c.Error(), tablePipeline.Stats())
}
}()
}
wg.Wait()
logging.EtlLogger.Info(fmt.Sprintf("END %v", srcP.Name()))
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/inbrdv/RdvPipelineBuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestRdvPipeline_buildSatMarkDelete(t *testing.T) {
wantErr bool
}{
{name: "SatCurDeleteTest1", fields: fields{nil, ""}, want: nil, wantErr: true},
{name: "SatCurDeleteTest2", fields: fields{nil, "testTableDelete"}, want: processors.NewSQLExecutor(nil, "UPDATE rdv.testTableDelete_sat SET delete_dts = NOW() WHERE frh NOT IN (SELECT frh FROM rdv.testTableDelete_sat_cur) AND delete_dts IS NULL;"), wantErr: false},
{name: "SatCurDeleteTest2", fields: fields{nil, "testTableDelete"}, want: processors.NewSQLExecutor(nil, "UPDATE rdv.testTableDelete_sat s SET delete_dts = NOW() WHERE s.delete_dts IS NULL AND NOT EXISTS (SELECT 1 FROM rdv.testTableDelete_sat_cur sc WHERE sc.frh = s.frh);"), wantErr: false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestRdvPipeline_buildSatInsertExecutor(t *testing.T) {
wantErr bool
}{
{name: "SatInsertTest1", fields: fields{nil, ""}, want: nil, wantErr: true},
{name: "SatInsertTest2", fields: fields{nil, "testTableSatInsert"}, want: processors.NewSQLExecutor(nil, "INSERT INTO rdv.testTableSatInsert_sat SELECT * FROM rdv.testTableSatInsert_sat_cur WHERE frh NOT IN (SELECT frh FROM rdv.testTableSatInsert_sat);"), wantErr: false},
{name: "SatInsertTest2", fields: fields{nil, "testTableSatInsert"}, want: processors.NewSQLExecutor(nil, "INSERT INTO rdv.testTableSatInsert_sat SELECT sc.* FROM rdv.testTableSatInsert_sat_cur AS sc LEFT JOIN rdv.testTableSatInsert_sat AS s ON s.frh = sc.frh WHERE s.frh IS NULL;"), wantErr: false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down