Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to Storage write API in BigQuery #15158

Open
ebyhr opened this issue Nov 23, 2022 · 10 comments
Open

Switch to Storage write API in BigQuery #15158

ebyhr opened this issue Nov 23, 2022 · 10 comments
Assignees

Comments

@ebyhr
Copy link
Member

ebyhr commented Nov 23, 2022

The lag issue between Storage read and write API is finally resolved according to https://issuetracker.google.com/issues/200589932. We should use Storage write API for better performance.

Nov 23, 2022 08:59AM Read API now supports Reading data committed from the Write API immediately. There were a few projects held out from this feature for technical reasons and we will be reaching out to let those project owners know they do not have the feature enabled.

@ebyhr
Copy link
Member Author

ebyhr commented Nov 23, 2022

cc: @hashhar @wendigo @vlad-lyutenko

@hashhar
Copy link
Member

hashhar commented Nov 23, 2022

I have the change already. It doesn't appear to solve #14981 though.

So I'll submit two PRs - one to change to use Storage Write API and the other to fix how projection pushdown works for empty projections.

@hashhar hashhar self-assigned this Nov 23, 2022
@wendigo
Copy link
Contributor

wendigo commented Nov 23, 2022

Finally! Great news @ebyhr

@hashhar
Copy link
Member

hashhar commented Nov 23, 2022

Also I'm documenting some things here I looked at when creating the PR:

BigQuery Storage Write API can operate in multiple modes.

Default stream

all writes are visible immediately - this means that for each PageSink#appendPage call the rows will be visible - so it's non-transactional by nature
However this matches existing implementation with insertAll API

Application created streams

Limits

Max 1k concurrent open streams
30k stream creations every 4 hours

The max open and creation limits can be handled by creating limited number of streams on coordinator and send them to workers instead of 1 per worker (streams can be shared by

1TB bytes in single commit

This implicitly caps each single write operation to max of 1TB of data written.
Other limits don't impact us as much.

Each stream provides min 1MB/s, avg 10MB/s throughput.

Modes

  • Pending
  • Committed (similar to Default Stream but with exactly once semantics)
  • Buffered (not for general use)

Here's how Pending mode would look like

  1. CreateWriteStream - on coordinator, perhaps ConnectorMetadata#beginInsert/beginCreateTable?
  2. AppendRows (PageSink#appendPage loop)
  3. FinalizeWriteStream (PageSink#finish)
  4. BatchCommitWriteStreams - completable future returned from PageSink#finish - actually finished on Coordinator in ConnectorMetadata#finishInsert/finishCreateTable?

This has the benefit that entire INSERT is atomic.

Conclusion

Using DefaultStream is same as existing behavior - however it's not atomic.
Pending mode can be used and maps closely with how inserts are modeled in Trino already. The benefits are higher throughput and atomic writes. Downside is more code.

@rajc242
Copy link

rajc242 commented Nov 24, 2022

Spark bigquery connector is also using PENDING mode. When stream starts to write a data using PENDING mode then data will store in internal buffer not visible to outside world but when stream finalize and closed then buffer will be visible and data will be available to read. You can also check how much estimated bytes stores in buffer. Eventually buffer will be merged to actual bigquery storage.

    BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
    TableId tableId = TableId.of(projectId, datasetName, tableName);
    Table table =
        bigquery.getTable(
            tableId, BigQuery.TableOption.fields(BigQuery.TableField.STREAMING_BUFFER));
    StandardTableDefinition standardTableDefinition = table.getDefinition();
    StandardTableDefinition.StreamingBuffer streamingBuffer =
        standardTableDefinition.getStreamingBuffer();
    Long estimatedBytes = streamingBuffer.getEstimatedBytes();

@ebyhr
Copy link
Member Author

ebyhr commented Apr 3, 2023

@hashhar Can you send a PR when you have time? I guess switching to storage API improves CI speed.

@regadas
Copy link
Contributor

regadas commented Jun 1, 2023

Hi! I was looking to add support for this as well. I'm glad I stumbled on this!

@hashhar it would be great if you could push what you have. Thanks 🙏

@regadas
Copy link
Contributor

regadas commented Jun 9, 2023

Poking again on this issue 😄 and hopefully get some traction.

We are starting to rely heavily on the BigQuery connector and because it relies on the Streaming API we are seeing not only poor performance but also raising costs. Streaming API is not cheap and if you do a lot of ingestion it really becomes a problem.

I'll most likely start tackling this next week. If there's something shareable on your side, I would appreciate it if you could share it 😄 We do have a lot of load, and I'm happy to put it through its paces.

@hashhar
Copy link
Member

hashhar commented Jan 9, 2024

Partially done via #18897 by using the DefaultStream.

Parts that remain are to see if we can switch to Pending mode as described at #15158 (comment)

@hashhar
Copy link
Member

hashhar commented Feb 28, 2024

A PR for PENDING mode is now up at #20852

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging a pull request may close this issue.

5 participants