From 81acc0852ed89d0f2af469db978ca8be8d0e92f7 Mon Sep 17 00:00:00 2001 From: Francesco Casula Date: Thu, 24 Aug 2023 12:07:27 +0200 Subject: [PATCH] chore: removing uploadSchema mutex --- warehouse/schema/schema.go | 12 +++++----- warehouse/schema/schema_test.go | 39 +++++++++++++-------------------- warehouse/upload.go | 25 ++++++--------------- 3 files changed, 28 insertions(+), 48 deletions(-) diff --git a/warehouse/schema/schema.go b/warehouse/schema/schema.go index 0e8a6ba2ed1..d85638e7b14 100644 --- a/warehouse/schema/schema.go +++ b/warehouse/schema/schema.go @@ -84,8 +84,8 @@ func (sh *Schema) ConsolidateLocalSchemaWithStagingFiles( return consolidatedSchema, nil } -// TableSchemaDiff returns the diff between the warehouse schema and the upload schema -func (sh *Schema) TableSchemaDiff(tableName string, schema model.Schema) whutils.TableSchemaDiff { +// TableSchemaDiff returns the diff between the warehouse schema and the upload table schema +func (sh *Schema) TableSchemaDiff(tableName string, tableSchema model.TableSchema) whutils.TableSchemaDiff { diff := whutils.TableSchemaDiff{ ColumnMap: make(model.TableSchema), UpdatedSchema: make(model.TableSchema), @@ -97,13 +97,13 @@ func (sh *Schema) TableSchemaDiff(tableName string, schema model.Schema) whutils currentTableSchema, ok := sh.schemaInWarehouse[tableName] if !ok { - if _, ok := schema[tableName]; !ok { + if len(tableSchema) == 0 { return diff } diff.Exists = true diff.TableToBeCreated = true - diff.ColumnMap = schema[tableName] - diff.UpdatedSchema = schema[tableName] + diff.ColumnMap = tableSchema + diff.UpdatedSchema = tableSchema return diff } @@ -112,7 +112,7 @@ func (sh *Schema) TableSchemaDiff(tableName string, schema model.Schema) whutils } diff.ColumnMap = make(model.TableSchema) - for columnName, columnType := range schema[tableName] { + for columnName, columnType := range tableSchema { if _, ok := currentTableSchema[columnName]; !ok { diff.ColumnMap[columnName] = columnType diff.UpdatedSchema[columnName] = columnType diff --git a/warehouse/schema/schema_test.go b/warehouse/schema/schema_test.go index 077f0ae27f8..62232e77ec3 100644 --- a/warehouse/schema/schema_test.go +++ b/warehouse/schema/schema_test.go @@ -324,17 +324,16 @@ func TestSchema_FetchSchemaFromWarehouse(t *testing.T) { func TestSchema_GetUploadSchemaDiff(t *testing.T) { testCases := []struct { - name string - tableName string - currentSchema model.Schema - uploadSchema model.Schema - expected whutils.TableSchemaDiff + name string + tableName string + currentSchema model.Schema + uploadTableSchema model.TableSchema + expected whutils.TableSchemaDiff }{ { - name: "empty current and upload schema", + name: "empty current and upload table schema", tableName: "test-table", currentSchema: model.Schema{}, - uploadSchema: model.Schema{}, expected: whutils.TableSchemaDiff{ ColumnMap: model.TableSchema{}, UpdatedSchema: model.TableSchema{}, @@ -345,10 +344,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) { name: "empty current schema", tableName: "test-table", currentSchema: model.Schema{}, - uploadSchema: model.Schema{ - "test-table": model.TableSchema{ - "test-column": "test-value", - }, + uploadTableSchema: model.TableSchema{ + "test-column": "test-value", }, expected: whutils.TableSchemaDiff{ Exists: true, @@ -370,10 +367,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) { "test-column": "test-value-1", }, }, - uploadSchema: model.Schema{ - "test-table": model.TableSchema{ - "test-column": "test-value-2", - }, + uploadTableSchema: model.TableSchema{ + "test-column": "test-value-2", }, expected: whutils.TableSchemaDiff{ Exists: false, @@ -394,10 +389,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) { "test-column-2": "test-value-2", }, }, - uploadSchema: model.Schema{ - "test-table": model.TableSchema{ - "test-column": "test-value-2", - }, + uploadTableSchema: model.TableSchema{ + "test-column": "test-value-2", }, expected: whutils.TableSchemaDiff{ Exists: true, @@ -422,10 +415,8 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) { "test-column-2": "test-value-2", }, }, - uploadSchema: model.Schema{ - "test-table": model.TableSchema{ - "test-column": "text", - }, + uploadTableSchema: model.TableSchema{ + "test-column": "text", }, expected: whutils.TableSchemaDiff{ Exists: true, @@ -450,7 +441,7 @@ func TestSchema_GetUploadSchemaDiff(t *testing.T) { sch := Schema{ schemaInWarehouse: tc.currentSchema, } - diff := sch.TableSchemaDiff(tc.tableName, tc.uploadSchema) + diff := sch.TableSchemaDiff(tc.tableName, tc.uploadTableSchema) require.EqualValues(t, diff, tc.expected) }) } diff --git a/warehouse/upload.go b/warehouse/upload.go index 30d95f59735..e54da2d6381 100644 --- a/warehouse/upload.go +++ b/warehouse/upload.go @@ -93,7 +93,6 @@ type UploadJob struct { pgNotifier *pgnotifier.PGNotifier schemaHandle *schema.Schema uploadSchema model.Schema - uploadSchemaMu sync.RWMutex conf *config.Config logger logger.Logger statsFactory stats.Stats @@ -314,9 +313,7 @@ func (job *UploadJob) generateUploadSchema() error { return fmt.Errorf("set upload schema: %w", err) } - job.uploadSchemaMu.Lock() job.uploadSchema = uploadSchema - job.uploadSchemaMu.Unlock() return nil } @@ -412,6 +409,8 @@ func (job *UploadJob) run() (err error) { return err } + // TODO can't we use a Manager factory instead and get a new instance of the "Manager" every time + // instead of using the same one? whManager := job.whManager whManager.SetConnectionTimeout(warehouseutils.GetConnectionTimeout( job.warehouse.Type, job.warehouse.Destination.ID, @@ -423,6 +422,7 @@ func (job *UploadJob) run() (err error) { } defer whManager.Cleanup(job.ctx) + // TODO what does Recover really do? if err = job.recovery.Recover(job.ctx, whManager, job.warehouse); err != nil { _, _ = job.setUploadError(err, InternalProcessingFailed) return err @@ -445,9 +445,7 @@ func (job *UploadJob) run() (err error) { ) } - job.uploadSchemaMu.Lock() job.uploadSchema = whSchema - job.uploadSchemaMu.Unlock() userTables := []string{job.identifiesTableName(), job.usersTableName()} identityTables := []string{job.identityMergeRulesTableName(), job.identityMappingsTableName()} @@ -1016,11 +1014,8 @@ func (job *UploadJob) loadAllTablesExcept(skipLoadForTables []string, loadFilesT } func (job *UploadJob) updateSchema(tName string) (alteredSchema bool, err error) { - job.uploadSchemaMu.RLock() - uploadSchema := job.uploadSchema.Clone() - job.uploadSchemaMu.RUnlock() - - tableSchemaDiff := job.schemaHandle.TableSchemaDiff(tName, uploadSchema) + uploadTableSchema := job.uploadSchema[tName] + tableSchemaDiff := job.schemaHandle.TableSchemaDiff(tName, uploadTableSchema) if tableSchemaDiff.Exists { err = job.UpdateTableSchema(tName, tableSchemaDiff) if err != nil { @@ -1296,7 +1291,6 @@ func (job *UploadJob) loadIdentityTables(populateHistoricIdentities bool) (loadE } errorMap := make(map[string]error) - // var generated bool if generated, _ := job.areIdentityTablesLoadFilesGenerated(job.ctx); !generated { if err := job.resolveIdentities(populateHistoricIdentities); err != nil { job.logger.Errorf(` ID Resolution operation failed: %v`, err) @@ -1313,11 +1307,8 @@ func (job *UploadJob) loadIdentityTables(populateHistoricIdentities bool) (loadE errorMap[tableName] = nil - job.uploadSchemaMu.RLock() - uploadSchema := job.uploadSchema.Clone() - job.uploadSchemaMu.RUnlock() - - tableSchemaDiff := job.schemaHandle.TableSchemaDiff(tableName, uploadSchema) + uploadTableSchema := job.uploadSchema[tableName] + tableSchemaDiff := job.schemaHandle.TableSchemaDiff(tableName, uploadTableSchema) if tableSchemaDiff.Exists { err := job.UpdateTableSchema(tableName, tableSchemaDiff) if err != nil { @@ -1991,8 +1982,6 @@ func (job *UploadJob) UpdateLocalSchema(ctx context.Context, schema model.Schema } func (job *UploadJob) GetTableSchemaInUpload(tableName string) model.TableSchema { - job.uploadSchemaMu.RLock() - defer job.uploadSchemaMu.RUnlock() return job.uploadSchema[tableName] }