Skip to content

Commit

Permalink
chore: moving uploadSchema into UploadJob (#3888)
Browse files Browse the repository at this point in the history
  • Loading branch information
fracasula committed Sep 21, 2023
1 parent 74f5f11 commit 2a5547f
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 167 deletions.
11 changes: 6 additions & 5 deletions warehouse/internal/repo/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,19 @@ func (uploads *Uploads) Count(ctx context.Context, filters ...FilterBy) (int64,
}

func (uploads *Uploads) Get(ctx context.Context, id int64) (model.Upload, error) {
row := uploads.db.QueryRowContext(ctx, `
SELECT
row := uploads.db.QueryRowContext(ctx,
`SELECT
`+uploadColumns+`
FROM
`+uploadsTableName+`
WHERE
id = $1
`, id)
id = $1`,
id,
)

var upload model.Upload
err := scanUpload(row.Scan, &upload)
if err == sql.ErrNoRows {
if errors.Is(err, sql.ErrNoRows) {
return model.Upload{}, model.ErrUploadNotFound
}
if err != nil {
Expand Down
87 changes: 42 additions & 45 deletions warehouse/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,16 @@ import (
"reflect"
"regexp"

"golang.org/x/exp/slices"

"github.com/samber/lo"
"golang.org/x/exp/slices"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"

"github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/internal/repo"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/internal/repo"
"github.com/rudderlabs/rudder-server/warehouse/logfield"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

var (
Expand All @@ -30,7 +27,9 @@ var (
// deprecatedColumnsRegex
// This regex is used to identify deprecated columns in the warehouse
// Example: abc-deprecated-dba626a7-406a-4757-b3e0-3875559c5840
var deprecatedColumnsRegex = regexp.MustCompile(`.*-deprecated-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`)
var deprecatedColumnsRegex = regexp.MustCompile(
`.*-deprecated-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`,
)

type schemaRepo interface {
GetForNamespace(ctx context.Context, sourceID, destID, namespace string) (model.WHSchema, error)
Expand All @@ -50,7 +49,6 @@ type Schema struct {
localSchema model.Schema
schemaInWarehouse model.Schema
unrecognizedSchemaInWarehouse model.Schema
uploadSchema model.Schema
schemaRepo schemaRepo
stagingFileRepo stagingFileRepo
log logger.Logger
Expand All @@ -63,12 +61,13 @@ func NewSchema(
db *sqlquerywrapper.DB,
warehouse model.Warehouse,
conf *config.Config,
logger logger.Logger,
) *Schema {
return &Schema{
warehouse: warehouse,
schemaRepo: repo.NewWHSchemas(db),
stagingFileRepo: repo.NewStagingFiles(db),
log: logger.NewLogger().Child("warehouse").Child("schema"),
log: logger,
stagingFilesSchemaPaginationSize: conf.GetInt("Warehouse.stagingFilesSchemaPaginationSize", 100),
skipDeepEqualSchemas: conf.GetBool("Warehouse.skipDeepEqualSchemas", false),
enableIDResolution: conf.GetBool("Warehouse.enableIDResolution", false),
Expand Down Expand Up @@ -128,17 +127,17 @@ func (sh *Schema) fetchSchemaFromWarehouse(ctx context.Context, repo fetchSchema
return fmt.Errorf("fetching schema from warehouse: %w", err)
}

sh.skipDeprecatedColumns(warehouseSchema)
sh.skipDeprecatedColumns(unrecognizedWarehouseSchema)
sh.removeDeprecatedColumns(warehouseSchema)
sh.removeDeprecatedColumns(unrecognizedWarehouseSchema)

sh.schemaInWarehouse = warehouseSchema
sh.unrecognizedSchemaInWarehouse = unrecognizedWarehouseSchema

return nil
}

// skipDeprecatedColumns skips deprecated columns from the schema
func (sh *Schema) skipDeprecatedColumns(schema model.Schema) {
// removeDeprecatedColumns skips deprecated columns from the schema map
func (sh *Schema) removeDeprecatedColumns(schema model.Schema) {
for tableName, columnMap := range schema {
for columnName := range columnMap {
if deprecatedColumnsRegex.MatchString(columnName) {
Expand All @@ -152,20 +151,18 @@ func (sh *Schema) skipDeprecatedColumns(schema model.Schema) {
logfield.ColumnName, columnName,
)
delete(schema[tableName], columnName)
continue
}
}
}
}

func (sh *Schema) prepareUploadSchema(ctx context.Context, stagingFiles []*model.StagingFile) error {
func (sh *Schema) prepareUploadSchema(ctx context.Context, stagingFiles []*model.StagingFile) (model.Schema, error) {
consolidatedSchema, err := sh.consolidateStagingFilesSchemaUsingWarehouseSchema(ctx, stagingFiles)
if err != nil {
return fmt.Errorf("consolidating staging files schema: %w", err)
return nil, fmt.Errorf("consolidating staging files schema: %w", err)
}

sh.uploadSchema = consolidatedSchema
return nil
return consolidatedSchema, nil
}

// consolidateStagingFilesSchemaUsingWarehouseSchema consolidates staging files schema with warehouse schema
Expand Down Expand Up @@ -248,10 +245,10 @@ func consolidateWarehouseSchema(consolidatedSchema, warehouseSchema model.Schema
// Removes the user_id column from the users table
func overrideUsersWithIdentifiesSchema(consolidatedSchema model.Schema, warehouseType string, warehouseSchema model.Schema) model.Schema {
var (
usersTable = warehouseutils.ToProviderCase(warehouseType, warehouseutils.UsersTable)
identifiesTable = warehouseutils.ToProviderCase(warehouseType, warehouseutils.IdentifiesTable)
userIDColumn = warehouseutils.ToProviderCase(warehouseType, "user_id")
IDColumn = warehouseutils.ToProviderCase(warehouseType, "id")
usersTable = whutils.ToProviderCase(warehouseType, whutils.UsersTable)
identifiesTable = whutils.ToProviderCase(warehouseType, whutils.IdentifiesTable)
userIDColumn = whutils.ToProviderCase(warehouseType, "user_id")
IDColumn = whutils.ToProviderCase(warehouseType, "id")
)
if _, ok := consolidatedSchema[usersTable]; !ok {
return consolidatedSchema
Expand Down Expand Up @@ -279,15 +276,15 @@ func overrideUsersWithIdentifiesSchema(consolidatedSchema model.Schema, warehous
func enhanceDiscardsSchema(consolidatedSchema model.Schema, warehouseType string) model.Schema {
discards := model.TableSchema{}

for colName, colType := range warehouseutils.DiscardsSchema {
discards[warehouseutils.ToProviderCase(warehouseType, colName)] = colType
for colName, colType := range whutils.DiscardsSchema {
discards[whutils.ToProviderCase(warehouseType, colName)] = colType
}

if warehouseType == warehouseutils.BQ {
discards[warehouseutils.ToProviderCase(warehouseType, "loaded_at")] = "datetime"
if warehouseType == whutils.BQ {
discards[whutils.ToProviderCase(warehouseType, "loaded_at")] = "datetime"
}

consolidatedSchema[warehouseutils.ToProviderCase(warehouseType, warehouseutils.DiscardsTable)] = discards
consolidatedSchema[whutils.ToProviderCase(warehouseType, whutils.DiscardsTable)] = discards
return consolidatedSchema
}

Expand All @@ -297,28 +294,28 @@ func enhanceSchemaWithIDResolution(consolidatedSchema model.Schema, isIDResoluti
return consolidatedSchema
}
var (
mergeRulesTable = warehouseutils.ToProviderCase(warehouseType, warehouseutils.IdentityMergeRulesTable)
mappingsTable = warehouseutils.ToProviderCase(warehouseType, warehouseutils.IdentityMappingsTable)
mergeRulesTable = whutils.ToProviderCase(warehouseType, whutils.IdentityMergeRulesTable)
mappingsTable = whutils.ToProviderCase(warehouseType, whutils.IdentityMappingsTable)
)
if _, ok := consolidatedSchema[mergeRulesTable]; ok {
consolidatedSchema[mergeRulesTable] = model.TableSchema{
warehouseutils.ToProviderCase(warehouseType, "merge_property_1_type"): "string",
warehouseutils.ToProviderCase(warehouseType, "merge_property_1_value"): "string",
warehouseutils.ToProviderCase(warehouseType, "merge_property_2_type"): "string",
warehouseutils.ToProviderCase(warehouseType, "merge_property_2_value"): "string",
whutils.ToProviderCase(warehouseType, "merge_property_1_type"): "string",
whutils.ToProviderCase(warehouseType, "merge_property_1_value"): "string",
whutils.ToProviderCase(warehouseType, "merge_property_2_type"): "string",
whutils.ToProviderCase(warehouseType, "merge_property_2_value"): "string",
}
consolidatedSchema[mappingsTable] = model.TableSchema{
warehouseutils.ToProviderCase(warehouseType, "merge_property_type"): "string",
warehouseutils.ToProviderCase(warehouseType, "merge_property_value"): "string",
warehouseutils.ToProviderCase(warehouseType, "rudder_id"): "string",
warehouseutils.ToProviderCase(warehouseType, "updated_at"): "datetime",
whutils.ToProviderCase(warehouseType, "merge_property_type"): "string",
whutils.ToProviderCase(warehouseType, "merge_property_value"): "string",
whutils.ToProviderCase(warehouseType, "rudder_id"): "string",
whutils.ToProviderCase(warehouseType, "updated_at"): "datetime",
}
}
return consolidatedSchema
}

func (sh *Schema) isIDResolutionEnabled() bool {
return sh.enableIDResolution && slices.Contains(warehouseutils.IdentityEnabledWarehouses, sh.warehouse.Type)
return sh.enableIDResolution && slices.Contains(whutils.IdentityEnabledWarehouses, sh.warehouse.Type)
}

// hasSchemaChanged compares the localSchema with the schemaInWarehouse
Expand Down Expand Up @@ -350,23 +347,23 @@ func (sh *Schema) hasSchemaChanged() bool {
return false
}

// generateTableSchemaDiff returns the diff between the warehouse schema and the upload schema
func (sh *Schema) generateTableSchemaDiff(tableName string) warehouseutils.TableSchemaDiff {
diff := warehouseutils.TableSchemaDiff{
// TableSchemaDiff returns the diff between the warehouse schema and the upload schema
func (sh *Schema) TableSchemaDiff(tableName string, tableSchema model.TableSchema) whutils.TableSchemaDiff {
diff := whutils.TableSchemaDiff{
ColumnMap: make(model.TableSchema),
UpdatedSchema: make(model.TableSchema),
AlteredColumnMap: make(model.TableSchema),
}

currentTableSchema, ok := sh.schemaInWarehouse[tableName]
if !ok {
if _, ok := sh.uploadSchema[tableName]; !ok {
if len(tableSchema) == 0 {
return diff
}
diff.Exists = true
diff.TableToBeCreated = true
diff.ColumnMap = sh.uploadSchema[tableName]
diff.UpdatedSchema = sh.uploadSchema[tableName]
diff.ColumnMap = tableSchema
diff.UpdatedSchema = tableSchema
return diff
}

Expand All @@ -375,7 +372,7 @@ func (sh *Schema) generateTableSchemaDiff(tableName string) warehouseutils.Table
}

diff.ColumnMap = make(model.TableSchema)
for columnName, columnType := range sh.uploadSchema[tableName] {
for columnName, columnType := range tableSchema {
if _, ok := currentTableSchema[columnName]; !ok {
diff.ColumnMap[columnName] = columnType
diff.UpdatedSchema[columnName] = columnType
Expand Down
42 changes: 16 additions & 26 deletions warehouse/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,17 +585,16 @@ func TestSchema_FetchSchemaFromWarehouse(t *testing.T) {

func TestSchema_GetUploadSchemaDiff(t *testing.T) {
testCases := []struct {
name string
tableName string
currentSchema model.Schema
uploadSchema model.Schema
expected warehouseutils.TableSchemaDiff
name string
tableName string
currentSchema model.Schema
uploadTableSchema model.TableSchema
expected warehouseutils.TableSchemaDiff
}{
{
name: "empty current and upload schema",
tableName: "test-table",
currentSchema: model.Schema{},
uploadSchema: model.Schema{},
expected: warehouseutils.TableSchemaDiff{
ColumnMap: model.TableSchema{},
UpdatedSchema: model.TableSchema{},
Expand All @@ -606,10 +605,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) {
name: "empty current schema",
tableName: "test-table",
currentSchema: model.Schema{},
uploadSchema: model.Schema{
"test-table": model.TableSchema{
"test-column": "test-value",
},
uploadTableSchema: model.TableSchema{
"test-column": "test-value",
},
expected: warehouseutils.TableSchemaDiff{
Exists: true,
Expand All @@ -631,10 +628,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) {
"test-column": "test-value-1",
},
},
uploadSchema: model.Schema{
"test-table": model.TableSchema{
"test-column": "test-value-2",
},
uploadTableSchema: model.TableSchema{
"test-column": "test-value-2",
},
expected: warehouseutils.TableSchemaDiff{
Exists: false,
Expand All @@ -655,10 +650,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) {
"test-column-2": "test-value-2",
},
},
uploadSchema: model.Schema{
"test-table": model.TableSchema{
"test-column": "test-value-2",
},
uploadTableSchema: model.TableSchema{
"test-column": "test-value-2",
},
expected: warehouseutils.TableSchemaDiff{
Exists: true,
Expand All @@ -683,10 +676,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) {
"test-column-2": "test-value-2",
},
},
uploadSchema: model.Schema{
"test-table": model.TableSchema{
"test-column": "text",
},
uploadTableSchema: model.TableSchema{
"test-column": "text",
},
expected: warehouseutils.TableSchemaDiff{
Exists: true,
Expand All @@ -710,9 +701,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) {

sch := Schema{
schemaInWarehouse: tc.currentSchema,
uploadSchema: tc.uploadSchema,
}
diff := sch.generateTableSchemaDiff(tc.tableName)
diff := sch.TableSchemaDiff(tc.tableName, tc.uploadTableSchema)
require.EqualValues(t, diff, tc.expected)
})
}
Expand Down Expand Up @@ -1901,13 +1891,13 @@ func TestSchema_PrepareUploadSchema(t *testing.T) {
stagingFilesSchemaPaginationSize: 2,
}

err := sh.prepareUploadSchema(ctx, stagingFiles)
uploadSchema, err := sh.prepareUploadSchema(ctx, stagingFiles)
if tc.wantError != nil {
require.EqualError(t, err, tc.wantError.Error())
} else {
require.NoError(t, err)
}
require.Equal(t, tc.expectedSchema, sh.uploadSchema)
require.Equal(t, tc.expectedSchema, uploadSchema)
})
}
}
Loading

0 comments on commit 2a5547f

Please sign in to comment.