From 56ab090aecca5d4b54e8d0490fea343932b633df Mon Sep 17 00:00:00 2001 From: Francesco Casula Date: Thu, 14 Sep 2023 18:11:33 +0200 Subject: [PATCH] chore: uploader can append (#3805) * chore: uploader can append * chore: - clickhouse custom mockUploader * chore: - datalake schema repo custom mockUploader * chore: - postgres integration custom mockUploader * chore: - custom mockUploaders * fix: snowflake test * fix: restoring dummyUploader * chore: deltalake append update * fix: snowflake test * chore: temp kit to fix data race * chore: update of temp kit * chore: go-kit v0.15.9 * chore: go mod tidy --- .../clickhouse/clickhouse_test.go | 162 ++++++------ .../datalake/schema-repository/local_test.go | 69 ++--- warehouse/integrations/deltalake/deltalake.go | 41 ++- .../integrations/deltalake/deltalake_test.go | 94 +++++-- warehouse/integrations/postgres/load_test.go | 87 +++---- warehouse/integrations/snowflake/snowflake.go | 17 +- .../integrations/snowflake/snowflake_test.go | 100 ++++---- .../internal/mocks/utils/mock_uploader.go | 238 ++++++++++++++++++ .../loadfiles/downloader/downloader_test.go | 54 +--- warehouse/jobs/jobs.go | 2 + warehouse/upload.go | 21 ++ warehouse/upload_test.go | 72 ++++++ warehouse/utils/utils.go | 2 + warehouse/validations/validate.go | 79 +++--- 14 files changed, 648 insertions(+), 390 deletions(-) create mode 100644 warehouse/internal/mocks/utils/mock_uploader.go diff --git a/warehouse/integrations/clickhouse/clickhouse_test.go b/warehouse/integrations/clickhouse/clickhouse_test.go index 61a5c5da0b..8ec956a9ec 100644 --- a/warehouse/integrations/clickhouse/clickhouse_test.go +++ b/warehouse/integrations/clickhouse/clickhouse_test.go @@ -12,33 +12,28 @@ import ( "testing" "time" - "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" "github.com/rudderlabs/compose-test/compose" - - "github.com/rudderlabs/rudder-server/warehouse/integrations/clickhouse" - - "github.com/rudderlabs/rudder-server/testhelper/workspaceConfig" - "github.com/rudderlabs/compose-test/testcompose" - kithelper "github.com/rudderlabs/rudder-go-kit/testhelper" - "github.com/rudderlabs/rudder-server/runner" - "github.com/rudderlabs/rudder-server/testhelper/health" - "github.com/rudderlabs/rudder-server/warehouse/internal/model" - - "github.com/rudderlabs/rudder-server/warehouse/integrations/testhelper" - "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/filemanager" "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-server/utils/misc" - "github.com/rudderlabs/rudder-server/warehouse/validations" - - "github.com/stretchr/testify/require" - + "github.com/rudderlabs/rudder-go-kit/stats" + kithelper "github.com/rudderlabs/rudder-go-kit/testhelper" backendconfig "github.com/rudderlabs/rudder-server/backend-config" + "github.com/rudderlabs/rudder-server/runner" + "github.com/rudderlabs/rudder-server/testhelper/health" + "github.com/rudderlabs/rudder-server/testhelper/workspaceConfig" + "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/warehouse/client" + "github.com/rudderlabs/rudder-server/warehouse/integrations/clickhouse" + "github.com/rudderlabs/rudder-server/warehouse/integrations/testhelper" + mockuploader "github.com/rudderlabs/rudder-server/warehouse/internal/mocks/utils" + "github.com/rudderlabs/rudder-server/warehouse/internal/model" warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" + "github.com/rudderlabs/rudder-server/warehouse/validations" ) func TestIntegration(t *testing.T) { @@ -358,48 +353,11 @@ func TestClickhouse_UseS3CopyEngineForLoading(t *testing.T) { } } -type mockUploader struct { - minioPort string - tableSchema model.TableSchema - metadata []warehouseutils.LoadFile -} - -func (*mockUploader) GetSchemaInWarehouse() model.Schema { return model.Schema{} } -func (*mockUploader) GetLocalSchema(context.Context) (model.Schema, error) { - return model.Schema{}, nil -} -func (*mockUploader) UpdateLocalSchema(context.Context, model.Schema) error { return nil } -func (*mockUploader) ShouldOnDedupUseNewRecord() bool { return false } -func (*mockUploader) UseRudderStorage() bool { return false } -func (*mockUploader) GetLoadFileGenStartTIme() time.Time { return time.Time{} } -func (*mockUploader) GetLoadFileType() string { return "JSON" } -func (*mockUploader) GetFirstLastEvent() (time.Time, time.Time) { return time.Time{}, time.Time{} } -func (*mockUploader) GetTableSchemaInWarehouse(_ string) model.TableSchema { - return model.TableSchema{} -} - -func (*mockUploader) GetSingleLoadFile(_ context.Context, _ string) (warehouseutils.LoadFile, error) { - return warehouseutils.LoadFile{}, nil -} - -func (m *mockUploader) GetSampleLoadFileLocation(_ context.Context, _ string) (string, error) { - minioHostPort := fmt.Sprintf("localhost:%s", m.minioPort) - - sampleLocation := m.metadata[0].Location - sampleLocation = strings.Replace(sampleLocation, minioHostPort, "minio:9000", 1) - return sampleLocation, nil -} - -func (m *mockUploader) GetTableSchemaInUpload(string) model.TableSchema { - return m.tableSchema -} - -func (m *mockUploader) GetLoadFilesMetadata(context.Context, warehouseutils.GetLoadFilesOptions) []warehouseutils.LoadFile { - return m.metadata -} - func TestClickhouse_LoadTableRoundTrip(t *testing.T) { - c := testcompose.New(t, compose.FilePaths([]string{"testdata/docker-compose.clickhouse.yml", "../testdata/docker-compose.minio.yml"})) + c := testcompose.New(t, compose.FilePaths([]string{ + "testdata/docker-compose.clickhouse.yml", + "../testdata/docker-compose.minio.yml", + })) c.Start(context.Background()) misc.Init() @@ -485,28 +443,6 @@ func TestClickhouse_LoadTableRoundTrip(t *testing.T) { }, }, } - mockUploader := &mockUploader{ - tableSchema: model.TableSchema{ - "alter_test_bool": "boolean", - "alter_test_datetime": "datetime", - "alter_test_float": "float", - "alter_test_int": "int", - "alter_test_string": "string", - "id": "string", - "received_at": "datetime", - "test_array_bool": "array(boolean)", - "test_array_datetime": "array(datetime)", - "test_array_float": "array(float)", - "test_array_int": "array(int)", - "test_array_string": "array(string)", - "test_bool": "boolean", - "test_datetime": "datetime", - "test_float": "float", - "test_int": "int", - "test_string": "string", - }, - minioPort: strconv.Itoa(minioPort), - } t.Log("Preparing load files metadata") f, err := os.Open(tc.fileName) @@ -530,13 +466,32 @@ func TestClickhouse_LoadTableRoundTrip(t *testing.T) { require.NoError(t, err) ctx := context.Background() - uploadOutput, err := fm.Upload(ctx, f, fmt.Sprintf("test_prefix_%d", i)) require.NoError(t, err) - mockUploader.metadata = append(mockUploader.metadata, warehouseutils.LoadFile{ - Location: uploadOutput.Location, - }) + mockUploader := newMockUploader(t, + strconv.Itoa(minioPort), + model.TableSchema{ + "alter_test_bool": "boolean", + "alter_test_datetime": "datetime", + "alter_test_float": "float", + "alter_test_int": "int", + "alter_test_string": "string", + "id": "string", + "received_at": "datetime", + "test_array_bool": "array(boolean)", + "test_array_datetime": "array(datetime)", + "test_array_float": "array(float)", + "test_array_int": "array(int)", + "test_array_string": "array(string)", + "test_bool": "boolean", + "test_datetime": "datetime", + "test_float": "float", + "test_int": "int", + "test_string": "string", + }, + []warehouseutils.LoadFile{{Location: uploadOutput.Location}}, + ) t.Log("Setting up clickhouse") err = ch.Setup(ctx, warehouse, mockUploader) @@ -742,7 +697,7 @@ func TestClickhouse_TestConnection(t *testing.T) { }, } - err := ch.Setup(ctx, warehouse, &mockUploader{}) + err := ch.Setup(ctx, warehouse, newMockUploader(t, "", nil, nil)) require.NoError(t, err) ch.SetConnectionTimeout(tc.timeout) @@ -842,7 +797,7 @@ func TestClickhouse_LoadTestTable(t *testing.T) { payload[k] = v } - err := ch.Setup(ctx, warehouse, &mockUploader{}) + err := ch.Setup(ctx, warehouse, newMockUploader(t, "", nil, nil)) require.NoError(t, err) err = ch.CreateSchema(ctx) @@ -905,7 +860,7 @@ func TestClickhouse_FetchSchema(t *testing.T) { }, } - err := ch.Setup(ctx, warehouse, &mockUploader{}) + err := ch.Setup(ctx, warehouse, newMockUploader(t, "", nil, nil)) require.NoError(t, err) err = ch.CreateSchema(ctx) @@ -950,7 +905,7 @@ func TestClickhouse_FetchSchema(t *testing.T) { }, } - err := ch.Setup(ctx, warehouse, &mockUploader{}) + err := ch.Setup(ctx, warehouse, newMockUploader(t, "", nil, nil)) require.NoError(t, err) schema, unrecognizedSchema, err := ch.FetchSchema(ctx) @@ -976,7 +931,7 @@ func TestClickhouse_FetchSchema(t *testing.T) { }, } - err := ch.Setup(ctx, warehouse, &mockUploader{}) + err := ch.Setup(ctx, warehouse, newMockUploader(t, "", nil, nil)) require.NoError(t, err) schema, unrecognizedSchema, err := ch.FetchSchema(ctx) @@ -1002,7 +957,7 @@ func TestClickhouse_FetchSchema(t *testing.T) { }, } - err := ch.Setup(ctx, warehouse, &mockUploader{}) + err := ch.Setup(ctx, warehouse, newMockUploader(t, "", nil, nil)) require.NoError(t, err) err = ch.CreateSchema(ctx) @@ -1031,7 +986,7 @@ func TestClickhouse_FetchSchema(t *testing.T) { }, } - err := ch.Setup(ctx, warehouse, &mockUploader{}) + err := ch.Setup(ctx, warehouse, newMockUploader(t, "", nil, nil)) require.NoError(t, err) err = ch.CreateSchema(ctx) @@ -1233,3 +1188,26 @@ func initializeClickhouseClusterMode(t testing.TB, clusterDBs []*sql.DB, tables } } } + +func newMockUploader( + t testing.TB, + minioPort string, + tableSchema model.TableSchema, + metadata []warehouseutils.LoadFile, +) *mockuploader.MockUploader { + var sampleLocation string + if len(metadata) > 0 { + minioHostPort := fmt.Sprintf("localhost:%s", minioPort) + sampleLocation = strings.Replace(metadata[0].Location, minioHostPort, "minio:9000", 1) + } + + ctrl := gomock.NewController(t) + u := mockuploader.NewMockUploader(ctrl) + u.EXPECT().GetSampleLoadFileLocation(gomock.Any(), gomock.Any()).Return(sampleLocation, nil).AnyTimes() + u.EXPECT().GetTableSchemaInUpload(gomock.Any()).Return(tableSchema).AnyTimes() + u.EXPECT().GetLoadFilesMetadata(gomock.Any(), gomock.Any()).Return(metadata).AnyTimes() + u.EXPECT().UseRudderStorage().Return(false).AnyTimes() + u.EXPECT().GetSchemaInWarehouse().Return(nil).AnyTimes() + + return u +} diff --git a/warehouse/integrations/datalake/schema-repository/local_test.go b/warehouse/integrations/datalake/schema-repository/local_test.go index 95c86d1554..17bcd2bb27 100644 --- a/warehouse/integrations/datalake/schema-repository/local_test.go +++ b/warehouse/integrations/datalake/schema-repository/local_test.go @@ -4,52 +4,16 @@ import ( "context" "fmt" "testing" - "time" - - "github.com/rudderlabs/rudder-server/warehouse/internal/model" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" schemarepository "github.com/rudderlabs/rudder-server/warehouse/integrations/datalake/schema-repository" + mockuploader "github.com/rudderlabs/rudder-server/warehouse/internal/mocks/utils" + "github.com/rudderlabs/rudder-server/warehouse/internal/model" warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" ) -type mockUploader struct { - mockError error - localSchema model.Schema -} - -func (*mockUploader) GetSchemaInWarehouse() model.Schema { return model.Schema{} } -func (*mockUploader) ShouldOnDedupUseNewRecord() bool { return false } -func (*mockUploader) UseRudderStorage() bool { return false } -func (*mockUploader) GetLoadFileGenStartTIme() time.Time { return time.Time{} } -func (*mockUploader) GetLoadFileType() string { return "JSON" } -func (*mockUploader) GetFirstLastEvent() (time.Time, time.Time) { return time.Time{}, time.Time{} } -func (*mockUploader) GetTableSchemaInUpload(string) model.TableSchema { return nil } -func (*mockUploader) GetSampleLoadFileLocation(context.Context, string) (string, error) { - return "", nil -} - -func (*mockUploader) GetLoadFilesMetadata(context.Context, warehouseutils.GetLoadFilesOptions) []warehouseutils.LoadFile { - return nil -} - -func (*mockUploader) GetTableSchemaInWarehouse(string) model.TableSchema { - return model.TableSchema{} -} - -func (*mockUploader) GetSingleLoadFile(context.Context, string) (warehouseutils.LoadFile, error) { - return warehouseutils.LoadFile{}, nil -} - -func (m *mockUploader) GetLocalSchema(context.Context) (model.Schema, error) { - return m.localSchema, nil -} - -func (m *mockUploader) UpdateLocalSchema(context.Context, model.Schema) error { - return m.mockError -} - func TestLocalSchemaRepository_CreateTable(t *testing.T) { t.Parallel() @@ -87,10 +51,7 @@ func TestLocalSchemaRepository_CreateTable(t *testing.T) { t.Parallel() warehouse := model.Warehouse{} - uploader := &mockUploader{ - mockError: tc.mockError, - localSchema: tc.localSchema, - } + uploader := newMockUploader(t, tc.mockError, tc.localSchema) s, err := schemarepository.NewLocalSchemaRepository(warehouse, uploader) require.NoError(t, err) @@ -149,10 +110,7 @@ func TestLocalSchemaRepository_AddColumns(t *testing.T) { t.Parallel() warehouse := model.Warehouse{} - uploader := &mockUploader{ - mockError: tc.mockError, - localSchema: tc.localSchema, - } + uploader := newMockUploader(t, tc.mockError, tc.localSchema) s, err := schemarepository.NewLocalSchemaRepository(warehouse, uploader) require.NoError(t, err) @@ -223,10 +181,7 @@ func TestLocalSchemaRepository_AlterColumn(t *testing.T) { t.Parallel() warehouse := model.Warehouse{} - uploader := &mockUploader{ - mockError: tc.mockError, - localSchema: tc.localSchema, - } + uploader := newMockUploader(t, tc.mockError, tc.localSchema) s, err := schemarepository.NewLocalSchemaRepository(warehouse, uploader) require.NoError(t, err) @@ -242,3 +197,15 @@ func TestLocalSchemaRepository_AlterColumn(t *testing.T) { }) } } + +func newMockUploader( + t testing.TB, + updateLocalSchemaErr error, + localSchema model.Schema, +) *mockuploader.MockUploader { + ctrl := gomock.NewController(t) + u := mockuploader.NewMockUploader(ctrl) + u.EXPECT().UpdateLocalSchema(gomock.Any(), gomock.Any()).Return(updateLocalSchemaErr).AnyTimes() + u.EXPECT().GetLocalSchema(gomock.Any()).Return(localSchema, nil).AnyTimes() + return u +} diff --git a/warehouse/integrations/deltalake/deltalake.go b/warehouse/integrations/deltalake/deltalake.go index 0bebd11976..b9dc8fd2e2 100644 --- a/warehouse/integrations/deltalake/deltalake.go +++ b/warehouse/integrations/deltalake/deltalake.go @@ -10,22 +10,18 @@ import ( "strings" "time" - "golang.org/x/exp/slices" - - warehouseclient "github.com/rudderlabs/rudder-server/warehouse/client" - sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" - - dbsqllog "github.com/databricks/databricks-sql-go/logger" - dbsql "github.com/databricks/databricks-sql-go" - - "github.com/rudderlabs/rudder-server/warehouse/logfield" + dbsqllog "github.com/databricks/databricks-sql-go/logger" + "golang.org/x/exp/slices" "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-server/utils/misc" + warehouseclient "github.com/rudderlabs/rudder-server/warehouse/client" + sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" "github.com/rudderlabs/rudder-server/warehouse/internal/model" + "github.com/rudderlabs/rudder-server/warehouse/logfield" warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" ) @@ -37,28 +33,20 @@ const ( catalog = "catalog" useSTSTokens = "useSTSTokens" userAgent = "Rudderstack" -) -const ( provider = warehouseutils.DELTALAKE // Corresponds to the max length set for event rudder-transformer // https://github.com/rudderlabs/rudder-transformer/blob/fb8b818b2cbd05f784117b9f3040856dab1a7346/src/warehouse/v1/util.js#L34 tableNameLimit = 127 -) -const ( schemaNotFound = "[SCHEMA_NOT_FOUND]" partitionNotFound = "SHOW PARTITIONS is not allowed on a table that is not partitioned" columnsAlreadyExists = "already exists in root" -) -const ( mergeMode = "MERGE" appendMode = "APPEND" -) -const ( rudderStagingTableRegex = "^rudder_staging_.*$" // matches rudder_staging_* tables nonRudderStagingTableRegex = "^(?!rudder_staging_.*$).*" // matches tables that do not start with rudder_staging_ ) @@ -629,8 +617,7 @@ func (d *Deltalake) loadTable(ctx context.Context, tableName string, tableSchema FILEFORMAT = PARQUET PATTERN = '*.parquet' COPY_OPTIONS ('force' = 'true') - %s; -`, + %s;`, fmt.Sprintf(`%s.%s`, d.Namespace, stagingTableName), sortedColumnNames, loadFolder, auth, @@ -667,7 +654,7 @@ func (d *Deltalake) loadTable(ctx context.Context, tableName string, tableSchema return "", fmt.Errorf("running COPY command: %w", err) } - if d.config.loadTableStrategy == appendMode { + if d.ShouldAppend() { query = fmt.Sprintf(` INSERT INTO %[1]s.%[2]s (%[4]s) SELECT @@ -754,7 +741,7 @@ func (d *Deltalake) loadTable(ctx context.Context, tableName string, tableSchema inserted int64 ) - if d.config.loadTableStrategy == appendMode { + if d.ShouldAppend() { err = row.Scan(&affected, &inserted) } else { err = row.Scan(&affected, &updated, &deleted, &inserted) @@ -1058,7 +1045,7 @@ func (d *Deltalake) LoadUserTables(ctx context.Context) map[string]error { columnKeys := append([]string{`id`}, userColNames...) - if d.config.loadTableStrategy == appendMode { + if d.ShouldAppend() { query = fmt.Sprintf(` INSERT INTO %[1]s.%[2]s (%[4]s) SELECT @@ -1099,8 +1086,7 @@ func (d *Deltalake) LoadUserTables(ctx context.Context) map[string]error { %[5]s WHEN NOT MATCHED THEN INSERT (%[6]s) VALUES - (%[7]s); - `, + (%[7]s);`, d.Namespace, warehouseutils.UsersTable, stagingTableName, @@ -1354,3 +1340,10 @@ func (d *Deltalake) DropTable(ctx context.Context, tableName string) error { func (*Deltalake) DeleteBy(context.Context, []string, warehouseutils.DeleteByParams) error { return fmt.Errorf(warehouseutils.NotImplementedErrorCode) } + +// ShouldAppend returns true if: +// * the load table strategy is "append" mode +// * the uploader says we can append +func (d *Deltalake) ShouldAppend() bool { + return d.config.loadTableStrategy == appendMode && d.Uploader.CanAppend() +} diff --git a/warehouse/integrations/deltalake/deltalake_test.go b/warehouse/integrations/deltalake/deltalake_test.go index 7d006c5923..2eb9831f22 100644 --- a/warehouse/integrations/deltalake/deltalake_test.go +++ b/warehouse/integrations/deltalake/deltalake_test.go @@ -12,34 +12,27 @@ import ( "testing" "time" - "github.com/rudderlabs/rudder-go-kit/config" - "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-go-kit/stats" - "github.com/rudderlabs/rudder-server/warehouse/integrations/deltalake" - - "github.com/rudderlabs/compose-test/compose" - - "github.com/rudderlabs/rudder-server/testhelper/workspaceConfig" - dbsql "github.com/databricks/databricks-sql-go" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "github.com/rudderlabs/compose-test/compose" "github.com/rudderlabs/compose-test/testcompose" + "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" kithelper "github.com/rudderlabs/rudder-go-kit/testhelper" + backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/runner" "github.com/rudderlabs/rudder-server/testhelper/health" - - "github.com/rudderlabs/rudder-server/warehouse/validations" - + "github.com/rudderlabs/rudder-server/testhelper/workspaceConfig" "github.com/rudderlabs/rudder-server/utils/misc" - - backendconfig "github.com/rudderlabs/rudder-server/backend-config" - - "github.com/stretchr/testify/require" - warehouseclient "github.com/rudderlabs/rudder-server/warehouse/client" - warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" - + "github.com/rudderlabs/rudder-server/warehouse/integrations/deltalake" "github.com/rudderlabs/rudder-server/warehouse/integrations/testhelper" + mockuploader "github.com/rudderlabs/rudder-server/warehouse/internal/mocks/utils" + warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" + "github.com/rudderlabs/rudder-server/warehouse/validations" ) type testCredentials struct { @@ -189,6 +182,7 @@ func TestIntegration(t *testing.T) { loadTableStrategy string useParquetLoadFiles bool stagingFilePrefix string + jobRunID string }{ { name: "Merge Mode", @@ -200,6 +194,7 @@ func TestIntegration(t *testing.T) { loadTableStrategy: "MERGE", useParquetLoadFiles: false, stagingFilePrefix: "testdata/upload-job-merge-mode", + jobRunID: misc.FastUUID().String(), }, { name: "Append Mode", @@ -211,6 +206,9 @@ func TestIntegration(t *testing.T) { loadTableStrategy: "APPEND", useParquetLoadFiles: false, stagingFilePrefix: "testdata/upload-job-append-mode", + // an empty jobRunID means that the source is not an ETL one + // see Uploader.CanAppend() + jobRunID: "", }, { name: "Parquet load files", @@ -222,6 +220,7 @@ func TestIntegration(t *testing.T) { loadTableStrategy: "MERGE", useParquetLoadFiles: true, stagingFilePrefix: "testdata/upload-job-parquet", + jobRunID: misc.FastUUID().String(), }, } @@ -255,6 +254,7 @@ func TestIntegration(t *testing.T) { Tables: tables, SourceID: tc.sourceID, DestinationID: tc.destinationID, + JobRunID: tc.jobRunID, WarehouseEventsMap: testhelper.EventsCountMap{ "identifies": 1, "users": 1, @@ -283,6 +283,7 @@ func TestIntegration(t *testing.T) { Tables: tables, SourceID: tc.sourceID, DestinationID: tc.destinationID, + JobRunID: tc.jobRunID, WarehouseEventsMap: tc.warehouseEventsMap, Config: conf, WorkspaceID: workspaceID, @@ -418,6 +419,61 @@ func TestDeltalake_TrimErrorMessage(t *testing.T) { } } +func TestDeltalake_ShouldAppend(t *testing.T) { + testCases := []struct { + name string + loadTableStrategy string + uploaderCanAppend bool + uploaderExpectedCalls int + expected bool + }{ + { + name: "uploader says we can append and we are in append mode", + loadTableStrategy: "APPEND", + uploaderCanAppend: true, + uploaderExpectedCalls: 1, + expected: true, + }, + { + name: "uploader says we cannot append and we are in append mode", + loadTableStrategy: "APPEND", + uploaderCanAppend: false, + uploaderExpectedCalls: 1, + expected: false, + }, + { + name: "uploader says we can append and we are in merge mode", + loadTableStrategy: "MERGE", + uploaderCanAppend: true, + uploaderExpectedCalls: 0, + expected: false, + }, + { + name: "uploader says we cannot append and we are in merge mode", + loadTableStrategy: "MERGE", + uploaderCanAppend: false, + uploaderExpectedCalls: 0, + expected: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + c := config.New() + c.Set("Warehouse.deltalake.loadTableStrategy", tc.loadTableStrategy) + + d := deltalake.New(c, logger.NOP, stats.Default) + + mockCtrl := gomock.NewController(t) + uploader := mockuploader.NewMockUploader(mockCtrl) + uploader.EXPECT().CanAppend().Times(tc.uploaderExpectedCalls).Return(tc.uploaderCanAppend) + + d.Uploader = uploader + require.Equal(t, d.ShouldAppend(), tc.expected) + }) + } +} + func mergeEventsMap() testhelper.EventsCountMap { return testhelper.EventsCountMap{ "identifies": 1, diff --git a/warehouse/integrations/postgres/load_test.go b/warehouse/integrations/postgres/load_test.go index 56bfbefdae..a1d4246fff 100644 --- a/warehouse/integrations/postgres/load_test.go +++ b/warehouse/integrations/postgres/load_test.go @@ -7,8 +7,10 @@ import ( "os" "path/filepath" "testing" - "time" + "github.com/golang/mock/gomock" + + mockuploader "github.com/rudderlabs/rudder-server/warehouse/internal/mocks/utils" "github.com/rudderlabs/rudder-server/warehouse/internal/model" "github.com/docker/docker/pkg/fileutils" @@ -36,40 +38,6 @@ func (m *mockLoadFileUploader) Download(_ context.Context, tableName string) ([] return m.mockFiles[tableName], m.mockError[tableName] } -type mockUploader struct { - schema model.Schema -} - -func (*mockUploader) GetSchemaInWarehouse() model.Schema { return model.Schema{} } -func (*mockUploader) GetLocalSchema(context.Context) (model.Schema, error) { - return model.Schema{}, nil -} -func (*mockUploader) UpdateLocalSchema(context.Context, model.Schema) error { return nil } -func (*mockUploader) ShouldOnDedupUseNewRecord() bool { return false } -func (*mockUploader) UseRudderStorage() bool { return false } -func (*mockUploader) GetLoadFileGenStartTIme() time.Time { return time.Time{} } -func (*mockUploader) GetLoadFileType() string { return "JSON" } -func (*mockUploader) GetFirstLastEvent() (time.Time, time.Time) { return time.Time{}, time.Time{} } -func (*mockUploader) GetLoadFilesMetadata(context.Context, warehouseutils.GetLoadFilesOptions) []warehouseutils.LoadFile { - return []warehouseutils.LoadFile{} -} - -func (*mockUploader) GetSingleLoadFile(context.Context, string) (warehouseutils.LoadFile, error) { - return warehouseutils.LoadFile{}, nil -} - -func (*mockUploader) GetSampleLoadFileLocation(context.Context, string) (string, error) { - return "", nil -} - -func (m *mockUploader) GetTableSchemaInUpload(tableName string) model.TableSchema { - return m.schema[tableName] -} - -func (m *mockUploader) GetTableSchemaInWarehouse(tableName string) model.TableSchema { - return m.schema[tableName] -} - func cloneFiles(t *testing.T, files []string) []string { tempFiles := make([]string, len(files)) for i, file := range files { @@ -253,19 +221,17 @@ func TestLoadTable(t *testing.T) { tableName: tc.mockError, }, } - pg.Uploader = &mockUploader{ - schema: map[string]model.TableSchema{ - tableName: { - "test_bool": "boolean", - "test_datetime": "datetime", - "test_float": "float", - "test_int": "int", - "test_string": "string", - "id": "string", - "received_at": "datetime", - }, + pg.Uploader = newMockUploader(t, map[string]model.TableSchema{ + tableName: { + "test_bool": "boolean", + "test_datetime": "datetime", + "test_float": "float", + "test_int": "int", + "test_string": "string", + "id": "string", + "received_at": "datetime", }, - } + }) err = pg.LoadTable(ctx, tableName) if tc.wantError != nil { @@ -375,11 +341,9 @@ func TestLoadTable(t *testing.T) { tableName: tc.mockError, }, } - pg.Uploader = &mockUploader{ - schema: map[string]model.TableSchema{ - tableName: warehouseutils.DiscardsSchema, - }, - } + pg.Uploader = newMockUploader(t, map[string]model.TableSchema{ + tableName: warehouseutils.DiscardsSchema, + }) err = pg.LoadTable(ctx, tableName) if tc.wantError != nil { @@ -580,12 +544,10 @@ func TestLoadUsersTable(t *testing.T) { warehouseutils.IdentifiesTable: tc.mockIdentifiesError, }, } - pg.Uploader = &mockUploader{ - schema: map[string]model.TableSchema{ - warehouseutils.UsersTable: usersSchamaInUpload, - warehouseutils.IdentifiesTable: identifiesSchemaInUpload, - }, - } + pg.Uploader = newMockUploader(t, map[string]model.TableSchema{ + warehouseutils.UsersTable: usersSchamaInUpload, + warehouseutils.IdentifiesTable: identifiesSchemaInUpload, + }) errorsMap := pg.LoadUserTables(ctx) require.NotEmpty(t, errorsMap) @@ -602,3 +564,12 @@ func TestLoadUsersTable(t *testing.T) { }) } } + +func newMockUploader(t testing.TB, schema model.Schema) *mockuploader.MockUploader { + ctrl := gomock.NewController(t) + f := func(tableName string) model.TableSchema { return schema[tableName] } + u := mockuploader.NewMockUploader(ctrl) + u.EXPECT().GetTableSchemaInUpload(gomock.Any()).AnyTimes().DoAndReturn(f) + u.EXPECT().GetTableSchemaInWarehouse(gomock.Any()).AnyTimes().DoAndReturn(f) + return u +} diff --git a/warehouse/integrations/snowflake/snowflake.go b/warehouse/integrations/snowflake/snowflake.go index fd12d7e8c9..1d682f8772 100644 --- a/warehouse/integrations/snowflake/snowflake.go +++ b/warehouse/integrations/snowflake/snowflake.go @@ -13,8 +13,7 @@ import ( "time" "github.com/samber/lo" - - snowflake "github.com/snowflakedb/gosnowflake" // blank comment + snowflake "github.com/snowflakedb/gosnowflake" "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" @@ -59,11 +58,6 @@ var partitionKeyMap = map[string]string{ discardsTable: `"ROW_ID", "COLUMN_NAME", "TABLE_NAME"`, } -var mergeSourceCategoryMap = map[string]struct{}{ - "cloud": {}, - "singer-protocol": {}, -} - var ( usersTable = whutils.ToProviderCase(whutils.SNOWFLAKE, whutils.UsersTable) identifiesTable = whutils.ToProviderCase(whutils.SNOWFLAKE, whutils.IdentifiesTable) @@ -820,12 +814,11 @@ func (sf *Snowflake) LoadIdentityMappingsTable(ctx context.Context) error { return nil } -// ShouldAppend returns true if the load table strategy is append mode and the source category is not in "mergeSourceCategoryMap" +// ShouldAppend returns true if: +// * the load table strategy is "append" mode +// * the uploader says we can append func (sf *Snowflake) ShouldAppend() bool { - sourceCategory := sf.Warehouse.Source.SourceDefinition.Category - _, isMergeCategory := mergeSourceCategoryMap[sourceCategory] - - return !isMergeCategory && sf.config.loadTableStrategy == loadTableStrategyAppendMode + return sf.config.loadTableStrategy == loadTableStrategyAppendMode && sf.Uploader.CanAppend() } func (sf *Snowflake) LoadUserTables(ctx context.Context) map[string]error { diff --git a/warehouse/integrations/snowflake/snowflake_test.go b/warehouse/integrations/snowflake/snowflake_test.go index 58ce682e1e..d07482bca2 100644 --- a/warehouse/integrations/snowflake/snowflake_test.go +++ b/warehouse/integrations/snowflake/snowflake_test.go @@ -12,17 +12,15 @@ import ( "testing" "time" - "github.com/rudderlabs/rudder-go-kit/config" - "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-go-kit/stats" - "github.com/rudderlabs/rudder-server/warehouse/integrations/snowflake" - "github.com/rudderlabs/rudder-server/warehouse/internal/model" - + "github.com/golang/mock/gomock" sfdb "github.com/snowflakedb/gosnowflake" "github.com/stretchr/testify/require" "github.com/rudderlabs/compose-test/compose" "github.com/rudderlabs/compose-test/testcompose" + "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" kithelper "github.com/rudderlabs/rudder-go-kit/testhelper" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/runner" @@ -30,7 +28,9 @@ import ( "github.com/rudderlabs/rudder-server/testhelper/workspaceConfig" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/warehouse/client" + "github.com/rudderlabs/rudder-server/warehouse/integrations/snowflake" "github.com/rudderlabs/rudder-server/warehouse/integrations/testhelper" + mockuploader "github.com/rudderlabs/rudder-server/warehouse/internal/mocks/utils" whutils "github.com/rudderlabs/rudder-server/warehouse/utils" "github.com/rudderlabs/rudder-server/warehouse/validations" ) @@ -220,6 +220,7 @@ func TestIntegration(t *testing.T) { database string asyncJob bool stagingFilePrefix string + emptyJobRunID bool appendMode bool customUserID string }{ @@ -317,8 +318,11 @@ func TestIntegration(t *testing.T) { warehouseEventsMap: map[string]int{"identifies": 1, "users": 1, "tracks": 1}, warehouseEventsMap2: map[string]int{"identifies": 2, "users": 1, "tracks": 2}, stagingFilePrefix: "testdata/append-job", - appendMode: true, - customUserID: testhelper.GetUserId("append_test"), + // an empty jobRunID means that the source is not an ETL one + // see Uploader.CanAppend() + emptyJobRunID: true, + appendMode: true, + customUserID: testhelper.GetUserId("append_test"), }, } @@ -375,6 +379,10 @@ func TestIntegration(t *testing.T) { if userID == "" { userID = testhelper.GetUserId(destType) } + jobRunID := "" + if !tc.emptyJobRunID { + jobRunID = misc.FastUUID().String() + } ts1 := testhelper.TestConfig{ WriteKey: tc.writeKey, Schema: tc.schema, @@ -391,7 +399,7 @@ func TestIntegration(t *testing.T) { JobsDB: jobsDB, HTTPPort: httpPort, Client: sqlClient, - JobRunID: misc.FastUUID().String(), + JobRunID: jobRunID, TaskRunID: misc.FastUUID().String(), StagingFilePath: tc.stagingFilePrefix + ".staging-1.json", UserID: userID, @@ -403,6 +411,10 @@ func TestIntegration(t *testing.T) { if userID == "" { userID = testhelper.GetUserId(destType) } + jobRunID = "" + if !tc.emptyJobRunID { + jobRunID = misc.FastUUID().String() + } whEventsMap := tc.warehouseEventsMap2 if whEventsMap == nil { whEventsMap = tc.warehouseEventsMap @@ -424,7 +436,7 @@ func TestIntegration(t *testing.T) { JobsDB: jobsDB, HTTPPort: httpPort, Client: sqlClient, - JobRunID: misc.FastUUID().String(), + JobRunID: jobRunID, TaskRunID: misc.FastUUID().String(), StagingFilePath: tc.stagingFilePrefix + ".staging-2.json", UserID: userID, @@ -496,46 +508,39 @@ func TestIntegration(t *testing.T) { func TestSnowflake_ShouldAppend(t *testing.T) { testCases := []struct { - name string - loadTableStrategy string - sourceCategory string - expected bool + name string + loadTableStrategy string + uploaderCanAppend bool + uploaderExpectedCalls int + expected bool }{ { - name: "merge with event stream", - loadTableStrategy: "MERGE", - sourceCategory: "event-stream", - expected: false, - }, - { - name: "append with event-stream", - loadTableStrategy: "MERGE", - sourceCategory: "event-stream", - expected: false, - }, - { - name: "merge with extract cloud source", - loadTableStrategy: "MERGE", - sourceCategory: "cloud", - expected: false, + name: "uploader says we can append and we are in append mode", + loadTableStrategy: "APPEND", + uploaderCanAppend: true, + uploaderExpectedCalls: 1, + expected: true, }, { - name: "merge with extract singer protocol source", - loadTableStrategy: "MERGE", - sourceCategory: "singer-protocol", - expected: false, + name: "uploader says we cannot append and we are in append mode", + loadTableStrategy: "APPEND", + uploaderCanAppend: false, + uploaderExpectedCalls: 1, + expected: false, }, { - name: "append with extract cloud source", - loadTableStrategy: "APPEND", - sourceCategory: "cloud", - expected: false, + name: "uploader says we can append and we are in merge mode", + loadTableStrategy: "MERGE", + uploaderCanAppend: true, + uploaderExpectedCalls: 0, + expected: false, }, { - name: "append with extract singer protocol source", - loadTableStrategy: "APPEND", - sourceCategory: "singer-protocol", - expected: false, + name: "uploader says we cannot append and we are in merge mode", + loadTableStrategy: "MERGE", + uploaderCanAppend: false, + uploaderExpectedCalls: 0, + expected: false, }, } @@ -547,14 +552,11 @@ func TestSnowflake_ShouldAppend(t *testing.T) { sf, err := snowflake.New(c, logger.NOP, stats.Default) require.NoError(t, err) - sf.Warehouse = model.Warehouse{ - Source: backendconfig.SourceT{ - SourceDefinition: backendconfig.SourceDefinitionT{ - Category: tc.sourceCategory, - }, - }, - } + mockCtrl := gomock.NewController(t) + uploader := mockuploader.NewMockUploader(mockCtrl) + uploader.EXPECT().CanAppend().Times(tc.uploaderExpectedCalls).Return(tc.uploaderCanAppend) + sf.Uploader = uploader require.Equal(t, sf.ShouldAppend(), tc.expected) }) } diff --git a/warehouse/internal/mocks/utils/mock_uploader.go b/warehouse/internal/mocks/utils/mock_uploader.go new file mode 100644 index 0000000000..f17e67ebeb --- /dev/null +++ b/warehouse/internal/mocks/utils/mock_uploader.go @@ -0,0 +1,238 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/rudderlabs/rudder-server/warehouse/utils (interfaces: Uploader) + +// Package mock_uploader is a generated GoMock package. +package mock_uploader + +import ( + context "context" + reflect "reflect" + time "time" + + gomock "github.com/golang/mock/gomock" + model "github.com/rudderlabs/rudder-server/warehouse/internal/model" + warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" +) + +// MockUploader is a mock of Uploader interface. +type MockUploader struct { + ctrl *gomock.Controller + recorder *MockUploaderMockRecorder +} + +// MockUploaderMockRecorder is the mock recorder for MockUploader. +type MockUploaderMockRecorder struct { + mock *MockUploader +} + +// NewMockUploader creates a new mock instance. +func NewMockUploader(ctrl *gomock.Controller) *MockUploader { + mock := &MockUploader{ctrl: ctrl} + mock.recorder = &MockUploaderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockUploader) EXPECT() *MockUploaderMockRecorder { + return m.recorder +} + +// CanAppend mocks base method. +func (m *MockUploader) CanAppend() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CanAppend") + ret0, _ := ret[0].(bool) + return ret0 +} + +// CanAppend indicates an expected call of CanAppend. +func (mr *MockUploaderMockRecorder) CanAppend() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CanAppend", reflect.TypeOf((*MockUploader)(nil).CanAppend)) +} + +// GetFirstLastEvent mocks base method. +func (m *MockUploader) GetFirstLastEvent() (time.Time, time.Time) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetFirstLastEvent") + ret0, _ := ret[0].(time.Time) + ret1, _ := ret[1].(time.Time) + return ret0, ret1 +} + +// GetFirstLastEvent indicates an expected call of GetFirstLastEvent. +func (mr *MockUploaderMockRecorder) GetFirstLastEvent() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFirstLastEvent", reflect.TypeOf((*MockUploader)(nil).GetFirstLastEvent)) +} + +// GetLoadFileGenStartTIme mocks base method. +func (m *MockUploader) GetLoadFileGenStartTIme() time.Time { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetLoadFileGenStartTIme") + ret0, _ := ret[0].(time.Time) + return ret0 +} + +// GetLoadFileGenStartTIme indicates an expected call of GetLoadFileGenStartTIme. +func (mr *MockUploaderMockRecorder) GetLoadFileGenStartTIme() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLoadFileGenStartTIme", reflect.TypeOf((*MockUploader)(nil).GetLoadFileGenStartTIme)) +} + +// GetLoadFileType mocks base method. +func (m *MockUploader) GetLoadFileType() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetLoadFileType") + ret0, _ := ret[0].(string) + return ret0 +} + +// GetLoadFileType indicates an expected call of GetLoadFileType. +func (mr *MockUploaderMockRecorder) GetLoadFileType() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLoadFileType", reflect.TypeOf((*MockUploader)(nil).GetLoadFileType)) +} + +// GetLoadFilesMetadata mocks base method. +func (m *MockUploader) GetLoadFilesMetadata(arg0 context.Context, arg1 warehouseutils.GetLoadFilesOptions) []warehouseutils.LoadFile { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetLoadFilesMetadata", arg0, arg1) + ret0, _ := ret[0].([]warehouseutils.LoadFile) + return ret0 +} + +// GetLoadFilesMetadata indicates an expected call of GetLoadFilesMetadata. +func (mr *MockUploaderMockRecorder) GetLoadFilesMetadata(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLoadFilesMetadata", reflect.TypeOf((*MockUploader)(nil).GetLoadFilesMetadata), arg0, arg1) +} + +// GetLocalSchema mocks base method. +func (m *MockUploader) GetLocalSchema(arg0 context.Context) (model.Schema, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetLocalSchema", arg0) + ret0, _ := ret[0].(model.Schema) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetLocalSchema indicates an expected call of GetLocalSchema. +func (mr *MockUploaderMockRecorder) GetLocalSchema(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLocalSchema", reflect.TypeOf((*MockUploader)(nil).GetLocalSchema), arg0) +} + +// GetSampleLoadFileLocation mocks base method. +func (m *MockUploader) GetSampleLoadFileLocation(arg0 context.Context, arg1 string) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSampleLoadFileLocation", arg0, arg1) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetSampleLoadFileLocation indicates an expected call of GetSampleLoadFileLocation. +func (mr *MockUploaderMockRecorder) GetSampleLoadFileLocation(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSampleLoadFileLocation", reflect.TypeOf((*MockUploader)(nil).GetSampleLoadFileLocation), arg0, arg1) +} + +// GetSchemaInWarehouse mocks base method. +func (m *MockUploader) GetSchemaInWarehouse() model.Schema { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSchemaInWarehouse") + ret0, _ := ret[0].(model.Schema) + return ret0 +} + +// GetSchemaInWarehouse indicates an expected call of GetSchemaInWarehouse. +func (mr *MockUploaderMockRecorder) GetSchemaInWarehouse() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSchemaInWarehouse", reflect.TypeOf((*MockUploader)(nil).GetSchemaInWarehouse)) +} + +// GetSingleLoadFile mocks base method. +func (m *MockUploader) GetSingleLoadFile(arg0 context.Context, arg1 string) (warehouseutils.LoadFile, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSingleLoadFile", arg0, arg1) + ret0, _ := ret[0].(warehouseutils.LoadFile) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetSingleLoadFile indicates an expected call of GetSingleLoadFile. +func (mr *MockUploaderMockRecorder) GetSingleLoadFile(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSingleLoadFile", reflect.TypeOf((*MockUploader)(nil).GetSingleLoadFile), arg0, arg1) +} + +// GetTableSchemaInUpload mocks base method. +func (m *MockUploader) GetTableSchemaInUpload(arg0 string) model.TableSchema { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTableSchemaInUpload", arg0) + ret0, _ := ret[0].(model.TableSchema) + return ret0 +} + +// GetTableSchemaInUpload indicates an expected call of GetTableSchemaInUpload. +func (mr *MockUploaderMockRecorder) GetTableSchemaInUpload(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTableSchemaInUpload", reflect.TypeOf((*MockUploader)(nil).GetTableSchemaInUpload), arg0) +} + +// GetTableSchemaInWarehouse mocks base method. +func (m *MockUploader) GetTableSchemaInWarehouse(arg0 string) model.TableSchema { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTableSchemaInWarehouse", arg0) + ret0, _ := ret[0].(model.TableSchema) + return ret0 +} + +// GetTableSchemaInWarehouse indicates an expected call of GetTableSchemaInWarehouse. +func (mr *MockUploaderMockRecorder) GetTableSchemaInWarehouse(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTableSchemaInWarehouse", reflect.TypeOf((*MockUploader)(nil).GetTableSchemaInWarehouse), arg0) +} + +// ShouldOnDedupUseNewRecord mocks base method. +func (m *MockUploader) ShouldOnDedupUseNewRecord() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ShouldOnDedupUseNewRecord") + ret0, _ := ret[0].(bool) + return ret0 +} + +// ShouldOnDedupUseNewRecord indicates an expected call of ShouldOnDedupUseNewRecord. +func (mr *MockUploaderMockRecorder) ShouldOnDedupUseNewRecord() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ShouldOnDedupUseNewRecord", reflect.TypeOf((*MockUploader)(nil).ShouldOnDedupUseNewRecord)) +} + +// UpdateLocalSchema mocks base method. +func (m *MockUploader) UpdateLocalSchema(arg0 context.Context, arg1 model.Schema) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateLocalSchema", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateLocalSchema indicates an expected call of UpdateLocalSchema. +func (mr *MockUploaderMockRecorder) UpdateLocalSchema(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateLocalSchema", reflect.TypeOf((*MockUploader)(nil).UpdateLocalSchema), arg0, arg1) +} + +// UseRudderStorage mocks base method. +func (m *MockUploader) UseRudderStorage() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UseRudderStorage") + ret0, _ := ret[0].(bool) + return ret0 +} + +// UseRudderStorage indicates an expected call of UseRudderStorage. +func (mr *MockUploaderMockRecorder) UseRudderStorage() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UseRudderStorage", reflect.TypeOf((*MockUploader)(nil).UseRudderStorage)) +} diff --git a/warehouse/internal/service/loadfiles/downloader/downloader_test.go b/warehouse/internal/service/loadfiles/downloader/downloader_test.go index 1baa20adff..089205493d 100644 --- a/warehouse/internal/service/loadfiles/downloader/downloader_test.go +++ b/warehouse/internal/service/loadfiles/downloader/downloader_test.go @@ -6,12 +6,8 @@ import ( "fmt" "os" "testing" - "time" - - "github.com/rudderlabs/rudder-server/warehouse/internal/model" - - "github.com/rudderlabs/rudder-server/warehouse/internal/service/loadfiles/downloader" + "github.com/golang/mock/gomock" "github.com/google/uuid" "github.com/ory/dockertest/v3" "github.com/stretchr/testify/require" @@ -20,42 +16,12 @@ import ( backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/testhelper/destination" "github.com/rudderlabs/rudder-server/utils/misc" + mockuploader "github.com/rudderlabs/rudder-server/warehouse/internal/mocks/utils" + "github.com/rudderlabs/rudder-server/warehouse/internal/model" + "github.com/rudderlabs/rudder-server/warehouse/internal/service/loadfiles/downloader" warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" ) -type mockUploader struct { - loadFiles []warehouseutils.LoadFile -} - -func (*mockUploader) GetSchemaInWarehouse() model.Schema { return model.Schema{} } -func (*mockUploader) GetLocalSchema(context.Context) (model.Schema, error) { - return model.Schema{}, nil -} -func (*mockUploader) UpdateLocalSchema(context.Context, model.Schema) error { return nil } -func (*mockUploader) ShouldOnDedupUseNewRecord() bool { return false } -func (*mockUploader) GetLoadFileGenStartTIme() time.Time { return time.Time{} } -func (*mockUploader) GetLoadFileType() string { return "JSON" } -func (*mockUploader) GetFirstLastEvent() (time.Time, time.Time) { return time.Time{}, time.Time{} } -func (*mockUploader) GetSampleLoadFileLocation(context.Context, string) (string, error) { - return "", nil -} -func (*mockUploader) UseRudderStorage() bool { return false } -func (*mockUploader) GetTableSchemaInWarehouse(string) model.TableSchema { - return model.TableSchema{} -} - -func (*mockUploader) GetSingleLoadFile(context.Context, string) (warehouseutils.LoadFile, error) { - return warehouseutils.LoadFile{}, nil -} - -func (*mockUploader) GetTableSchemaInUpload(string) model.TableSchema { - return model.TableSchema{} -} - -func (m *mockUploader) GetLoadFilesMetadata(context.Context, warehouseutils.GetLoadFilesOptions) []warehouseutils.LoadFile { - return m.loadFiles -} - func TestDownloader(t *testing.T) { t.Parallel() @@ -186,9 +152,7 @@ func TestDownloader(t *testing.T) { WorkspaceID: workspaceID, }, }, - &mockUploader{ - loadFiles: loadFiles, - }, + newMockUploader(t, loadFiles), workers, ) @@ -207,3 +171,11 @@ func TestDownloader(t *testing.T) { }) } } + +func newMockUploader(t testing.TB, loadFiles []warehouseutils.LoadFile) *mockuploader.MockUploader { + ctrl := gomock.NewController(t) + u := mockuploader.NewMockUploader(ctrl) + u.EXPECT().GetLoadFilesMetadata(gomock.Any(), gomock.Any()).AnyTimes().Return(loadFiles) + u.EXPECT().UseRudderStorage().Return(false).AnyTimes() + return u +} diff --git a/warehouse/jobs/jobs.go b/warehouse/jobs/jobs.go index 84f7b8b1a6..1ac5be9dfb 100644 --- a/warehouse/jobs/jobs.go +++ b/warehouse/jobs/jobs.go @@ -62,3 +62,5 @@ func (*WhAsyncJob) GetLoadFileType() string { func (*WhAsyncJob) GetFirstLastEvent() (time.Time, time.Time) { return time.Now(), time.Now() } + +func (*WhAsyncJob) CanAppend() bool { return false } diff --git a/warehouse/upload.go b/warehouse/upload.go index ee9f338fe5..f51057f403 100644 --- a/warehouse/upload.go +++ b/warehouse/upload.go @@ -171,6 +171,10 @@ const ( var ( alwaysMarkExported = []string{warehouseutils.DiscardsTable} warehousesToAlwaysRegenerateAllLoadFilesOnResume = []string{warehouseutils.SNOWFLAKE, warehouseutils.BQ} + mergeSourceCategoryMap = map[string]struct{}{ + "cloud": {}, + "singer-protocol": {}, + } ) func init() { @@ -770,6 +774,23 @@ func (job *UploadJob) exportRegularTables(specialTables []string, loadFilesTable return } +// CanAppend returns true if: +// * the source is not an ETL source +// * the source is not a replay source +// * the source category is not in "mergeSourceCategoryMap" +func (job *UploadJob) CanAppend() bool { + if isSourceETL := job.upload.SourceJobRunID != ""; isSourceETL { + return false + } + if job.warehouse.Source.IsReplaySource() { + return false + } + if _, isMergeCategory := mergeSourceCategoryMap[job.warehouse.Source.SourceDefinition.Category]; isMergeCategory { + return false + } + return true +} + func (job *UploadJob) TablesToSkip() (map[string]model.PendingTableUpload, map[string]model.PendingTableUpload, error) { job.pendingTableUploadsOnce.Do(func() { job.pendingTableUploads, job.pendingTableUploadsError = job.pendingTableUploadsRepo.PendingTableUploads( diff --git a/warehouse/upload_test.go b/warehouse/upload_test.go index 49b8fdf108..f78e152a8c 100644 --- a/warehouse/upload_test.go +++ b/warehouse/upload_test.go @@ -631,3 +631,75 @@ func TestUploadJob_DurationBeforeNextAttempt(t *testing.T) { }) } } + +func TestUploadJob_CanAppend(t *testing.T) { + testCases := []struct { + name string + sourceCategory string + sourceJobRunID string // if not empty then it's an ETL source + originalID string // if not empty then it's a replay + expected bool + }{ + { + name: "not a merge category", + sourceCategory: "event-stream", + sourceJobRunID: "", + originalID: "", + expected: true, + }, + { + name: "cloud merge category", + sourceCategory: "cloud", + sourceJobRunID: "", + originalID: "", + expected: false, + }, + { + name: "singer-protocol merge category", + sourceCategory: "singer-protocol", + sourceJobRunID: "", + originalID: "", + expected: false, + }, + { + name: "etl source", + sourceCategory: "event-stream", + sourceJobRunID: "some-job-run-id", + originalID: "", + expected: false, + }, + { + name: "replay", + sourceCategory: "event-stream", + sourceJobRunID: "", + originalID: "some-original-id", + expected: false, + }, + { + name: "replay of etl source in merge category map", + sourceCategory: "cloud", + sourceJobRunID: "some-job-run-id", + originalID: "some-original-id", + expected: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + uj := UploadJob{ + upload: model.Upload{ + SourceJobRunID: tc.sourceJobRunID, + }, + warehouse: model.Warehouse{ + Source: backendconfig.SourceT{ + OriginalID: tc.originalID, + SourceDefinition: backendconfig.SourceDefinitionT{ + Category: tc.sourceCategory, + }, + }, + }, + } + require.Equal(t, uj.CanAppend(), tc.expected) + }) + } +} diff --git a/warehouse/utils/utils.go b/warehouse/utils/utils.go index dbc88efbaf..afe7be90f7 100644 --- a/warehouse/utils/utils.go +++ b/warehouse/utils/utils.go @@ -232,6 +232,7 @@ type KeyValue struct { Value interface{} } +//go:generate mockgen -destination=../internal/mocks/utils/mock_uploader.go -package mock_uploader github.com/rudderlabs/rudder-server/warehouse/utils Uploader type Uploader interface { GetSchemaInWarehouse() model.Schema GetLocalSchema(ctx context.Context) (model.Schema, error) @@ -246,6 +247,7 @@ type Uploader interface { GetLoadFileGenStartTIme() time.Time GetLoadFileType() string GetFirstLastEvent() (time.Time, time.Time) + CanAppend() bool } type GetLoadFilesOptions struct { diff --git a/warehouse/validations/validate.go b/warehouse/validations/validate.go index 12850a005d..70808c8239 100644 --- a/warehouse/validations/validate.go +++ b/warehouse/validations/validate.go @@ -11,13 +11,11 @@ import ( "time" "github.com/rudderlabs/rudder-go-kit/config" - "github.com/rudderlabs/rudder-go-kit/stats" - - "github.com/rudderlabs/rudder-server/warehouse/encoding" - "github.com/rudderlabs/rudder-go-kit/filemanager" + "github.com/rudderlabs/rudder-go-kit/stats" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/utils/misc" + "github.com/rudderlabs/rudder-server/warehouse/encoding" "github.com/rudderlabs/rudder-server/warehouse/integrations/manager" "github.com/rudderlabs/rudder-server/warehouse/internal/model" "github.com/rudderlabs/rudder-server/warehouse/logfield" @@ -57,52 +55,12 @@ type loadTable struct { table string } -type dummyUploader struct { - dest *backendconfig.DestinationT -} - type DestinationValidator interface { Validate(ctx context.Context, dest *backendconfig.DestinationT) *model.DestinationValidationResponse } type destinationValidationImpl struct{} -func (*dummyUploader) GetSchemaInWarehouse() model.Schema { return model.Schema{} } -func (*dummyUploader) GetLocalSchema(context.Context) (model.Schema, error) { - return model.Schema{}, nil -} -func (*dummyUploader) UpdateLocalSchema(context.Context, model.Schema) error { return nil } -func (*dummyUploader) ShouldOnDedupUseNewRecord() bool { return false } -func (*dummyUploader) GetFirstLastEvent() (time.Time, time.Time) { return time.Time{}, time.Time{} } -func (*dummyUploader) GetLoadFileGenStartTIme() time.Time { return time.Time{} } -func (*dummyUploader) GetSampleLoadFileLocation(context.Context, string) (string, error) { - return "", nil -} - -func (*dummyUploader) GetTableSchemaInWarehouse(string) model.TableSchema { - return model.TableSchema{} -} - -func (*dummyUploader) GetTableSchemaInUpload(string) model.TableSchema { - return model.TableSchema{} -} - -func (*dummyUploader) GetLoadFilesMetadata(context.Context, warehouseutils.GetLoadFilesOptions) []warehouseutils.LoadFile { - return []warehouseutils.LoadFile{} -} - -func (*dummyUploader) GetSingleLoadFile(context.Context, string) (warehouseutils.LoadFile, error) { - return warehouseutils.LoadFile{}, nil -} - -func (m *dummyUploader) GetLoadFileType() string { - return warehouseutils.GetLoadFileType(m.dest.DestinationDefinition.Name) -} - -func (m *dummyUploader) UseRudderStorage() bool { - return misc.IsConfiguredToUseRudderObjectStorage(m.dest.Config) -} - func NewDestinationValidator() DestinationValidator { return &destinationValidationImpl{} } @@ -589,3 +547,36 @@ func getTable(dest *backendconfig.DestinationT) string { func tableWithUUID() string { return table + "_" + warehouseutils.RandHex() } + +type dummyUploader struct { + dest *backendconfig.DestinationT +} + +func (*dummyUploader) GetSchemaInWarehouse() model.Schema { return nil } +func (*dummyUploader) GetLocalSchema(context.Context) (model.Schema, error) { return nil, nil } +func (*dummyUploader) UpdateLocalSchema(context.Context, model.Schema) error { return nil } +func (*dummyUploader) ShouldOnDedupUseNewRecord() bool { return false } +func (*dummyUploader) GetFirstLastEvent() (time.Time, time.Time) { return time.Time{}, time.Time{} } +func (*dummyUploader) GetLoadFileGenStartTIme() time.Time { return time.Time{} } +func (*dummyUploader) GetTableSchemaInWarehouse(string) model.TableSchema { return nil } +func (*dummyUploader) GetTableSchemaInUpload(string) model.TableSchema { return nil } +func (*dummyUploader) CanAppend() bool { return false } +func (*dummyUploader) GetSampleLoadFileLocation(context.Context, string) (string, error) { + return "", nil +} + +func (*dummyUploader) GetLoadFilesMetadata(context.Context, warehouseutils.GetLoadFilesOptions) []warehouseutils.LoadFile { + return nil +} + +func (*dummyUploader) GetSingleLoadFile(context.Context, string) (warehouseutils.LoadFile, error) { + return warehouseutils.LoadFile{}, nil +} + +func (m *dummyUploader) GetLoadFileType() string { + return warehouseutils.GetLoadFileType(m.dest.DestinationDefinition.Name) +} + +func (m *dummyUploader) UseRudderStorage() bool { + return misc.IsConfiguredToUseRudderObjectStorage(m.dest.Config) +}