Skip to content

Introduce 'pre-receiver' for more intuitive message order#2743

Merged
oskarth merged 1 commit intodevelopfrom
feature/spike-message-order-#2702
Jan 2, 2018
Merged

Introduce 'pre-receiver' for more intuitive message order#2743
oskarth merged 1 commit intodevelopfrom
feature/spike-message-order-#2702

Conversation

@oskarth
Copy link
Contributor

@oskarth oskarth commented Dec 16, 2017

Summary

Includes tests and explanation of logic. Doesn't change any behavior, just provides a new capability to be used when messages are received.

Partially addresses #2702 by adding capability to reorder messages before committing them. Keeps Lamport semantics but uses information in a (for end users) more intuitive way.

Details (copy-paste from test ns)

The tests in clocks.cljs only ensure that the local clock value is respected and that new messages are always appended correctly so we get a locally consistent view.

Additionally, a desirable property to have is that two people talking to each other have roughly the same ordering of messages. Example:

A and B are different chats with different chat identifiers. The sent clock-value represents that client's truth, but The Network (Whisper, etc) doesn't guarantee delivery order. This means a client can receive the messages in the following ordering.

(def messages [{:id "a" :clock-value 1 :payload "a1"}
               {:id "a" :clock-value 2 :payload "a2"}
               {:id "b" :clock-value 1 :payload "b1"}
               {:id "a" :clock-value 4 :payload "a4"}
               {:id "a" :clock-value 3 :payload "a3"}
               {:id "b" :clock-value 2 :payload "b2"}])

Empirically speaking, "a4" arriving before "a3" happens when messages are sent in quick succession, but the delay between these being delivered is usually very small, i.e. <100ms.

Given this delivery order, we have a design decision to make. We can either eagerly "commit" them, and thus update our local clock value to reflect the order we see messages in. Alternatively, we can pause the commit/full receive step and wait for some time for logically earlier messages arrive.

In 0.9.12 and earlier this is the behavior we had the former behavior, but this breaks users expectation. The tests below showcases the latter behavior, which can be turned on with a flag.

Review notes (optional):

Sanity check.

Testing notes (optional):

Nothing to test, code not called anywhere but in tests.

Steps to test:

