Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: moving uploadSchema into UploadJob #3888

Merged
merged 12 commits into from
Sep 21, 2023
11 changes: 6 additions & 5 deletions warehouse/internal/repo/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,19 @@ func (uploads *Uploads) Count(ctx context.Context, filters ...FilterBy) (int64,
}

func (uploads *Uploads) Get(ctx context.Context, id int64) (model.Upload, error) {
row := uploads.db.QueryRowContext(ctx, `
SELECT
row := uploads.db.QueryRowContext(ctx,
`SELECT
`+uploadColumns+`
FROM
`+uploadsTableName+`
WHERE
id = $1
`, id)
id = $1`,
id,
)

var upload model.Upload
err := scanUpload(row.Scan, &upload)
if err == sql.ErrNoRows {
if errors.Is(err, sql.ErrNoRows) {
return model.Upload{}, model.ErrUploadNotFound
}
if err != nil {
Expand Down
87 changes: 42 additions & 45 deletions warehouse/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,16 @@ import (
"reflect"
"regexp"

"golang.org/x/exp/slices"

"github.com/samber/lo"
"golang.org/x/exp/slices"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"

"github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper"
"github.com/rudderlabs/rudder-server/warehouse/internal/repo"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/internal/repo"
"github.com/rudderlabs/rudder-server/warehouse/logfield"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

var (
Expand All @@ -30,7 +27,9 @@ var (
// deprecatedColumnsRegex
// This regex is used to identify deprecated columns in the warehouse
// Example: abc-deprecated-dba626a7-406a-4757-b3e0-3875559c5840
var deprecatedColumnsRegex = regexp.MustCompile(`.*-deprecated-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`)
var deprecatedColumnsRegex = regexp.MustCompile(
`.*-deprecated-[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`,
)

type schemaRepo interface {
GetForNamespace(ctx context.Context, sourceID, destID, namespace string) (model.WHSchema, error)
Expand All @@ -50,7 +49,6 @@ type Schema struct {
localSchema model.Schema
schemaInWarehouse model.Schema
unrecognizedSchemaInWarehouse model.Schema
uploadSchema model.Schema
schemaRepo schemaRepo
stagingFileRepo stagingFileRepo
log logger.Logger
Expand All @@ -63,12 +61,13 @@ func NewSchema(
db *sqlquerywrapper.DB,
warehouse model.Warehouse,
conf *config.Config,
logger logger.Logger,
) *Schema {
return &Schema{
warehouse: warehouse,
schemaRepo: repo.NewWHSchemas(db),
stagingFileRepo: repo.NewStagingFiles(db),
log: logger.NewLogger().Child("warehouse").Child("schema"),
log: logger,
stagingFilesSchemaPaginationSize: conf.GetInt("Warehouse.stagingFilesSchemaPaginationSize", 100),
skipDeepEqualSchemas: conf.GetBool("Warehouse.skipDeepEqualSchemas", false),
enableIDResolution: conf.GetBool("Warehouse.enableIDResolution", false),
Expand Down Expand Up @@ -128,17 +127,17 @@ func (sh *Schema) fetchSchemaFromWarehouse(ctx context.Context, repo fetchSchema
return fmt.Errorf("fetching schema from warehouse: %w", err)
}

sh.skipDeprecatedColumns(warehouseSchema)
sh.skipDeprecatedColumns(unrecognizedWarehouseSchema)
sh.removeDeprecatedColumns(warehouseSchema)
sh.removeDeprecatedColumns(unrecognizedWarehouseSchema)

sh.schemaInWarehouse = warehouseSchema
sh.unrecognizedSchemaInWarehouse = unrecognizedWarehouseSchema

return nil
}

// skipDeprecatedColumns skips deprecated columns from the schema
func (sh *Schema) skipDeprecatedColumns(schema model.Schema) {
// removeDeprecatedColumns skips deprecated columns from the schema map
func (sh *Schema) removeDeprecatedColumns(schema model.Schema) {
for tableName, columnMap := range schema {
for columnName := range columnMap {
if deprecatedColumnsRegex.MatchString(columnName) {
Expand All @@ -152,20 +151,18 @@ func (sh *Schema) skipDeprecatedColumns(schema model.Schema) {
logfield.ColumnName, columnName,
)
delete(schema[tableName], columnName)
continue
}
}
}
}

func (sh *Schema) prepareUploadSchema(ctx context.Context, stagingFiles []*model.StagingFile) error {
func (sh *Schema) prepareUploadSchema(ctx context.Context, stagingFiles []*model.StagingFile) (model.Schema, error) {
consolidatedSchema, err := sh.consolidateStagingFilesSchemaUsingWarehouseSchema(ctx, stagingFiles)
if err != nil {
return fmt.Errorf("consolidating staging files schema: %w", err)
return nil, fmt.Errorf("consolidating staging files schema: %w", err)
}

sh.uploadSchema = consolidatedSchema
return nil
return consolidatedSchema, nil
}

// consolidateStagingFilesSchemaUsingWarehouseSchema consolidates staging files schema with warehouse schema
Expand Down Expand Up @@ -248,10 +245,10 @@ func consolidateWarehouseSchema(consolidatedSchema, warehouseSchema model.Schema
// Removes the user_id column from the users table
func overrideUsersWithIdentifiesSchema(consolidatedSchema model.Schema, warehouseType string, warehouseSchema model.Schema) model.Schema {
var (
usersTable = warehouseutils.ToProviderCase(warehouseType, warehouseutils.UsersTable)
identifiesTable = warehouseutils.ToProviderCase(warehouseType, warehouseutils.IdentifiesTable)
userIDColumn = warehouseutils.ToProviderCase(warehouseType, "user_id")
IDColumn = warehouseutils.ToProviderCase(warehouseType, "id")
usersTable = whutils.ToProviderCase(warehouseType, whutils.UsersTable)
identifiesTable = whutils.ToProviderCase(warehouseType, whutils.IdentifiesTable)
userIDColumn = whutils.ToProviderCase(warehouseType, "user_id")
IDColumn = whutils.ToProviderCase(warehouseType, "id")
)
if _, ok := consolidatedSchema[usersTable]; !ok {
return consolidatedSchema
Expand Down Expand Up @@ -279,15 +276,15 @@ func overrideUsersWithIdentifiesSchema(consolidatedSchema model.Schema, warehous
func enhanceDiscardsSchema(consolidatedSchema model.Schema, warehouseType string) model.Schema {
discards := model.TableSchema{}

for colName, colType := range warehouseutils.DiscardsSchema {
discards[warehouseutils.ToProviderCase(warehouseType, colName)] = colType
for colName, colType := range whutils.DiscardsSchema {
discards[whutils.ToProviderCase(warehouseType, colName)] = colType
}

if warehouseType == warehouseutils.BQ {
discards[warehouseutils.ToProviderCase(warehouseType, "loaded_at")] = "datetime"
if warehouseType == whutils.BQ {
discards[whutils.ToProviderCase(warehouseType, "loaded_at")] = "datetime"
}

consolidatedSchema[warehouseutils.ToProviderCase(warehouseType, warehouseutils.DiscardsTable)] = discards
consolidatedSchema[whutils.ToProviderCase(warehouseType, whutils.DiscardsTable)] = discards
return consolidatedSchema
}

Expand All @@ -297,28 +294,28 @@ func enhanceSchemaWithIDResolution(consolidatedSchema model.Schema, isIDResoluti
return consolidatedSchema
}
var (
mergeRulesTable = warehouseutils.ToProviderCase(warehouseType, warehouseutils.IdentityMergeRulesTable)
mappingsTable = warehouseutils.ToProviderCase(warehouseType, warehouseutils.IdentityMappingsTable)
mergeRulesTable = whutils.ToProviderCase(warehouseType, whutils.IdentityMergeRulesTable)
mappingsTable = whutils.ToProviderCase(warehouseType, whutils.IdentityMappingsTable)
)
if _, ok := consolidatedSchema[mergeRulesTable]; ok {
consolidatedSchema[mergeRulesTable] = model.TableSchema{
warehouseutils.ToProviderCase(warehouseType, "merge_property_1_type"): "string",
warehouseutils.ToProviderCase(warehouseType, "merge_property_1_value"): "string",
warehouseutils.ToProviderCase(warehouseType, "merge_property_2_type"): "string",
warehouseutils.ToProviderCase(warehouseType, "merge_property_2_value"): "string",
whutils.ToProviderCase(warehouseType, "merge_property_1_type"): "string",
whutils.ToProviderCase(warehouseType, "merge_property_1_value"): "string",
whutils.ToProviderCase(warehouseType, "merge_property_2_type"): "string",
whutils.ToProviderCase(warehouseType, "merge_property_2_value"): "string",
}
consolidatedSchema[mappingsTable] = model.TableSchema{
warehouseutils.ToProviderCase(warehouseType, "merge_property_type"): "string",
warehouseutils.ToProviderCase(warehouseType, "merge_property_value"): "string",
warehouseutils.ToProviderCase(warehouseType, "rudder_id"): "string",
warehouseutils.ToProviderCase(warehouseType, "updated_at"): "datetime",
whutils.ToProviderCase(warehouseType, "merge_property_type"): "string",
whutils.ToProviderCase(warehouseType, "merge_property_value"): "string",
whutils.ToProviderCase(warehouseType, "rudder_id"): "string",
whutils.ToProviderCase(warehouseType, "updated_at"): "datetime",
}
}
return consolidatedSchema
}

func (sh *Schema) isIDResolutionEnabled() bool {
return sh.enableIDResolution && slices.Contains(warehouseutils.IdentityEnabledWarehouses, sh.warehouse.Type)
return sh.enableIDResolution && slices.Contains(whutils.IdentityEnabledWarehouses, sh.warehouse.Type)
}

// hasSchemaChanged compares the localSchema with the schemaInWarehouse
Expand Down Expand Up @@ -350,23 +347,23 @@ func (sh *Schema) hasSchemaChanged() bool {
return false
}

// generateTableSchemaDiff returns the diff between the warehouse schema and the upload schema
func (sh *Schema) generateTableSchemaDiff(tableName string) warehouseutils.TableSchemaDiff {
diff := warehouseutils.TableSchemaDiff{
// TableSchemaDiff returns the diff between the warehouse schema and the upload schema
func (sh *Schema) TableSchemaDiff(tableName string, tableSchema model.TableSchema) whutils.TableSchemaDiff {
Comment on lines +350 to +351
Copy link
Member

@achettyiitr achettyiitr Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are calculating diff, it feels more intuitive to have the first parameter as schema and the second as tableName? wdyt?

Suggested change
// TableSchemaDiff returns the diff between the warehouse schema and the upload schema
func (sh *Schema) TableSchemaDiff(tableName string, tableSchema model.TableSchema) whutils.TableSchemaDiff {
// TableSchemaDiff returns the diff between the warehouse schema and the upload schema
func (sh *Schema) TableSchemaDiff(tableSchema model.TableSchema, tableName string) whutils.TableSchemaDiff {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you just suggesting to have the tableSchema as the first parameter?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

diff := whutils.TableSchemaDiff{
ColumnMap: make(model.TableSchema),
UpdatedSchema: make(model.TableSchema),
AlteredColumnMap: make(model.TableSchema),
}

currentTableSchema, ok := sh.schemaInWarehouse[tableName]
if !ok {
if _, ok := sh.uploadSchema[tableName]; !ok {
if len(tableSchema) == 0 {
return diff
}
diff.Exists = true
diff.TableToBeCreated = true
diff.ColumnMap = sh.uploadSchema[tableName]
diff.UpdatedSchema = sh.uploadSchema[tableName]
diff.ColumnMap = tableSchema
diff.UpdatedSchema = tableSchema
return diff
}

Expand All @@ -375,7 +372,7 @@ func (sh *Schema) generateTableSchemaDiff(tableName string) warehouseutils.Table
}

diff.ColumnMap = make(model.TableSchema)
for columnName, columnType := range sh.uploadSchema[tableName] {
for columnName, columnType := range tableSchema {
if _, ok := currentTableSchema[columnName]; !ok {
diff.ColumnMap[columnName] = columnType
diff.UpdatedSchema[columnName] = columnType
Expand Down
42 changes: 16 additions & 26 deletions warehouse/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,17 +585,16 @@ func TestSchema_FetchSchemaFromWarehouse(t *testing.T) {

func TestSchema_GetUploadSchemaDiff(t *testing.T) {
testCases := []struct {
name string
tableName string
currentSchema model.Schema
uploadSchema model.Schema
expected warehouseutils.TableSchemaDiff
name string
tableName string
currentSchema model.Schema
uploadTableSchema model.TableSchema
expected warehouseutils.TableSchemaDiff
}{
{
name: "empty current and upload schema",
tableName: "test-table",
currentSchema: model.Schema{},
uploadSchema: model.Schema{},
expected: warehouseutils.TableSchemaDiff{
ColumnMap: model.TableSchema{},
UpdatedSchema: model.TableSchema{},
Expand All @@ -606,10 +605,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) {
name: "empty current schema",
tableName: "test-table",
currentSchema: model.Schema{},
uploadSchema: model.Schema{
"test-table": model.TableSchema{
"test-column": "test-value",
},
uploadTableSchema: model.TableSchema{
"test-column": "test-value",
},
expected: warehouseutils.TableSchemaDiff{
Exists: true,
Expand All @@ -631,10 +628,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) {
"test-column": "test-value-1",
},
},
uploadSchema: model.Schema{
"test-table": model.TableSchema{
"test-column": "test-value-2",
},
uploadTableSchema: model.TableSchema{
"test-column": "test-value-2",
},
expected: warehouseutils.TableSchemaDiff{
Exists: false,
Expand All @@ -655,10 +650,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) {
"test-column-2": "test-value-2",
},
},
uploadSchema: model.Schema{
"test-table": model.TableSchema{
"test-column": "test-value-2",
},
uploadTableSchema: model.TableSchema{
"test-column": "test-value-2",
},
expected: warehouseutils.TableSchemaDiff{
Exists: true,
Expand All @@ -683,10 +676,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) {
"test-column-2": "test-value-2",
},
},
uploadSchema: model.Schema{
"test-table": model.TableSchema{
"test-column": "text",
},
uploadTableSchema: model.TableSchema{
"test-column": "text",
},
expected: warehouseutils.TableSchemaDiff{
Exists: true,
Expand All @@ -710,9 +701,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) {

sch := Schema{
schemaInWarehouse: tc.currentSchema,
uploadSchema: tc.uploadSchema,
}
diff := sch.generateTableSchemaDiff(tc.tableName)
diff := sch.TableSchemaDiff(tc.tableName, tc.uploadTableSchema)
require.EqualValues(t, diff, tc.expected)
})
}
Expand Down Expand Up @@ -1901,13 +1891,13 @@ func TestSchema_PrepareUploadSchema(t *testing.T) {
stagingFilesSchemaPaginationSize: 2,
}

err := sh.prepareUploadSchema(ctx, stagingFiles)
uploadSchema, err := sh.prepareUploadSchema(ctx, stagingFiles)
if tc.wantError != nil {
require.EqualError(t, err, tc.wantError.Error())
} else {
require.NoError(t, err)
}
require.Equal(t, tc.expectedSchema, sh.uploadSchema)
require.Equal(t, tc.expectedSchema, uploadSchema)
})
}
}
Loading
Loading