Skip to content

Commit

Permalink
fix: sample duplicate messagesids for snowflake (#3884)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Sep 20, 2023
1 parent 1fa2f45 commit b06dc36
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 19 deletions.
43 changes: 24 additions & 19 deletions warehouse/integrations/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,23 @@ func (sf *Snowflake) loadTable(ctx context.Context, tableName string, tableSchem
return tableLoadResp{}, err
}

duplicates, err := sf.sampleDuplicateMessages(
ctx,
db,
tableName,
stagingTableName,
)
if err != nil {
log.Warnw("failed to sample duplicate rows", lf.Error, err.Error())
} else if len(duplicates) > 0 {
uploadID, _ := whutils.UploadIDFromCtx(ctx)

formattedDuplicateMessages := lo.Map(duplicates, func(item duplicateMessage, index int) string {
return item.String()
})
log.Infow("sample duplicate rows", lf.UploadJobID, uploadID, lf.SampleDuplicateMessages, formattedDuplicateMessages)
}

var (
primaryKey = "ID"
partitionKey = `"ID"`
Expand Down Expand Up @@ -472,24 +489,6 @@ func (sf *Snowflake) loadTable(ctx context.Context, tableName string, tableSchem
"tableName": tableName,
}).Count(int(updated))

if updated > 0 {
duplicates, err := sf.sampleDuplicateMessages(ctx, db, tableName, stagingTableName)
if err != nil {
log.Warnw("failed to sample duplicate rows", lf.Error, err.Error())
} else if len(duplicates) > 0 {
uploadID, _ := whutils.UploadIDFromCtx(ctx)

sort.Slice(duplicates, func(i, j int) bool {
return duplicates[i].receivedAt.Before(duplicates[j].receivedAt)
})

formattedDuplicateMessages := lo.Map(duplicates, func(item duplicateMessage, index int) string {
return item.String()
})
log.Infow("sample duplicate rows", lf.UploadJobID, uploadID, lf.SampleDuplicateMessages, formattedDuplicateMessages)
}
}

log.Infow("completed loading")

return tableLoadResp{
Expand All @@ -510,7 +509,12 @@ func (sf *Snowflake) joinColumnsWithFormatting(columns []string, format string)
}, ",")
}

func (sf *Snowflake) sampleDuplicateMessages(ctx context.Context, db *sqlmw.DB, mainTableName, stagingTableName string) ([]duplicateMessage, error) {
func (sf *Snowflake) sampleDuplicateMessages(
ctx context.Context,
db *sqlmw.DB,
mainTableName,
stagingTableName string,
) ([]duplicateMessage, error) {
if !lo.Contains(sf.config.debugDuplicateWorkspaceIDs, sf.Warehouse.WorkspaceID) {
return nil, nil
}
Expand All @@ -536,6 +540,7 @@ func (sf *Snowflake) sampleDuplicateMessages(ctx context.Context, db *sqlmw.DB,
FROM
`+stagingTable+`
)
ORDER BY RECEIVED_AT ASC
LIMIT
?;
`,
Expand Down
7 changes: 7 additions & 0 deletions warehouse/integrations/snowflake/snowflake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,13 @@ func TestIntegration(t *testing.T) {
t.Setenv("RSERVER_WAREHOUSE_WEB_PORT", strconv.Itoa(httpPort))
t.Setenv("RSERVER_BACKEND_CONFIG_CONFIG_JSONPATH", workspaceConfigPath)
t.Setenv("RSERVER_WAREHOUSE_SNOWFLAKE_SLOW_QUERY_THRESHOLD", "0s")
t.Setenv("RSERVER_WAREHOUSE_SNOWFLAKE_DEBUG_DUPLICATE_WORKSPACE_IDS", workspaceID)
t.Setenv("RSERVER_WAREHOUSE_SNOWFLAKE_DEBUG_DUPLICATE_TABLES", strings.Join(
[]string{
"identifies", "users", "tracks", "product_track", "pages", "screens", "aliases", "groups",
},
" ",
))

ctx, cancel := context.WithCancel(context.Background())

Expand Down

0 comments on commit b06dc36

Please sign in to comment.