Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor and optimize distributed COPY #5417

Merged
merged 1 commit into from
Apr 4, 2023

Conversation

erimatnor
Copy link
Contributor

@erimatnor erimatnor commented Mar 9, 2023

Refactor the code path that handles remote distributed COPY. The
main changes are:

  • Use a hash table to lookup data node connections instead of a list.
  • Reduce the number of data copies by accumulating bigger CopyData
    messages directly in the libpq connection buffer instead of doing it
    in a separate buffer which later requires copying the data. To
    achieve this, some internal libpq functions are imported that gives
    more fine-grain control over buffers.
  • Add an foreign data wrapper option to set the number of rows to
    send in a CopyData message.

@erimatnor erimatnor added multinode tech-debt Needs refactoring and improvement tasks related to the source code and its architecture. labels Mar 9, 2023
@erimatnor erimatnor self-assigned this Mar 9, 2023
@codecov
Copy link

codecov bot commented Mar 9, 2023

Codecov Report

Merging #5417 (a669236) into main (c6b9f50) will decrease coverage by 0.05%.
The diff coverage is 85.30%.

❗ Current head a669236 differs from pull request most recent head 4ee69d6. Consider uploading reports for the commit 4ee69d6 to get more accurate results

@@            Coverage Diff             @@
##             main    #5417      +/-   ##
==========================================
- Coverage   90.76%   90.71%   -0.05%     
==========================================
  Files         229      230       +1     
  Lines       53705    53761      +56     
==========================================
+ Hits        48747    48771      +24     
- Misses       4958     4990      +32     
Impacted Files Coverage Δ
tsl/src/remote/connection.c 88.66% <0.00%> (-0.21%) ⬇️
tsl/src/fdw/option.c 58.18% <20.00%> (-1.82%) ⬇️
tsl/src/remote/libpq-ts.c 71.83% <71.83%> (ø)
tsl/src/remote/dist_copy.c 88.01% <92.26%> (-1.47%) ⬇️
tsl/src/nodes/data_node_copy.c 95.23% <100.00%> (+0.53%) ⬆️

... and 8 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@akuzm
Copy link
Member

akuzm commented Mar 9, 2023

there seems little to gain from accumulating a bigger data buffer before calling PQputCopyData()

The reason to batch rows before PQputCopyData is that it is a pretty complex function that adds several kinds of overhead:

  1. Overhead in the size of data that is sent to the remote server. Each buffer is sent as a separate message, which has overhead (header/length/trailer etc).
  2. CPU overhead on data node to parse these extra messages.
  3. CPU overhead on access node. This function does a lot of things.
  4. Possibly network overhead because it tries to flush the pending output buffers.

Removing the batching increases all these kinds of overhead 1000 times (by current batch size).

Point 3 is probably the most critical, because COPY throughput is limited by CPU usage on the access node (Igor can comment on that).

@erimatnor
Copy link
Contributor Author

erimatnor commented Mar 13, 2023

there seems little to gain from accumulating a bigger data buffer before calling PQputCopyData()

The reason to batch rows before PQputCopyData is that it is a pretty complex function that adds several kinds of overhead:

1. Overhead in the size of data that is sent to the remote server. Each buffer is sent as a separate message, which has overhead (header/length/trailer etc).

2. CPU overhead on data node to parse these extra messages.

3. CPU overhead on access node. This function does a lot of things.

4. Possibly network overhead because it tries to flush the pending output buffers.

Removing the batching increases all these kinds of overhead 1000 times (by current batch size).

Point 3 is probably the most critical, because COPY throughput is limited by CPU usage on the access node (Igor can comment on that).

CPU overhead is actually what I am most concerned about. I am testing different things in this PR (which is why it is still in draft), and removing buffering actually is faster on my local machine. Presumably this is because the buffering done in the current code is actually very CPU intensive given how much string wrangling happens: for each data node, we need to first construct the row index arrays, then loop through them to create a data node unique buffer/data string. That's potentially a lot of CPU to do that and it gets much worse with higher replication factors. Perhaps the network overhead is worth it if CPU/latency is the bottleneck and not network bandwidth.

