Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection manager #278

Merged
merged 13 commits into from Feb 26, 2020
Merged

Connection manager #278

merged 13 commits into from Feb 26, 2020

Conversation

dbrgn
Copy link
Contributor

@dbrgn dbrgn commented Jan 30, 2020

Early draft of a ConnectionManager that would address the reconnect behavior discussed in #218.

Would be great to get some feedback on it. @Marwes you've written something like this in the past, right?

To test, run cargo run --example async-connection-loss --features="tokio-rt-core" reconnect and then restart and/or stop redis. Output for a 2s server downtime:

PING
Query result: Ok("PONG")
PING
Query result: Ok("PONG")
PING
Connection lost, reconnecting
Reconnecting failed: Connection refused (os error 111)
Query result: Err(broken pipe)
PING
Connection lost, reconnecting
Reconnecting failed: Connection refused (os error 111)
Query result: Err(broken pipe)
PING
Connection lost, reconnecting
Query result: Err(broken pipe)
PING
Query result: Ok("PONG")

I'll add some comments inline.

Based on #276.

src/aio.rs Show resolved Hide resolved
src/aio.rs Outdated Show resolved Hide resolved
src/aio.rs Outdated Show resolved Hide resolved
src/aio.rs Outdated Show resolved Hide resolved
src/aio.rs Outdated Show resolved Hide resolved
@dbrgn dbrgn requested a review from Marwes January 30, 2020 15:10
};
}
}
result
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error will be passed on to the caller unmodified. I think this is the only feasible generic solution, since depending on the application logic, if the connection fails after redis has processed the request, automatic retrying might have unintended consequences.

This way, the application must deal with the error, but reconnecting will happen automatically.

src/aio.rs Outdated Show resolved Hide resolved
src/aio.rs Outdated Show resolved Hide resolved
src/aio.rs Outdated Show resolved Hide resolved
src/aio.rs Outdated Show resolved Hide resolved
src/aio.rs Outdated Show resolved Hide resolved
@dbrgn
Copy link
Contributor Author

dbrgn commented Feb 3, 2020

I have to say I'm a bit stuck here.

In the current version of the code, ConnectionManager is not yet cloneable. Once #[derive(Clone)] is enabled, it's possible to clone the ConnectionManager, which is fine in general because it wraps a Shared instance.

However, once a manager instance is cloned, if instance A replaces the shared future with the reconnect future, instance B will still have a reference to the old shared future, right?

That would mean that we would need some kind of wrapper type with interior mutability. For example an Arc:

#[derive(Clone)]
pub struct ConnectionManager(Arc<ConnectionManagerInner>);

struct ConnectionManagerInner {
    connection_info: ConnectionInfo,
    connection: MultiplexedConnection,
}

Now references to the connection need to be updated. For read-only references that's easy:

fn get_db(&self) -> i64 {
    self.0.connection.db
}

However, mutable references now complain that the arc contents cannot be borrowed mutably. For example, the reconnect method now cannot replace the connection:

error[E0594]: cannot assign to data in an `Arc`
   --> src/aio.rs:620:13
    |
620 |             self.0.connection = new_connection;
    |             ^^^^^^^^^^^^^^^^^ cannot assign
    |
    = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `std::sync::Arc<aio::connection_manager::ConnectionManagerInner>`

I could now wrap the entire ConnectionManagerInner in something like a Mutex or a RwLock, but then we lose the advantages that we got from wrapping MultiplexedConnection.

Am I on the wrong path? 🙂

@Marwes
Copy link
Collaborator

Marwes commented Feb 3, 2020

No, you will need some sort of interior mutability. https://docs.rs/arc-swap/0.4.4/arc_swap/index.html works well and you can use compare_and_swap to prevent multiple reconnections.

@dbrgn
Copy link
Contributor Author

dbrgn commented Feb 3, 2020

@Marwes thanks for pointing out arc_swap, looks great! However, req_packed_command takes &mut self as argument. This would mean that I'd either have to lock the ArcSwap in write mode for every message dispatched, or to clone the MultiplexedConnection for every message dispatched.

ArcSwap about their performance characteristics:

Write operations are considered expensive. A write operation is more expensive than access to an uncontended Mutex and on some architectures even slower than uncontended RwLock.

