Skip to content

Commit

Permalink
Batch longpoll messages to resolve race conditions (#5283)
Browse files Browse the repository at this point in the history
* Batch longpoll messages to resolve race condition on publish
  • Loading branch information
chrismccord committed Feb 17, 2023
1 parent 2c80cca commit 2674c6e
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 10 deletions.
37 changes: 33 additions & 4 deletions assets/js/phoenix/longpoll.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ export default class LongPoll {
this.token = null
this.skipHeartbeat = true
this.reqs = new Set()
this.awaitingBatchAck = false
this.currentBatch = null
this.currentBatchTimer = null
this.batchBuffer = []
this.onopen = function (){ } // noop
this.onerror = function (){ } // noop
this.onmessage = function (){ } // noop
Expand Down Expand Up @@ -45,7 +49,7 @@ export default class LongPoll {
isActive(){ return this.readyState === SOCKET_STATES.open || this.readyState === SOCKET_STATES.connecting }

poll(){
this.ajax("GET", null, () => this.ontimeout(), resp => {
this.ajax("GET", "application/json", null, () => this.ontimeout(), resp => {
if(resp){
var {status, token, messages} = resp
this.token = token
Expand Down Expand Up @@ -100,11 +104,33 @@ export default class LongPoll {
})
}

// we collect all pushes within the current event loop by
// setTimeout 0, which optimizes back-to-back procedural
// pushes against an empty buffer
send(body){
this.ajax("POST", body, () => this.onerror("timeout"), resp => {
if(this.currentBatch){
this.currentBatch.push(body)
} else if(this.awaitingBatchAck){
this.batchBuffer.push(body)
} else {
this.currentBatch = [body]
this.currentBatchTimer = setTimeout(() => {
this.batchSend(this.currentBatch)
this.currentBatch = null
}, 0)
}
}

batchSend(messages){
this.awaitingBatchAck = true
this.ajax("POST", "application/ndjson", messages.join("\n"), () => this.onerror("timeout"), resp => {
this.awaitingBatchAck = false
if(!resp || resp.status !== 200){
this.onerror(resp && resp.status)
this.closeAndRetry(1011, "internal server error", false)
} else if(this.batchBuffer.length > 0){
this.batchSend(this.batchBuffer)
this.batchBuffer = []
}
})
}
Expand All @@ -113,20 +139,23 @@ export default class LongPoll {
for(let req of this.reqs){ req.abort() }
this.readyState = SOCKET_STATES.closed
let opts = Object.assign({code: 1000, reason: undefined, wasClean: true}, {code, reason, wasClean})
this.batchBuffer = []
clearTimeout(this.currentBatchTimer)
this.currentBatchTimer = null
if(typeof(CloseEvent) !== "undefined"){
this.onclose(new CloseEvent("close", opts))
} else {
this.onclose(opts)
}
}

ajax(method, body, onCallerTimeout, callback){
ajax(method, contentType, body, onCallerTimeout, callback){
let req
let ontimeout = () => {
this.reqs.delete(req)
onCallerTimeout()
}
req = Ajax.request(method, this.endpointURL(), "application/json", body, this.timeout, ontimeout, resp => {
req = Ajax.request(method, this.endpointURL(), contentType, body, this.timeout, ontimeout, resp => {
this.reqs.delete(req)
if(this.isActive()){ callback(resp) }
})
Expand Down
16 changes: 15 additions & 1 deletion lib/phoenix/transports/long_poll.ex
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,21 @@ defmodule Phoenix.Transports.LongPoll do
defp publish(conn, server_ref, endpoint, opts) do
case read_body(conn, []) do
{:ok, body, conn} ->
status = transport_dispatch(endpoint, server_ref, body, opts)
# we need to match on both v1 and v2 protocol, as well as wrap for backwards compat
batch =
case get_req_header(conn, "content-type") do
["application/ndjson"] -> String.split(body, ["\n", "\r\n"])
_ -> [body]
end

{conn, status} =
Enum.reduce_while(batch, {conn, nil}, fn msg, {conn, _status} ->
case transport_dispatch(endpoint, server_ref, msg, opts) do
:ok -> {:cont, {conn, :ok}}
:request_timeout = timeout -> {:halt, {conn, timeout}}
end
end)

conn |> put_status(status) |> status_json()

_ ->
Expand Down
79 changes: 74 additions & 5 deletions test/phoenix/integration/long_poll_channels_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,13 @@ defmodule Phoenix.Integration.LongPollChannelsTest do
"""
def poll(method, path, vsn, params, json \\ nil, headers \\ %{}) do
{serializer, json} = serializer(vsn, json)
headers = Map.merge(%{"content-type" => "application/json"}, headers)
headers =
if is_list(json) do
Map.merge(%{"content-type" => "application/ndjson"}, headers)
else
Map.merge(%{"content-type" => "application/json"}, headers)
end

body = encode(serializer, json)
query_string = params |> Map.put("vsn", vsn) |> URI.encode_query()
url = "http://127.0.0.1:#{@port}#{path}/longpoll?" <> query_string
Expand All @@ -168,8 +174,12 @@ defmodule Phoenix.Integration.LongPollChannelsTest do
end

defp serializer("2." <> _, json), do: {V2.JSONSerializer, json}
defp serializer(_, json) do
{V1.JSONSerializer, json && Map.delete(json, "join_ref")}
defp serializer(_, nil), do: {V1.JSONSerializer, nil}
defp serializer(_, batch) when is_list(batch) do
{V1.JSONSerializer, for(msg <- batch, do: Map.delete(msg, "join_ref"))}
end
defp serializer(_, %{} = json) do
{V1.JSONSerializer, json}
end

defp decode_body(serializer, %{} = resp) do
Expand All @@ -184,13 +194,19 @@ defmodule Phoenix.Integration.LongPollChannelsTest do

defp encode(_vsn, nil), do: ""

defp encode(V2.JSONSerializer, map) do
defp encode(V2.JSONSerializer = serializer, batch) when is_list(batch) do
batch
|> Enum.map(&encode(serializer, &1))
|> Enum.join("\n")
end

defp encode(V2.JSONSerializer, %{} = map) do
Phoenix.json_library().encode!(
[map["join_ref"], map["ref"], map["topic"], map["event"], map["payload"]]
)
end

defp encode(V1.JSONSerializer, map), do: Phoenix.json_library().encode!(map)
defp encode(V1.JSONSerializer, %{} = map), do: Phoenix.json_library().encode!(map)

@doc """
Joins a long poll socket.
Expand Down Expand Up @@ -385,6 +401,59 @@ defmodule Phoenix.Integration.LongPollChannelsTest do
end
end

test "#{@mode}: lonpoll publishing batch events on v2 protocol" do
vsn = "2.0.0"
Phoenix.PubSub.subscribe(__MODULE__, "room:lobby")
session = join("/ws", "room:lobby", vsn, "1", @mode)
# Publish successfully
resp =
poll(:post, "/ws", vsn, session, [
%{
"topic" => "room:lobby",
"event" => "new_msg",
"ref" => "2",
"join_ref" => "1",
"payload" => %{"body" => "hi1"}
},
%{
"topic" => "room:lobby",
"event" => "new_msg",
"ref" => "3",
"join_ref" => "1",
"payload" => %{"body" => "hi2"}
}
])


assert resp.body["status"] == 200
assert_receive %Broadcast{event: "new_msg", payload: %{"body" => "hi1"}}
assert_receive %Broadcast{event: "new_msg", payload: %{"body" => "hi2"}}

# Get published message
resp = poll(:get, "/ws", vsn, session)
assert resp.body["status"] == 200

assert [
_phx_reply,
_user_entered,
_joined,
%Message{
topic: "room:lobby",
event: "new_msg",
payload: %{"body" => "hi1", "transport" => ":longpoll"},
ref: nil,
join_ref: "1"
},
%Message{
topic: "room:lobby",
event: "new_msg",
payload: %{"body" => "hi2", "transport" => ":longpoll"},
ref: nil,
join_ref: "1"
}
] = resp.body["messages"]
end

test "#{@mode}: shuts down after timeout" do
session = join("/ws", "room:lobby", @vsn, "1", @mode)

Expand Down

0 comments on commit 2674c6e

Please sign in to comment.