Skip to content

Commit

Permalink
added support for retryable reads
Browse files Browse the repository at this point in the history
  • Loading branch information
zookzook committed Apr 11, 2020
1 parent 4ca5d82 commit a0963b3
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 14 deletions.
60 changes: 52 additions & 8 deletions lib/mongo.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ defmodule Mongo do
alias Mongo.Events
alias Mongo.Events.CommandSucceededEvent
alias Mongo.Events.CommandFailedEvent
alias Mongo.Error

@timeout 15000 # 5000

Expand Down Expand Up @@ -391,23 +392,39 @@ defmodule Mongo do
@doc """
This function is very fundamental.
"""
def issue_command(topology_pid, cmd, type, opts) do
def issue_command(topology_pid, cmd, :read, opts) do

new_cmd = case type do
:read -> ReadPreference.add_read_preference(cmd, opts)
:write -> cmd
end
new_cmd = ReadPreference.add_read_preference(cmd, opts)

Logger.debug("issue_command: #{inspect type} #{inspect new_cmd}")
## check, if retryable reads are enabled
opts = Mongo.retryable_reads(opts)

with {:ok, session} <- Session.start_implicit_session(topology_pid, type, opts),
with {:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts),
result <- exec_command_session(session, new_cmd, opts),
:ok <- Session.end_implict_session(topology_pid, session) do
case result do
{:error, error} ->
case Error.should_retry(error, cmd, opts) do
true -> issue_command(topology_pid, cmd, :read, Keyword.put(opts, :read_counter, 2))
false -> {:error, error}
end
_other -> result
end
else
{:new_connection, _server} ->
:timer.sleep(1000)
issue_command(topology_pid, cmd, :read, opts)
end
end
def issue_command(topology_pid, cmd, :write, opts) do
with {:ok, session} <- Session.start_implicit_session(topology_pid, :write, opts),
result <- exec_command_session(session, cmd, opts),
:ok <- Session.end_implict_session(topology_pid, session) do
result
else
{:new_connection, _server} ->
:timer.sleep(1000)
issue_command(topology_pid, cmd, type, opts)
issue_command(topology_pid, cmd, :write, opts)
end
end

Expand Down Expand Up @@ -1211,6 +1228,33 @@ defmodule Mongo do
|> Stream.map(fn coll -> coll["name"] end)
end

@doc """
In case of retryable reads are enabled, the keyword `:read_counter` is added with the value of 1.
In other cases like
* `:retryable_reads` is false or nil
* `:session` is nil
* `:read_counter` is nil
the `opts` is unchanged
## Example
iex> Mongo.retryable_reads([retryable_reads: true])
[retryable_reads: true, read_counter: 1]
"""
def retryable_reads(opts) do
case opts[:read_counter] do
nil -> case opts[:retryable_reads] == true && opts[:session] == nil do
true -> opts ++ [read_counter: 1]
false -> opts
end
_other -> opts
end
end

defp get_stream(topology_pid, cmd, opts) do
Mongo.Stream.new(topology_pid, cmd, opts)
end
Expand Down
20 changes: 20 additions & 0 deletions lib/mongo/change_stream.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
defmodule Mongo.ChangeStream do

alias Mongo.Session
alias Mongo.Error

import Record, only: [defrecordp: 2]

defstruct [:topology_pid, :session, :doc, :cmd, :on_resume_token, :opts]

def new(topology_pid, cmd, on_resume_token_fun, opts) do

## check, if retryable reads are enabled
opts = Mongo.retryable_reads(opts)

with new_cmd = Mongo.ReadPreference.add_read_preference(cmd, opts),
{:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts),
{:ok, %{"ok" => ok} = doc} when ok == 1 <- Mongo.exec_command_session(session, new_cmd, opts) do
Expand All @@ -18,8 +23,15 @@ defmodule Mongo.ChangeStream do
cmd: cmd,
opts: opts
}
else
{:error, error} ->
case Error.should_retry(error, cmd, opts) do
true -> new(topology_pid, cmd, on_resume_token_fun, Keyword.put(opts, :read_counter, 2))
false -> {:error, error}
end
end
end

defimpl Enumerable do

defrecordp :change_stream, [:resume_token, :op_time, :cmd, :on_resume_token]
Expand Down Expand Up @@ -63,6 +75,12 @@ defmodule Mongo.ChangeStream do
{:ok, %{"ok" => ok} = doc} when ok == 1 <- Mongo.exec_command_session(session, new_cmd, opts) do

aggregate(topology_pid, session, doc, cmd, fun)
else
{:error, error} ->
case Error.should_retry(error, cmd, opts) do
true -> aggregate(topology_pid, cmd, fun, Keyword.put(opts, :read_counter, 2))
false -> {:error, error}
end
end
end

