Skip to content

Commit

Permalink
Updated Secret try to reconnect the client (#364)
Browse files Browse the repository at this point in the history
* Fixes #363
* Cache the secret password
   The password can change with the update secret, so here we merge the current
   configuration with the new possible configuration to use the last valid password

---------

Co-authored-by: Gabriele Santomaggio <gsantomaggio@gsantomagg492XT.vmware.com>
  • Loading branch information
Gsantomaggio and Gabriele Santomaggio committed Mar 18, 2024
1 parent 18e63d1 commit 906eb83
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 21 deletions.
31 changes: 30 additions & 1 deletion RabbitMQ.Stream.Client/ConnectionsPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@ public class ConnectionPoolConfig
public byte ProducersPerConnection { get; set; } = 1;
}

public class LastSecret
{
public string Secret { get; private set; } = string.Empty;
public DateTime LastUpdate { get; private set; } = DateTime.MinValue;
public bool IsValid => LastUpdate > DateTime.MinValue && !string.IsNullOrEmpty(Secret);

public void Update(string secret)
{
Secret = secret;
LastUpdate = DateTime.UtcNow;
}
}

public class ConnectionItem
{
public ConnectionItem(string brokerInfo, byte idsPerConnection, IClient client)
Expand Down Expand Up @@ -113,6 +126,7 @@ internal static byte FindNextValidId(List<byte> ids, byte nextId = 0)
private readonly int _maxConnections;
private readonly byte _idsPerConnection;
private readonly SemaphoreSlim _semaphoreSlim = new(1, 1);
private readonly LastSecret _lastSecret = new();

/// <summary>
/// Init the pool with the max connections and the max ids per connection
Expand Down Expand Up @@ -186,6 +200,21 @@ internal async Task<IClient> GetOrCreateClient(string brokerInfo, Func<Task<ICli
}
}

public bool TryMergeClientParameters(ClientParameters clientParameters, out ClientParameters cp)
{
if (!_lastSecret.IsValid || clientParameters.Password == _lastSecret.Secret)
{
cp = clientParameters;
return false;
}

cp = clientParameters with
{
Password = _lastSecret.Secret
};
return true;
}