Run tests, consider toggling invariant to see how current behavior causes unintuitive ordering (see e.g. #2699)

status: ready

@oskarth oskarth self-assigned this Dec 16, 2017
@janherich
Copy link
Contributor

janherich commented Dec 17, 2017

@oskarth This looks good, I have just one comment regarding the implementation:
We already have the core.async primitive which is doing the pausing (or chunking), documented, used and tested, defined in status-im.utils.async/chunked-pipe!.
Only thing which is necessary would be:

(def queue-flush-time 500)
(def receive-queue (async/chan))
(def chunked-receive-queue (async/chan))

(async-util/chunked-pipe! receive-queue chunked-receive-queue queue-flush-time)

(async/go-loop
  (doseq [msg (sort-by :clock-value (async/<! chunked-receive-queue))]
    (commit-msg msg))
  (recur))

(defn add-msg [msg]
  (async/put! receive-queue msg))

Even the code in go-loop is very similar to what we have in JailCalls deduplication impl, we can generalize that if we find that useful.

@oskarth
Copy link
Contributor Author

oskarth commented Dec 18, 2017

@janherich Interesting, thanks! Seems like I might've partially re-invented the wheel then :) I'll give your code a try soon and see if it works as expected (not sure about chunk vs streaming but delayed).

@oskarth oskarth force-pushed the feature/spike-message-order-#2702 branch from bbe6f71 to 993d800 Compare December 23, 2017 12:54
@oskarth
Copy link
Contributor Author

oskarth commented Jan 1, 2018

@janherich I played around with chunked-pipe a while ago and I don't think it solves the same problem. It deals with chunking, which is separate from streaming and reordering. If you don't have any other issues with the code, would you mind approving the PR so we can merge it? I'm introducing the actual behavior in a separate PR: #2860

If we want to generalize this into chunked and streaming pipes we can do but in a later issue, assuming it becomes a priority.

@oskarth
Copy link
Contributor Author

oskarth commented Jan 1, 2018

To elaborate on chunking, from chunked-pipe docstring:

flush-time parameter decides for how long we are waiting to accumulate
value from input channel in a vector before it's put on the output channel.

Which assumes we are accumulating into a vector and then flushing as once, as opposed to having it be a rolling window and each value being its own thing. At least that's how I understood it.

@janherich
Copy link
Contributor

@oskarth chunked-pipe establishes rolling window (time bounded), so we can establish some interval (let's say 2 seconds) and everything received during this interval will be accumulated into sequence which will be "committed" for processing after said interval expires.
By the time we "commit" (eq take from the chunked pipe) we are free to do any processing on the accumulated values, (in our case sorting according to sender :clock-value).

What's the problem with the code snippet I posted ?
I don't have any problem with code in this PR, I'm just against reinventing the wheel.

@oskarth
Copy link
Contributor Author

oskarth commented Jan 1, 2018

@janherich

so we can establish some interval (let's say 2 seconds) and everything received during this interval will be accumulated into sequence which will be "committed" for processing after said interval expires.

By the time we "commit" (eq take from the chunked pipe) we are free to do any processing on the accumulated values, (in our case sorting according to sender :clock-value).

As I said, we don't want to commit them all at once after time has expired. It's simply different semantics, as far as I understand it. I explained why above.

What's the problem with the code snippet I posted ?

For one it doesn't work :) (doseq not allowed). Secondly, I don't see how to actually maintain the same invariant/interface as outlined in the test https://github.com/status-im/status-react/pull/2743/files#diff-f6b999d8c22ac3c6e7611d049e2ed855R78

Looking at both the docstring and code implementation of chunked-pipe! it seems clear to me that:

  • Either flush? is true or not

  • If it isn't true, we wait for say 2s (or val on input-ch)

  • If 2s expired, and there are accumulated values, we flush them

  • Flushing means putting all accumulated values on output channel

  • Above implies that a message that comes at the tail end of a flush window won't wait a full 2s (or flush-time) until being output in accumulated vector.

  • This is a different semantic than what this PR introduces, since message each is guaranteed to be delayed for a certain amount of time before being committed.

@janherich
Copy link
Contributor

@oskarth thanks for the explanation, I understand the semantic difference now.

@oskarth oskarth force-pushed the feature/spike-message-order-#2702 branch from 993d800 to 08210c0 Compare January 2, 2018 09:56
Includes tests and explanation of logic. Doesn't change any behavior, just
provides a new capability to be used when messages are received.
@oskarth oskarth force-pushed the feature/spike-message-order-#2702 branch from 08210c0 to bb69995 Compare January 2, 2018 10:01
@oskarth oskarth merged commit bb69995 into develop Jan 2, 2018
@oskarth oskarth deleted the feature/spike-message-order-#2702 branch January 2, 2018 10:06
@oskarth
Copy link
Contributor Author

oskarth commented Jan 2, 2018

Merging as this PR doesn't change any behavior.

@oskarth
Copy link
Contributor Author

oskarth commented Jan 3, 2018

Mini-postmortem: pre-mature merge. Always confirm CI builds latest after rebase etc before merging. Hopefully easier with GH bot integration for visual feedback.

No idea what the connection between test in this PR and the following error is though.

Error:

    Testing status-im.test.utils.pre-receiver
    undefined
    /Users/oskarth/git/status-react/target/test/status_im/utils/utils.cljs:61
    (defn http-get
    ^
    ReferenceError: window is not defined
    ...
    Error encountered performing task 'doo' with profile(s): 'test'
    Subprocess failed

    Not sure what http-get has to do with this test ns though.

Quickfix in #2873

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants