Skip to content

Commit

Permalink
chore: removing uploadSchema mutex
Browse files Browse the repository at this point in the history
  • Loading branch information
fracasula committed Aug 24, 2023
1 parent 20793c7 commit 81acc08
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 48 deletions.
12 changes: 6 additions & 6 deletions warehouse/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func (sh *Schema) ConsolidateLocalSchemaWithStagingFiles(
return consolidatedSchema, nil
}

// TableSchemaDiff returns the diff between the warehouse schema and the upload schema
func (sh *Schema) TableSchemaDiff(tableName string, schema model.Schema) whutils.TableSchemaDiff {
// TableSchemaDiff returns the diff between the warehouse schema and the upload table schema
func (sh *Schema) TableSchemaDiff(tableName string, tableSchema model.TableSchema) whutils.TableSchemaDiff {
diff := whutils.TableSchemaDiff{
ColumnMap: make(model.TableSchema),
UpdatedSchema: make(model.TableSchema),
Expand All @@ -97,13 +97,13 @@ func (sh *Schema) TableSchemaDiff(tableName string, schema model.Schema) whutils
currentTableSchema, ok := sh.schemaInWarehouse[tableName]

if !ok {
if _, ok := schema[tableName]; !ok {
if len(tableSchema) == 0 {
return diff
}
diff.Exists = true
diff.TableToBeCreated = true
diff.ColumnMap = schema[tableName]
diff.UpdatedSchema = schema[tableName]
diff.ColumnMap = tableSchema
diff.UpdatedSchema = tableSchema
return diff
}

Expand All @@ -112,7 +112,7 @@ func (sh *Schema) TableSchemaDiff(tableName string, schema model.Schema) whutils
}

diff.ColumnMap = make(model.TableSchema)
for columnName, columnType := range schema[tableName] {
for columnName, columnType := range tableSchema {
if _, ok := currentTableSchema[columnName]; !ok {
diff.ColumnMap[columnName] = columnType
diff.UpdatedSchema[columnName] = columnType
Expand Down
39 changes: 15 additions & 24 deletions warehouse/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,17 +324,16 @@ func TestSchema_FetchSchemaFromWarehouse(t *testing.T) {

func TestSchema_GetUploadSchemaDiff(t *testing.T) {
testCases := []struct {
name string
tableName string
currentSchema model.Schema
uploadSchema model.Schema
expected whutils.TableSchemaDiff
name string
tableName string
currentSchema model.Schema
uploadTableSchema model.TableSchema
expected whutils.TableSchemaDiff
}{
{
name: "empty current and upload schema",
name: "empty current and upload table schema",
tableName: "test-table",
currentSchema: model.Schema{},
uploadSchema: model.Schema{},
expected: whutils.TableSchemaDiff{
ColumnMap: model.TableSchema{},
UpdatedSchema: model.TableSchema{},
Expand All @@ -345,10 +344,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) {
name: "empty current schema",
tableName: "test-table",
currentSchema: model.Schema{},
uploadSchema: model.Schema{
"test-table": model.TableSchema{
"test-column": "test-value",
},
uploadTableSchema: model.TableSchema{
"test-column": "test-value",
},
expected: whutils.TableSchemaDiff{
Exists: true,
Expand All @@ -370,10 +367,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) {
"test-column": "test-value-1",
},
},
uploadSchema: model.Schema{
"test-table": model.TableSchema{
"test-column": "test-value-2",
},
uploadTableSchema: model.TableSchema{
"test-column": "test-value-2",
},
expected: whutils.TableSchemaDiff{
Exists: false,
Expand All @@ -394,10 +389,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) {
"test-column-2": "test-value-2",
},
},
uploadSchema: model.Schema{
"test-table": model.TableSchema{
"test-column": "test-value-2",
},
uploadTableSchema: model.TableSchema{
"test-column": "test-value-2",
},
expected: whutils.TableSchemaDiff{
Exists: true,
Expand All @@ -422,10 +415,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) {
"test-column-2": "test-value-2",
},
},
uploadSchema: model.Schema{
"test-table": model.TableSchema{
"test-column": "text",
},
uploadTableSchema: model.TableSchema{
"test-column": "text",
},
expected: whutils.TableSchemaDiff{
Exists: true,
Expand All @@ -450,7 +441,7 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) {
sch := Schema{
schemaInWarehouse: tc.currentSchema,
}
diff := sch.TableSchemaDiff(tc.tableName, tc.uploadSchema)
diff := sch.TableSchemaDiff(tc.tableName, tc.uploadTableSchema)
require.EqualValues(t, diff, tc.expected)
})
}
Expand Down
25 changes: 7 additions & 18 deletions warehouse/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ type UploadJob struct {
pgNotifier *pgnotifier.PGNotifier
schemaHandle *schema.Schema
uploadSchema model.Schema
uploadSchemaMu sync.RWMutex
conf *config.Config
logger logger.Logger
statsFactory stats.Stats
Expand Down Expand Up @@ -314,9 +313,7 @@ func (job *UploadJob) generateUploadSchema() error {
return fmt.Errorf("set upload schema: %w", err)
}

job.uploadSchemaMu.Lock()
job.uploadSchema = uploadSchema
job.uploadSchemaMu.Unlock()

return nil
}
Expand Down Expand Up @@ -412,6 +409,8 @@ func (job *UploadJob) run() (err error) {
return err
}

// TODO can't we use a Manager factory instead and get a new instance of the "Manager" every time
// instead of using the same one?
whManager := job.whManager
whManager.SetConnectionTimeout(warehouseutils.GetConnectionTimeout(
job.warehouse.Type, job.warehouse.Destination.ID,
Expand All @@ -423,6 +422,7 @@ func (job *UploadJob) run() (err error) {
}
defer whManager.Cleanup(job.ctx)

// TODO what does Recover really do?
if err = job.recovery.Recover(job.ctx, whManager, job.warehouse); err != nil {
_, _ = job.setUploadError(err, InternalProcessingFailed)
return err
Expand All @@ -445,9 +445,7 @@ func (job *UploadJob) run() (err error) {
)
}

job.uploadSchemaMu.Lock()
job.uploadSchema = whSchema
job.uploadSchemaMu.Unlock()

userTables := []string{job.identifiesTableName(), job.usersTableName()}
identityTables := []string{job.identityMergeRulesTableName(), job.identityMappingsTableName()}
Expand Down Expand Up @@ -1016,11 +1014,8 @@ func (job *UploadJob) loadAllTablesExcept(skipLoadForTables []string, loadFilesT
}

func (job *UploadJob) updateSchema(tName string) (alteredSchema bool, err error) {
job.uploadSchemaMu.RLock()
uploadSchema := job.uploadSchema.Clone()
job.uploadSchemaMu.RUnlock()

tableSchemaDiff := job.schemaHandle.TableSchemaDiff(tName, uploadSchema)
uploadTableSchema := job.uploadSchema[tName]
tableSchemaDiff := job.schemaHandle.TableSchemaDiff(tName, uploadTableSchema)
if tableSchemaDiff.Exists {
err = job.UpdateTableSchema(tName, tableSchemaDiff)
if err != nil {
Expand Down Expand Up @@ -1296,7 +1291,6 @@ func (job *UploadJob) loadIdentityTables(populateHistoricIdentities bool) (loadE
}

errorMap := make(map[string]error)
// var generated bool
if generated, _ := job.areIdentityTablesLoadFilesGenerated(job.ctx); !generated {
if err := job.resolveIdentities(populateHistoricIdentities); err != nil {
job.logger.Errorf(` ID Resolution operation failed: %v`, err)
Expand All @@ -1313,11 +1307,8 @@ func (job *UploadJob) loadIdentityTables(populateHistoricIdentities bool) (loadE

errorMap[tableName] = nil

job.uploadSchemaMu.RLock()
uploadSchema := job.uploadSchema.Clone()
job.uploadSchemaMu.RUnlock()

tableSchemaDiff := job.schemaHandle.TableSchemaDiff(tableName, uploadSchema)
uploadTableSchema := job.uploadSchema[tableName]
tableSchemaDiff := job.schemaHandle.TableSchemaDiff(tableName, uploadTableSchema)
if tableSchemaDiff.Exists {
err := job.UpdateTableSchema(tableName, tableSchemaDiff)
if err != nil {
Expand Down Expand Up @@ -1991,8 +1982,6 @@ func (job *UploadJob) UpdateLocalSchema(ctx context.Context, schema model.Schema
}

func (job *UploadJob) GetTableSchemaInUpload(tableName string) model.TableSchema {
job.uploadSchemaMu.RLock()
defer job.uploadSchemaMu.RUnlock()
return job.uploadSchema[tableName]
}

Expand Down

0 comments on commit 81acc08

Please sign in to comment.