Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SIG-18794: Make getAsync wait until the query completes or times out. #59

Merged
merged 2 commits into from
Mar 18, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ wss-golang-agent.config
wss-unified-agent.jar
whitesource/
*.swp
/vendor/github.com/snowflakedb/gosnowflake
126 changes: 72 additions & 54 deletions async.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,45 +66,39 @@ func (sr *snowflakeRestful) getAsync(
defer close(errChannel)
token, _, _ := sr.TokenAccessor.GetTokens()
headers[headerAuthorizationKey] = fmt.Sprintf(headerSnowflakeToken, token)
resp, err := sr.FuncGet(ctx, sr, URL, headers, timeout)
if err != nil {
logger.WithContext(ctx).Errorf("failed to get response. err: %v", err)
sfError.Message = err.Error()
errChannel <- sfError
// if we failed here because of top level context cancellation we want to cancel the original query
if err == context.Canceled || err == context.DeadlineExceeded {
// use the default top level 1 sec timeout for cancellation as throughout the driver
if err := cancelQuery(context.TODO(), sr, requestID, time.Second); err != nil {
logger.WithContext(ctx).Errorf("failed to cancel async query, err: %v", err)

// the get call pulling for result status is
var response *execResponse
var err error
for response == nil || !response.Success || parseCode(response.Code) == ErrQueryExecutionInProgress {
response, err = sr.getAsyncOrStatus(ctx, URL, headers, timeout)

if err != nil {
logger.WithContext(ctx).Errorf("failed to get response. err: %v", err)
if err == context.Canceled || err == context.DeadlineExceeded {
// use the default top level 1 sec timeout for cancellation as throughout the driver
if err := cancelQuery(context.TODO(), sr, requestID, time.Second); err != nil {
logger.WithContext(ctx).Errorf("failed to cancel async query, err: %v", err)
}
}
}
return err
}
if resp.Body != nil {
defer resp.Body.Close()
}

respd := execResponse{}
err = json.NewDecoder(resp.Body).Decode(&respd)
resp.Body.Close()
if err != nil {
logger.WithContext(ctx).Errorf("failed to decode JSON. err: %v", err)
sfError.Message = err.Error()
errChannel <- sfError
return err
sfError.Message = err.Error()
errChannel <- sfError
return err
}
}

sc := &snowflakeConn{rest: sr, cfg: cfg}
if respd.Success {
if response.Success {
if resType == execResultType {
res.insertID = -1
if isDml(respd.Data.StatementTypeID) {
res.affectedRows, err = updateRows(respd.Data)
if isDml(response.Data.StatementTypeID) {
res.affectedRows, err = updateRows(response.Data)
if err != nil {
return err
}
} else if isMultiStmt(&respd.Data) {
r, err := sc.handleMultiExec(ctx, respd.Data)
} else if isMultiStmt(&response.Data) {
r, err := sc.handleMultiExec(ctx, response.Data)
if err != nil {
res.errChannel <- err
return err
Expand All @@ -115,39 +109,63 @@ func (sr *snowflakeRestful) getAsync(
return err
}
}
res.queryID = respd.Data.QueryID
res.queryID = response.Data.QueryID
res.errChannel <- nil // mark exec status complete
} else {
rows.sc = sc
rows.queryID = respd.Data.QueryID
if isMultiStmt(&respd.Data) {
if err = sc.handleMultiQuery(ctx, respd.Data, rows); err != nil {
rows.errChannel <- err
close(errChannel)
return err
}
} else {
rows.addDownloader(populateChunkDownloader(ctx, sc, respd.Data))
}
rows.ChunkDownloader.start()
rows.queryID = response.Data.QueryID

// TODO(mihai): Find a better way to control this behavior. We usually use async to submit, but we only
// want to download chunks via a separate results call (using a different context).
// Launching the chunk downloader here seems like wasted work.
//if isMultiStmt(&response.Data) {
// if err = sc.handleMultiQuery(ctx, response.Data, rows); err != nil {
// rows.errChannel <- err
// close(errChannel)
// return err
// }
//} else if !isAsyncMode(ctx) {
// rows.addDownloader(populateChunkDownloader(ctx, sc, response.Data))
//}
//rows.ChunkDownloader.start()
rows.errChannel <- nil // mark query status complete
}
} else {
var code int
if respd.Code != "" {
code, err = strconv.Atoi(respd.Code)
if err != nil {
code = -1
}
} else {
code = -1
}
errChannel <- &SnowflakeError{
Number: code,
SQLState: respd.Data.SQLState,
Message: respd.Message,
QueryID: respd.Data.QueryID,
Number: parseCode(response.Code),
SQLState: response.Data.SQLState,
Message: response.Message,
QueryID: response.Data.QueryID,
}
}
return nil
}

func parseCode(codeStr string) int {
if code, err := strconv.Atoi(codeStr); err == nil {
return code
}

return -1
}

func (sr *snowflakeRestful) getAsyncOrStatus(
ctx context.Context,
url *url.URL,
headers map[string]string,
timeout time.Duration) (*execResponse, error) {
resp, err := sr.FuncGet(ctx, sr, url, headers, timeout)
if err != nil {
return nil, err
}
if resp.Body != nil {
defer func() { _ = resp.Body.Close() }()
}

response := &execResponse{}
if err = json.NewDecoder(resp.Body).Decode(&response); err != nil {
return nil, err
}

return response, nil
}
5 changes: 5 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ const (
ErrRoleNotExist = 390189
// ErrObjectNotExistOrAuthorized is a GS error code for the case that the server-side object specified does not exist
ErrObjectNotExistOrAuthorized = 390201

/* Extra error code */

// ErrQueryExecutionInProgress is returned when monitoring an async query reaches 45s
ErrQueryExecutionInProgress = 333333
)

const (
Expand Down