From ce2ed332a45b3deafc7284e2629fb21a3f756aed Mon Sep 17 00:00:00 2001 From: Francesco Casula Date: Mon, 28 Aug 2023 10:42:19 +0200 Subject: [PATCH] chore: warehouse tests race detection (#3773) * chore: warehouse tests race detection * temp * temp * temp * chore: fix defer * chore: removed unused lock * fix: unlocking of RWMutex * fix: unlocking of RWMutex * chore: fixing err data race * chore: removing parallel * chore: go-kit v0.15.6 * chore: removing uploadLock * chore: removing uploadSchema mutex * chore: removing TODO * chore: go-kit v0.15.7 --- Makefile | 2 +- .../integrations/snowflake/snowflake_test.go | 15 +- warehouse/internal/model/schema.go | 11 + warehouse/logfield/logfield.go | 1 + warehouse/router_identities.go | 2 +- warehouse/{ => schema}/schema.go | 432 ++++++++++-------- warehouse/{ => schema}/schema_test.go | 389 +++------------- warehouse/slave_worker.go | 52 ++- warehouse/slave_worker_test.go | 259 +++++++++++ warehouse/upload.go | 160 +++---- warehouse/upload_test.go | 18 +- 11 files changed, 707 insertions(+), 634 deletions(-) rename warehouse/{ => schema}/schema.go (55%) rename warehouse/{ => schema}/schema_test.go (80%) diff --git a/Makefile b/Makefile index ce38b1d6be..1f467f2624 100644 --- a/Makefile +++ b/Makefile @@ -31,7 +31,7 @@ endif test-warehouse-integration: $(eval TEST_PATTERN = 'TestIntegration') $(eval TEST_CMD = SLOW=1 go test) - $(eval TEST_OPTIONS = -v -p 8 -timeout 30m -count 1 -run $(TEST_PATTERN) -coverprofile=profile.out -covermode=atomic -coverpkg=./...) + $(eval TEST_OPTIONS = -v -p 8 -timeout 30m -count 1 -race -run $(TEST_PATTERN) -coverprofile=profile.out -covermode=atomic -coverpkg=./...) $(TEST_CMD) $(TEST_OPTIONS) $(package) && touch $(TESTFILE) || true test-warehouse: test-warehouse-integration test-teardown diff --git a/warehouse/integrations/snowflake/snowflake_test.go b/warehouse/integrations/snowflake/snowflake_test.go index 5e99d36317..f61c38d3f3 100644 --- a/warehouse/integrations/snowflake/snowflake_test.go +++ b/warehouse/integrations/snowflake/snowflake_test.go @@ -322,13 +322,16 @@ func TestIntegration(t *testing.T) { cancel := bootstrap(t, tc.appendMode) defer cancel() + cred := tc.cred + cred.Database = tc.database + urlConfig := sfdb.Config{ - Account: tc.cred.Account, - User: tc.cred.User, - Role: tc.cred.Role, - Password: tc.cred.Password, - Database: tc.database, - Warehouse: tc.cred.Warehouse, + Account: cred.Account, + User: cred.User, + Role: cred.Role, + Password: cred.Password, + Database: cred.Database, + Warehouse: cred.Warehouse, } dsn, err := sfdb.DSN(&urlConfig) diff --git a/warehouse/internal/model/schema.go b/warehouse/internal/model/schema.go index fff0ec0749..f0346c1e9e 100644 --- a/warehouse/internal/model/schema.go +++ b/warehouse/internal/model/schema.go @@ -30,3 +30,14 @@ type WHSchema struct { CreatedAt time.Time UpdatedAt time.Time } + +func (s Schema) Clone() Schema { + cp := make(Schema, len(s)) + for k, v := range s { + cp[k] = make(TableSchema, len(v)) + for kk, vv := range v { + cp[k][kk] = vv + } + } + return cp +} diff --git a/warehouse/logfield/logfield.go b/warehouse/logfield/logfield.go index 66e140ff87..c4f904f9b1 100644 --- a/warehouse/logfield/logfield.go +++ b/warehouse/logfield/logfield.go @@ -29,5 +29,6 @@ const ( QueryExecutionTime = "queryExecutionTime" StagingTableName = "stagingTableName" TotalRows = "totalRows" + WarehouseID = "warehouseID" SampleDuplicateMessages = "sampleDuplicateMessages" ) diff --git a/warehouse/router_identities.go b/warehouse/router_identities.go index 2e3ac06e86..781f5c8205 100644 --- a/warehouse/router_identities.go +++ b/warehouse/router_identities.go @@ -440,7 +440,7 @@ func (r *router) populateHistoricIdentities(ctx context.Context, warehouse model } defer whManager.Cleanup(ctx) - err = job.schemaHandle.fetchSchemaFromWarehouse(ctx, whManager) + _, _, err = job.schemaHandle.FetchSchemaFromWarehouse(ctx, whManager) if err != nil { r.logger.Errorf(`[WH]: Failed fetching schema from warehouse: %v`, err) job.setUploadError(err, model.Aborted) diff --git a/warehouse/schema.go b/warehouse/schema/schema.go similarity index 55% rename from warehouse/schema.go rename to warehouse/schema/schema.go index 384de8918f..d85638e7b1 100644 --- a/warehouse/schema.go +++ b/warehouse/schema/schema.go @@ -1,36 +1,30 @@ -package warehouse +package schema import ( "context" - "errors" "fmt" "reflect" "regexp" - - "golang.org/x/exp/slices" + "sync" "github.com/samber/lo" + "golang.org/x/exp/slices" "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" - "github.com/rudderlabs/rudder-server/warehouse/internal/repo" - "github.com/rudderlabs/rudder-server/warehouse/internal/model" + "github.com/rudderlabs/rudder-server/warehouse/internal/repo" "github.com/rudderlabs/rudder-server/warehouse/logfield" - warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" -) - -var ( - errIncompatibleSchemaConversion = errors.New("incompatible schema conversion") - errSchemaConversionNotSupported = errors.New("schema conversion not supported") + whutils "github.com/rudderlabs/rudder-server/warehouse/utils" ) // deprecatedColumnsRegex // This regex is used to identify deprecated columns in the warehouse // Example: abc-deprecated-dba626a7-406a-4757-b3e0-3875559c5840 -var deprecatedColumnsRegex = regexp.MustCompile(`.*-deprecated-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`) +var deprecatedColumnsRegex = regexp.MustCompile( + `.*-deprecated-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`, +) type schemaRepo interface { GetForNamespace(ctx context.Context, sourceID, destID, namespace string) (model.WHSchema, error) @@ -46,14 +40,17 @@ type fetchSchemaRepo interface { } type Schema struct { - warehouse model.Warehouse - localSchema model.Schema - schemaInWarehouse model.Schema - unrecognizedSchemaInWarehouse model.Schema - uploadSchema model.Schema - schemaRepo schemaRepo - stagingFileRepo stagingFileRepo - log logger.Logger + warehouse model.Warehouse + localSchema model.Schema + localSchemaMu sync.RWMutex + schemaInWarehouse model.Schema + schemaInWarehouseMu sync.RWMutex + unrecognizedSchemaInWarehouse model.Schema + unrecognizedSchemaInWarehouseMu sync.RWMutex + schemaRepo schemaRepo + stagingFileRepo stagingFileRepo + log logger.Logger + stagingFilesSchemaPaginationSize int skipDeepEqualSchemas bool enableIDResolution bool @@ -63,19 +60,117 @@ func NewSchema( db *sqlquerywrapper.DB, warehouse model.Warehouse, conf *config.Config, + logger logger.Logger, ) *Schema { return &Schema{ warehouse: warehouse, schemaRepo: repo.NewWHSchemas(db), stagingFileRepo: repo.NewStagingFiles(db), - log: logger.NewLogger().Child("warehouse").Child("schema"), + log: logger, stagingFilesSchemaPaginationSize: conf.GetInt("Warehouse.stagingFilesSchemaPaginationSize", 100), skipDeepEqualSchemas: conf.GetBool("Warehouse.skipDeepEqualSchemas", false), enableIDResolution: conf.GetBool("Warehouse.enableIDResolution", false), } } -func (sh *Schema) updateLocalSchema(ctx context.Context, uploadId int64, updatedSchema model.Schema) error { +func (sh *Schema) ConsolidateLocalSchemaWithStagingFiles( + ctx context.Context, + stagingFiles []*model.StagingFile, +) (model.Schema, error) { + consolidatedSchema, err := sh.consolidateLocalSchemaWithStagingFiles(ctx, stagingFiles) + if err != nil { + return nil, fmt.Errorf("consolidating staging files schema: %w", err) + } + return consolidatedSchema, nil +} + +// TableSchemaDiff returns the diff between the warehouse schema and the upload table schema +func (sh *Schema) TableSchemaDiff(tableName string, tableSchema model.TableSchema) whutils.TableSchemaDiff { + diff := whutils.TableSchemaDiff{ + ColumnMap: make(model.TableSchema), + UpdatedSchema: make(model.TableSchema), + AlteredColumnMap: make(model.TableSchema), + } + + sh.schemaInWarehouseMu.RLock() + defer sh.schemaInWarehouseMu.RUnlock() + currentTableSchema, ok := sh.schemaInWarehouse[tableName] + + if !ok { + if len(tableSchema) == 0 { + return diff + } + diff.Exists = true + diff.TableToBeCreated = true + diff.ColumnMap = tableSchema + diff.UpdatedSchema = tableSchema + return diff + } + + for columnName, columnType := range currentTableSchema { + diff.UpdatedSchema[columnName] = columnType + } + + diff.ColumnMap = make(model.TableSchema) + for columnName, columnType := range tableSchema { + if _, ok := currentTableSchema[columnName]; !ok { + diff.ColumnMap[columnName] = columnType + diff.UpdatedSchema[columnName] = columnType + diff.Exists = true + } else if model.SchemaType(columnType) == model.TextDataType && + model.SchemaType(currentTableSchema[columnName]) == model.StringDataType { + diff.AlteredColumnMap[columnName] = columnType + diff.UpdatedSchema[columnName] = columnType + diff.Exists = true + } + } + return diff +} + +func (sh *Schema) SyncRemoteSchema(ctx context.Context, repo fetchSchemaRepo, uploadID int64) ( + model.Schema, + bool, + error, +) { + localSchema, err := sh.GetLocalSchema(ctx) + if err != nil { + return nil, false, fmt.Errorf("fetching schema from local: %w", err) + } + + warehouseSchema, _, err := sh.FetchSchemaFromWarehouse(ctx, repo) + if err != nil { + return nil, false, fmt.Errorf("fetching schema from warehouse: %w", err) + } + + schemaChanged := sh.hasSchemaChanged(localSchema, warehouseSchema) + if schemaChanged { + if err := sh.UpdateLocalSchema(ctx, uploadID, warehouseSchema); err != nil { + return nil, false, fmt.Errorf("updating local schema: %w", err) + } + } + + return warehouseSchema, schemaChanged, nil +} + +func (sh *Schema) IsWarehouseSchemaEmpty() bool { + sh.schemaInWarehouseMu.RLock() + defer sh.schemaInWarehouseMu.RUnlock() + + return len(sh.schemaInWarehouse) == 0 +} + +func (sh *Schema) IsColumnInUnrecognizedSchema(t, c string) bool { + sh.unrecognizedSchemaInWarehouseMu.RLock() + defer sh.unrecognizedSchemaInWarehouseMu.RUnlock() + + s, ok := sh.unrecognizedSchemaInWarehouse[t] + if ok { + _, ok = s[c] + } + return ok +} + +func (sh *Schema) UpdateLocalSchema(ctx context.Context, uploadId int64, updatedSchema model.Schema) error { _, err := sh.schemaRepo.Insert(ctx, &model.WHSchema{ UploadID: uploadId, SourceID: sh.warehouse.Source.ID, @@ -88,24 +183,44 @@ func (sh *Schema) updateLocalSchema(ctx context.Context, uploadId int64, updated return fmt.Errorf("updating local schema: %w", err) } - sh.localSchema = updatedSchema + sh.localSchemaMu.Lock() + sh.localSchema = updatedSchema.Clone() + sh.localSchemaMu.Unlock() return nil } -// fetchSchemaFromLocal fetches schema from local -func (sh *Schema) fetchSchemaFromLocal(ctx context.Context) error { - localSchema, err := sh.getLocalSchema(ctx) - if err != nil { - return fmt.Errorf("fetching schema from local: %w", err) +func (sh *Schema) SetWarehouseTableSchema(t string, ts model.TableSchema) { + sh.schemaInWarehouseMu.Lock() + if sh.schemaInWarehouse == nil { + sh.schemaInWarehouse = make(model.Schema) } + sh.schemaInWarehouse[t] = ts + sh.schemaInWarehouseMu.Unlock() +} - sh.localSchema = localSchema +func (sh *Schema) GetWarehouseTableSchema(t string) model.TableSchema { + sh.schemaInWarehouseMu.Lock() + ts := sh.schemaInWarehouse[t] + sh.schemaInWarehouseMu.Unlock() + return ts +} - return nil +func (sh *Schema) UpdateLocalSchemaWithWarehouseSchema(ctx context.Context, uploadId int64) error { + sh.schemaInWarehouseMu.RLock() + updatedSchema := sh.schemaInWarehouse.Clone() + sh.schemaInWarehouseMu.RUnlock() + + return sh.UpdateLocalSchema(ctx, uploadId, updatedSchema) +} + +func (sh *Schema) CurrentColumnsCount(t string) int { + sh.schemaInWarehouseMu.RLock() + defer sh.schemaInWarehouseMu.RUnlock() + return len(sh.schemaInWarehouse[t]) } -func (sh *Schema) getLocalSchema(ctx context.Context) (model.Schema, error) { +func (sh *Schema) GetLocalSchema(ctx context.Context) (model.Schema, error) { whSchema, err := sh.schemaRepo.GetForNamespace( ctx, sh.warehouse.Source.ID, @@ -116,29 +231,41 @@ func (sh *Schema) getLocalSchema(ctx context.Context) (model.Schema, error) { return nil, fmt.Errorf("getting schema for namespace: %w", err) } if whSchema.Schema == nil { - return model.Schema{}, nil + whSchema.Schema = model.Schema{} } return whSchema.Schema, nil } -// fetchSchemaFromWarehouse fetches schema from warehouse -func (sh *Schema) fetchSchemaFromWarehouse(ctx context.Context, repo fetchSchemaRepo) error { +func (sh *Schema) GetWarehouseSchemaCopy(_ context.Context) model.Schema { + sh.schemaInWarehouseMu.RLock() + defer sh.schemaInWarehouseMu.RUnlock() + return sh.schemaInWarehouse.Clone() +} + +// FetchSchemaFromWarehouse fetches schema from warehouse +func (sh *Schema) FetchSchemaFromWarehouse(ctx context.Context, repo fetchSchemaRepo) ( + model.Schema, + model.Schema, + error, +) { warehouseSchema, unrecognizedWarehouseSchema, err := repo.FetchSchema(ctx) if err != nil { - return fmt.Errorf("fetching schema from warehouse: %w", err) + return nil, nil, fmt.Errorf("fetching schema from warehouse: %w", err) } - sh.skipDeprecatedColumns(warehouseSchema) - sh.skipDeprecatedColumns(unrecognizedWarehouseSchema) + sh.removeDeprecatedColumns(warehouseSchema) + sh.removeDeprecatedColumns(unrecognizedWarehouseSchema) - sh.schemaInWarehouse = warehouseSchema - sh.unrecognizedSchemaInWarehouse = unrecognizedWarehouseSchema + sh.schemaInWarehouseMu.Lock() + sh.schemaInWarehouse = warehouseSchema.Clone() + sh.unrecognizedSchemaInWarehouse = unrecognizedWarehouseSchema.Clone() + sh.schemaInWarehouseMu.Unlock() - return nil + return warehouseSchema, unrecognizedWarehouseSchema, nil } -// skipDeprecatedColumns skips deprecated columns from the schema -func (sh *Schema) skipDeprecatedColumns(schema model.Schema) { +// removeDeprecatedColumns deletes deprecated columns from the schema map +func (sh *Schema) removeDeprecatedColumns(schema model.Schema) { for tableName, columnMap := range schema { for columnName := range columnMap { if deprecatedColumnsRegex.MatchString(columnName) { @@ -152,24 +279,22 @@ func (sh *Schema) skipDeprecatedColumns(schema model.Schema) { logfield.ColumnName, columnName, ) delete(schema[tableName], columnName) - continue } } } } -func (sh *Schema) prepareUploadSchema(ctx context.Context, stagingFiles []*model.StagingFile) error { - consolidatedSchema, err := sh.consolidateStagingFilesSchemaUsingWarehouseSchema(ctx, stagingFiles) - if err != nil { - return fmt.Errorf("consolidating staging files schema: %w", err) - } - - sh.uploadSchema = consolidatedSchema - return nil -} +// consolidateLocalSchemaWithStagingFiles consolidates staging files schema with warehouse schema +func (sh *Schema) consolidateLocalSchemaWithStagingFiles( + ctx context.Context, + stagingFiles []*model.StagingFile, +) ( + model.Schema, + error, +) { + sh.localSchemaMu.RLock() + defer sh.localSchemaMu.RUnlock() -// consolidateStagingFilesSchemaUsingWarehouseSchema consolidates staging files schema with warehouse schema -func (sh *Schema) consolidateStagingFilesSchemaUsingWarehouseSchema(ctx context.Context, stagingFiles []*model.StagingFile) (model.Schema, error) { consolidatedSchema := model.Schema{} batches := lo.Chunk(stagingFiles, sh.stagingFilesSchemaPaginationSize) for _, batch := range batches { @@ -191,6 +316,39 @@ func (sh *Schema) consolidateStagingFilesSchemaUsingWarehouseSchema(ctx context. return consolidatedSchema, nil } +func (sh *Schema) isIDResolutionEnabled() bool { + return sh.enableIDResolution && slices.Contains(whutils.IdentityEnabledWarehouses, sh.warehouse.Type) +} + +// hasSchemaChanged compares the localSchema with the schemaInWarehouse +func (sh *Schema) hasSchemaChanged(localSchema, schemaInWarehouse model.Schema) bool { + if !sh.skipDeepEqualSchemas { + eq := reflect.DeepEqual(localSchema, schemaInWarehouse) + return !eq + } + // Iterating through all tableName in the localSchema + for tableName := range localSchema { + localColumns := localSchema[tableName] + warehouseColumns, whColumnsExist := schemaInWarehouse[tableName] + + // If warehouse does not contain the specified table return true. + if !whColumnsExist { + return true + } + for columnName := range localColumns { + localColumn := localColumns[columnName] + warehouseColumn := warehouseColumns[columnName] + + // If warehouse does not contain the specified column return true. + // If warehouse column does not match with the local one return true + if localColumn != warehouseColumn { + return true + } + } + } + return false +} + // consolidateStagingSchemas merges multiple schemas into one // Prefer the type of the first schema, If the type is text, prefer text func consolidateStagingSchemas(consolidatedSchema model.Schema, schemas []model.Schema) model.Schema { @@ -246,12 +404,14 @@ func consolidateWarehouseSchema(consolidatedSchema, warehouseSchema model.Schema // overrideUsersWithIdentifiesSchema overrides the users table with the identifies table // users(id) <-> identifies(user_id) // Removes the user_id column from the users table -func overrideUsersWithIdentifiesSchema(consolidatedSchema model.Schema, warehouseType string, warehouseSchema model.Schema) model.Schema { +func overrideUsersWithIdentifiesSchema( + consolidatedSchema model.Schema, warehouseType string, warehouseSchema model.Schema, +) model.Schema { var ( - usersTable = warehouseutils.ToProviderCase(warehouseType, warehouseutils.UsersTable) - identifiesTable = warehouseutils.ToProviderCase(warehouseType, warehouseutils.IdentifiesTable) - userIDColumn = warehouseutils.ToProviderCase(warehouseType, "user_id") - IDColumn = warehouseutils.ToProviderCase(warehouseType, "id") + usersTable = whutils.ToProviderCase(warehouseType, whutils.UsersTable) + identifiesTable = whutils.ToProviderCase(warehouseType, whutils.IdentifiesTable) + userIDColumn = whutils.ToProviderCase(warehouseType, "user_id") + IDColumn = whutils.ToProviderCase(warehouseType, "id") ) if _, ok := consolidatedSchema[usersTable]; !ok { return consolidatedSchema @@ -279,156 +439,42 @@ func overrideUsersWithIdentifiesSchema(consolidatedSchema model.Schema, warehous func enhanceDiscardsSchema(consolidatedSchema model.Schema, warehouseType string) model.Schema { discards := model.TableSchema{} - for colName, colType := range warehouseutils.DiscardsSchema { - discards[warehouseutils.ToProviderCase(warehouseType, colName)] = colType + for colName, colType := range whutils.DiscardsSchema { + discards[whutils.ToProviderCase(warehouseType, colName)] = colType } - if warehouseType == warehouseutils.BQ { - discards[warehouseutils.ToProviderCase(warehouseType, "loaded_at")] = "datetime" + if warehouseType == whutils.BQ { + discards[whutils.ToProviderCase(warehouseType, "loaded_at")] = "datetime" } - consolidatedSchema[warehouseutils.ToProviderCase(warehouseType, warehouseutils.DiscardsTable)] = discards + consolidatedSchema[whutils.ToProviderCase(warehouseType, whutils.DiscardsTable)] = discards return consolidatedSchema } // enhanceSchemaWithIDResolution adds the merge rules and mappings table to the schema if IDResolution is enabled -func enhanceSchemaWithIDResolution(consolidatedSchema model.Schema, isIDResolutionEnabled bool, warehouseType string) model.Schema { +func enhanceSchemaWithIDResolution( + consolidatedSchema model.Schema, isIDResolutionEnabled bool, warehouseType string, +) model.Schema { if !isIDResolutionEnabled { return consolidatedSchema } var ( - mergeRulesTable = warehouseutils.ToProviderCase(warehouseType, warehouseutils.IdentityMergeRulesTable) - mappingsTable = warehouseutils.ToProviderCase(warehouseType, warehouseutils.IdentityMappingsTable) + mergeRulesTable = whutils.ToProviderCase(warehouseType, whutils.IdentityMergeRulesTable) + mappingsTable = whutils.ToProviderCase(warehouseType, whutils.IdentityMappingsTable) ) if _, ok := consolidatedSchema[mergeRulesTable]; ok { consolidatedSchema[mergeRulesTable] = model.TableSchema{ - warehouseutils.ToProviderCase(warehouseType, "merge_property_1_type"): "string", - warehouseutils.ToProviderCase(warehouseType, "merge_property_1_value"): "string", - warehouseutils.ToProviderCase(warehouseType, "merge_property_2_type"): "string", - warehouseutils.ToProviderCase(warehouseType, "merge_property_2_value"): "string", + whutils.ToProviderCase(warehouseType, "merge_property_1_type"): "string", + whutils.ToProviderCase(warehouseType, "merge_property_1_value"): "string", + whutils.ToProviderCase(warehouseType, "merge_property_2_type"): "string", + whutils.ToProviderCase(warehouseType, "merge_property_2_value"): "string", } consolidatedSchema[mappingsTable] = model.TableSchema{ - warehouseutils.ToProviderCase(warehouseType, "merge_property_type"): "string", - warehouseutils.ToProviderCase(warehouseType, "merge_property_value"): "string", - warehouseutils.ToProviderCase(warehouseType, "rudder_id"): "string", - warehouseutils.ToProviderCase(warehouseType, "updated_at"): "datetime", + whutils.ToProviderCase(warehouseType, "merge_property_type"): "string", + whutils.ToProviderCase(warehouseType, "merge_property_value"): "string", + whutils.ToProviderCase(warehouseType, "rudder_id"): "string", + whutils.ToProviderCase(warehouseType, "updated_at"): "datetime", } } return consolidatedSchema } - -func (sh *Schema) isIDResolutionEnabled() bool { - return sh.enableIDResolution && slices.Contains(warehouseutils.IdentityEnabledWarehouses, sh.warehouse.Type) -} - -// hasSchemaChanged compares the localSchema with the schemaInWarehouse -func (sh *Schema) hasSchemaChanged() bool { - if !sh.skipDeepEqualSchemas { - eq := reflect.DeepEqual(sh.localSchema, sh.schemaInWarehouse) - return !eq - } - // Iterating through all tableName in the localSchema - for tableName := range sh.localSchema { - localColumns := sh.localSchema[tableName] - warehouseColumns, whColumnsExist := sh.schemaInWarehouse[tableName] - - // If warehouse does not contain the specified table return true. - if !whColumnsExist { - return true - } - for columnName := range localColumns { - localColumn := localColumns[columnName] - warehouseColumn := warehouseColumns[columnName] - - // If warehouse does not contain the specified column return true. - // If warehouse column does not match with the local one return true - if localColumn != warehouseColumn { - return true - } - } - } - return false -} - -// generateTableSchemaDiff returns the diff between the warehouse schema and the upload schema -func (sh *Schema) generateTableSchemaDiff(tableName string) warehouseutils.TableSchemaDiff { - diff := warehouseutils.TableSchemaDiff{ - ColumnMap: make(model.TableSchema), - UpdatedSchema: make(model.TableSchema), - AlteredColumnMap: make(model.TableSchema), - } - - currentTableSchema, ok := sh.schemaInWarehouse[tableName] - if !ok { - if _, ok := sh.uploadSchema[tableName]; !ok { - return diff - } - diff.Exists = true - diff.TableToBeCreated = true - diff.ColumnMap = sh.uploadSchema[tableName] - diff.UpdatedSchema = sh.uploadSchema[tableName] - return diff - } - - for columnName, columnType := range currentTableSchema { - diff.UpdatedSchema[columnName] = columnType - } - - diff.ColumnMap = make(model.TableSchema) - for columnName, columnType := range sh.uploadSchema[tableName] { - if _, ok := currentTableSchema[columnName]; !ok { - diff.ColumnMap[columnName] = columnType - diff.UpdatedSchema[columnName] = columnType - diff.Exists = true - } else if columnType == "text" && currentTableSchema[columnName] == "string" { - diff.AlteredColumnMap[columnName] = columnType - diff.UpdatedSchema[columnName] = columnType - diff.Exists = true - } - } - return diff -} - -// handleSchemaChange checks if the existing column type is compatible with the new column type -func handleSchemaChange(existingDataType, currentDataType model.SchemaType, value any) (any, error) { - var ( - newColumnVal any - err error - ) - - if existingDataType == model.StringDataType || existingDataType == model.TextDataType { - // only stringify if the previous type is non-string/text/json - if currentDataType != model.StringDataType && currentDataType != model.TextDataType && currentDataType != model.JSONDataType { - newColumnVal = fmt.Sprintf("%v", value) - } else { - newColumnVal = value - } - } else if (currentDataType == model.IntDataType || currentDataType == model.BigIntDataType) && existingDataType == model.FloatDataType { - intVal, ok := value.(int) - if !ok { - err = errIncompatibleSchemaConversion - } else { - newColumnVal = float64(intVal) - } - } else if currentDataType == model.FloatDataType && (existingDataType == model.IntDataType || existingDataType == model.BigIntDataType) { - floatVal, ok := value.(float64) - if !ok { - err = errIncompatibleSchemaConversion - } else { - newColumnVal = int(floatVal) - } - } else if existingDataType == model.JSONDataType { - var interfaceSliceSample []any - if currentDataType == model.IntDataType || currentDataType == model.FloatDataType || currentDataType == model.BooleanDataType { - newColumnVal = fmt.Sprintf("%v", value) - } else if reflect.TypeOf(value) == reflect.TypeOf(interfaceSliceSample) { - newColumnVal = value - } else { - newColumnVal = fmt.Sprintf(`"%v"`, value) - } - } else { - err = errSchemaConversionNotSupported - } - - return newColumnVal, err -} diff --git a/warehouse/schema_test.go b/warehouse/schema/schema_test.go similarity index 80% rename from warehouse/schema_test.go rename to warehouse/schema/schema_test.go index 91d4d4a412..62232e77ec 100644 --- a/warehouse/schema_test.go +++ b/warehouse/schema/schema_test.go @@ -1,4 +1,4 @@ -package warehouse +package schema import ( "context" @@ -6,277 +6,15 @@ import ( "fmt" "testing" - "github.com/rudderlabs/rudder-go-kit/logger" - backendconfig "github.com/rudderlabs/rudder-server/backend-config" - "github.com/samber/lo" - - "github.com/rudderlabs/rudder-server/warehouse/internal/model" - "github.com/stretchr/testify/require" - warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" + "github.com/rudderlabs/rudder-go-kit/logger" + backendconfig "github.com/rudderlabs/rudder-server/backend-config" + "github.com/rudderlabs/rudder-server/warehouse/internal/model" + whutils "github.com/rudderlabs/rudder-server/warehouse/utils" ) -func TestHandleSchemaChange(t *testing.T) { - inputs := []struct { - name string - existingDatatype string - currentDataType string - value any - - newColumnVal any - convError error - }{ - { - name: "should send int values if existing datatype is int, new datatype is float", - existingDatatype: "int", - currentDataType: "float", - value: 1.501, - newColumnVal: 1, - }, - { - name: "should send float values if existing datatype is float, new datatype is int", - existingDatatype: "float", - currentDataType: "int", - value: 1, - newColumnVal: 1.0, - }, - { - name: "should send string values if existing datatype is string, new datatype is boolean", - existingDatatype: "string", - currentDataType: "boolean", - value: false, - newColumnVal: "false", - }, - { - name: "should send string values if existing datatype is string, new datatype is int", - existingDatatype: "string", - currentDataType: "int", - value: 1, - newColumnVal: "1", - }, - { - name: "should send string values if existing datatype is string, new datatype is float", - existingDatatype: "string", - currentDataType: "float", - value: 1.501, - newColumnVal: "1.501", - }, - { - name: "should send string values if existing datatype is string, new datatype is datetime", - existingDatatype: "string", - currentDataType: "datetime", - value: "2022-05-05T00:00:00.000Z", - newColumnVal: "2022-05-05T00:00:00.000Z", - }, - { - name: "should send string values if existing datatype is string, new datatype is string", - existingDatatype: "string", - currentDataType: "json", - value: `{"json":true}`, - newColumnVal: `{"json":true}`, - }, - { - name: "should send json string values if existing datatype is json, new datatype is boolean", - existingDatatype: "json", - currentDataType: "boolean", - value: false, - newColumnVal: "false", - }, - { - name: "should send json string values if existing datatype is jso, new datatype is int", - existingDatatype: "json", - currentDataType: "int", - value: 1, - newColumnVal: "1", - }, - { - name: "should send json string values if existing datatype is json, new datatype is float", - existingDatatype: "json", - currentDataType: "float", - value: 1.501, - newColumnVal: "1.501", - }, - { - name: "should send json string values if existing datatype is json, new datatype is json", - existingDatatype: "json", - currentDataType: "datetime", - value: "2022-05-05T00:00:00.000Z", - newColumnVal: `"2022-05-05T00:00:00.000Z"`, - }, - { - name: "should send json string values if existing datatype is json, new datatype is string", - existingDatatype: "json", - currentDataType: "string", - value: "string value", - newColumnVal: `"string value"`, - }, - { - name: "should send json string values if existing datatype is json, new datatype is array", - existingDatatype: "json", - currentDataType: "array", - value: []any{false, 1, "string value"}, - newColumnVal: []any{false, 1, "string value"}, - }, - { - name: "existing datatype is boolean, new datatype is int", - existingDatatype: "boolean", - currentDataType: "int", - value: 1, - convError: errSchemaConversionNotSupported, - }, - { - name: "existing datatype is boolean, new datatype is float", - existingDatatype: "boolean", - currentDataType: "float", - value: 1.501, - convError: errSchemaConversionNotSupported, - }, - { - name: "existing datatype is boolean, new datatype is string", - existingDatatype: "boolean", - currentDataType: "string", - value: "string value", - convError: errSchemaConversionNotSupported, - }, - { - name: "existing datatype is boolean, new datatype is datetime", - existingDatatype: "boolean", - currentDataType: "datetime", - value: "2022-05-05T00:00:00.000Z", - convError: errSchemaConversionNotSupported, - }, - { - name: "existing datatype is boolean, new datatype is json", - existingDatatype: "boolean", - currentDataType: "json", - value: `{"json":true}`, - convError: errSchemaConversionNotSupported, - }, - { - name: "existing datatype is int, new datatype is boolean", - existingDatatype: "int", - currentDataType: "boolean", - value: false, - convError: errSchemaConversionNotSupported, - }, - { - name: "existing datatype is int, new datatype is string", - existingDatatype: "int", - currentDataType: "string", - value: "string value", - convError: errSchemaConversionNotSupported, - }, - { - name: "existing datatype is int, new datatype is datetime", - existingDatatype: "int", - currentDataType: "datetime", - value: "2022-05-05T00:00:00.000Z", - convError: errSchemaConversionNotSupported, - }, - { - name: "existing datatype is int, new datatype is json", - existingDatatype: "int", - currentDataType: "json", - value: `{"json":true}`, - convError: errSchemaConversionNotSupported, - }, - { - name: "existing datatype is int, new datatype is float", - existingDatatype: "int", - currentDataType: "float", - value: 1, - convError: errIncompatibleSchemaConversion, - }, - { - name: "existing datatype is float, new datatype is boolean", - existingDatatype: "float", - currentDataType: "boolean", - value: false, - convError: errSchemaConversionNotSupported, - }, - { - name: "existing datatype is float, new datatype is int", - existingDatatype: "float", - currentDataType: "int", - value: 1.0, - convError: errIncompatibleSchemaConversion, - }, - { - name: "existing datatype is float, new datatype is string", - existingDatatype: "float", - currentDataType: "string", - value: "string value", - convError: errSchemaConversionNotSupported, - }, - { - name: "existing datatype is float, new datatype is datetime", - existingDatatype: "float", - currentDataType: "datetime", - value: "2022-05-05T00:00:00.000Z", - convError: errSchemaConversionNotSupported, - }, - { - name: "existing datatype is float, new datatype is json", - existingDatatype: "float", - currentDataType: "json", - value: `{"json":true}`, - convError: errSchemaConversionNotSupported, - }, - { - name: "existing datatype is datetime, new datatype is boolean", - existingDatatype: "datetime", - currentDataType: "boolean", - value: false, - convError: errSchemaConversionNotSupported, - }, - { - name: "existing datatype is datetime, new datatype is string", - existingDatatype: "datetime", - currentDataType: "string", - value: "string value", - convError: errSchemaConversionNotSupported, - }, - { - name: "existing datatype is datetime, new datatype is int", - existingDatatype: "datetime", - currentDataType: "int", - value: 1, - convError: errSchemaConversionNotSupported, - }, - { - name: "existing datatype is datetime, new datatype is float", - existingDatatype: "datetime", - currentDataType: "float", - value: 1.501, - convError: errSchemaConversionNotSupported, - }, - { - name: "existing datatype is datetime, new datatype is json", - existingDatatype: "datetime", - currentDataType: "json", - value: `{"json":true}`, - convError: errSchemaConversionNotSupported, - }, - } - for _, ip := range inputs { - tc := ip - - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - newColumnVal, convError := handleSchemaChange( - model.SchemaType(tc.existingDatatype), - model.SchemaType(tc.currentDataType), - tc.value, - ) - require.Equal(t, newColumnVal, tc.newColumnVal) - require.ErrorIs(t, convError, tc.convError) - }) - } -} - type mockSchemaRepo struct { err error schemaMap map[string]model.WHSchema @@ -322,7 +60,7 @@ func TestSchema_GetUpdateLocalSchema(t *testing.T) { sourceID = "test_source" destID = "test_dest" namespace = "test_namespace" - warehouseType = warehouseutils.RS + warehouseType = whutils.RS uploadID = 1 ) @@ -401,14 +139,15 @@ func TestSchema_GetUpdateLocalSchema(t *testing.T) { ctx := context.Background() - err := sch.updateLocalSchema(ctx, uploadID, tc.mockSchema.Schema) + err := sch.UpdateLocalSchema(ctx, uploadID, tc.mockSchema.Schema) if tc.wantError == nil { require.NoError(t, err) } else { require.ErrorContains(t, err, tc.wantError.Error()) } - err = sch.fetchSchemaFromLocal(ctx) + localSchema, err := sch.GetLocalSchema(ctx) + require.Equal(t, tc.wantSchema, localSchema) require.Equal(t, tc.wantSchema, sch.localSchema) if tc.wantError == nil { require.NoError(t, err) @@ -420,8 +159,6 @@ func TestSchema_GetUpdateLocalSchema(t *testing.T) { } func TestSchema_FetchSchemaFromWarehouse(t *testing.T) { - Init4() - testCases := []struct { name string mockSchema model.Schema @@ -546,7 +283,7 @@ func TestSchema_FetchSchemaFromWarehouse(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - fechSchemaRepo := mockFetchSchemaFromWarehouse{ + fetchSchemaRepo := mockFetchSchemaFromWarehouse{ schemaInWarehouse: tc.mockSchema, unrecognizedSchemaInWarehouse: tc.mockSchema, err: tc.mockErr, @@ -571,13 +308,15 @@ func TestSchema_FetchSchemaFromWarehouse(t *testing.T) { ctx := context.Background() - err := sh.fetchSchemaFromWarehouse(ctx, &fechSchemaRepo) + schemaInWarehouse, unrecognizedSchemaInWarehouse, err := sh.FetchSchemaFromWarehouse(ctx, &fetchSchemaRepo) if tc.wantError != nil { require.EqualError(t, err, tc.wantError.Error()) } else { require.NoError(t, err) } + require.Equal(t, tc.expectedSchema, schemaInWarehouse) require.Equal(t, tc.expectedSchema, sh.schemaInWarehouse) + require.Equal(t, tc.expectedSchema, unrecognizedSchemaInWarehouse) require.Equal(t, tc.expectedSchema, sh.unrecognizedSchemaInWarehouse) }) } @@ -585,18 +324,17 @@ func TestSchema_FetchSchemaFromWarehouse(t *testing.T) { func TestSchema_GetUploadSchemaDiff(t *testing.T) { testCases := []struct { - name string - tableName string - currentSchema model.Schema - uploadSchema model.Schema - expected warehouseutils.TableSchemaDiff + name string + tableName string + currentSchema model.Schema + uploadTableSchema model.TableSchema + expected whutils.TableSchemaDiff }{ { - name: "empty current and upload schema", + name: "empty current and upload table schema", tableName: "test-table", currentSchema: model.Schema{}, - uploadSchema: model.Schema{}, - expected: warehouseutils.TableSchemaDiff{ + expected: whutils.TableSchemaDiff{ ColumnMap: model.TableSchema{}, UpdatedSchema: model.TableSchema{}, AlteredColumnMap: model.TableSchema{}, @@ -606,12 +344,10 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) { name: "empty current schema", tableName: "test-table", currentSchema: model.Schema{}, - uploadSchema: model.Schema{ - "test-table": model.TableSchema{ - "test-column": "test-value", - }, + uploadTableSchema: model.TableSchema{ + "test-column": "test-value", }, - expected: warehouseutils.TableSchemaDiff{ + expected: whutils.TableSchemaDiff{ Exists: true, TableToBeCreated: true, ColumnMap: model.TableSchema{ @@ -631,12 +367,10 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) { "test-column": "test-value-1", }, }, - uploadSchema: model.Schema{ - "test-table": model.TableSchema{ - "test-column": "test-value-2", - }, + uploadTableSchema: model.TableSchema{ + "test-column": "test-value-2", }, - expected: warehouseutils.TableSchemaDiff{ + expected: whutils.TableSchemaDiff{ Exists: false, TableToBeCreated: false, ColumnMap: model.TableSchema{}, @@ -655,12 +389,10 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) { "test-column-2": "test-value-2", }, }, - uploadSchema: model.Schema{ - "test-table": model.TableSchema{ - "test-column": "test-value-2", - }, + uploadTableSchema: model.TableSchema{ + "test-column": "test-value-2", }, - expected: warehouseutils.TableSchemaDiff{ + expected: whutils.TableSchemaDiff{ Exists: true, TableToBeCreated: false, ColumnMap: model.TableSchema{ @@ -683,12 +415,10 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) { "test-column-2": "test-value-2", }, }, - uploadSchema: model.Schema{ - "test-table": model.TableSchema{ - "test-column": "text", - }, + uploadTableSchema: model.TableSchema{ + "test-column": "text", }, - expected: warehouseutils.TableSchemaDiff{ + expected: whutils.TableSchemaDiff{ Exists: true, TableToBeCreated: false, ColumnMap: model.TableSchema{}, @@ -710,9 +440,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) { sch := Schema{ schemaInWarehouse: tc.currentSchema, - uploadSchema: tc.uploadSchema, } - diff := sch.generateTableSchemaDiff(tc.tableName) + diff := sch.TableSchemaDiff(tc.tableName, tc.uploadTableSchema) require.EqualValues(t, diff, tc.expected) }) } @@ -864,19 +593,17 @@ func TestSchema_HasLocalSchemaChanged(t *testing.T) { sch := &Schema{ warehouse: model.Warehouse{ - Type: warehouseutils.SNOWFLAKE, + Type: whutils.SNOWFLAKE, }, skipDeepEqualSchemas: tc.skipDeepEquals, - localSchema: tc.localSchema, - schemaInWarehouse: tc.schemaInWarehouse, } - require.Equal(t, tc.expected, sch.hasSchemaChanged()) + require.Equal(t, tc.expected, sch.hasSchemaChanged(tc.localSchema, tc.schemaInWarehouse)) }) } } func TestSchema_PrepareUploadSchema(t *testing.T) { - warehouseutils.Init() + whutils.Init() const ( sourceID = "test-source-id" @@ -906,7 +633,7 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { }{ { name: "error fetching staging schema", - warehouseType: warehouseutils.RS, + warehouseType: whutils.RS, mockSchemas: []model.Schema{}, mockErr: errors.New("test error"), wantError: errors.New("consolidating staging files schema: getting staging files schema: test error"), @@ -914,7 +641,7 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { { name: "discards schema for bigquery", - warehouseType: warehouseutils.BQ, + warehouseType: whutils.BQ, mockSchemas: []model.Schema{}, expectedSchema: model.Schema{ "rudder_discards": model.TableSchema{ @@ -930,7 +657,7 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { }, { name: "discards schema for all destinations", - warehouseType: warehouseutils.RS, + warehouseType: whutils.RS, mockSchemas: []model.Schema{}, expectedSchema: model.Schema{ "rudder_discards": model.TableSchema{ @@ -945,7 +672,7 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { }, { name: "users and identifies should have similar schema except for id and user_id", - warehouseType: warehouseutils.RS, + warehouseType: whutils.RS, mockSchemas: []model.Schema{ { "identifies": model.TableSchema{ @@ -1009,7 +736,7 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { }, { name: "users have extra properties as compared to identifies", - warehouseType: warehouseutils.RS, + warehouseType: whutils.RS, mockSchemas: []model.Schema{ { "identifies": model.TableSchema{ @@ -1077,7 +804,7 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { }, { name: "users without identifies", - warehouseType: warehouseutils.RS, + warehouseType: whutils.RS, mockSchemas: []model.Schema{ { "users": model.TableSchema{ @@ -1117,7 +844,7 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { }, { name: "unknown table in warehouse schema", - warehouseType: warehouseutils.RS, + warehouseType: whutils.RS, mockSchemas: []model.Schema{ { "test-table": model.TableSchema{ @@ -1161,7 +888,7 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { }, { name: "unknown properties in warehouse schema", - warehouseType: warehouseutils.RS, + warehouseType: whutils.RS, mockSchemas: []model.Schema{ { "test-table": model.TableSchema{ @@ -1205,7 +932,7 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { }, { name: "single staging schema with empty warehouse schema", - warehouseType: warehouseutils.RS, + warehouseType: whutils.RS, mockSchemas: []model.Schema{ { "test-table": model.TableSchema{ @@ -1239,7 +966,7 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { }, { name: "single staging schema with warehouse schema and text data type override", - warehouseType: warehouseutils.RS, + warehouseType: whutils.RS, mockSchemas: []model.Schema{ { "test-table": model.TableSchema{ @@ -1286,7 +1013,7 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { }, { name: "id resolution without merge schema", - warehouseType: warehouseutils.BQ, + warehouseType: whutils.BQ, idResolutionEnabled: true, mockSchemas: []model.Schema{ { @@ -1322,7 +1049,7 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { }, { name: "id resolution with merge schema", - warehouseType: warehouseutils.BQ, + warehouseType: whutils.BQ, idResolutionEnabled: true, mockSchemas: []model.Schema{ { @@ -1372,7 +1099,7 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { }, { name: "multiple staging schemas with empty warehouse schema", - warehouseType: warehouseutils.RS, + warehouseType: whutils.RS, mockSchemas: []model.Schema{ { "test-table-1": model.TableSchema{ @@ -1424,7 +1151,7 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { }, { name: "multiple staging schemas with empty warehouse schema and text datatype", - warehouseType: warehouseutils.RS, + warehouseType: whutils.RS, mockSchemas: []model.Schema{ { "test-table-1": model.TableSchema{ @@ -1500,7 +1227,7 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { }, { name: "multiple staging schemas with warehouse schema and text datatype", - warehouseType: warehouseutils.RS, + warehouseType: whutils.RS, mockSchemas: []model.Schema{ { "test-table-1": model.TableSchema{ @@ -1588,7 +1315,7 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { }, { name: "multiple schemas with same table and empty warehouse schema", - warehouseType: warehouseutils.RS, + warehouseType: whutils.RS, mockSchemas: []model.Schema{ { "test-table": model.TableSchema{ @@ -1626,7 +1353,7 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { }, { name: "multiple schemas with same table and warehouse schema", - warehouseType: warehouseutils.RS, + warehouseType: whutils.RS, mockSchemas: []model.Schema{ { "test-table": model.TableSchema{ @@ -1670,7 +1397,7 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { }, { name: "multiple schemas with preference to first schema and empty warehouse schema", - warehouseType: warehouseutils.RS, + warehouseType: whutils.RS, mockSchemas: []model.Schema{ { "test-table": model.TableSchema{ @@ -1714,7 +1441,7 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { }, { name: "multiple schemas with preference to warehouse schema", - warehouseType: warehouseutils.RS, + warehouseType: whutils.RS, mockSchemas: []model.Schema{ { "test-table": model.TableSchema{ @@ -1768,7 +1495,7 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { }, { name: "warehouse users have extra properties as compared to identifies", - warehouseType: warehouseutils.RS, + warehouseType: whutils.RS, mockSchemas: []model.Schema{ { "identifies": model.TableSchema{ @@ -1901,13 +1628,13 @@ func TestSchema_PrepareUploadSchema(t *testing.T) { stagingFilesSchemaPaginationSize: 2, } - err := sh.prepareUploadSchema(ctx, stagingFiles) + uploadSchema, err := sh.ConsolidateLocalSchemaWithStagingFiles(ctx, stagingFiles) if tc.wantError != nil { require.EqualError(t, err, tc.wantError.Error()) } else { require.NoError(t, err) } - require.Equal(t, tc.expectedSchema, sh.uploadSchema) + require.Equal(t, tc.expectedSchema, uploadSchema) }) } } diff --git a/warehouse/slave_worker.go b/warehouse/slave_worker.go index 5cbb79c678..0e422e79d1 100644 --- a/warehouse/slave_worker.go +++ b/warehouse/slave_worker.go @@ -11,9 +11,8 @@ import ( "strconv" "time" - "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-server/services/pgnotifier" "github.com/rudderlabs/rudder-server/warehouse/encoding" @@ -24,6 +23,11 @@ import ( warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" ) +var ( + errIncompatibleSchemaConversion = errors.New("incompatible schema conversion") + errSchemaConversionNotSupported = errors.New("schema conversion not supported") +) + type uploadProcessingResult struct { result uploadResult err error @@ -514,3 +518,47 @@ func (sw *slaveWorker) destinationFromSlaveConnectionMap(destinationId, sourceId return conn, nil } + +// handleSchemaChange checks if the existing column type is compatible with the new column type +func handleSchemaChange(existingDataType, currentDataType model.SchemaType, value any) (any, error) { + var ( + newColumnVal any + err error + ) + + if existingDataType == model.StringDataType || existingDataType == model.TextDataType { + // only stringify if the previous type is non-string/text/json + if currentDataType != model.StringDataType && currentDataType != model.TextDataType && currentDataType != model.JSONDataType { + newColumnVal = fmt.Sprintf("%v", value) + } else { + newColumnVal = value + } + } else if (currentDataType == model.IntDataType || currentDataType == model.BigIntDataType) && existingDataType == model.FloatDataType { + intVal, ok := value.(int) + if !ok { + err = errIncompatibleSchemaConversion + } else { + newColumnVal = float64(intVal) + } + } else if currentDataType == model.FloatDataType && (existingDataType == model.IntDataType || existingDataType == model.BigIntDataType) { + floatVal, ok := value.(float64) + if !ok { + err = errIncompatibleSchemaConversion + } else { + newColumnVal = int(floatVal) + } + } else if existingDataType == model.JSONDataType { + var interfaceSliceSample []any + if currentDataType == model.IntDataType || currentDataType == model.FloatDataType || currentDataType == model.BooleanDataType { + newColumnVal = fmt.Sprintf("%v", value) + } else if reflect.TypeOf(value) == reflect.TypeOf(interfaceSliceSample) { + newColumnVal = value + } else { + newColumnVal = fmt.Sprintf(`"%v"`, value) + } + } else { + err = errSchemaConversionNotSupported + } + + return newColumnVal, err +} diff --git a/warehouse/slave_worker_test.go b/warehouse/slave_worker_test.go index 11cf02dfe8..476b715636 100644 --- a/warehouse/slave_worker_test.go +++ b/warehouse/slave_worker_test.go @@ -699,3 +699,262 @@ func TestSlaveWorker(t *testing.T) { }) }) } + +func TestHandleSchemaChange(t *testing.T) { + inputs := []struct { + name string + existingDatatype string + currentDataType string + value any + + newColumnVal any + convError error + }{ + { + name: "should send int values if existing datatype is int, new datatype is float", + existingDatatype: "int", + currentDataType: "float", + value: 1.501, + newColumnVal: 1, + }, + { + name: "should send float values if existing datatype is float, new datatype is int", + existingDatatype: "float", + currentDataType: "int", + value: 1, + newColumnVal: 1.0, + }, + { + name: "should send string values if existing datatype is string, new datatype is boolean", + existingDatatype: "string", + currentDataType: "boolean", + value: false, + newColumnVal: "false", + }, + { + name: "should send string values if existing datatype is string, new datatype is int", + existingDatatype: "string", + currentDataType: "int", + value: 1, + newColumnVal: "1", + }, + { + name: "should send string values if existing datatype is string, new datatype is float", + existingDatatype: "string", + currentDataType: "float", + value: 1.501, + newColumnVal: "1.501", + }, + { + name: "should send string values if existing datatype is string, new datatype is datetime", + existingDatatype: "string", + currentDataType: "datetime", + value: "2022-05-05T00:00:00.000Z", + newColumnVal: "2022-05-05T00:00:00.000Z", + }, + { + name: "should send string values if existing datatype is string, new datatype is string", + existingDatatype: "string", + currentDataType: "json", + value: `{"json":true}`, + newColumnVal: `{"json":true}`, + }, + { + name: "should send json string values if existing datatype is json, new datatype is boolean", + existingDatatype: "json", + currentDataType: "boolean", + value: false, + newColumnVal: "false", + }, + { + name: "should send json string values if existing datatype is jso, new datatype is int", + existingDatatype: "json", + currentDataType: "int", + value: 1, + newColumnVal: "1", + }, + { + name: "should send json string values if existing datatype is json, new datatype is float", + existingDatatype: "json", + currentDataType: "float", + value: 1.501, + newColumnVal: "1.501", + }, + { + name: "should send json string values if existing datatype is json, new datatype is json", + existingDatatype: "json", + currentDataType: "datetime", + value: "2022-05-05T00:00:00.000Z", + newColumnVal: `"2022-05-05T00:00:00.000Z"`, + }, + { + name: "should send json string values if existing datatype is json, new datatype is string", + existingDatatype: "json", + currentDataType: "string", + value: "string value", + newColumnVal: `"string value"`, + }, + { + name: "should send json string values if existing datatype is json, new datatype is array", + existingDatatype: "json", + currentDataType: "array", + value: []any{false, 1, "string value"}, + newColumnVal: []any{false, 1, "string value"}, + }, + { + name: "existing datatype is boolean, new datatype is int", + existingDatatype: "boolean", + currentDataType: "int", + value: 1, + convError: errSchemaConversionNotSupported, + }, + { + name: "existing datatype is boolean, new datatype is float", + existingDatatype: "boolean", + currentDataType: "float", + value: 1.501, + convError: errSchemaConversionNotSupported, + }, + { + name: "existing datatype is boolean, new datatype is string", + existingDatatype: "boolean", + currentDataType: "string", + value: "string value", + convError: errSchemaConversionNotSupported, + }, + { + name: "existing datatype is boolean, new datatype is datetime", + existingDatatype: "boolean", + currentDataType: "datetime", + value: "2022-05-05T00:00:00.000Z", + convError: errSchemaConversionNotSupported, + }, + { + name: "existing datatype is boolean, new datatype is json", + existingDatatype: "boolean", + currentDataType: "json", + value: `{"json":true}`, + convError: errSchemaConversionNotSupported, + }, + { + name: "existing datatype is int, new datatype is boolean", + existingDatatype: "int", + currentDataType: "boolean", + value: false, + convError: errSchemaConversionNotSupported, + }, + { + name: "existing datatype is int, new datatype is string", + existingDatatype: "int", + currentDataType: "string", + value: "string value", + convError: errSchemaConversionNotSupported, + }, + { + name: "existing datatype is int, new datatype is datetime", + existingDatatype: "int", + currentDataType: "datetime", + value: "2022-05-05T00:00:00.000Z", + convError: errSchemaConversionNotSupported, + }, + { + name: "existing datatype is int, new datatype is json", + existingDatatype: "int", + currentDataType: "json", + value: `{"json":true}`, + convError: errSchemaConversionNotSupported, + }, + { + name: "existing datatype is int, new datatype is float", + existingDatatype: "int", + currentDataType: "float", + value: 1, + convError: errIncompatibleSchemaConversion, + }, + { + name: "existing datatype is float, new datatype is boolean", + existingDatatype: "float", + currentDataType: "boolean", + value: false, + convError: errSchemaConversionNotSupported, + }, + { + name: "existing datatype is float, new datatype is int", + existingDatatype: "float", + currentDataType: "int", + value: 1.0, + convError: errIncompatibleSchemaConversion, + }, + { + name: "existing datatype is float, new datatype is string", + existingDatatype: "float", + currentDataType: "string", + value: "string value", + convError: errSchemaConversionNotSupported, + }, + { + name: "existing datatype is float, new datatype is datetime", + existingDatatype: "float", + currentDataType: "datetime", + value: "2022-05-05T00:00:00.000Z", + convError: errSchemaConversionNotSupported, + }, + { + name: "existing datatype is float, new datatype is json", + existingDatatype: "float", + currentDataType: "json", + value: `{"json":true}`, + convError: errSchemaConversionNotSupported, + }, + { + name: "existing datatype is datetime, new datatype is boolean", + existingDatatype: "datetime", + currentDataType: "boolean", + value: false, + convError: errSchemaConversionNotSupported, + }, + { + name: "existing datatype is datetime, new datatype is string", + existingDatatype: "datetime", + currentDataType: "string", + value: "string value", + convError: errSchemaConversionNotSupported, + }, + { + name: "existing datatype is datetime, new datatype is int", + existingDatatype: "datetime", + currentDataType: "int", + value: 1, + convError: errSchemaConversionNotSupported, + }, + { + name: "existing datatype is datetime, new datatype is float", + existingDatatype: "datetime", + currentDataType: "float", + value: 1.501, + convError: errSchemaConversionNotSupported, + }, + { + name: "existing datatype is datetime, new datatype is json", + existingDatatype: "datetime", + currentDataType: "json", + value: `{"json":true}`, + convError: errSchemaConversionNotSupported, + }, + } + for _, ip := range inputs { + tc := ip + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + newColumnVal, convError := handleSchemaChange( + model.SchemaType(tc.existingDatatype), + model.SchemaType(tc.currentDataType), + tc.value, + ) + require.Equal(t, newColumnVal, tc.newColumnVal) + require.ErrorIs(t, convError, tc.convError) + }) + } +} diff --git a/warehouse/upload.go b/warehouse/upload.go index 43a5a53841..91993c790a 100644 --- a/warehouse/upload.go +++ b/warehouse/upload.go @@ -12,17 +12,14 @@ import ( "sync/atomic" "time" - "github.com/rudderlabs/rudder-server/warehouse/encoding" - - "github.com/rudderlabs/rudder-go-kit/logger" - "github.com/rudderlabs/rudder-server/app" - "github.com/cenkalti/backoff/v4" "github.com/samber/lo" "golang.org/x/exp/slices" "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-server/app" "github.com/rudderlabs/rudder-server/jobsdb" "github.com/rudderlabs/rudder-server/rruntime" "github.com/rudderlabs/rudder-server/services/alerta" @@ -30,6 +27,7 @@ import ( "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/utils/timeutil" "github.com/rudderlabs/rudder-server/utils/types" + "github.com/rudderlabs/rudder-server/warehouse/encoding" "github.com/rudderlabs/rudder-server/warehouse/identity" integrationsconfig "github.com/rudderlabs/rudder-server/warehouse/integrations/config" schemarepository "github.com/rudderlabs/rudder-server/warehouse/integrations/datalake/schema-repository" @@ -42,6 +40,7 @@ import ( "github.com/rudderlabs/rudder-server/warehouse/internal/service" "github.com/rudderlabs/rudder-server/warehouse/internal/service/loadfiles/downloader" "github.com/rudderlabs/rudder-server/warehouse/logfield" + "github.com/rudderlabs/rudder-server/warehouse/schema" warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" "github.com/rudderlabs/rudder-server/warehouse/validations" ) @@ -92,7 +91,8 @@ type UploadJob struct { recovery *service.Recovery whManager manager.Manager pgNotifier *pgnotifier.PGNotifier - schemaHandle *Schema + schemaHandle *schema.Schema + uploadSchema model.Schema conf *config.Config logger logger.Logger statsFactory stats.Stats @@ -102,8 +102,6 @@ type UploadJob struct { warehouse model.Warehouse stagingFiles []*model.StagingFile stagingFileIDs []int64 - schemaLock sync.Mutex - uploadLock sync.Mutex alertSender alerta.AlertSender now func() time.Time @@ -193,10 +191,11 @@ func (f *UploadJobFactory) NewUploadJob(ctx context.Context, dto *model.UploadJo logger: f.logger, statsFactory: f.statsFactory, tableUploadsRepo: repo.NewTableUploads(f.dbHandle), - schemaHandle: NewSchema( + schemaHandle: schema.NewSchema( f.dbHandle, dto.Warehouse, config.Default, + f.logger.Child("warehouse").Child("schema"), ), upload: dto.Upload, @@ -307,17 +306,17 @@ func (job *UploadJob) trackLongRunningUpload() chan struct{} { } func (job *UploadJob) generateUploadSchema() error { - if err := job.schemaHandle.prepareUploadSchema( - job.ctx, - job.stagingFiles, - ); err != nil { + uploadSchema, err := job.schemaHandle.ConsolidateLocalSchemaWithStagingFiles(job.ctx, job.stagingFiles) + if err != nil { return fmt.Errorf("consolidate staging files schema using warehouse schema: %w", err) } - if err := job.setUploadSchema(job.schemaHandle.uploadSchema); err != nil { + if err := job.setUploadSchema(uploadSchema); err != nil { return fmt.Errorf("set upload schema: %w", err) } + job.uploadSchema = uploadSchema + return nil } @@ -342,33 +341,6 @@ func (job *UploadJob) initTableUploads() error { ) } -func (job *UploadJob) syncRemoteSchema() (bool, error) { - if err := job.schemaHandle.fetchSchemaFromLocal(job.ctx); err != nil { - return false, fmt.Errorf("fetching schema from local: %w", err) - } - if err := job.schemaHandle.fetchSchemaFromWarehouse(job.ctx, job.whManager); err != nil { - return false, fmt.Errorf("fetching schema from warehouse: %w", err) - } - - schemaChanged := job.schemaHandle.hasSchemaChanged() - if schemaChanged { - job.logger.Infow("schema changed", - logfield.SourceID, job.warehouse.Source.ID, - logfield.SourceType, job.warehouse.Source.SourceDefinition.Name, - logfield.DestinationID, job.warehouse.Destination.ID, - logfield.DestinationType, job.warehouse.Destination.DestinationDefinition.Name, - logfield.WorkspaceID, job.warehouse.WorkspaceID, - logfield.Namespace, job.warehouse.Namespace, - ) - - if err := job.schemaHandle.updateLocalSchema(job.ctx, job.upload.ID, job.schemaHandle.schemaInWarehouse); err != nil { - return false, fmt.Errorf("updating local schema: %w", err) - } - } - - return schemaChanged, nil -} - func (job *UploadJob) getTotalRowsInLoadFiles(ctx context.Context) int64 { var total sql.NullInt64 @@ -428,9 +400,10 @@ func (job *UploadJob) run() (err error) { ch <- struct{}{} }() - job.uploadLock.Lock() - defer job.uploadLock.Unlock() - _ = job.setUploadColumns(UploadColumnsOpts{Fields: []UploadColumn{{Column: UploadLastExecAtField, Value: job.now()}, {Column: UploadInProgress, Value: true}}}) + _ = job.setUploadColumns(UploadColumnsOpts{Fields: []UploadColumn{ + {Column: UploadLastExecAtField, Value: job.now()}, + {Column: UploadInProgress, Value: true}, + }}) if len(job.stagingFiles) == 0 { err := fmt.Errorf("no staging files found") @@ -438,6 +411,8 @@ func (job *UploadJob) run() (err error) { return err } + // TODO can't we use a Manager factory instead and get a new instance of the "Manager" every time + // instead of using the same one? whManager := job.whManager whManager.SetConnectionTimeout(warehouseutils.GetConnectionTimeout( job.warehouse.Type, job.warehouse.Destination.ID, @@ -454,16 +429,24 @@ func (job *UploadJob) run() (err error) { return err } - hasSchemaChanged, err := job.syncRemoteSchema() + whSchema, hasSchemaChanged, err := job.schemaHandle.SyncRemoteSchema(job.ctx, whManager, job.upload.ID) if err != nil { _, _ = job.setUploadError(err, FetchingRemoteSchemaFailed) return err } if hasSchemaChanged { - job.logger.Infof("[WH] Remote schema changed for Warehouse: %s", job.warehouse.Identifier) + job.logger.Infof("Remote schema changed", + logfield.SourceID, job.warehouse.Source.ID, + logfield.SourceType, job.warehouse.Source.SourceDefinition.Name, + logfield.DestinationID, job.warehouse.Destination.ID, + logfield.DestinationType, job.warehouse.Destination.DestinationDefinition.Name, + logfield.WorkspaceID, job.warehouse.WorkspaceID, + logfield.Namespace, job.warehouse.Namespace, + logfield.WarehouseID, job.warehouse.Identifier, + ) } - job.schemaHandle.uploadSchema = job.upload.UploadSchema + job.uploadSchema = whSchema userTables := []string{job.identifiesTableName(), job.usersTableName()} identityTables := []string{job.identityMergeRulesTableName(), job.identityMappingsTableName()} @@ -553,7 +536,7 @@ func (job *UploadJob) run() (err error) { case model.CreatedRemoteSchema: newStatus = nextUploadState.failed - if len(job.schemaHandle.schemaInWarehouse) == 0 { + if job.schemaHandle.IsWarehouseSchemaEmpty() { if err = whManager.CreateSchema(job.ctx); err != nil { break } @@ -596,7 +579,7 @@ func (job *UploadJob) run() (err error) { wg.Done() return } - err = job.exportUserTables(loadFilesTableMap) + err := job.exportUserTables(loadFilesTableMap) if err != nil { loadErrorLock.Lock() loadErrors = append(loadErrors, err) @@ -616,7 +599,7 @@ func (job *UploadJob) run() (err error) { wg.Done() return } - err = job.exportIdentities() + err := job.exportIdentities() if err != nil { loadErrorLock.Lock() loadErrors = append(loadErrors, err) @@ -630,7 +613,7 @@ func (job *UploadJob) run() (err error) { specialTables = append(specialTables, userTables...) specialTables = append(specialTables, identityTables...) - err = job.exportRegularTables(specialTables, loadFilesTableMap) + err := job.exportRegularTables(specialTables, loadFilesTableMap) if err != nil { loadErrorLock.Lock() loadErrors = append(loadErrors, err) @@ -919,10 +902,8 @@ func (job *UploadJob) addColumnsToWarehouse(ctx context.Context, tName string, c var columnsToAdd []warehouseutils.ColumnInfo for columnName, columnType := range columnsMap { // columns present in unrecognized schema should be skipped - if unrecognizedSchema, ok := job.schemaHandle.unrecognizedSchemaInWarehouse[tName]; ok { - if _, ok := unrecognizedSchema[columnName]; ok { - continue - } + if job.schemaHandle.IsColumnInUnrecognizedSchema(tName, columnName) { + continue } columnsToAdd = append(columnsToAdd, warehouseutils.ColumnInfo{Name: columnName, Type: columnType}) @@ -957,7 +938,9 @@ func (job *UploadJob) loadAllTablesExcept(skipLoadForTables []string, loadFilesT } } - job.logger.Infof(`[WH]: Running %d parallel loads in namespace %s of destination %s:%s`, parallelLoads, job.warehouse.Namespace, job.warehouse.Type, job.warehouse.Destination.ID) + job.logger.Infof(`[WH]: Running %d parallel loads in namespace %s of destination %s:%s`, + parallelLoads, job.warehouse.Namespace, job.warehouse.Type, job.warehouse.Destination.ID, + ) var loadErrors []error var loadErrorLock sync.Mutex @@ -994,18 +977,21 @@ func (job *UploadJob) loadAllTablesExcept(skipLoadForTables []string, loadFilesT } hasLoadFiles := loadFilesTableMap[tableNameT(tableName)] if !hasLoadFiles { - wg.Done() if slices.Contains(alwaysMarkExported, strings.ToLower(tableName)) { status := model.TableUploadExported _ = job.tableUploadsRepo.Set(job.ctx, job.upload.ID, tableName, repo.TableUploadSetOptions{ Status: &status, }) } + wg.Done() continue } tName := tableName loadChan <- struct{}{} rruntime.GoForWarehouse(func() { + defer wg.Done() + defer func() { <-loadChan }() + alteredSchema, err := job.loadTable(tName) if alteredSchema { alteredSchemaInAtLeastOneTable.Store(true) @@ -1016,29 +1002,28 @@ func (job *UploadJob) loadAllTablesExcept(skipLoadForTables []string, loadFilesT loadErrors = append(loadErrors, err) loadErrorLock.Unlock() } - wg.Done() - <-loadChan }) } wg.Wait() if alteredSchemaInAtLeastOneTable.Load() { job.logger.Infof("loadAllTablesExcept: schema changed - updating local schema for %s", job.warehouse.Identifier) - _ = job.schemaHandle.updateLocalSchema(job.ctx, job.upload.ID, job.schemaHandle.schemaInWarehouse) + _ = job.schemaHandle.UpdateLocalSchemaWithWarehouseSchema(job.ctx, job.upload.ID) } return loadErrors } func (job *UploadJob) updateSchema(tName string) (alteredSchema bool, err error) { - tableSchemaDiff := job.schemaHandle.generateTableSchemaDiff(tName) + uploadTableSchema := job.uploadSchema[tName] + tableSchemaDiff := job.schemaHandle.TableSchemaDiff(tName, uploadTableSchema) if tableSchemaDiff.Exists { err = job.UpdateTableSchema(tName, tableSchemaDiff) if err != nil { return } - job.setUpdatedTableSchema(tName, tableSchemaDiff.UpdatedSchema) + job.schemaHandle.SetWarehouseTableSchema(tName, tableSchemaDiff.UpdatedSchema) alteredSchema = true } return @@ -1194,7 +1179,7 @@ func (job *UploadJob) columnCountStat(tableName string) { tags := []warehouseutils.Tag{ {Name: "tableName", Value: strings.ToLower(tableName)}, } - currentColumnsCount := len(job.schemaHandle.schemaInWarehouse[tableName]) + currentColumnsCount := job.schemaHandle.CurrentColumnsCount(tableName) job.counterStat(`warehouse_load_table_column_count`, tags...).Count(currentColumnsCount) job.counterStat(`warehouse_load_table_column_limit`, tags...).Count(columnCountLimit) @@ -1281,7 +1266,7 @@ func (job *UploadJob) loadUserTables(loadFilesTableMap map[tableNameT]bool) ([]e if alteredIdentitySchema || alteredUserSchema { job.logger.Infof("loadUserTables: schema changed - updating local schema for %s", job.warehouse.Identifier) - _ = job.schemaHandle.updateLocalSchema(job.ctx, job.upload.ID, job.schemaHandle.schemaInWarehouse) + _ = job.schemaHandle.UpdateLocalSchemaWithWarehouseSchema(job.ctx, job.upload.ID) } return job.processLoadTableResponse(errorMap) } @@ -1307,7 +1292,6 @@ func (job *UploadJob) loadIdentityTables(populateHistoricIdentities bool) (loadE } errorMap := make(map[string]error) - // var generated bool if generated, _ := job.areIdentityTablesLoadFilesGenerated(job.ctx); !generated { if err := job.resolveIdentities(populateHistoricIdentities); err != nil { job.logger.Errorf(` ID Resolution operation failed: %v`, err) @@ -1324,7 +1308,8 @@ func (job *UploadJob) loadIdentityTables(populateHistoricIdentities bool) (loadE errorMap[tableName] = nil - tableSchemaDiff := job.schemaHandle.generateTableSchemaDiff(tableName) + uploadTableSchema := job.uploadSchema[tableName] + tableSchemaDiff := job.schemaHandle.TableSchemaDiff(tableName, uploadTableSchema) if tableSchemaDiff.Exists { err := job.UpdateTableSchema(tableName, tableSchemaDiff) if err != nil { @@ -1337,7 +1322,7 @@ func (job *UploadJob) loadIdentityTables(populateHistoricIdentities bool) (loadE errorMap := map[string]error{tableName: err} return job.processLoadTableResponse(errorMap) } - job.setUpdatedTableSchema(tableName, tableSchemaDiff.UpdatedSchema) + job.schemaHandle.SetWarehouseTableSchema(tableName, tableSchemaDiff.UpdatedSchema) status := model.TableUploadUpdatedSchema _ = job.tableUploadsRepo.Set(job.ctx, job.upload.ID, tableName, repo.TableUploadSetOptions{ @@ -1372,18 +1357,12 @@ func (job *UploadJob) loadIdentityTables(populateHistoricIdentities bool) (loadE if alteredSchema { job.logger.Infof("loadIdentityTables: schema changed - updating local schema for %s", job.warehouse.Identifier) - _ = job.schemaHandle.updateLocalSchema(job.ctx, job.upload.ID, job.schemaHandle.schemaInWarehouse) + _ = job.schemaHandle.UpdateLocalSchemaWithWarehouseSchema(job.ctx, job.upload.ID) } return job.processLoadTableResponse(errorMap) } -func (job *UploadJob) setUpdatedTableSchema(tableName string, updatedSchema model.TableSchema) { - job.schemaLock.Lock() - job.schemaHandle.schemaInWarehouse[tableName] = updatedSchema - job.schemaLock.Unlock() -} - func (job *UploadJob) processLoadTableResponse(errorMap map[string]error) (errors []error, tableUploadErr error) { for tName, loadErr := range errorMap { // TODO: set last_exec_time @@ -1507,7 +1486,11 @@ func (job *UploadJob) setUploadSchema(consolidatedSchema model.Schema) error { panic(err) } job.upload.UploadSchema = consolidatedSchema - return job.setUploadColumns(UploadColumnsOpts{Fields: []UploadColumn{{Column: UploadSchemaField, Value: marshalledSchema}}}) + return job.setUploadColumns(UploadColumnsOpts{ + Fields: []UploadColumn{ + {Column: UploadSchemaField, Value: marshalledSchema}, + }, + }) } // Set LoadFileIDs @@ -1551,8 +1534,7 @@ func (job *UploadJob) setUploadColumns(opts UploadColumnsOpts) error { SET %s WHERE - id = $1; -`, + id = $1;`, warehouseutils.WarehouseUploadsTable, columns, ) @@ -1570,8 +1552,6 @@ func (job *UploadJob) setUploadColumns(opts UploadColumnsOpts) error { } func (job *UploadJob) triggerUploadNow() (err error) { - job.uploadLock.Lock() - defer job.uploadLock.Unlock() newJobState := model.Waiting metadata := repo.ExtractUploadMetadata(job.upload) @@ -1987,15 +1967,23 @@ func (job *UploadJob) GetSchemaInWarehouse() (schema model.Schema) { if job.schemaHandle == nil { return } - return job.schemaHandle.schemaInWarehouse + return job.schemaHandle.GetWarehouseSchemaCopy(job.ctx) } func (job *UploadJob) GetTableSchemaInWarehouse(tableName string) model.TableSchema { - return job.schemaHandle.schemaInWarehouse[tableName] + return job.schemaHandle.GetWarehouseTableSchema(tableName) +} + +func (job *UploadJob) GetLocalSchema(ctx context.Context) (model.Schema, error) { + return job.schemaHandle.GetLocalSchema(ctx) +} + +func (job *UploadJob) UpdateLocalSchema(ctx context.Context, schema model.Schema) error { + return job.schemaHandle.UpdateLocalSchema(ctx, job.upload.ID, schema) } func (job *UploadJob) GetTableSchemaInUpload(tableName string) model.TableSchema { - return job.schemaHandle.uploadSchema[tableName] + return job.uploadSchema[tableName] } func (job *UploadJob) GetSingleLoadFile(ctx context.Context, tableName string) (warehouseutils.LoadFile, error) { @@ -2132,14 +2120,6 @@ func initializeStateMachine() { abortState.nextState = nil } -func (job *UploadJob) GetLocalSchema(ctx context.Context) (model.Schema, error) { - return job.schemaHandle.getLocalSchema(ctx) -} - -func (job *UploadJob) UpdateLocalSchema(ctx context.Context, schema model.Schema) error { - return job.schemaHandle.updateLocalSchema(ctx, job.upload.ID, schema) -} - func (job *UploadJob) RefreshPartitions(loadFileStartID, loadFileEndID int64) error { if !slices.Contains(warehouseutils.TimeWindowDestinations, job.upload.DestinationType) { return nil diff --git a/warehouse/upload_test.go b/warehouse/upload_test.go index 49b8fdf108..0f8141e815 100644 --- a/warehouse/upload_test.go +++ b/warehouse/upload_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-server/warehouse/schema" "github.com/ory/dockertest/v3" @@ -152,16 +153,13 @@ func TestColumnCountStat(t *testing.T) { }, }, statsFactory: store, - schemaHandle: &Schema{ - schemaInWarehouse: model.Schema{ - tableName: model.TableSchema{ - "test-column-1": "string", - "test-column-2": "string", - "test-column-3": "string", - }, - }, - }, + schemaHandle: &schema.Schema{}, } + j.schemaHandle.SetWarehouseTableSchema(tableName, model.TableSchema{ + "test-column-1": "string", + "test-column-2": "string", + "test-column-3": "string", + }) tags := j.buildTags() tags["tableName"] = tableName @@ -172,7 +170,7 @@ func TestColumnCountStat(t *testing.T) { m2 := store.Get("warehouse_load_table_column_limit", tags) if tc.statExpected { - require.EqualValues(t, m1.LastValue(), len(j.schemaHandle.schemaInWarehouse[tableName])) + require.EqualValues(t, m1.LastValue(), j.schemaHandle.CurrentColumnsCount(tableName)) require.EqualValues(t, m2.LastValue(), tc.columnCountLimit) } else { require.Nil(t, m1)