Skip to content

Commit

Permalink
Handle Snowflake error -00001: (closes #191)
Browse files Browse the repository at this point in the history
  • Loading branch information
colmsnowplow committed Jan 25, 2022
1 parent 6f3439f commit af05ca4
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 5 deletions.
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -12,6 +12,7 @@ require (
github.com/olekukonko/tablewriter v0.0.5
github.com/onsi/ginkgo v1.16.1 // indirect
github.com/onsi/gomega v1.11.0 // indirect
github.com/pkg/errors v0.9.1
github.com/snowflakedb/gosnowflake v1.6.5
github.com/stretchr/testify v1.7.0
golang.org/x/net v0.0.0-20211118161319-6a13c67c3ce4
Expand Down
65 changes: 60 additions & 5 deletions sql_runner/snowflake_target.go
Expand Up @@ -15,14 +15,14 @@ package main
import (
"context"
"database/sql"
"errors"
"fmt"
"log"
"os"
"strings"
"time"

"github.com/olekukonko/tablewriter"
"github.com/pkg/errors"
sf "github.com/snowflakedb/gosnowflake"
)

Expand All @@ -35,6 +35,7 @@ const (
type SnowFlakeTarget struct {
Target
Client *sql.DB
Dsn string
}

func (sft SnowFlakeTarget) IsConnectable() bool {
Expand Down Expand Up @@ -72,7 +73,7 @@ func NewSnowflakeTarget(target Target) *SnowFlakeTarget {
log.Fatal(err)
}

return &SnowFlakeTarget{target, db}
return &SnowFlakeTarget{target, db, configStr}
}

func (sft SnowFlakeTarget) GetTarget() Target {
Expand All @@ -94,8 +95,23 @@ func (sft SnowFlakeTarget) RunQuery(query ReadyQuery, dryRun bool, showQueryOutp
return QueryStatus{query, query.Path, 0, nil}
}

// Enable grabbing the queryID
queryIdChannel := make(chan string)
ctxWithQueryIdChan := sf.WithQueryIDChan(context.Background(), queryIdChannel)

// Kick off a goroutine to grab the queryId when we get it from the driver (there should be one queryID per script)
var queryId string
qId := &queryId
go func() {
*qId = <-queryIdChannel
}()

// 0 allows arbitrary number of statements
ctx, _ := sf.WithMultiStatement(context.Background(), 0)
ctx, err := sf.WithMultiStatement(ctxWithQueryIdChan, 0)
if err != nil {
log.Printf("ERROR: Could not initialise query script.")
return QueryStatus{query, query.Path, 0, err}
}
script := query.Script

if len(strings.TrimSpace(script)) > 0 {
Expand All @@ -122,9 +138,16 @@ func (sft SnowFlakeTarget) RunQuery(query ReadyQuery, dryRun bool, showQueryOutp
} else {
res, err := sft.Client.ExecContext(ctx, script)
if err != nil {
return QueryStatus{query, query.Path, int(affected), err}
switch err.Error() {
// If the error message is `-00001: `, the DB failed to return accurate status. Request the status and proceed accordingly.
case "-00001: ":
fmt.Println("INFO: Encountered -1 status. Polling for query result with queryId: ", queryId)
pollResult := pollForQueryStatus(sft, queryId)
return QueryStatus{query, query.Path, int(affected), pollResult}
default:
return QueryStatus{query, query.Path, int(affected), errors.Wrap(err, fmt.Sprintf("QueryID: %s", queryId))}
}
}

aff, _ := res.RowsAffected()
affected += aff
}
Expand Down Expand Up @@ -188,3 +211,35 @@ func stringify(row [][]byte) []string {
}
return line
}

// Blocking function to poll for the true status of a query which didn't return a result.
func pollForQueryStatus(sft SnowFlakeTarget, queryId string) error {
// Get the snoflake driver and open a connection
sfd := sft.Client.Driver()
conn, err := sfd.Open(sft.Dsn)
if err != nil {
return errors.Wrap(err, "Failed to open connection to poll for query result.")
}
// Poll Snowflake for actual query status
for {
qStatus, err := conn.(sf.SnowflakeConnection).GetQueryStatus(context.Background(), queryId)

switch {
case err != nil && strings.Contains(err.Error(), "279301:"): // The driver returns an error containing this code when the query is still running.
break
case err != nil:
// Any other error is genuine, return the error.
return err
case qStatus != nil && qStatus.ErrorCode == 0:
// A non-nil qStatus means the query completed. If the ErrorCode field is 0, we have no error.
return nil
case qStatus != nil:
// If qStatus is non-nil but has a non-zero error code, return the relevant info as an error.
return errors.New(qStatus.ErrorMessage)
default:
break
}
// Give it a minute before polling again.
time.Sleep(60 * time.Second)
}
}

0 comments on commit af05ca4

Please sign in to comment.