Now, my local (single machine) setup is not realistic so we should basically benchmark this code on a real cluster and see what effect it has, then change accordingly. I might be wrong, and, in that case, adding back some buffering to this PR is not difficult.

@erimatnor erimatnor force-pushed the dist-copy-refactor branch 3 times, most recently from 4ccb174 to 1b30a9b Compare March 17, 2023 09:32
@erimatnor
Copy link
Contributor Author

@akuzm I added the ability to accumulate bigger CopyData messages directly in the libpq send buffer instead of maintaining our own buffers. The number of rows to accumulate before sending a CopyData message can be set using a foreign data wrapper option. This allows us to test and evaluate different buffering levels.

I had to import some libpq code because, although it is possible to include the internal libpq API (actually the header explicitly says apps can do this at their own risk), the internal functions aren't exported by the linker. But the send functions needed isn't much code and is likely to stay quite stable across PG versions.

@erimatnor erimatnor force-pushed the dist-copy-refactor branch 2 times, most recently from a049a8b to d779d87 Compare March 17, 2023 09:38
@erimatnor erimatnor force-pushed the dist-copy-refactor branch 4 times, most recently from 4b1b872 to 2286ba8 Compare March 17, 2023 10:10
@erimatnor
Copy link
Contributor Author

Here are some performance numbers for ingest with this change on a 5-DN cluster:

Screenshot 2023-03-28 at 2 07 37 PM