Cloning the MultiplexedConnection would mean cloning an mpsc::Sender as well as the db i64. Does that sound reasonable? It would be great if we could opt-in to automatic reconnection without having to pay that cost, but I'm not sure if it's possible. (Caching the cloned connection would mean that the reference could become stale...)

@Marwes
Copy link
Collaborator

Marwes commented Feb 3, 2020

Cloning a MultiplexedConnection is pretty cheap, it might be possible to do better though by adding methods which work with &self which this could call. But it would need https://github.com/tokio-rs/tokio/pull/2041/files in that case.

@Marwes
Copy link
Collaborator

Marwes commented Feb 3, 2020

Perhaps it could cache the connection, but still do a read on the arc_swap to check that the connection isn't stale?

src/aio.rs Outdated Show resolved Hide resolved
@dbrgn
Copy link
Contributor Author

dbrgn commented Feb 3, 2020

I managed to get things working with arc_swap. Very nice library, thanks for the recommendation! Also thanks for the help with the pull request 🙂

I'll work on the remaining open issues tomorrow:

  • Ensure that multiple connections are not established in parallel
  • Reconnect asynchronously (don't .await after reconnecting)
  • Potentially cache connections

Open questions:

  • If a reconnection attempt is in progress, should the current connection be considered dead? Or could it be that connection errors are transient? In the former case, commands should be delayed until the new connection is established, while in the latter case all commands should be forwarded to the old connection, with the possibility that they fail as well.
  • Is triggering a reconnect attempt on every failed command OK, or should there be some kind of throttling? Throttling might result in more failed connection attempts than not doing it, but at the same time the server is not hammered with connection attempts.
  • Is the separate connection-manager feature good? Should it be named differently?
  • This is probably non-trivial to unit-test, right?

@Marwes
Copy link
Collaborator

Marwes commented Feb 3, 2020

This is probably non-trivial to unit-test, right?

You could abstract ConnectionManager<T: ConnectionLike>, and supply a mocked connection instead of MultiplexedConnection

@Marwes
Copy link
Collaborator

Marwes commented Feb 3, 2020

Is triggering a reconnect attempt on every failed command OK, or should there be some kind of throttling? Throttling might result in more failed connection attempts than not doing it, but at the same time the server is not hammered with connection attempts.

A circuitbreaker could be built on top of this but the issue comes with figuring out a good strategy for it or making the strategy pluggable which is complicated in its own right :/

Is the separate connection-manager feature good? Should it be named differently?

Sounds good to me.

@dbrgn

This comment has been minimized.

@Marwes
Copy link
Collaborator

Marwes commented Feb 3, 2020

If a reconnection attempt is in progress, should the current connection be considered dead? Or could it be that connection errors are transient? In the former case, commands should be delayed until the new connection is established, while in the latter case all commands should be forwarded to the old connection, with the possibility that they fail as well.

As I see it, as soon as a reconnection attempt is started there is no way to retrieve an old connection since the ArcSwap has been updated at that point.

@dbrgn

This comment has been minimized.

@dbrgn
Copy link
Contributor Author

dbrgn commented Feb 6, 2020

TIL that ArcSwap does not behave like a pointer and should not be cloned in our case 🤦‍♂️ 😄

I guess we should use connection: Arc<ArcSwap<SharedRedisFuture<MultiplexedConnection>>> for our connection.

I got a version with async reconnection and non-parallel reconnections to work, will have to clean up the code and push it later.

@dbrgn
Copy link
Contributor Author

dbrgn commented Feb 10, 2020

Ok, I think now I have a version that does what it should! It's now ready to review.

Currently implemented behavior:

  • When creating an instance of the ConnectionManager, an initial connection will be established and awaited. Connection errors will be returned directly.
  • When a command sent to the server fails with an error that represents a "connection dropped" condition, that error will be passed on to the user, but it will trigger a reconnection in the background.
  • The reconnect code will atomically swap the current (dead) connection with a future that will eventually resolve to a MultiplexedConnection or to a RedisError
  • All commands that are issued after the reconnect process has been initiated, will have to await the connection future.
  • If reconnecting fails, all pending commands will be failed as well. A new reconnection attempt will be triggered if the error is an I/O error.

This can be tested by decreasing the delay in examples/async-connection-loss.rs to a very low value like 1ms. (I removed most debug prints from the committed code, but left in a few lines.) Here's a test run (3 futures being resolved in parallel every 1ms):

Initially, the connection still works:

> PING
> PING
> PING
< Ok("PONG")
< Ok("PONG")
< Ok("PONG")

Then I restarted the Redis server. In the debug output, you can see the pointer to the connection future before and after being swapped.

If the log output contains "Swap happened", then a stale connection was replaced with a connection future. If the log output contains "Swap did not happen", then another task was faster in creating a connection future. In this case, no connection will be established.

> PING
> PING
> PING
Connection lost, reconnecting
Prepare reconnect
Swap happened
    Current: 0x558835c06c50
        New: 0x558835c077d0
Connection lost, reconnecting
Prepare reconnect
Swap did not happen
Connection lost, reconnecting
Prepare reconnect
Swap did not happen
< Err(Connection reset by peer (os error 104))
< Err(broken pipe)
< Err(broken pipe)

On the next command a few ms later, the server was not yet back up, so the reconnect future fails and is immediately replaced by a new connection future:

> PING
> PING
> PING
Reconnect failed, reconnecting
Prepare reconnect
Swap happened
    Current: 0x558835c077d0
        New: 0x558835c01a10
Reconnect failed, reconnecting
Prepare reconnect
Swap did not happen
Reconnect failed, reconnecting
Prepare reconnect
Swap did not happen
< Err(Reconnecting failed: Connection refused (os error 111))
< Err(Reconnecting failed: Connection refused (os error 111))
< Err(Reconnecting failed: Connection refused (os error 111))

Sometimes the reconnection future fails so fast that the next (concurrent) task already sees the new, failed future:

> PING
> PING
> PING
Reconnect failed, reconnecting
Prepare reconnect
Swap happened
    Current: 0x558835c02ab0
        New: 0x558835c02a10
Reconnect failed, reconnecting
Prepare reconnect
Swap happened
    Current: 0x558835c02a10
        New: 0x558835c07800
Reconnect failed, reconnecting
Prepare reconnect
Swap did not happen
< Err(Reconnecting failed: Connection refused (os error 111))
< Err(Reconnecting failed: Connection refused (os error 111))
< Err(Reconnecting failed: Connection refused (os error 111))
Reconnecting

After a few dozen milliseconds, the server is back up and the connection future resolves to a working MultiplexedConnection.

> PING
> PING
> PING
Reconnect failed, reconnecting
Prepare reconnect
Swap happened
Reconnecting
< Err(Reconnecting failed: Connection refused (os error 111))
< Ok("PONG")
< Ok("PONG")

Not yet implemented:

  • Connection caching. Is it worth it?
  • Throttling of reconnections. Maybe just delaying the reconnection attempt by ~50ms would help?

TODO:

  • Remove debug print calls
  • Squash commits

Note: The rustfmt failures happen because 1.41 introduced a slight change in formatting rules. Maybe the rustfmt check should be pinned to a specific Rust version. The code can then be reformatted when bumping the used Rust version.

@dbrgn dbrgn marked this pull request as ready for review February 10, 2020 16:43
This way, the first task that notices a connection loss will initiate a
reconnection, while the other tasks wait for the connection future to
complete.

In order to avoid having to lock a mutex for replacing the future,
the arc-swap crate is used. It performs well in read-often update-seldom
scenarios.
@dbrgn
Copy link
Contributor Author

dbrgn commented Feb 24, 2020

Rebased against master. I also added a commit that removes the debug prints (should be squashed anyways, but can be temporarily reverted for easier testing).

@Marwes is there anything else for me to do here?

src/types.rs Outdated Show resolved Hide resolved
src/types.rs Outdated Show resolved Hide resolved
examples/async-connection-loss.rs Outdated Show resolved Hide resolved
examples/async-connection-loss.rs Outdated Show resolved Hide resolved
src/client.rs Outdated Show resolved Hide resolved
@Marwes Marwes merged commit be33c83 into redis-rs:master Feb 26, 2020
@Marwes
Copy link
Collaborator

Marwes commented Feb 26, 2020

Looks good! Thanks!

@dbrgn dbrgn deleted the connection-manager branch February 26, 2020 15:16
@dbrgn
Copy link
Contributor Author

dbrgn commented Feb 26, 2020

On this occasion, thanks @Marwes for the fantastic mentoring and quick feedback you did here - I probably wouldn't have managed to get to this solution without you 🙂

@Terkwood
Copy link
Contributor

Late to the party, but this looks like a great addition! Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants