Skip to content

Commit

Permalink
chore: introduce source category for deduplication (#3730)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Aug 10, 2023
1 parent 35f55e0 commit 00f994b
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 37 deletions.
26 changes: 14 additions & 12 deletions warehouse/integrations/deltalake/deltalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,12 +757,13 @@ func (d *Deltalake) loadTable(ctx context.Context, tableName string, tableSchema
}

d.stats.NewTaggedStat("dedup_rows", stats.CountType, stats.Tags{
"sourceID": d.Warehouse.Source.ID,
"sourceType": d.Warehouse.Source.SourceDefinition.Name,
"destID": d.Warehouse.Destination.ID,
"destType": d.Warehouse.Destination.DestinationDefinition.Name,
"workspaceId": d.Warehouse.WorkspaceID,
"tableName": tableName,
"sourceID": d.Warehouse.Source.ID,
"sourceType": d.Warehouse.Source.SourceDefinition.Name,
"sourceCategory": d.Warehouse.Source.SourceDefinition.Category,
"destID": d.Warehouse.Destination.ID,
"destType": d.Warehouse.Destination.DestinationDefinition.Name,
"workspaceId": d.Warehouse.WorkspaceID,
"tableName": tableName,
}).Count(int(updated))

d.logger.Infow("completed loading",
Expand Down Expand Up @@ -1130,12 +1131,13 @@ func (d *Deltalake) LoadUserTables(ctx context.Context) map[string]error {
}

d.stats.NewTaggedStat("dedup_rows", stats.CountType, stats.Tags{
"sourceID": d.Warehouse.Source.ID,
"sourceType": d.Warehouse.Source.SourceDefinition.Name,
"destID": d.Warehouse.Destination.ID,
"destType": d.Warehouse.Destination.DestinationDefinition.Name,
"workspaceId": d.Warehouse.WorkspaceID,
"tableName": warehouseutils.UsersTable,
"sourceID": d.Warehouse.Source.ID,
"sourceType": d.Warehouse.Source.SourceDefinition.Name,
"sourceCategory": d.Warehouse.Source.SourceDefinition.Category,
"destID": d.Warehouse.Destination.ID,
"destType": d.Warehouse.Destination.DestinationDefinition.Name,
"workspaceId": d.Warehouse.WorkspaceID,
"tableName": warehouseutils.UsersTable,
}).Count(int(updated))

d.logger.Infow("completed loading for users and identifies tables",
Expand Down
15 changes: 8 additions & 7 deletions warehouse/integrations/postgres/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,14 @@ func (pg *Postgres) loadTable(
}

pg.stats.NewTaggedStat("dedup_rows", stats.CountType, stats.Tags{
"sourceID": pg.Warehouse.Source.ID,
"sourceType": pg.Warehouse.Source.SourceDefinition.Name,
"destID": pg.Warehouse.Destination.ID,
"destType": pg.Warehouse.Destination.DestinationDefinition.Name,
"workspaceId": pg.Warehouse.WorkspaceID,
"tableName": tableName,
"rowsAffected": fmt.Sprintf("%d", rowsAffected),
"sourceID": pg.Warehouse.Source.ID,
"sourceType": pg.Warehouse.Source.SourceDefinition.Name,
"sourceCategory": pg.Warehouse.Source.SourceDefinition.Category,
"destID": pg.Warehouse.Destination.ID,
"destType": pg.Warehouse.Destination.DestinationDefinition.Name,
"workspaceId": pg.Warehouse.WorkspaceID,
"tableName": tableName,
"rowsAffected": fmt.Sprintf("%d", rowsAffected),
})
}

Expand Down
13 changes: 7 additions & 6 deletions warehouse/integrations/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,12 +679,13 @@ func (rs *Redshift) loadTable(ctx context.Context, tableName string, tableSchema
}

rs.stats.NewTaggedStat("dedup_rows", stats.CountType, stats.Tags{
"sourceID": rs.Warehouse.Source.ID,
"sourceType": rs.Warehouse.Source.SourceDefinition.Name,
"destID": rs.Warehouse.Destination.ID,
"destType": rs.Warehouse.Destination.DestinationDefinition.Name,
"workspaceId": rs.Warehouse.WorkspaceID,
"tableName": tableName,
"sourceID": rs.Warehouse.Source.ID,
"sourceType": rs.Warehouse.Source.SourceDefinition.Name,
"sourceCategory": rs.Warehouse.Source.SourceDefinition.Category,
"destID": rs.Warehouse.Destination.ID,
"destType": rs.Warehouse.Destination.DestinationDefinition.Name,
"workspaceId": rs.Warehouse.WorkspaceID,
"tableName": tableName,
}).Count(int(rowsAffected))
}

Expand Down
26 changes: 14 additions & 12 deletions warehouse/integrations/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,12 +567,13 @@ func (sf *Snowflake) loadTable(ctx context.Context, tableName string, tableSchem
}

sf.stats.NewTaggedStat("dedup_rows", stats.CountType, stats.Tags{
"sourceID": sf.Warehouse.Source.ID,
"sourceType": sf.Warehouse.Source.SourceDefinition.Name,
"destID": sf.Warehouse.Destination.ID,
"destType": sf.Warehouse.Destination.DestinationDefinition.Name,
"workspaceId": sf.Warehouse.WorkspaceID,
"tableName": tableName,
"sourceID": sf.Warehouse.Source.ID,
"sourceType": sf.Warehouse.Source.SourceDefinition.Name,
"sourceCategory": sf.Warehouse.Source.SourceDefinition.Category,
"destID": sf.Warehouse.Destination.ID,
"destType": sf.Warehouse.Destination.DestinationDefinition.Name,
"workspaceId": sf.Warehouse.WorkspaceID,
"tableName": tableName,
}).Count(int(updated))

sf.logger.Infow("completed loading",
Expand Down Expand Up @@ -929,12 +930,13 @@ func (sf *Snowflake) loadUserTables(ctx context.Context) map[string]error {
}

sf.stats.NewTaggedStat("dedup_rows", stats.CountType, stats.Tags{
"sourceID": sf.Warehouse.Source.ID,
"sourceType": sf.Warehouse.Source.SourceDefinition.Name,
"destID": sf.Warehouse.Destination.ID,
"destType": sf.Warehouse.Destination.DestinationDefinition.Name,
"workspaceId": sf.Warehouse.WorkspaceID,
"tableName": warehouseutils.UsersTable,
"sourceID": sf.Warehouse.Source.ID,
"sourceType": sf.Warehouse.Source.SourceDefinition.Name,
"sourceCategory": sf.Warehouse.Source.SourceDefinition.Category,
"destID": sf.Warehouse.Destination.ID,
"destType": sf.Warehouse.Destination.DestinationDefinition.Name,
"workspaceId": sf.Warehouse.WorkspaceID,
"tableName": warehouseutils.UsersTable,
}).Count(int(updated))

sf.logger.Infow("completed loading for users and identifies tables",
Expand Down

0 comments on commit 00f994b

Please sign in to comment.