Skip to content

Commit

Permalink
chore: cloud extract should always merge (#3800)
Browse files Browse the repository at this point in the history
* chore: cloud extract should always merge

* chore: review comments
  • Loading branch information
achettyiitr committed Aug 28, 2023
1 parent 6abd76e commit 7d2446b
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 2 deletions.
17 changes: 15 additions & 2 deletions warehouse/integrations/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ var partitionKeyMap = map[string]string{
discardsTable: `"ROW_ID", "COLUMN_NAME", "TABLE_NAME"`,
}

var mergeSourceCategoryMap = map[string]struct{}{
"cloud": {},
"singer-protocol": {},
}

var (
usersTable = whutils.ToProviderCase(whutils.SNOWFLAKE, whutils.UsersTable)
identifiesTable = whutils.ToProviderCase(whutils.SNOWFLAKE, whutils.IdentifiesTable)
Expand Down Expand Up @@ -372,7 +377,7 @@ func (sf *Snowflake) loadTable(ctx context.Context, tableName string, tableSchem

// Truncating the columns by default to avoid size limitation errors
// https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html#copy-options-copyoptions
if sf.config.loadTableStrategy == loadTableStrategyAppendMode {
if sf.ShouldAppend() {
err = sf.copyInto(ctx, db, schemaIdentifier, tableName, sortedColumnNames, tableName, log)
if err != nil {
return tableLoadResp{}, err
Expand Down Expand Up @@ -815,6 +820,14 @@ func (sf *Snowflake) LoadIdentityMappingsTable(ctx context.Context) error {
return nil
}

// ShouldAppend returns true if the load table strategy is append mode and the source category is not in "mergeSourceCategoryMap"
func (sf *Snowflake) ShouldAppend() bool {
sourceCategory := sf.Warehouse.Source.SourceDefinition.Category
_, isMergeCategory := mergeSourceCategoryMap[sourceCategory]

return !isMergeCategory && sf.config.loadTableStrategy == loadTableStrategyAppendMode
}

func (sf *Snowflake) LoadUserTables(ctx context.Context) map[string]error {
var (
identifiesSchema = sf.Uploader.GetTableSchemaInUpload(identifiesTable)
Expand Down Expand Up @@ -854,7 +867,7 @@ func (sf *Snowflake) LoadUserTables(ctx context.Context) map[string]error {
}

schemaIdentifier := sf.schemaIdentifier()
if sf.config.loadTableStrategy == loadTableStrategyAppendMode {
if sf.ShouldAppend() {
tmpIdentifiesStagingTable := whutils.StagingTableName(provider, identifiesTable, tableNameLimit)
sqlStatement := fmt.Sprintf(
`CREATE TEMPORARY TABLE %[1]s.%[2]q LIKE %[1]s.%[3]q;`,
Expand Down
72 changes: 72 additions & 0 deletions warehouse/integrations/snowflake/snowflake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/warehouse/integrations/snowflake"
"github.com/rudderlabs/rudder-server/warehouse/internal/model"

sfdb "github.com/snowflakedb/gosnowflake"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -491,6 +497,72 @@ func TestIntegration(t *testing.T) {
})
}

func TestSnowflake_ShouldAppend(t *testing.T) {
testCases := []struct {
name string
loadTableStrategy string
sourceCategory string
expected bool
}{
{
name: "merge with event stream",
loadTableStrategy: "MERGE",
sourceCategory: "event-stream",
expected: false,
},
{
name: "append with event-stream",
loadTableStrategy: "MERGE",
sourceCategory: "event-stream",
expected: false,
},
{
name: "merge with extract cloud source",
loadTableStrategy: "MERGE",
sourceCategory: "cloud",
expected: false,
},
{
name: "merge with extract singer protocol source",
loadTableStrategy: "MERGE",
sourceCategory: "singer-protocol",
expected: false,
},
{
name: "append with extract cloud source",
loadTableStrategy: "APPEND",
sourceCategory: "cloud",
expected: false,
},
{
name: "append with extract singer protocol source",
loadTableStrategy: "APPEND",
sourceCategory: "singer-protocol",
expected: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
c := config.New()
c.Set("Warehouse.snowflake.loadTableStrategy", tc.loadTableStrategy)

sf, err := snowflake.New(c, logger.NOP, stats.Default)
require.NoError(t, err)

sf.Warehouse = model.Warehouse{
Source: backendconfig.SourceT{
SourceDefinition: backendconfig.SourceDefinitionT{
Category: tc.sourceCategory,
},
},
}

require.Equal(t, sf.ShouldAppend(), tc.expected)
})
}
}

func getSnowflakeDB(t testing.TB, dsn string) *sql.DB {
t.Helper()
db, err := sql.Open("snowflake", dsn)
Expand Down

0 comments on commit 7d2446b

Please sign in to comment.