@erimatnor erimatnor marked this pull request as ready for review March 28, 2023 08:55
tsl/src/remote/dist_copy.c Outdated Show resolved Hide resolved
Comment on lines 1467 to 1480
/*
* Flush the previous batch to avoid growing the outgoing buffers
* indefinitely if some data node is not keeping up. It would be more
* efficient to check for buffer growth and only flush then, but libpq
* doesn't provide a way to know the outgoing buffer size. It also doesn't
* provide any way to control the outgoing buffer size.
* Don't do it if we have ended the COPY above to create new chunks.
* The number 11 is an arbitrary prime, growing the output buffer to at
* most 11ki rows sounds reasonable.
*/
if (!did_end_copy && context->batch_ordinal % 11 == 0)
{
flush_active_connections(&context->connection_state);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still do the periodic flush somewhere? We need some way to control the outgoing queue size. I expect we're going to be bottlenecked at the data nodes in the normal case, so we need to avoid growing the output buffers on access node indefinitely.

Copy link
Contributor Author

@erimatnor erimatnor Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I removed this because I don't think it is necessary.

The reason I don't think it is necessary is because even if we'd use standard libpq functions, it actually flushes automatically once a msg end marker is written and the buffer has reached 8k.

Further, now that I changed this to use the internal API, we have better control on a per socket basis. So, every socket is flushed once we write the CopyData end marker, which happens after N rows are buffered. So, basically, we accumulate up to N rows of data on a socket, then write msg end and flush. If the flush fails, the code will do a WaitEventSetWait on all sockets that could not be flushed.

Previously, we flushed all sockets after having sent N rows across all sockets, which meant that some sockets might not require a flush if they didn't get many of those N rows.

I kept the flush_active_connections() at the end of the transfer, but it only does something if any of the sockets failed the their last flush for some reason. Then it will wait until the flush succeeds.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, every socket is flushed once we write the CopyData end marker, which happens after N rows are buffered. So, basically, we accumulate up to N rows of data on a socket, then write msg end and flush. If the flush fails, the code will do a WaitEventSetWait on all sockets that could not be flushed.

Does this guarantee that we're flushing at the same rate we're receiving the new data, and the buffer size doesn't grow? Before it just flushed to zero outgoing buffers, now that you imported the internal header, probably we can just examine the buffer size.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After reading the code more, looks like send_row_to_data_nodes flushes to zero output buffer, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akuzm yes, send_row_to_data_nodes() is supposed to flush. However, admittedly this wasn't entirely clear and I reworked the code a bit to reuse flush_active_connections() since I had unnecessarily duplicated some of that code. Do you think it is more clear now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, yes, it's more clear.

tsl/src/remote/libpq-int.c Outdated Show resolved Hide resolved
Copy link
Member

@akuzm akuzm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to see the improvement in the benchmarks.

tsl/src/fdw/option.c Show resolved Hide resolved
tsl/src/nodes/data_node_copy.c Show resolved Hide resolved
tsl/src/remote/libpq-int.c Outdated Show resolved Hide resolved
@erimatnor erimatnor force-pushed the dist-copy-refactor branch 4 times, most recently from a669236 to 3da82c7 Compare March 29, 2023 08:26
@erimatnor erimatnor changed the title Refactor and simplify distributed COPY Refactor and optimize distributed COPY Mar 29, 2023
@erimatnor erimatnor force-pushed the dist-copy-refactor branch 12 times, most recently from d1d91aa to a3cc550 Compare March 31, 2023 17:02
Refactor the code path that handles remote distributed COPY. The
main changes include:

* Use a hash table to lookup data node connections instead of a list.
* Refactor the per-data node buffer code that accumulates rows into
  bigger CopyData messages.
* Reduce the default number of rows in a CopyData message to 100. This
  seems to improve throughput, probably striking a better balance
  between message overhead and latency.
* The number of rows to send in each CopyData message can now be
  changed via a new foreign data wrapper option.
@erimatnor
Copy link
Contributor Author

@akuzm @pmwkaa @nikkhils I had issues getting the use of internal APIs to play nice with the Windows builds and tests, so I reverted the use of those APIs. Multi-row message are now accumulated in per-datanode buffers, similar to before. This means additional copying, but it doesn't seem to affect the performance in my local tests. I think the rest of the refactor still has value, however.

@erimatnor erimatnor merged commit 2e6c6b5 into timescale:main Apr 4, 2023
@erimatnor erimatnor deleted the dist-copy-refactor branch April 4, 2023 13:35
kgyrtkirk added a commit to kgyrtkirk/timescaledb that referenced this pull request May 12, 2023
This release includes these noteworthy features:
* compressed hypertable enhancements:
  * UPDATE/DELETE support
  * ON CONFLICT DO UPDATE
* Join support for hierarchical Continougs Aggregates
* performance improvements

**Features**
* timescale#5212 Allow pushdown of reference table joins
* timescale#5221 Improve Realtime Continuous Aggregate performance
* timescale#5252 Improve unique constraint support on compressed hypertables
* timescale#5339 Support UPDATE/DELETE on compressed hypertables
* timescale#5344 Enable JOINS for Hierarchical Continuous Aggregates
* timescale#5361 Add parallel support for partialize_agg()
* timescale#5417 Refactor and optimize distributed COPY
* timescale#5454 Add support for ON CONFLICT DO UPDATE for compressed hypertables
* timescale#5547 Skip Ordered Append when only 1 child node is present
* timescale#5510 Propagate vacuum/analyze to compressed chunks
* timescale#5584 Reduce decompression during constraint checking
* timescale#5530 Optimize compressed chunk resorting

**Bugfixes**
* timescale#5396 Fix SEGMENTBY columns predicates to be pushed down
* timescale#5427 Handle user-defined FDW options properly
* timescale#5442 Decompression may have lost DEFAULT values
* timescale#5459 Fix issue creating dimensional constraints
* timescale#5570 Improve interpolate error message on datatype mismatch
* timescale#5573 Fix unique constraint on compressed tables
* timescale#5615 Add permission checks to run_job()
* timescale#5614 Enable run_job() for telemetry job
* timescale#5578 Fix on-insert decompression after schema changes
* timescale#5613 Quote username identifier appropriately
* timescale#5525 Fix tablespace for compressed hypertable and corresponding toast
* timescale#5642 Fix ALTER TABLE SET with normal tables
* timescale#5666 Reduce memory usage for distributed analyze
* timescale#5668 Fix subtransaction resource owner

**Thanks**
* @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates
* @ollz272 for reporting an issue with interpolate error messages
kgyrtkirk added a commit to kgyrtkirk/timescaledb that referenced this pull request May 17, 2023
This release contains new features and bug fixes since the 2.10.3 release.
We deem it moderate priority for upgrading.

This release includes these noteworthy features:
* Support for DML operations on compressed chunks:
  * UPDATE/DELETE support
  * Support for unique constraints on compressed chunks
  * Support for `ON CONFLICT DO UPDATE`
  * Support for `ON CONFLICT DO NOTHING`
* Join support for hierarchical Continuous Aggregates

**Features**
* timescale#5212 Allow pushdown of reference table joins
* timescale#5221 Improve Realtime Continuous Aggregate performance
* timescale#5252 Improve unique constraint support on compressed hypertables
* timescale#5339 Support UPDATE/DELETE on compressed hypertables
* timescale#5344 Enable JOINS for Hierarchical Continuous Aggregates
* timescale#5361 Add parallel support for partialize_agg()
* timescale#5417 Refactor and optimize distributed COPY
* timescale#5454 Add support for ON CONFLICT DO UPDATE for compressed hypertables
* timescale#5547 Skip Ordered Append when only 1 child node is present
* timescale#5510 Propagate vacuum/analyze to compressed chunks
* timescale#5584 Reduce decompression during constraint checking
* timescale#5530 Optimize compressed chunk resorting
* timescale#5639 Support sending telemetry event reports

**Bugfixes**
* timescale#5396 Fix SEGMENTBY columns predicates to be pushed down
* timescale#5427 Handle user-defined FDW options properly
* timescale#5442 Decompression may have lost DEFAULT values
* timescale#5459 Fix issue creating dimensional constraints
* timescale#5570 Improve interpolate error message on datatype mismatch
* timescale#5573 Fix unique constraint on compressed tables
* timescale#5615 Add permission checks to run_job()
* timescale#5614 Enable run_job() for telemetry job
* timescale#5578 Fix on-insert decompression after schema changes
* timescale#5613 Quote username identifier appropriately
* timescale#5525 Fix tablespace for compressed hypertable and corresponding toast
* timescale#5642 Fix ALTER TABLE SET with normal tables
* timescale#5666 Reduce memory usage for distributed analyze
* timescale#5668 Fix subtransaction resource owner

**Thanks**
* @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates
* @ollz272 for reporting an issue with interpolate error messages
@kgyrtkirk kgyrtkirk mentioned this pull request May 17, 2023
kgyrtkirk added a commit to kgyrtkirk/timescaledb that referenced this pull request May 19, 2023
This release contains new features and bug fixes since the 2.10.3 release.
We deem it moderate priority for upgrading.

This release includes these noteworthy features:
* Support for DML operations on compressed chunks:
  * UPDATE/DELETE support
  * Support for unique constraints on compressed chunks
  * Support for `ON CONFLICT DO UPDATE`
  * Support for `ON CONFLICT DO NOTHING`
* Join support for hierarchical Continuous Aggregates

**Features**
* timescale#5212 Allow pushdown of reference table joins
* timescale#5221 Improve Realtime Continuous Aggregate performance
* timescale#5252 Improve unique constraint support on compressed hypertables
* timescale#5339 Support UPDATE/DELETE on compressed hypertables
* timescale#5344 Enable JOINS for Hierarchical Continuous Aggregates
* timescale#5361 Add parallel support for partialize_agg()
* timescale#5417 Refactor and optimize distributed COPY
* timescale#5454 Add support for ON CONFLICT DO UPDATE for compressed hypertables
* timescale#5547 Skip Ordered Append when only 1 child node is present
* timescale#5510 Propagate vacuum/analyze to compressed chunks
* timescale#5584 Reduce decompression during constraint checking
* timescale#5530 Optimize compressed chunk resorting
* timescale#5639 Support sending telemetry event reports

**Bugfixes**
* timescale#5396 Fix SEGMENTBY columns predicates to be pushed down
* timescale#5427 Handle user-defined FDW options properly
* timescale#5442 Decompression may have lost DEFAULT values
* timescale#5459 Fix issue creating dimensional constraints
* timescale#5570 Improve interpolate error message on datatype mismatch
* timescale#5573 Fix unique constraint on compressed tables
* timescale#5615 Add permission checks to run_job()
* timescale#5614 Enable run_job() for telemetry job
* timescale#5578 Fix on-insert decompression after schema changes
* timescale#5613 Quote username identifier appropriately
* timescale#5525 Fix tablespace for compressed hypertable and corresponding toast
* timescale#5642 Fix ALTER TABLE SET with normal tables
* timescale#5666 Reduce memory usage for distributed analyze
* timescale#5668 Fix subtransaction resource owner

**Thanks**
* @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates
* @ollz272 for reporting an issue with interpolate error messages
kgyrtkirk added a commit that referenced this pull request May 19, 2023
This release contains new features and bug fixes since the 2.10.3 release.
We deem it moderate priority for upgrading.

This release includes these noteworthy features:
* Support for DML operations on compressed chunks:
  * UPDATE/DELETE support
  * Support for unique constraints on compressed chunks
  * Support for `ON CONFLICT DO UPDATE`
  * Support for `ON CONFLICT DO NOTHING`
* Join support for hierarchical Continuous Aggregates

**Features**
* #5212 Allow pushdown of reference table joins
* #5221 Improve Realtime Continuous Aggregate performance
* #5252 Improve unique constraint support on compressed hypertables
* #5339 Support UPDATE/DELETE on compressed hypertables
* #5344 Enable JOINS for Hierarchical Continuous Aggregates
* #5361 Add parallel support for partialize_agg()
* #5417 Refactor and optimize distributed COPY
* #5454 Add support for ON CONFLICT DO UPDATE for compressed hypertables
* #5547 Skip Ordered Append when only 1 child node is present
* #5510 Propagate vacuum/analyze to compressed chunks
* #5584 Reduce decompression during constraint checking
* #5530 Optimize compressed chunk resorting
* #5639 Support sending telemetry event reports

**Bugfixes**
* #5396 Fix SEGMENTBY columns predicates to be pushed down
* #5427 Handle user-defined FDW options properly
* #5442 Decompression may have lost DEFAULT values
* #5459 Fix issue creating dimensional constraints
* #5570 Improve interpolate error message on datatype mismatch
* #5573 Fix unique constraint on compressed tables
* #5615 Add permission checks to run_job()
* #5614 Enable run_job() for telemetry job
* #5578 Fix on-insert decompression after schema changes
* #5613 Quote username identifier appropriately
* #5525 Fix tablespace for compressed hypertable and corresponding toast
* #5642 Fix ALTER TABLE SET with normal tables
* #5666 Reduce memory usage for distributed analyze
* #5668 Fix subtransaction resource owner

**Thanks**
* @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates
* @ollz272 for reporting an issue with interpolate error messages
kgyrtkirk added a commit to kgyrtkirk/timescaledb that referenced this pull request May 19, 2023
This release contains new features and bug fixes since the 2.10.3 release.
We deem it moderate priority for upgrading.

This release includes these noteworthy features:
* Support for DML operations on compressed chunks:
  * UPDATE/DELETE support
  * Support for unique constraints on compressed chunks
  * Support for `ON CONFLICT DO UPDATE`
  * Support for `ON CONFLICT DO NOTHING`
* Join support for hierarchical Continuous Aggregates

**Features**
* timescale#5212 Allow pushdown of reference table joins
* timescale#5221 Improve Realtime Continuous Aggregate performance
* timescale#5252 Improve unique constraint support on compressed hypertables
* timescale#5339 Support UPDATE/DELETE on compressed hypertables
* timescale#5344 Enable JOINS for Hierarchical Continuous Aggregates
* timescale#5361 Add parallel support for partialize_agg()
* timescale#5417 Refactor and optimize distributed COPY
* timescale#5454 Add support for ON CONFLICT DO UPDATE for compressed hypertables
* timescale#5547 Skip Ordered Append when only 1 child node is present
* timescale#5510 Propagate vacuum/analyze to compressed chunks
* timescale#5584 Reduce decompression during constraint checking
* timescale#5530 Optimize compressed chunk resorting
* timescale#5639 Support sending telemetry event reports

**Bugfixes**
* timescale#5396 Fix SEGMENTBY columns predicates to be pushed down
* timescale#5427 Handle user-defined FDW options properly
* timescale#5442 Decompression may have lost DEFAULT values
* timescale#5459 Fix issue creating dimensional constraints
* timescale#5570 Improve interpolate error message on datatype mismatch
* timescale#5573 Fix unique constraint on compressed tables
* timescale#5615 Add permission checks to run_job()
* timescale#5614 Enable run_job() for telemetry job
* timescale#5578 Fix on-insert decompression after schema changes
* timescale#5613 Quote username identifier appropriately
* timescale#5525 Fix tablespace for compressed hypertable and corresponding toast
* timescale#5642 Fix ALTER TABLE SET with normal tables
* timescale#5666 Reduce memory usage for distributed analyze
* timescale#5668 Fix subtransaction resource owner

**Thanks**
* @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates
* @ollz272 for reporting an issue with interpolate error messages
kgyrtkirk added a commit to kgyrtkirk/timescaledb that referenced this pull request May 19, 2023
This release contains new features and bug fixes since the 2.10.3 release.
We deem it moderate priority for upgrading.

This release includes these noteworthy features:
* Support for DML operations on compressed chunks:
  * UPDATE/DELETE support
  * Support for unique constraints on compressed chunks
  * Support for `ON CONFLICT DO UPDATE`
  * Support for `ON CONFLICT DO NOTHING`
* Join support for hierarchical Continuous Aggregates

**Features**
* timescale#5212 Allow pushdown of reference table joins
* timescale#5221 Improve Realtime Continuous Aggregate performance
* timescale#5252 Improve unique constraint support on compressed hypertables
* timescale#5339 Support UPDATE/DELETE on compressed hypertables
* timescale#5344 Enable JOINS for Hierarchical Continuous Aggregates
* timescale#5361 Add parallel support for partialize_agg()
* timescale#5417 Refactor and optimize distributed COPY
* timescale#5454 Add support for ON CONFLICT DO UPDATE for compressed hypertables
* timescale#5547 Skip Ordered Append when only 1 child node is present
* timescale#5510 Propagate vacuum/analyze to compressed chunks
* timescale#5584 Reduce decompression during constraint checking
* timescale#5530 Optimize compressed chunk resorting
* timescale#5639 Support sending telemetry event reports

**Bugfixes**
* timescale#5396 Fix SEGMENTBY columns predicates to be pushed down
* timescale#5427 Handle user-defined FDW options properly
* timescale#5442 Decompression may have lost DEFAULT values
* timescale#5459 Fix issue creating dimensional constraints
* timescale#5570 Improve interpolate error message on datatype mismatch
* timescale#5573 Fix unique constraint on compressed tables
* timescale#5615 Add permission checks to run_job()
* timescale#5614 Enable run_job() for telemetry job
* timescale#5578 Fix on-insert decompression after schema changes
* timescale#5613 Quote username identifier appropriately
* timescale#5525 Fix tablespace for compressed hypertable and corresponding toast
* timescale#5642 Fix ALTER TABLE SET with normal tables
* timescale#5666 Reduce memory usage for distributed analyze
* timescale#5668 Fix subtransaction resource owner

**Thanks**
* @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates
* @ollz272 for reporting an issue with interpolate error messages
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
multinode tech-debt Needs refactoring and improvement tasks related to the source code and its architecture.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants