Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow setFlushedLSN(lsn) and setAppliedLSN(lsn) from outside main loop #1329

Merged
merged 4 commits into from Nov 26, 2018

Conversation

Projects
None yet
4 participants
@pbillen
Copy link
Contributor

commented Nov 5, 2018

Hi,

TL;DR version

Marking the fields holding the sequence numbers as volatile will allow the API user to call setFlushedLSN(lsn) and setAppliedLSN(lsn) (and also the getters) from a thread different than the thread doing the main loop. This is useful when doing a blocking read(), as it allows another thread to actually acknowledge an event as flushed/applied.

Without this change, one cannot combine read() and parallel processing+acknowledging of WAL events, as there is no guarantee that the updated values will be seen by the main replication loop.

long version

I'm working on a logical replication consumer implementation which forwards replication event to a queue service (RabbitMQ in this instance). The flow is as following:

  1. The main loop consuming the replication events uses read() to collect replication events. This is a blocking call, it only returns when a new replication event is read.
  2. The replication events, together with their corresponding LSN, are pushed to a BlockingQueue (https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html)
  3. One or more threads consume this queue and send the replication events to RabbitMQ, following certain policies.
  4. From the moment RabbitMQ confirms, we can acknowledge to PostgreSQL, if all previous events were also confirmed by RabbitMQ.

In this flow, replication events are independent from each other. Therefore, it is correct to have multiple active threads in (3) above.

The main replication loop is as following:

final PGReplicationStream stream = ...;
final BlockingQueue queue = ...;

while (true) {
    final ByteBuffer bb = stream.read(); // this blocks
    final Item = new Item(bb, stream.getLastReceivedLSN());

    queue.put(item); // this might also block if the queue is full
}

The main RabbitMQ loops is/are as following:

final PGReplicationStream stream = ...; // the same stream as above
final BlockingQueue queue = ...; // the same queue as above
final RabbitMQClient client = ...;

while (true) {
    final Item item = queue.take(); // this might block

    // send to RabbitMQ
    final boolean OK = client.send(item.getBuffer());

    // acknowledge to PostgreSQL
    if (OK && all previous events were also OK) { // this typically happens asynchronously
        stream.setFlushedLSN(item.getLSN());
    }
}

The change is in this pull request is related to stream.setFlushedLSN(item.getLSN()) in the code snippet above. Without declaring the LSN members as volatile, there is no guarantee that the replication main loop will (ever) notice the update, as it could continue to use its cached value.

pbillen
Marking the fields holding the sequence numbers as will allow the API…
… user to call setFlushedLSN() and setAppliedLSN() (and also the getters) from a thread different than the thread doing the main loop. This is useful when doing a blocking read(), as it allows another thread to actually acknowledge an event as flushed/applied. Without this, one cannot combine read() and parallel processing of WAL events.
@pbillen

This comment has been minimized.

Copy link
Contributor Author

commented Nov 5, 2018

@davecramer @vlsi Do you see a window to include this in the 42.3 release? It would be great to support the flow as explained. Thanks.

@davecramer

This comment has been minimized.

Copy link
Member

commented Nov 5, 2018

@pbillen this isn't a semantic change so I don't see a problem.

That said I think I'd rather synchronize the getters and setters as that seems to be the intent?

@pbillen

This comment has been minimized.

Copy link
Contributor Author

commented Nov 6, 2018

@pbillen this isn't a semantic change so I don't see a problem.

That said I think I'd rather synchronize the getters and setters as that seems to be the intent?

Thank you.

I believe adding volatile has the least impact, if any. You could add synchronized to the getters and setters, but for the main loop to actually pick up the updated values of the members lastAppliedLSN and lastFlushedLSN after a setter is called, V3PGReplicationStream.timeUpdateStatus() must be synchronized as well.
In addition, for getLastReceiveLSN() to become 'visible' outside the main loop (which is beyond the scope of my question but could be useful nonetheless - for example if you want to calculate the current replication lag), you have to synchronize V3PGReplicationStream.processKeepAliveMessage() and V3PGReplicationStream.processXLogData() as well, to create a memory barrier.

Adding volatile avoids all this. Since V3PGReplicationStream is by nature a single-threaded implementation (it doesn't make sense to have multiple main loops), you don't have the (minimal) overhead.

That said, it would work, of course. So feel free to let me know if you want me to alter the current change!

Thanks again.

@vlsi

This comment has been minimized.

Copy link
Member

commented Nov 6, 2018

@pbillen , A class is either thread-safe or it is not.

I see that making org.postgresql.replication.PGReplicationStream thread-safe might be good (I would +1 re that change), however it would take a bit more than adding a couple of volatile here and there.
For instance, there's org.postgresql.replication.PGReplicationStream#forceUpdateStatus which would likely blow up in case of a true concurrency.

In the mean time, there's a workaround to use AtomicReference to pass FlushedLSN to the pg-stream operating thread.
In other words, your stream.setFlushedLSN(item.getLSN()); becomes flushedLsn.set(item.getLSN()), and you just add stream.setFlushedLSN(flushedLsn.get()); to the "main" thread.

@pbillen

This comment has been minimized.

Copy link
Contributor Author

commented Nov 6, 2018

@vlsi

Thanks.

My intention was actually not to create the whole class thread-safe. As said, it should by default controlled by a single main thread, due to the nature of having a single while(true) { read socket } loop. That is, I cannot see a valid use case for two threads doing read()/readPending() at the same time.

My only intention was to allow updates of the LSN from outside the main thread, which is achieved by making these members volatile, as far as I can see.

That said, you have a good point regarding forceUpdateStatus(). There could be a use case to call forceUpdateStatus() also from another thread than the main thread. I did not take that into account.
We could either improve that as well (which would require true synchronization), or add documentation which explains the valid concurrency use cases. (In that sense, a class could be perfectly 'semi' thread-safe, as long as its valid use cases are documented.)

I'm not sure how your suggestion would work in case of a blocking read() main thread. The main thread could be blocking on read(), while one of the other threads wants to update the flushed or applied LSN. We could pass that value to the main thread (for example with AtomicReference as you explained), but there is no opportunity for the main thread to pass that on his behalf to the stream, because it's blocked on read(). It might be blocking 'forever', if no new events are coming in.
In other words, we want to update the LSN in the next status update, not in the status update which would happen after the next event would arrive. (Note that we do not have that problem when using a non-blocking readPending() in the main thread.)
By providing a possibility to pass LSN updates directly to the stream (which volatile does), the internal status handler in V3PGReplicationStream will pick up that change and send it as an update to the postgres backend in the next update/keepalive message.
I might be missing something, so please feel free to correct me here.

How do you propose to continue here? We could

  • allow this change and document properly that forceUpdateStatus(), read(), readPending(), close() and isClosed() cannot be called from another thread;
  • make the class true thread-safe. Assuming that CopyDual is thread-safe, and that we can read and write concurrently, that should not be that hard.

Thanks again.

@vlsi

This comment has been minimized.

Copy link
Member

commented Nov 6, 2018

We could either improve that as well (which would require true synchronization), or add documentation which explains the valid concurrency use cases. (In that sense, a class could be perfectly 'semi' thread-safe, as long as its valid use cases are documented.)

The history behind JDBC spec is as follows:
v0) All classes are not thread safe
v1) Certain methods are thread-safe, because one needs to call .cancel() and .close() from a different thread
v2) "v1 threading is too complicated" ==> all the classes should be thread-safe. // <-- this is the current state of JDBC spec

The main thread could be blocking on read()

Thats is a valid point. Then adding volatile to lastAppliedLSN and lastFlushedLSN makes sens.

There's more: replication stream could fetch MULTIPLE messages from the backend at once, and there's no one-to-one match between org.postgresql.core.v3.replication.V3PGReplicationStream#read and org.postgresql.core.v3.replication.V3PGReplicationStream#getLastReceiveLSN.
It might be a different bug though.
The thing is replication messages are added to a queue (org.postgresql.core.v3.CopyDualImpl#received), however lastReceiveLSN is updated immediately.

@pbillen

This comment has been minimized.

Copy link
Contributor Author

commented Nov 6, 2018

The history behind JDBC spec is as follows:
v0) All classes are not thread safe
v1) Certain methods are thread-safe, because one needs to call .cancel() and .close() from a different thread
v2) "v1 threading is too complicated" ==> all the classes should be thread-safe. // <-- this is the current state of JDBC spec

Thats is a valid point. Then adding volatile to lastAppliedLSN and lastFlushedLSN makes sens.

Do you see the change which only marks the fields volatile as mergeable, or you do propose additional synchronization? It depends on the use cases we want to support, I'd say. My specific use case is covered with volatile, but given your JDBC history, we might want to make the whole class thread-safe? I can work on that if you want.

There's more: replication stream could fetch MULTIPLE messages from the backend at once, and there's no one-to-one match between org.postgresql.core.v3.replication.V3PGReplicationStream#read and org.postgresql.core.v3.replication.V3PGReplicationStream#getLastReceiveLSN.
It might be a different bug though.
The thing is replication messages are added to a queue (org.postgresql.core.v3.CopyDualImpl#received), however lastReceiveLSN is updated immediately.

I understand your remark, but let me verify I can see it in the code. In processXLogData(), we process the received buffer. If I understand this correctly, this buffer could container several WAL events, not just one. However, in processXLogData(), we update lastReceiveLSN to the LSN of the first message. We should return the event+corresponding LSN, though that will break the current API.

Is this what you meant?

@vlsi

This comment has been minimized.

Copy link
Member

commented Nov 6, 2018

Do you see the change which only marks the fields volatile as mergeable

That looks good to me.
However I do not see why make lastReceiveLSN volatile.

we might want to make the whole class thread-safe? I can work on that if you want.

I think it would be great, however we make little steps as making setFlushedLSN thread-safe would simplify replication clients.

@vlsi

This comment has been minimized.

Copy link
Member

commented Nov 6, 2018

Is this what you meant?

Exactly.
It looks like we'd want to queue lastServerLSN values, so getLastReceiveLSN produces the values that are aligned with read

@pbillen

This comment has been minimized.

Copy link
Contributor Author

commented Nov 6, 2018

Do you see the change which only marks the fields volatile as mergeable

That looks good to me.
However I do not see why make lastReceiveLSN volatile.

I marked that volatile as well, as that could be useful to calculate replication lag in another thread: you could calculate the offset between the LSN of the current event being processed in that thread and the lastReceiveLSN of the stream.

It could be considered as one of these small baby steps, but I can remove it as well. Let me know.

we might want to make the whole class thread-safe? I can work on that if you want.

I think it would be great, however we make little steps as making setFlushedLSN thread-safe would simplify replication clients.

I'll start working on this soon. I'll do this in a separate pull request.

@pbillen

This comment has been minimized.

Copy link
Contributor Author

commented Nov 6, 2018

Is this what you meant?

Exactly.
It looks like we'd want to queue lastServerLSN values, so getLastReceiveLSN produces the values that are aligned with read

Great, got it. I will take this up in a separate pull request. I believe it should be easy to do. My main concern is how to test/simulate this.

@vlsi

This comment has been minimized.

Copy link
Member

commented Nov 6, 2018

My main concern is how to test/simulate this.

Is it something that happens by default?
Would the following work?

  1. sleep(5 seconds), ensure backend generates 2-5 messages
  2. read messages one by one and verify that LSN is increasing
@pbillen

This comment has been minimized.

Copy link
Contributor Author

commented Nov 6, 2018

My main concern is how to test/simulate this.

Is it something that happens by default?
Would the following work?

  1. sleep(5 seconds), ensure backend generates 2-5 messages
  2. read messages one by one and verify that LSN is increasing

Good idea. I will try it out.

Though, thinking further, if the buffer can contain multiple actual WAL events, I don't see a way to break that full buffer into multiple sub-buffers. There is no indication of payload length in case of logical replication. So after the buffer.slice() in processXLogData, the buffer now contains <payload event 1><header event 2><payload event 2><header event 3><payload event 3>...?

I am probably still missing something.

I'll create a new pull request soon, so we can discuss there.

@vlsi

This comment has been minimized.

Copy link
Member

commented Nov 7, 2018

you could calculate the offset between the LSN of the current event being processed

I'm not sure how accurate LSN maths is, however it sounds reasonable.
I'm +1 re adding volatile there provided the case is documented.

Would you please add relevant documentation to javadocs of the setters/getters regarding thread-safety as you add volatile?
For instance, getLastReceiveLSN should suggest to use that method in the same thread that calls read

pbillen added some commits Nov 7, 2018

pbillen
@pbillen

This comment has been minimized.

Copy link
Contributor Author

commented Nov 7, 2018

@vlsi Good idea. I have updated the javadocs to reflect these changes. If you have some time, could you please review it?

Thanks again.

pbillen
@codecov-io

This comment has been minimized.

Copy link

commented Nov 7, 2018

Codecov Report

Merging #1329 into master will increase coverage by <.01%.
The diff coverage is 100%.

@@             Coverage Diff              @@
##             master    #1329      +/-   ##
============================================
+ Coverage     68.57%   68.57%   +<.01%     
+ Complexity     3883     3882       -1     
============================================
  Files           178      178              
  Lines         16205    16205              
  Branches       2646     2646              
============================================
+ Hits          11112    11113       +1     
+ Misses         3862     3860       -2     
- Partials       1231     1232       +1
@pbillen

This comment has been minimized.

Copy link
Contributor Author

commented Nov 26, 2018

@davecramer @vlsi

Javadocs have been altered, so all open tasks should have been handled. Could you please review and hopefully merge? I believe everything for the first step (= allowing flush from another thread, hence making these fields volatile) is ready to be merged.

Thank you.

@vlsi

This comment has been minimized.

Copy link
Member

commented Nov 26, 2018

LGTM.

@davecramer

This comment has been minimized.

Copy link
Member

commented Nov 26, 2018

Let me get travis fixed and I'll push this

@pbillen

This comment has been minimized.

Copy link
Contributor Author

commented Nov 26, 2018

Thank you, appreciated!

@davecramer davecramer merged commit 381cf45 into pgjdbc:master Nov 26, 2018

2 checks passed

codecov/project 68.57% (+<.01%) compared to e2623d6
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.