Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Logical replication cursor advances when it should not #940

Closed
jbergknoff-rival opened this issue Jul 2, 2019 · 16 comments · Fixed by #943

Comments

@jbergknoff-rival
Copy link

@jbergknoff-rival jbergknoff-rival commented Jul 2, 2019

There appears to be a regression introduced in 2.8.3 (#913) where a logical replication cursor will be advanced whether the user wants it to be or not. This may be specific to wal2json decoding. It does not happen with test_decoding.

Our use case involves aggregating several logical replication messages before sending them on to our consumer [1], and we only want to advance the WAL flush_lsn after we've done that. However, in 2.8.3 with wal2json, this is impossible: the WAL position is advanced even if we never call send_feedback on the cursor.

Here's a program that reproduces the behavior:

import psycopg2
import psycopg2.extras

connection = psycopg2.connect(
    f'postgresql://user:password@postgresql/testing_db',
    connection_factory=psycopg2.extras.LogicalReplicationConnection
)

cursor = connection.cursor()
replication_slot_name = 'testing'

try:
    cursor.create_replication_slot(replication_slot_name, output_plugin='wal2json')
except Exception:
    pass

cursor.start_replication(slot_name=replication_slot_name, decode=True, status_interval=1)
cursor.consume_stream(lambda message: print('received message:', message, message.payload))

Create a table in testing_db and insert a record. The expectation is that the testing slot stays where it started. The script above reports the event to stdout, but never sends feedback. If you stop the script and restart it, however many times, the script should print that event to stdout again every time.

However, once the status interval elapses (set here to 1 second so it's quick), the WAL position is advanced beyond the event. Restarting the script doesn't print the event. This happens with both consume_stream and a read_message loop.

If the slot is created with test_decoding instead of wal2json, the behavior is as expected.

Our current workaround is to

  • Pin to psycopg2==2.8.2
  • Use a read_message/select loop, which
    • When we receive a message, advances flush_lsn to message.payload["nextlsn"] (using wal2json's include-lsn: true option)
    • When there's no message to receive, and our to-send buffer is empty, advances flush_lsn to cursor.wal_end

As I understand it, #913 aims to cover that latter behavior (advancing the WAL position when we've read everything), but there may be a bug in the implementation.

@CyberDem0n

[1] AWS Kinesis. We have many small database transactions, and we want to group them into one big PutRecords call because it's significantly faster to retrieve each little change from logical replication than it is to send each little change to Kinesis.

@CyberDem0n

This comment has been minimized.

Copy link
Contributor

@CyberDem0n CyberDem0n commented Jul 5, 2019

This is a very interesting bug.
The reason why test_decoding works as expected is that it sends a message not only for changed rows, but also for BEGIN and COMMIT.

test_decoding:

python test.py

.... initialization....
[25678] pq_read_replication_message: msg=k, len=18
[25678] pq_read_replication_message: wal_end=0/16a8370
[25678] pq_send_replication_feedback: write=0/0, flush=0/16a8370, apply=0/0
...
...
[25678] pq_read_replication_message: msg=w, len=34
[25678] pq_read_replication_message: data_start=0/16a8370, wal_end=0/16a8370
[25678] pq_read_replication_message: >>BEGIN 589<<
[25678] pq_read_replication_message: msg=w, len=64
[25678] pq_read_replication_message: data_start=0/16a8370, wal_end=0/16a8370
[25678] pq_read_replication_message: >>table public.foo: INSERT: id[integer]:4<<
[25678] pq_read_replication_message: msg=w, len=35
[25678] pq_read_replication_message: data_start=0/16a85d0, wal_end=0/16a85d0
[25678] pq_read_replication_message: >>COMMIT 589<<

The value wal_end=0/16a8370 of the keepalive message right before the BEGIN is absolutely the same as data_start=0/16a8370 and wal_end=0/16a8370 of the BEGIN and INSERT. The reason why further keepalive messages don't automatically advance flush LSN is that there is also the COMMIT message, with the different data_start=0/16a85d0 and wal_end=0/16a85d0.

Meanwhile in wal2json there is only one message per change and data_start is always matching with wal_end of the previous keepalive message:

python test.py:

.... initialization....
[25618] pq_read_replication_message: msg=k, len=18 -- server keepalive message
[25618] pq_read_replication_message: wal_end=0/16a7fd0
[25618] pq_send_replication_feedback: write=0/0, flush=0/16a7fd0, apply=0/0
...
...
[25618] pq_read_replication_message: msg=w, len=153
[25618] pq_read_replication_message: data_start=0/16a7fd0, wal_end=0/16a7fd0
[25618] pq_read_replication_message: >>{"change":[{"kind":"insert","schema":"public","table":"foo","columnnames":["id"],"columntypes":["integer"],"columnvalues":[4]}]}<<

Please pay attention on pq_read_replication_message: wal_end=0/16a7fd0 and pq_read_replication_message: data_start=0/16a7fd0, wal_end=0/16a7fd0.

The fix is very simple and straitforward, I will create a separate variable where will record the flush LSN explicitly set by calling the send_feedback method and use it for the check.

@dvarrazzo

This comment has been minimized.

Copy link
Member

@dvarrazzo dvarrazzo commented Jul 6, 2019

@CyberDem0n Thank you for the MR! I'm away from computers now: I'll merge and release in a few days.

@jbergknoff-rival

This comment has been minimized.

Copy link
Author

@jbergknoff-rival jbergknoff-rival commented Jul 8, 2019

Sounds good @CyberDem0n! Thanks for looking into this.

louis-pie added a commit to transferwise/pipelinewise-tap-postgres that referenced this issue Jul 26, 2019
louis-pie added a commit to transferwise/pipelinewise-tap-postgres that referenced this issue Jul 26, 2019
@louis-pie

This comment has been minimized.

Copy link

@louis-pie louis-pie commented Jul 26, 2019

Thank you for a great library ... looking forward to next release.

@dvarrazzo dvarrazzo closed this in 9097a5b Oct 19, 2019
bors bot added a commit to chronhq/backend that referenced this issue Oct 27, 2019
Merge #167
167: Update psycopg2 requirement from ~=2.8.3 to ~=2.8.4 in /config r=MiklerGM a=dependabot-preview[bot]

Updates the requirements on [psycopg2](https://github.com/psycopg/psycopg2) to permit the latest version.
<details>
<summary>Changelog</summary>

*Sourced from [psycopg2's changelog](https://github.com/psycopg/psycopg2/blob/master/NEWS).*

> Current release
> ---------------
> 
> What's new in psycopg 2.8.4
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^
> 
> - Fixed building with Python 3.8 (🎫`[#854](psycopg/psycopg2#854).
> - Don't swallow keyboard interrupts on connect when a password is specified
>   in the connection string (🎫`[#898](psycopg/psycopg2#898).
> - Don't advance replication cursor when the message wasn't confirmed
>   (🎫`[#940](psycopg/psycopg2#940).
> - Fixed inclusion of ``time.h`` on linux (🎫`[#951](psycopg/psycopg2#951).
> - Fixed int overflow for large values in `~psycopg2.extensions.Column.table_oid`
>   and `~psycopg2.extensions.Column.type_code` (🎫`[#961](psycopg/psycopg2#961).
> - `~psycopg2.errorcodes` map and `~psycopg2.errors` classes updated to
>   PostgreSQL 12.
> - Wheel package compiled against OpenSSL 1.1.1d and PostgreSQL at least 11.4.
> 
> 
> What's new in psycopg 2.8.3
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^
> 
> - Added *interval_status* parameter to
>   `~psycopg2.extras.ReplicationCursor.start_replication()` method and other
>   facilities to send automatic replication keepalives at periodic intervals
>   (🎫`[#913](psycopg/psycopg2#913).
> - Fixed namedtuples caching introduced in 2.8 (🎫`[#928](psycopg/psycopg2#928).
> 
> 
> What's new in psycopg 2.8.2
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^
> 
> - Fixed `~psycopg2.extras.RealDictCursor` when there are repeated columns
>   (🎫`[#884](psycopg/psycopg2#884).
> - Binary packages built with openssl 1.1.1b. Should fix concurrency problems
>   (🎟`[#543](psycopg/psycopg2#543), [#836](psycopg/psycopg2#836).
> 
> 
> What's new in psycopg 2.8.1
> ^^^^^^^^^^^^^^^^^^^^^^^^^^^
> 
> - Fixed `~psycopg2.extras.RealDictRow` modifiability (🎫`[#886](psycopg/psycopg2#886).
> - Fixed "there's no async cursor" error polling a connection with no cursor
>   (🎫`[#887](psycopg/psycopg2#887).
> 
> 
> What's new in psycopg 2.8
> -------------------------
> 
> New features:
></tr></table> ... (truncated)
</details>
<details>
<summary>Commits</summary>

- See full diff in [compare view](https://github.com/psycopg/psycopg2/commits)
</details>
<br />

Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting `@dependabot rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- `@dependabot rebase` will rebase this PR
- `@dependabot recreate` will recreate this PR, overwriting any edits that have been made to it
- `@dependabot merge` will merge this PR after your CI passes on it
- `@dependabot squash and merge` will squash and merge this PR after your CI passes on it
- `@dependabot cancel merge` will cancel a previously requested merge and block automerging
- `@dependabot reopen` will reopen this PR if it is closed
- `@dependabot close` will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
- `@dependabot ignore this major version` will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
- `@dependabot ignore this minor version` will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
- `@dependabot ignore this dependency` will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
- `@dependabot use these labels` will set the current labels as the default for future PRs for this repo and language
- `@dependabot use these reviewers` will set the current reviewers as the default for future PRs for this repo and language
- `@dependabot use these assignees` will set the current assignees as the default for future PRs for this repo and language
- `@dependabot use this milestone` will set the current milestone as the default for future PRs for this repo and language
- `@dependabot badge me` will comment on this PR with code to add a "Dependabot enabled" badge to your readme

Additionally, you can set the following in your Dependabot [dashboard](https://app.dependabot.com):
- Update frequency (including time of day and day of week)
- Pull request limits (per update run and/or open at any time)
- Automerge options (never/patch/minor, and dev/runtime dependencies)
- Out-of-range updates (receive only lockfile updates, if desired)
- Security updates (receive only security updates, if desired)



</details>

Co-authored-by: dependabot-preview[bot] <27856297+dependabot-preview[bot]@users.noreply.github.com>
@louis-pie

This comment has been minimized.

Copy link

@louis-pie louis-pie commented Oct 30, 2019

I am still getting this behavior with version 2.8.4
I am using wal2json and tested against PostgreSQL versions 9.6 and 11

Even though I am not sending any feedback to the server, the confirmed_flush_lsn keeps advancing

...
cur.start_replication(slot_name=slot, decode=True, start_lsn=start_lsn, options={'write-in-chunks': 1, 'add-tables': ','.join(selected_tables)})
...
while True:
    msg = cur.read_message()
    # cur.send_feedback()
...
@CyberDem0n

This comment has been minimized.

Copy link
Contributor

@CyberDem0n CyberDem0n commented Oct 30, 2019

@louis-pie is there any message received?
What is the position of start_lsn? Is it bigger or smaller than confirmed_flush_lsn?

I am not able to reproduce it.

@louis-pie

This comment has been minimized.

Copy link

@louis-pie louis-pie commented Oct 30, 2019

Hi @CyberDem0n

I performed another test and kept better record of the positions

  1. lsn positions
    restart_lsn = '0/6890470'
    confirmed_flush_lsn = '0/68904A8'
    pg_current_wal_lsn = '0/68904A8'

  2. Update one row

  3. lsn positions
    restart_lsn = '0/6890470'
    confirmed_flush_lsn = '0/68904A8'
    pg_current_wal_lsn = '0/6890728'

  4. Start replication from start_lsn=1
    cur.start_replication(slot_name=slot, decode=True, start_lsn=1, options={'write-in-chunks': 1, 'add-tables': ','.join(selected_tables)})
    This returned data

  5. lsn positions
    restart_lsn = '0/6890470'
    confirmed_flush_lsn = '0/68904A8'
    pg_current_wal_lsn = '0/6890808'

  6. Start replication from start_lsn=300000000
    cur.start_replication(slot_name=slot, decode=True, start_lsn=1, options={'write-in-chunks': 1, 'add-tables': ','.join(selected_tables)})
    This returned NO data

  7. lsn positions
    restart_lsn = '0/68906F0'
    confirmed_flush_lsn = '0/6890808'
    pg_current_wal_lsn = '0/6890808'

It seems it only automatically advances the confirmed_flush_lsn if no data has been received

@CyberDem0n

This comment has been minimized.

Copy link
Contributor

@CyberDem0n CyberDem0n commented Oct 30, 2019

  1. Start replication from start_lsn=300000000

Well, 300000000 is far ahead of confirmed_flush_lsn...
By starting from this position you are acknowledging that you don't care about events between 0/68904A8 and 0/11E1A300.

@louis-pie

This comment has been minimized.

Copy link

@louis-pie louis-pie commented Oct 30, 2019

Does that mean that psycopg sends a confirmed_flush_lsn=start_lsn when calling
cur.start_replication(slot_name=slot, decode=True, start_lsn=xxxxxxx) ?

This would explain the behavior and then probably not a bug

@CyberDem0n

This comment has been minimized.

Copy link
Contributor

@CyberDem0n CyberDem0n commented Oct 30, 2019

No, it doesn't. There is no direct relation between start_lsn and confirmed_flush_lsn.
In some cases the value of flush_lsn is incremented automatically from the keepalive messages:

  1. From time to time postgres send keepalive messages, with the current lsn on the server. The lsn value might grow even when there is no DDL. For example due to the vacuum activity might generate gigabytes of WAL. If we are not sending feedback the WAL would never be cleaned up. This is the first major problem of not sending feedback.

  2. Now let's imagine the situation when there were no data change events from the moment we started logical replication. Meanwhile, WAL position on the server moved from 0/10 to 0/20. If we don't confirm 0/20 that will prevent postgres from a clean shutdown. Why? Because postgres during fast or smart shutdown requests from the replication client immediate feedback and will continue running until it received the confirmation till the very last byte. This is the second major problem of not sending feedback.

@louis-pie

This comment has been minimized.

Copy link

@louis-pie louis-pie commented Oct 31, 2019

No, it doesn't. There is no direct relation between start_lsn and confirmed_flush_lsn.
In some cases the value of flush_lsn is incremented automatically from the keepalive messages:

1. From time to time postgres send keepalive messages, with the current lsn on the server. The lsn value might grow even when there is no DDL. For example due to the vacuum activity might generate gigabytes of WAL. If we are not sending feedback the WAL would never be cleaned up. This is the first major problem of not sending feedback.

2. Now let's imagine the situation when there were no data change events from the moment we started logical replication. Meanwhile, WAL position on the server moved from `0/10` to `0/20`. If we don't confirm `0/20` that will prevent postgres from a clean shutdown. Why? Because postgres during fast or smart shutdown requests from the replication client immediate feedback and will continue running until it received the confirmation till the very last byte. This is the second major problem of not sending feedback.

Hi @CyberDem0n ,

I am not really following what you mean.

RE 1. I don't think postgres sends a keep-alive message. It is up to the client to send the keep-alive message. Postgres will simply disconnect the client if no keep-alive was received before wal_sender_timeout is reached. A new feature from psycopg2 2.8.3 is sending this keep-alive message to avoid having to do it in the python script.

RE 2. Regardless of the reason the WAL position (pg_current_wal_lsn) had moved on, the confirmed_flush_lsn should not be updated automatically. The confirmed_flush_lsn should only be updated when the client sends the confirmation message that the lsn had been committed.


I have performed another test and set the start_lsn to be the same as the pg_current_wal_lsn without ever sending a feedback message.

  • lsn positions
    restart_lsn = '0/130541E8'
    confirmed_flush_lsn = '0/13054220'
    pg_current_wal_lsn = '0/130F8350'

  • start replication from '0/130F8350'

...
cur.start_replication(slot_name=slot, decode=True, start_lsn=319783760, options={'write-in-chunks': 1, 'add-tables': ','.join(selected_tables)})
...
while True:
    msg = cur.read_message()
    # cur.send_feedback()
...
  • lsn positions
    restart_lsn = '0/1305AB48'
    confirmed_flush_lsn = '0/130F8350'
    pg_current_wal_lsn = '0/130F8350'

This had the same result of the auto advancing the confirmed_flush_lsn
You say there is no relation between start_lsn and confirmed_flush_lsn , but the behavior of psycopg2 2.8.4 suggests otherwise.

@CyberDem0n

This comment has been minimized.

Copy link
Contributor

@CyberDem0n CyberDem0n commented Oct 31, 2019

I don't think postgres sends a keep-alive message. It is up to the client to send the keep-alive message.

This is just your assumption, not based on facts. When you are unsure about something - read the documentation. If it is still not clear - read the source code. I did both. If you still don't trust me - compile and run psycopg2 with debug, it will produce a lot of messages like here.
pq_read_replication_message: msg=k -- this is the Primary keepalive message, it is send by the server.

A new feature from psycopg2 2.8.3 is sending this keep-alive message to avoid having to do it in the python script.

The psycopg2 is sending Standby status update messages, not keep-alive!

I have performed another test and set the start_lsn to be the same as the pg_current_wal_lsn without ever sending a feedback message.

Look:

  1. You start from position X.
  2. There is no messages for your subscription, but WAL is being generated due to some other activity.
  3. After some timeout postgres will send you the Primary keepalive message. This message contains the current WAL position (LSN) on the server, Y.
  4. Y could be the same as X, but it also could be much higher than X, Gigabytes of data.
  5. In the psycopg2 we know for sure that from the moment when we started streaming (X) not a single data change event has arrived. Therefore it is safe to notify the server with flush_lsn=Y.

Does it makes sense?

This behavior shouldn't break your application logic. Your app doesn't get the start_lsn from nowhere, it knows 100% that before specified lsn it already received and processed all events, otherwise you wouldn't specify it.

@louis-pie

This comment has been minimized.

Copy link

@louis-pie louis-pie commented Oct 31, 2019

This is just your assumption, not based on facts. When you are unsure about something - read the documentation. If it is still not clear - read the source code. I did both. If you still don't trust me - compile and run psycopg2 with debug, it will produce a lot of messages like here.

The documentation link you sent is for the Streaming Replication Protocol , in other words the way the client interacts with the server. The client is required to send the keep-alive message.

I appreciate you wanting to help, but please, no need to be rude.

@jbergknoff-rival

This comment has been minimized.

Copy link
Author

@jbergknoff-rival jbergknoff-rival commented Oct 31, 2019

As far as I know (I dug into the postgres/psycopg2/wal2json source when I was debugging this issue a few months ago), everything @CyberDem0n said above is correct and relevant. The server is sending keepalive messages [1]. His response was (helpfully) verbose and precise and, in my opinion, not really rude. I believe it explains what you're seeing. I suggest rereading it.

[1] Note "The payload of each CopyData message from server to the client contains a message of one of the following formats:" in the linked docs.

@CyberDem0n

This comment has been minimized.

Copy link
Contributor

@CyberDem0n CyberDem0n commented Oct 31, 2019

The documentation link you sent is for the Streaming Replication Protocol

Exactly on this page there is a section about logical replication protocol:

START_REPLICATION SLOT slot_name LOGICAL XXX/XXX [ ( option_name [ option_value ] [, ...] ) ]

Instructs server to start streaming WAL for logical replication, starting at WAL location
XXX/XXX. The server can reply with an error, for example if the requested section of
WAL has already been recycled. On success, server responds with a CopyBothResponse
message, and then starts to stream WAL to the frontend.

The messages inside the CopyBothResponse messages are of the same format documented for
START_REPLICATION ... PHYSICAL.

Logical replication protocol is using messages in the same format as physical replication. Also the documentation is 100% clear about terminology, the postgres server sends Primary keepalive message (B) and the client required to send Standby status update (F) messages.

I appreciate you wanting to help, but please, no need to be rude.

I am very sorry about that, I don't really want to be rude.

@jbergknoff-rival I hope the issue you reported was solved in 2.8.4.

@louis-pie

This comment has been minimized.

Copy link

@louis-pie louis-pie commented Nov 1, 2019

I am very sorry about that, I don't really want to be rude.

Apology completely accepted ... lets meet for a beer sometime, I am buying.

Also ... please let's continue the discussion

Logical replication protocol is using messages in the same format as physical replication. Also the documentation is 100% clear about terminology, the postgres server sends Primary keepalive message (B) and the client required to send Standby status update (F) messages.

I believe, when I said the client sends the keep-alive message, what I really meant is the client sends a status-update message. And this status-update message is required to avoid the server disconnecting the session when wal_sender_timeout is reached.

I have spent many hours reading documentation and source code, but many more testing behavior, as that is what I need to understand when making design decisions.

1. You start from position X.
2. There is no messages for your subscription, but WAL is being generated due to some other activity.
3. After some timeout postgres will send you the **Primary keepalive message**. This message contains the current WAL position (LSN) on the server, Y.
4. Y could be the same as X, but it also could be much higher than X, Gigabytes of data.
5. In the psycopg2 we know for sure that from the moment when we started streaming (X) not a single data change event has arrived. Therefore it is safe to notify the server with flush_lsn=Y.

I see this behavior only when position X is the same or higher than pg_current_wal_lsn(). I understand that this can alleviate pressure on the source server and I agree that this is extremely unlikely to cause any data loss. That being said, I will avoid it completely by never sending a position X same or higher than pg_current_wal_lsn()

I thank you for this useful information, and especially thank you for your time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants
You can’t perform that action at this time.