Expand Down Expand Up @@ -102,6 +120,8 @@ defmodule Mongo.ChangeStream do
end
end



@doc """
Calls the GetCore-Command
See https://github.com/mongodb/specifications/blob/master/source/find_getmore_killcursors_commands.rst
Expand Down
37 changes: 32 additions & 5 deletions lib/mongo/error.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
defmodule Mongo.Error do
defexception [:message, :code, :host, :error_labels, :resumable]

alias Mongo.Events

defexception [:message, :code, :host, :error_labels, :resumable, :retryable_reads]

@host_unreachable 6
@host_not_found 7
Expand All @@ -19,6 +22,10 @@ defmodule Mongo.Error do
@retry_change_stream 234
@failed_to_satisfy_read_preference 133

@retryable_reads [@interrupted_at_shutdown, @interrupted_due_to_repl_state_change, @not_master,
@not_master_no_slaveok, @not_master_or_secondary, @primary_stepped_down,
@host_not_found, @host_unreachable , @network_timeout, @socket_exception]

@resumable [@host_unreachable, @host_not_found, @network_timeout, @shutdown_in_progress, @primary_stepped_down,
@exceeded_time_limit, @socket_exception, @not_master, @interrupted_at_shutdown, @interrupted_at_shutdown,
@interrupted_due_to_repl_state_change, @not_master_no_slaveok, @not_master_or_secondary, @stale_shard_version,
Expand All @@ -29,7 +36,8 @@ defmodule Mongo.Error do
code: number,
host: String.t,
error_labels: [String.t] | nil,
resumable: boolean
resumable: boolean,
retryable_reads: boolean
}

def message(e) do
Expand All @@ -48,9 +56,10 @@ defmodule Mongo.Error do
end

def exception(%{"code" => code, "errmsg" => msg} = doc) do
errorLabels = doc["errorLabels"] || []
resumable = Enum.any?(@resumable, &(&1 == code)) || Enum.any?(errorLabels, &(&1 == "ResumableChangeStreamError"))
%Mongo.Error{message: msg, code: code, error_labels: errorLabels, resumable: resumable}
errorLabels = doc["errorLabels"] || []
resumable = Enum.any?(@resumable, &(&1 == code)) || Enum.any?(errorLabels, &(&1 == "ResumableChangeStreamError"))
retryable_reads = Enum.any?(@retryable_reads, &(&1 == code)) || Enum.any?(errorLabels, &(&1 == "RetryableReadError"))
%Mongo.Error{message: msg, code: code, error_labels: errorLabels, resumable: resumable, retryable_reads: retryable_reads}
end
def exception(message: message, code: code) do
%Mongo.Error{message: message, code: code, resumable: Enum.any?(@resumable, &(&1 == code))}
Expand All @@ -59,6 +68,24 @@ defmodule Mongo.Error do
def exception(message) do
%Mongo.Error{message: message, resumable: false}
end

@doc """
Return true if the error is retryale for read operations.
"""
def should_retry(%Mongo.Error{retryable_reads: true}, cmd, opts) do
[{command_name,_}|_] = cmd

result = (command_name != :getMore and opts[:read_counter] == 1)
if result do
Events.notify(%Mongo.Events.RetryReadEvent{command_name: command_name, command: cmd}, :commands)
end

result
end
def should_retry(_error, _cmd, _opts) do
false
end

end

defmodule Mongo.WriteError do
Expand Down
9 changes: 9 additions & 0 deletions lib/mongo/events.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,15 @@ defmodule Mongo.Events do
]
end

defmodule RetryReadEvent do
@moduledoc false

defstruct [
:command, ## Returns the command.
:command_name, ## Returns the command name.
]
end

# Published when server description changes, but does NOT include changes to
# the RTT
defmodule ServerDescriptionChangedEvent do
Expand Down
11 changes: 11 additions & 0 deletions lib/mongo/stream.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule Mongo.Stream do

alias Mongo.Session
alias Mongo.Error

import Record, only: [defrecordp: 2]

Expand All @@ -9,6 +10,10 @@ defmodule Mongo.Stream do
alias Mongo.Session

def new(topology_pid, cmd, opts) do

## check, if retryable reads are enabled
opts = Mongo.retryable_reads(opts)

with cmd = Mongo.ReadPreference.add_read_preference(cmd, opts),
{:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts),
{:ok,
Expand All @@ -19,6 +24,12 @@ defmodule Mongo.Stream do
"firstBatch" => docs}}} when ok == 1 <- Mongo.exec_command_session(session, cmd, opts) do

%Mongo.Stream{topology_pid: topology_pid, session: session, cursor: cursor_id, coll: coll, docs: docs, opts: opts}
else
{:error, error} ->
case Error.should_retry(error, cmd, opts) do
true -> new(topology_pid, cmd, Keyword.put(opts, :read_counter, 2))
false -> {:error, error}
end
end
end

Expand Down
2 changes: 1 addition & 1 deletion test/mongo/cursor_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ defmodule Mongo.CursorTest do

# issue #35: Crash executing find function without enough permission
test "matching errors in the next function of the stream api", c do
assert {:error, %Mongo.Error{__exception__: true, code: 2, error_labels: [], host: nil, message: "unknown operator: $gth", resumable: false}} == Mongo.find(c.pid, "test", [_id: ["$gth": 1]])
assert {:error, %Mongo.Error{__exception__: true, code: 2, error_labels: '', host: nil, message: "unknown operator: $gth", resumable: false, retryable_reads: false}} == Mongo.find(c.pid, "test", [_id: ["$gth": 1]])
end

end
96 changes: 96 additions & 0 deletions test/mongo/retryable_reads_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
defmodule Mongo.RetryableReadsTest do
use ExUnit.Case

alias Mongo.Error
alias Mongo.Session

setup_all do
assert {:ok, pid} = Mongo.TestConnection.connect
Mongo.drop_database(pid)
{:ok, [pid: pid]}
end

setup do
{:ok, catcher} = EventCatcher.start_link()

on_exit(fn -> EventCatcher.stop(catcher) end)

[catcher: catcher]
end

test "find_one", %{pid: top, catcher: catcher} do

coll = "retryable_reads_1"
Mongo.insert_one(top, coll, %{name: "Greta", age: 10})
Mongo.insert_one(top, coll, %{name: "Tom", age: 13})
Mongo.insert_one(top, coll, %{name: "Waldo", age: 5})
Mongo.insert_one(top, coll, %{name: "Oska", age: 3})

assert {:ok, 4} == Mongo.count(top, coll, %{})

cmd = [
configureFailPoint: "failCommand",
mode: [times: 1],
data: [errorCode: 6, failCommands: ["find"]]
]

Mongo.admin_command(top, cmd)
{:error, %Error{code: 6, retryable_reads: true}} = Mongo.find_one(top, coll, %{"name" => "Waldo"})

Mongo.admin_command(top, cmd)
assert %{"_id" => _id, "age" => 5, "name" => "Waldo"} = Mongo.find_one(top, coll, %{"name" => "Waldo"}, retryable_reads: true)

assert [:find | _] = EventCatcher.retryable_read_events(catcher) |> Enum.map(fn event -> event.command_name end)
end

test "find_one in transaction", %{pid: top, catcher: catcher} do

coll = "retryable_reads_2"
Mongo.insert_one(top, coll, %{name: "Greta", age: 10})
Mongo.insert_one(top, coll, %{name: "Tom", age: 13})
Mongo.insert_one(top, coll, %{name: "Waldo", age: 5})
Mongo.insert_one(top, coll, %{name: "Oska", age: 3})

assert {:ok, 4} == Mongo.count(top, coll, %{})

cmd = [
configureFailPoint: "failCommand",
mode: [times: 1],
data: [errorCode: 6, failCommands: ["find"]]
]

{:ok, session} = Session.start_session(top, :read, [])

Mongo.admin_command(top, cmd)
{:error, %Error{code: 6, retryable_reads: true}} = Mongo.find_one(top, coll, %{"name" => "Waldo"}, retryable_reads: true, session: session)

Session.end_session(top, session)

assert [] = EventCatcher.retryable_read_events(catcher) |> Enum.map(fn event -> event.command_name end)
end

test "count", %{pid: top, catcher: catcher} do

coll = "retryable_reads_count"
Mongo.insert_one(top, coll, %{name: "Greta", age: 10})
Mongo.insert_one(top, coll, %{name: "Tom", age: 13})
Mongo.insert_one(top, coll, %{name: "Waldo", age: 5})
Mongo.insert_one(top, coll, %{name: "Oska", age: 3})

assert {:ok, 4} == Mongo.count(top, coll, %{})

cmd = [
configureFailPoint: "failCommand",
mode: [times: 1],
data: [errorCode: 6, failCommands: ["count"]]
]

Mongo.admin_command(top, cmd)
{:error, %Error{code: 6, retryable_reads: true}} = Mongo.count(top, coll, %{})

Mongo.admin_command(top, cmd)
assert {:ok, 4} == Mongo.count(top, coll, %{}, retryable_reads: true)

assert [:count | _] = EventCatcher.retryable_read_events(catcher) |> Enum.map(fn event -> event.command_name end)
end
end
Loading

0 comments on commit a0963b3

Please sign in to comment.