Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel fetch of batches in Snowflake connector #4070

Merged
merged 3 commits into from
Feb 19, 2024

Conversation

esevastyanov
Copy link
Contributor

@esevastyanov esevastyanov commented Feb 16, 2024

Closes #4069

rill start --verbose --env connector.snowflake.parallel_fetch_limit=20 ./projects/snowflake

if err != nil {
return nil, err
}
fetchLimitPtr, exists := dsnConfig.Params["parallelFetchLimit"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a standard Snowflake property? If not, I think it would be better to set it as a separate connector property – i.e. configured using --var connector.snowflake.parallel_fetch_limit=N.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it isn't a Snowflake property
Moved to env var

-env connector.snowflake.parallel_fetch_limit=20


// Fetch batches async as it takes most of the time
var wg sync.WaitGroup
fetchResultChan := make(chan fetchResult, len(f.batches))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the max number of batches? Wondering if this might take a lot of memory. If the writing phase is fast, I think we should avoid a buffered channel here. See: https://github.com/uber-go/guide/blob/master/style.md#channel-size-is-one-or-none

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no way to control the number of batches and number of records per batch in Snowflake. There might be 1K batches for 100M rows

Comment on lines 215 to 217
for _, batch := range f.batches {
records, err := batch.Fetch()
if err != nil {
return nil, err
wg.Add(1)
go func(b *sf.ArrowBatch) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question – how many batches might there be? Starting 10s-100s of Goroutines is okay, but probably worth optimizing if looking at thousands.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it might be simpler to use an errgroup with WithContext and SetLimit. And then doing the writing from a separate goroutine. It would avoid the semaphore, avoid potentially 100s of concurrent goroutines, and make error propagation / cancellation easier. Something like this:

grp, ctx := errgroup.WithContext(ctx)
grp.SetLimit(f.parallelFetchLimit)

go func() {
  for {
    select {
    case res, ok <-resultChan:
      // Write...
    case <-ctx.Done():
      // Either finished or errored
      return
    }
  }
}()

for _, batch := range f.batches {
  grp.Go(...)
}

err := wg.Wait()
// done

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converted using errgroup. The writing also requires an error propagation so used errgroup the the writing too.

Comment on lines 219 to 224
defer sem.Release(1)
err := sem.Acquire(f.ctx, 1)
if err != nil {
fetchResultChan <- fetchResult{Records: nil, Batch: nil, Err: err}
return
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think releases should be after successful acquire (else might panic)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the use of a semaphore

Comment on lines 242 to 245
case <-ctx.Done():
if ctx.Err() != nil {
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The if check seems redundant since ctx.Done will never be sent without ctx.Err being non-nil. Also, I think we should return ctx.Err() since if the iterator's ctx is cancelled, it's expected that the ctx error is returned (this doesn't matter in case another goroutine returns an error, since errgroup will only return the first error to Wait()).

}

err = fetchGrp.Wait()
ctx.Done()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This statement doesn't do anything

@begelundmuller begelundmuller added the blocker A release blocker issue that should be resolved before a new release label Feb 19, 2024
@begelundmuller begelundmuller merged commit b9ccaf8 into main Feb 19, 2024
4 checks passed
@begelundmuller begelundmuller deleted the 4069-parallel-fetch branch February 19, 2024 13:49
mindspank pushed a commit that referenced this pull request Feb 23, 2024
* Parallel fetch of batches in Snowflake connector

* Errgroups for fetching and writing

* Fixed context cancellation case
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
blocker A release blocker issue that should be resolved before a new release
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Introduce a parallel fetch of batches into Snowflake connector
3 participants