public void Remove(string clientId)
{
_semaphoreSlim.Wait();
Expand All @@ -208,10 +237,10 @@ public async Task UpdateSecrets(string newSecret)
await _semaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
_lastSecret.Update(newSecret);
foreach (var connectionItem in Connections.Values)
{
await connectionItem.Client.UpdateSecret(newSecret).ConfigureAwait(false);
connectionItem.Client.Parameters.Password = newSecret;
}
}
finally
Expand Down
7 changes: 7 additions & 0 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ RabbitMQ.Stream.Client.ConnectionsPool.MaybeClose(string clientId, string reason
RabbitMQ.Stream.Client.ConnectionsPool.Remove(string clientId) -> void
RabbitMQ.Stream.Client.ConnectionsPool.RemoveConsumerEntityFromStream(string clientId, byte id, string stream) -> void
RabbitMQ.Stream.Client.ConnectionsPool.RemoveProducerEntityFromStream(string clientId, byte id, string stream) -> void
RabbitMQ.Stream.Client.ConnectionsPool.TryMergeClientParameters(RabbitMQ.Stream.Client.ClientParameters clientParameters, out RabbitMQ.Stream.Client.ClientParameters cp) -> bool
RabbitMQ.Stream.Client.ConnectionsPool.UpdateSecrets(string newSecret) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.ConsumerEvents
RabbitMQ.Stream.Client.ConsumerEvents.ConsumerEvents() -> void
Expand Down Expand Up @@ -176,6 +177,12 @@ RabbitMQ.Stream.Client.ISuperStreamProducer.ReconnectPartition(RabbitMQ.Stream.C
RabbitMQ.Stream.Client.KeyRoutingStrategy
RabbitMQ.Stream.Client.KeyRoutingStrategy.KeyRoutingStrategy(System.Func<RabbitMQ.Stream.Client.Message, string> routingKeyExtractor, System.Func<string, string, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>> routingKeyQFunc, string superStream) -> void
RabbitMQ.Stream.Client.KeyRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List<string> partitions) -> System.Threading.Tasks.Task<System.Collections.Generic.List<string>>
RabbitMQ.Stream.Client.LastSecret
RabbitMQ.Stream.Client.LastSecret.IsValid.get -> bool
RabbitMQ.Stream.Client.LastSecret.LastSecret() -> void
RabbitMQ.Stream.Client.LastSecret.LastUpdate.get -> System.DateTime
RabbitMQ.Stream.Client.LastSecret.Secret.get -> string
RabbitMQ.Stream.Client.LastSecret.Update(string secret) -> void
RabbitMQ.Stream.Client.MessageContext.ChunkId.get -> ulong
RabbitMQ.Stream.Client.MessageContext.ChunkMessagesCount.get -> uint
RabbitMQ.Stream.Client.MessageContext.MessageContext(ulong offset, System.TimeSpan timestamp, uint chunkMessagesCount, ulong chunkId) -> void
Expand Down
17 changes: 15 additions & 2 deletions RabbitMQ.Stream.Client/RoutingClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,16 @@ private static string GetPropertyValue(IDictionary<string, string> connectionPro
public static async Task<IClient> LookupLeaderConnection(ClientParameters clientParameters,
StreamInfo metaDataInfo, ConnectionsPool pool, ILogger logger = null)
{

if (pool.TryMergeClientParameters(clientParameters, out var mergedClientParameters))
{
logger?.LogInformation("Leader Connection. Password changed Merged client parameters");
}

return await pool.GetOrCreateClient(metaDataInfo.Leader.ToString(),
async () =>
await LookupConnection(clientParameters, metaDataInfo.Leader, MaxAttempts(metaDataInfo), logger)
await LookupConnection(mergedClientParameters, metaDataInfo.Leader,
MaxAttempts(metaDataInfo), logger)
.ConfigureAwait(false)).ConfigureAwait(false);
}

Expand All @@ -202,9 +209,15 @@ await LookupConnection(clientParameters, metaDataInfo.Leader, MaxAttempts(metaDa
{
try
{
if (pool.TryMergeClientParameters(clientParameters, out var mergedClientParameters))
{
logger?.LogInformation("Replicas Connections. Password changed Merged client parameters");
}

return await pool.GetOrCreateClient(broker.ToString(),
async () =>
await LookupConnection(clientParameters, broker, MaxAttempts(metaDataInfo),
await LookupConnection(mergedClientParameters, broker,
MaxAttempts(metaDataInfo),
logger)
.ConfigureAwait(false)).ConfigureAwait(false);
}
Expand Down
25 changes: 20 additions & 5 deletions RabbitMQ.Stream.Client/StreamSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ internal void Validate()
/// </summary>
public SslOption Ssl { get; set; } = new();

public IList<EndPoint> Endpoints { get; set; } = new List<EndPoint> { new IPEndPoint(IPAddress.Loopback, 5552) };
public IList<EndPoint> Endpoints { get; set; } =
new List<EndPoint> { new IPEndPoint(IPAddress.Loopback, 5552) };

public AddressResolver AddressResolver { get; set; }
public string ClientProvidedName { get; set; } = "dotnet-stream-locator";
Expand Down Expand Up @@ -155,12 +156,22 @@ private async Task MayBeReconnectLocator()

public async Task UpdateSecret(string newSecret)
{
// store the old password just in case it will fail to update the secret
var oldSecret = _clientParameters.Password;
_clientParameters.Password = newSecret;
_client.Parameters.Password = newSecret;
await MayBeReconnectLocator().ConfigureAwait(false);
if (_client.IsClosed)
{
// it can happen during some network problem or server rebooting
// even the _clientParameters.Password could be invalid we restore the
// the old one just to be consistent
_clientParameters.Password = oldSecret;
_client.Parameters.Password = oldSecret;
throw new UpdateSecretFailureException("Cannot update a closed connection.");
}

await _client.UpdateSecret(newSecret).ConfigureAwait(false);
_clientParameters.Password = newSecret;
_client.Parameters.Password = newSecret;
await PoolConsumers.UpdateSecrets(newSecret).ConfigureAwait(false);
await PoolProducers.UpdateSecrets(newSecret).ConfigureAwait(false);
}
Expand Down Expand Up @@ -373,7 +384,9 @@ public async Task CreateSuperStream(SuperStreamSpec spec)
{
spec.Validate();
await MayBeReconnectLocator().ConfigureAwait(false);
var response = await _client.CreateSuperStream(spec.Name, spec.GetPartitions(), spec.GetBindingKeys(), spec.Args).ConfigureAwait(false);
var response = await _client
.CreateSuperStream(spec.Name, spec.GetPartitions(), spec.GetBindingKeys(), spec.Args)
.ConfigureAwait(false);
if (response.ResponseCode is ResponseCode.Ok or ResponseCode.StreamAlreadyExists)
{
return;
Expand Down Expand Up @@ -473,7 +486,8 @@ public async Task DeleteSuperStream(string superStream)
return;
}

throw new DeleteStreamException($"Failed to delete super stream, error code: {response.ResponseCode.ToString()}");
throw new DeleteStreamException(
$"Failed to delete super stream, error code: {response.ResponseCode.ToString()}");
}

public async Task<StreamStats> StreamStats(string stream)
Expand Down Expand Up @@ -593,6 +607,7 @@ public StreamSystemInitialisationException(string error) : base(error)
{
}
}

public class UpdateSecretFailureException : ProtocolException
{
public UpdateSecretFailureException(string s)
Expand Down
13 changes: 0 additions & 13 deletions Tests/SystemTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,19 +181,6 @@ public async void UpdateSecretWithInvalidSecretShouldThrowAuthenticationFailureE
await streamSystem.Close();
}

[Fact]
public async void UpdateSecretForClosedConnectionShouldThrowUpdateSecretFailureException()
{
var config = new StreamSystemConfig { UserName = "guest", Password = "guest" }; // specified for readability
var streamSystem = await StreamSystem.Create(config);

await streamSystem.Close();
await Assert.ThrowsAsync<UpdateSecretFailureException>(
async () => { await streamSystem.UpdateSecret("guest"); }
);
await streamSystem.Close();
}

[Fact]
public async void CreateExistStreamIdempotentShouldNoRaiseExceptions()
{
Expand Down

0 comments on commit 906eb83

Please sign in to comment.