Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve memory handling for remote COPY #2698

Merged
merged 1 commit into from Dec 2, 2020

Conversation

erimatnor
Copy link
Contributor

@erimatnor erimatnor commented Dec 1, 2020

This change improves memory usage in the COPY code used for
distributed hypertables. The following issues have been addressed:

  • PGresult objects were not cleared, leading to memory leaks.
  • The caching of chunk connections didn't work since the lookup
    compared ephemeral chunk pointers instead of chunk IDs. The effect
    was that cached chunk connection state was reallocated every time
    instead of being reused. This likely also caused worse performance.

To address these issues, the following changes are made:

  • All PGresult objects are now cleared with PQclear.
  • Lookup for chunk connections now compares chunk IDs instead of chunk
    pointers.
  • The per-tuple memory context is moved the to the outer processing
    loop to ensure that everything in the loop is allocated on the
    per-tuple memory context, which is also reset at every iteration of
    the loop.
  • The use of memory contexts is also simplified to have only one
    memory context for state that should survive across resets of the
    per-tuple memory context.

Fixes #2677

@erimatnor erimatnor force-pushed the fix-remote-copy-memory-usage branch 2 times, most recently from 61aef7b to 37612a5 Compare December 1, 2020 17:35
@codecov
Copy link

codecov bot commented Dec 1, 2020

Codecov Report

Merging #2698 (b4cd8be) into master (47da879) will increase coverage by 0.18%.
The diff coverage is 90.88%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #2698      +/-   ##
==========================================
+ Coverage   90.01%   90.19%   +0.18%     
==========================================
  Files         212      212              
  Lines       34431    34562     +131     
==========================================
+ Hits        30992    31174     +182     
+ Misses       3439     3388      -51     
Impacted Files Coverage Δ
src/bgw/job.c 91.44% <ø> (-0.89%) ⬇️
src/catalog.h 100.00% <ø> (ø)
src/cross_module_fn.c 68.54% <0.00%> (-0.56%) ⬇️
src/scanner.c 96.26% <ø> (ø)
tsl/test/src/remote/async.c 100.00% <ø> (ø)
tsl/src/remote/async.c 83.14% <52.94%> (-3.30%) ⬇️
tsl/src/remote/dist_copy.c 89.92% <88.23%> (-1.45%) ⬇️
src/version.c 86.88% <90.00%> (-0.62%) ⬇️
tsl/src/remote/connection.c 92.12% <92.15%> (+0.06%) ⬆️
src/hypertable.c 87.61% <95.45%> (+0.15%) ⬆️
... and 21 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 7c76fd4...b4cd8be. Read the comment docs.

@erimatnor
Copy link
Contributor Author

erimatnor commented Dec 2, 2020

For context, I tested this on my local machine with two data nodes using the NYC taxi data set. Without the fixes the memory consumption never stops increasing and inserts are slow. In fact, I had to prematurely stop the data load because it took such a long time (at 40+ mins) and memory usage was closing in on 1GB.

This is what memory consumption and load on the CPU cores looked like without the fix
no-fix-dist-copy

You can see that the data load is bottle necked on the access node that is doing lots of unnecessary work and continuously allocating memory without releasing.

With the fix, the memory usage was stable throughout the test, and the CPU load was much more even across all nodes. The AN rarely reached 100%. The whole data load completed after 3 mins and 55 seconds. This is what memory consumption and CPU load looked like:

dist-copy-fix

FWIW, on a regular (non-distributed) hypertable, it took just over 6 minutes to load the same data.

@erimatnor erimatnor marked this pull request as ready for review December 2, 2020 10:26
@erimatnor erimatnor requested a review from a team as a code owner December 2, 2020 10:26
@erimatnor erimatnor requested review from pmwkaa, k-rus, gayyappan, mkindahl, svenklemm and a team and removed request for a team and gayyappan December 2, 2020 10:26

return create_connection_list_for_chunk(state, chunk)->connections;
if (chunkconns->chunk_id == chunk->fd.id)
return chunkconns->connections;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is definitely hit in my benchmark (I added print statement here). But I guess our regression tests never get to the point of actually switching between cached chunks. I'll try to see what it would take to make that happen.

Comment on lines +243 to +244

return 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just creates a false positive in codecov. If your compiler warns, it's better to mark error_no_default_fn_community as non-returning, e.g., for GCC:

void error_no_default_fn_community () __attribute__ ((noreturn));

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I didn't add this function, I only changed the signature. We currently don't use these attributes on any of our default functions and if we'd like to do that we can add for all functions in a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to the added code, not the function. I don't think you need to add any code here at all.

Comment on lines +265 to +266
PG_TRY();
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand, this is only for ensuring that the error message is not released prior to calling the error reporting function.

Using exception handling seems quite heavy-handed for ensuring that the error message can be passed up, suggest to just copy the error message and clearing the result before reporting an error. The code will be significantly easier to follow and codecov will not generate so many false positives.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is to ensure that the PGresult is always released, even if we encounter an error or throw an error ourselves. The PGresult is malloced and will cause a permanent memory leak if we don't guarantee its release. And the only safe way to do that is a try-catch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if there is an error in PQexec nothing will be assigned to res (because the error function will do a longjmp to the exception handler directly) so then you do not need any cleanup code. The only place where res can have a value and an error can be thrown is from the ereport below, and the only reason you cannot call PQclear before calling ereport is because PQresultErrorMessage references the internals of res, so removing the entire try-catch and doing this instead is easier to understand:

if (PQresultStatus(res) != PGRES_COPY_IN)
{
  char *msg = pchomp(PQresultErrorMessage(res));
  PQclear(res);
				ereport(ERROR,
						(errcode(ERRCODE_CONNECTION_FAILURE),
						 errmsg("unable to start remote COPY on data node"),
						 errdetail("Remote command error: %s", msg)));
}

Note that you only need to copy the error message, because that's all you use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand what the improvement is apart from "just different".

FWIW, this is the standard pattern for handling PGresult, e.g., in the postgres_fdw. And, it is safe to future changes, e.g., if someone adds a function call that can throw and error between the PQexec and the error checking.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Easier to follow the logic, less duplicated code, that's all. This is a matter of taste. I have approved the PR, feel free to push.


foreach (lc, state->connections_in_use)
PG_TRY();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I misunderstand something, it seems like the only reason to have the try-catch construction is to free the result if you generate an error on line 400. Wouldn't it be easier to just clear the result there instead and not have the exception handling wrapper?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this loop throws errors in multiple places, not just line 400. Then, in theory, all the other functions called could also throw errors. We need to ensure that all PGresults are released or we'd have a memory leak.

Note that we collect results in a list, so it is not only about one result.

Copy link
Contributor

@pmwkaa pmwkaa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall

if (PQisnonblocking(pg_conn))
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("distributed copy doesn't support non-blocking connections")));

if (!list_member_ptr(state->connections_in_use, connection))
{
PGresult *res = PQexec(pg_conn, state->outgoing_copy_cmd);
PGresult *volatile res = NULL;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you encountered any issues regarding it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what the question is?

tsl/src/remote/dist_copy.c Show resolved Hide resolved
Copy link
Contributor

@mkindahl mkindahl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approving since the code is correct, but I think the use of the TRY-CATCH clauses is a quite heavy-handed solution to handling the deallocation of the result.

Comment on lines +265 to +266
PG_TRY();
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if there is an error in PQexec nothing will be assigned to res (because the error function will do a longjmp to the exception handler directly) so then you do not need any cleanup code. The only place where res can have a value and an error can be thrown is from the ereport below, and the only reason you cannot call PQclear before calling ereport is because PQresultErrorMessage references the internals of res, so removing the entire try-catch and doing this instead is easier to understand:

if (PQresultStatus(res) != PGRES_COPY_IN)
{
  char *msg = pchomp(PQresultErrorMessage(res));
  PQclear(res);
				ereport(ERROR,
						(errcode(ERRCODE_CONNECTION_FAILURE),
						 errmsg("unable to start remote COPY on data node"),
						 errdetail("Remote command error: %s", msg)));
}

Note that you only need to copy the error message, because that's all you use.

This change improves memory usage in the `COPY` code used for
distributed hypertables. The following issues have been addressed:

* `PGresult` objects were not cleared, leading to memory leaks.
* The caching of chunk connections didn't work since the lookup
  compared ephemeral chunk pointers instead of chunk IDs. The effect
  was that cached chunk connection state was reallocated every time
  instead of being reused. This likely also caused worse performance.

To address these issues, the following changes are made:

* All `PGresult` objects are now cleared with `PQclear`.
* Lookup for chunk connections now compares chunk IDs instead of chunk
  pointers.
* The per-tuple memory context is moved the to the outer processing
  loop to ensure that everything in the loop is allocated on the
  per-tuple memory context, which is also reset at every iteration of
  the loop.
* The use of memory contexts is also simplified to have only one
  memory context for state that should survive across resets of the
  per-tuple memory context.

Fixes timescale#2677
@erimatnor erimatnor merged commit 2ecb53e into timescale:master Dec 2, 2020
@erimatnor erimatnor deleted the fix-remote-copy-memory-usage branch December 2, 2020 16:40
@mkindahl mkindahl mentioned this pull request Dec 2, 2020
mkindahl added a commit to mkindahl/timescaledb that referenced this pull request Dec 2, 2020
**Minor Features**
* timescale#2662 Save compression metadata settings on access node
* timescale#2707 Introduce additional db for data node bootstrapping

**Bugfixes**
* timescale#2698 Improve memory handling for remote COPY
* timescale#2555 Set metadata for chunks compressed before 2.0
mkindahl added a commit to mkindahl/timescaledb that referenced this pull request Dec 3, 2020
This release candidate contains bugfixes since the previous release
candidate, as well as additional minor features. It improves
validation of configuration changes for background jobs, adds support
for gapfill on distributed tables, and contains improvements to
compression for distributed hypertables.

**Minor Features**
* timescale#2689 Check configuration in alter_job and add_job
* timescale#2696 Support gapfill on distributed hypertable
* timescale#2468 Show more information in get_git_commit
* timescale#2678 Include user actions into job stats view
* timescale#2664 Fix support for complex aggregate expression
* timescale#2672 Add hypertable to continuous aggregates view
* timescale#2662 Save compression metadata settings on access node
* timescale#2707 Introduce additional db for data node bootstrapping

**Bugfixes**
* timescale#2688 Fix crash for concurrent drop and compress chunk
* timescale#2666 Fix timeout handling in async library
* timescale#2683 Fix crash in add_job when given NULL interval
* timescale#2698 Improve memory handling for remote COPY
* timescale#2555 Set metadata for chunks compressed before 2.0
mkindahl added a commit to mkindahl/timescaledb that referenced this pull request Dec 3, 2020
This release candidate contains bugfixes since the previous release
candidate, as well as additional minor features. It improves
validation of configuration changes for background jobs, adds support
for gapfill on distributed tables, contains improvements to the memory
handling for large COPY, and contains improvements to compression for
distributed hypertables.

**Minor Features**
* timescale#2689 Check configuration in alter_job and add_job
* timescale#2696 Support gapfill on distributed hypertable
* timescale#2468 Show more information in get_git_commit
* timescale#2678 Include user actions into job stats view
* timescale#2664 Fix support for complex aggregate expression
* timescale#2672 Add hypertable to continuous aggregates view
* timescale#2662 Save compression metadata settings on access node
* timescale#2707 Introduce additional db for data node bootstrapping

**Bugfixes**
* timescale#2688 Fix crash for concurrent drop and compress chunk
* timescale#2666 Fix timeout handling in async library
* timescale#2683 Fix crash in add_job when given NULL interval
* timescale#2698 Improve memory handling for remote COPY
* timescale#2555 Set metadata for chunks compressed before 2.0
mkindahl added a commit to mkindahl/timescaledb that referenced this pull request Dec 3, 2020
This release candidate contains bugfixes since the previous release
candidate, as well as additional minor features. It improves
validation of configuration changes for background jobs, adds support
for gapfill on distributed tables, contains improvements to the memory
handling for large COPY, and contains improvements to compression for
distributed hypertables.

**Minor Features**
* timescale#2689 Check configuration in alter_job and add_job
* timescale#2696 Support gapfill on distributed hypertable
* timescale#2468 Show more information in get_git_commit
* timescale#2678 Include user actions into job stats view
* timescale#2664 Fix support for complex aggregate expression
* timescale#2672 Add hypertable to continuous aggregates view
* timescale#2662 Save compression metadata settings on access node
* timescale#2707 Introduce additional db for data node bootstrapping

**Bugfixes**
* timescale#2688 Fix crash for concurrent drop and compress chunk
* timescale#2666 Fix timeout handling in async library
* timescale#2683 Fix crash in add_job when given NULL interval
* timescale#2698 Improve memory handling for remote COPY
* timescale#2555 Set metadata for chunks compressed before 2.0
mkindahl added a commit that referenced this pull request Dec 3, 2020
This release candidate contains bugfixes since the previous release
candidate, as well as additional minor features. It improves
validation of configuration changes for background jobs, adds support
for gapfill on distributed tables, contains improvements to the memory
handling for large COPY, and contains improvements to compression for
distributed hypertables.

**Minor Features**
* #2689 Check configuration in alter_job and add_job
* #2696 Support gapfill on distributed hypertable
* #2468 Show more information in get_git_commit
* #2678 Include user actions into job stats view
* #2664 Fix support for complex aggregate expression
* #2672 Add hypertable to continuous aggregates view
* #2662 Save compression metadata settings on access node
* #2707 Introduce additional db for data node bootstrapping

**Bugfixes**
* #2688 Fix crash for concurrent drop and compress chunk
* #2666 Fix timeout handling in async library
* #2683 Fix crash in add_job when given NULL interval
* #2698 Improve memory handling for remote COPY
* #2555 Set metadata for chunks compressed before 2.0
mkindahl added a commit to mkindahl/timescaledb that referenced this pull request Dec 3, 2020
This release candidate contains bugfixes since the previous release
candidate, as well as additional minor features. It improves
validation of configuration changes for background jobs, adds support
for gapfill on distributed tables, contains improvements to the memory
handling for large COPY, and contains improvements to compression for
distributed hypertables.

**Minor Features**
* timescale#2689 Check configuration in alter_job and add_job
* timescale#2696 Support gapfill on distributed hypertable
* timescale#2468 Show more information in get_git_commit
* timescale#2678 Include user actions into job stats view
* timescale#2664 Fix support for complex aggregate expression
* timescale#2672 Add hypertable to continuous aggregates view
* timescale#2662 Save compression metadata settings on access node
* timescale#2707 Introduce additional db for data node bootstrapping

**Bugfixes**
* timescale#2688 Fix crash for concurrent drop and compress chunk
* timescale#2666 Fix timeout handling in async library
* timescale#2683 Fix crash in add_job when given NULL interval
* timescale#2698 Improve memory handling for remote COPY
* timescale#2555 Set metadata for chunks compressed before 2.0
mkindahl added a commit to mkindahl/timescaledb that referenced this pull request Dec 3, 2020
This release candidate contains bugfixes since the previous release
candidate, as well as additional minor features. It improves
validation of configuration changes for background jobs, adds support
for gapfill on distributed tables, contains improvements to the memory
handling for large COPY, and contains improvements to compression for
distributed hypertables.

**Minor Features**
* timescale#2689 Check configuration in alter_job and add_job
* timescale#2696 Support gapfill on distributed hypertable
* timescale#2468 Show more information in get_git_commit
* timescale#2678 Include user actions into job stats view
* timescale#2664 Fix support for complex aggregate expression
* timescale#2672 Add hypertable to continuous aggregates view
* timescale#2662 Save compression metadata settings on access node
* timescale#2707 Introduce additional db for data node bootstrapping

**Bugfixes**
* timescale#2688 Fix crash for concurrent drop and compress chunk
* timescale#2666 Fix timeout handling in async library
* timescale#2683 Fix crash in add_job when given NULL interval
* timescale#2698 Improve memory handling for remote COPY
* timescale#2555 Set metadata for chunks compressed before 2.0
mkindahl added a commit to mkindahl/timescaledb that referenced this pull request Dec 3, 2020
This release candidate contains bugfixes since the previous release
candidate, as well as additional minor features. It improves
validation of configuration changes for background jobs, adds support
for gapfill on distributed tables, contains improvements to the memory
handling for large COPY, and contains improvements to compression for
distributed hypertables.

**Minor Features**
* timescale#2689 Check configuration in alter_job and add_job
* timescale#2696 Support gapfill on distributed hypertable
* timescale#2468 Show more information in get_git_commit
* timescale#2678 Include user actions into job stats view
* timescale#2664 Fix support for complex aggregate expression
* timescale#2672 Add hypertable to continuous aggregates view
* timescale#2662 Save compression metadata settings on access node
* timescale#2707 Introduce additional db for data node bootstrapping

**Bugfixes**
* timescale#2688 Fix crash for concurrent drop and compress chunk
* timescale#2666 Fix timeout handling in async library
* timescale#2683 Fix crash in add_job when given NULL interval
* timescale#2698 Improve memory handling for remote COPY
* timescale#2555 Set metadata for chunks compressed before 2.0
mkindahl added a commit to mkindahl/timescaledb that referenced this pull request Dec 3, 2020
This release candidate contains bugfixes since the previous release
candidate, as well as additional minor features. It improves
validation of configuration changes for background jobs, adds support
for gapfill on distributed tables, contains improvements to the memory
handling for large COPY, and contains improvements to compression for
distributed hypertables.

**Minor Features**
* timescale#2689 Check configuration in alter_job and add_job
* timescale#2696 Support gapfill on distributed hypertable
* timescale#2468 Show more information in get_git_commit
* timescale#2678 Include user actions into job stats view
* timescale#2664 Fix support for complex aggregate expression
* timescale#2672 Add hypertable to continuous aggregates view
* timescale#2662 Save compression metadata settings on access node
* timescale#2707 Introduce additional db for data node bootstrapping

**Bugfixes**
* timescale#2688 Fix crash for concurrent drop and compress chunk
* timescale#2666 Fix timeout handling in async library
* timescale#2683 Fix crash in add_job when given NULL interval
* timescale#2698 Improve memory handling for remote COPY
* timescale#2555 Set metadata for chunks compressed before 2.0
mkindahl added a commit to mkindahl/timescaledb that referenced this pull request Dec 3, 2020
This release candidate contains bugfixes since the previous release
candidate, as well as additional minor features. It improves
validation of configuration changes for background jobs, adds support
for gapfill on distributed tables, contains improvements to the memory
handling for large COPY, and contains improvements to compression for
distributed hypertables.

**Minor Features**
* timescale#2689 Check configuration in alter_job and add_job
* timescale#2696 Support gapfill on distributed hypertable
* timescale#2468 Show more information in get_git_commit
* timescale#2678 Include user actions into job stats view
* timescale#2664 Fix support for complex aggregate expression
* timescale#2672 Add hypertable to continuous aggregates view
* timescale#2662 Save compression metadata settings on access node
* timescale#2707 Introduce additional db for data node bootstrapping

**Bugfixes**
* timescale#2688 Fix crash for concurrent drop and compress chunk
* timescale#2666 Fix timeout handling in async library
* timescale#2683 Fix crash in add_job when given NULL interval
* timescale#2698 Improve memory handling for remote COPY
* timescale#2555 Set metadata for chunks compressed before 2.0
mkindahl added a commit to mkindahl/timescaledb that referenced this pull request Dec 3, 2020
This release candidate contains bugfixes since the previous release
candidate, as well as additional minor features. It improves
validation of configuration changes for background jobs, adds support
for gapfill on distributed tables, contains improvements to the memory
handling for large COPY, and contains improvements to compression for
distributed hypertables.

**Minor Features**
* timescale#2689 Check configuration in alter_job and add_job
* timescale#2696 Support gapfill on distributed hypertable
* timescale#2468 Show more information in get_git_commit
* timescale#2678 Include user actions into job stats view
* timescale#2664 Fix support for complex aggregate expression
* timescale#2672 Add hypertable to continuous aggregates view
* timescale#2662 Save compression metadata settings on access node
* timescale#2707 Introduce additional db for data node bootstrapping

**Bugfixes**
* timescale#2688 Fix crash for concurrent drop and compress chunk
* timescale#2666 Fix timeout handling in async library
* timescale#2683 Fix crash in add_job when given NULL interval
* timescale#2698 Improve memory handling for remote COPY
* timescale#2555 Set metadata for chunks compressed before 2.0
mkindahl added a commit that referenced this pull request Dec 3, 2020
This release candidate contains bugfixes since the previous release
candidate, as well as additional minor features. It improves
validation of configuration changes for background jobs, adds support
for gapfill on distributed tables, contains improvements to the memory
handling for large COPY, and contains improvements to compression for
distributed hypertables.

**Minor Features**
* #2689 Check configuration in alter_job and add_job
* #2696 Support gapfill on distributed hypertable
* #2468 Show more information in get_git_commit
* #2678 Include user actions into job stats view
* #2664 Fix support for complex aggregate expression
* #2672 Add hypertable to continuous aggregates view
* #2662 Save compression metadata settings on access node
* #2707 Introduce additional db for data node bootstrapping

**Bugfixes**
* #2688 Fix crash for concurrent drop and compress chunk
* #2666 Fix timeout handling in async library
* #2683 Fix crash in add_job when given NULL interval
* #2698 Improve memory handling for remote COPY
* #2555 Set metadata for chunks compressed before 2.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

COPY Command not working for large CSV/txt files of 1000000+ rows in Timescaledb rc1
4 participants