Skip to content

Commit

Permalink
[Fix] SIG-17456: Trap chunk-downloader errors with Sentry at Multiple…
Browse files Browse the repository at this point in the history
…x. (#44)

* nit: Move "send error on channel" within the `recover` condition.

* Improvement: don't panic _indiscriminately_, have a separate error-type that signals the internal panics we want to re-propagate.

Co-authored-by: Agam Brahma <agam@sigmacomputing.com>
  • Loading branch information
2 people authored and madisonchamberlain committed Apr 27, 2023
1 parent 283b2be commit 99390d7
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 1 deletion.
35 changes: 34 additions & 1 deletion chunk_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math/rand"
"net"
"net/http"
"net/url"
"runtime"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -68,6 +70,15 @@ type snowflakeChunkDownloader struct {
FuncGet func(context.Context, *snowflakeConn, string, map[string]string, time.Duration) (*http.Response, error)
}

type wrappedPanic struct {
stackTrace string
err error
}

func (w *wrappedPanic) Error() string {
return fmt.Sprintf("Panic within GoSnowflake: %v\nStack-trace:\n %s", w.err, w.stackTrace)
}

func (scd *snowflakeChunkDownloader) totalUncompressedSize() (acc int64) {
for _, c := range scd.ChunkMetas {
acc += c.UncompressedSize
Expand Down Expand Up @@ -335,6 +346,28 @@ func downloadChunk(ctx context.Context, scd *snowflakeChunkDownloader, idx int)
logger.Infof("download start chunk: %v", idx+1)
defer scd.DoneDownloadCond.Broadcast()

defer func() {
var ret *wrappedPanic
if err := recover(); err != nil {
logger.Infof("GoSnowflake chunk-downloader trapping error: %v", err)
stackTrace := make([]byte, 2048)
runtime.Stack(stackTrace, false)
// TODO(agam): properly format the stack-trace-payload
switch errResolved := err.(type) {
case string:
ret = &wrappedPanic{
err: errors.New(errResolved),
stackTrace: string(stackTrace),
}
case error:
ret = &wrappedPanic{err: errResolved, stackTrace: string(stackTrace)}
default:
ret = &wrappedPanic{err: errors.New("Panic within GoSnowflake"), stackTrace: string(stackTrace)}
}
// Pass this through our error-channel
scd.ChunksError <- &chunkError{Index: idx, Error: ret}
}
}()
if err := scd.FuncDownloadHelper(ctx, scd, idx); err != nil {
logger.Errorf(
"failed to extract HTTP response body. URL: %v, err: %v", scd.ChunkMetas[idx].URL, err)
Expand Down Expand Up @@ -510,7 +543,7 @@ func (scd *streamChunkDownloader) start() error {
if readErr == io.EOF {
logger.WithContext(scd.ctx).Infof("downloading done. downloader id: %v", scd.id)
} else {
logger.WithContext(scd.ctx).Debugf("downloading error. downloader id: %v", scd.id)
logger.WithContext(scd.ctx).Infof("downloading error. downloader id: %v, err: %v", scd.id, readErr)
}
scd.readErr = readErr
close(scd.rowStream)
Expand Down
5 changes: 5 additions & 0 deletions rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ func (rows *snowflakeRows) Next(dest []driver.Value) (err error) {
// includes io.EOF
if err == io.EOF {
rows.ChunkDownloader.reset()
} else {
// SIG-17456: we want to bubble up errors within GoSnowflake so they can be caught by Multiplex.
if innerPanic, ok := err.(*wrappedPanic); ok {
panic(innerPanic)
}
}
return err
}
Expand Down

0 comments on commit 99390d7

Please sign in to comment.