Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix transport reporting to account for all probe statuses #171

Merged
merged 4 commits into from
Jan 4, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.HealthCheck.Liveness;
using Akka.HealthCheck.Tests.Transports;
Expand Down Expand Up @@ -116,7 +117,7 @@ public void LivenessTransportActor_should_send_Stop_when_terminated()
{
var testTransport = new TestStatusTransport(new TestStatusTransportSettings(true, true, TimeSpan.Zero));
var fakeLiveness = CreateTestProbe("liveness");
var dict = new Dictionary<string, IActorRef> { ["default"] = fakeLiveness }.ToImmutableDictionary();;
var dict = new Dictionary<string, IActorRef> { ["default"] = fakeLiveness }.ToImmutableDictionary();

var transportActor =
Sys.ActorOf(Props.Create(() => new LivenessTransportActor(testTransport, dict, true)));
Expand All @@ -129,5 +130,79 @@ public void LivenessTransportActor_should_send_Stop_when_terminated()
AwaitCondition(() => testTransport.SystemCalls.Count == 1
&& testTransport.SystemCalls[0] == TestStatusTransport.TransportCall.Stop);
}

[Fact(DisplayName = "LivenessTransportActor with multiple probes should report correctly based on probe responses")]
public async Task LivenessTransportActorMultiProbeTest()
{
var testTransport = new TestStatusTransport(new TestStatusTransportSettings(true, true, TimeSpan.Zero));
var fakeLiveness1 = CreateTestProbe("liveness_1");
var fakeLiveness2 = CreateTestProbe("liveness_2");
var dict = new Dictionary<string, IActorRef>
{
["first"] = fakeLiveness1,
["second"] = fakeLiveness2
}.ToImmutableDictionary();

var transportActor =
Sys.ActorOf(Props.Create(() => new LivenessTransportActor(testTransport, dict, true)));

fakeLiveness1.ExpectMsg<SubscribeToLiveness>();
fakeLiveness2.ExpectMsg<SubscribeToLiveness>();

// "second" status should still be false because it has not reported in yet
transportActor.Tell(new LivenessStatus(true), fakeLiveness1);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 1
&& testTransport.SystemCalls[0] == TestStatusTransport.TransportCall.Stop);

// both probe status is true, Go should be called
transportActor.Tell(new LivenessStatus(true), fakeLiveness2);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 2
&& testTransport.SystemCalls[1] == TestStatusTransport.TransportCall.Go);

// probes reported true, Go should be called all the time
foreach (var i in Enumerable.Range(2, 8))
{
transportActor.Tell(new LivenessStatus(true), i % 2 == 0 ? fakeLiveness1 : fakeLiveness2);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == i + 1
&& testTransport.SystemCalls[i] == TestStatusTransport.TransportCall.Go);
}

// Stop should be called as soon as one of the probe failed
transportActor.Tell(new LivenessStatus(false), fakeLiveness1);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 11
&& testTransport.SystemCalls[10] == TestStatusTransport.TransportCall.Stop);

// Go should be called again as soon as the failing probe reports true
transportActor.Tell(new LivenessStatus(true), fakeLiveness1);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 12
&& testTransport.SystemCalls[11] == TestStatusTransport.TransportCall.Go);

// Stop should be called when a probe died
Watch(fakeLiveness1);
fakeLiveness1.Tell(PoisonPill.Instance);
ExpectTerminated(fakeLiveness1);
Unwatch(fakeLiveness1);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 13
&& testTransport.SystemCalls[12] == TestStatusTransport.TransportCall.Stop);

// transport actor should stop when all probe died
Watch(fakeLiveness2);
Watch(transportActor);
fakeLiveness2.Tell(PoisonPill.Instance);
ExpectTerminated(fakeLiveness2);
ExpectTerminated(transportActor);

// Last Stop call from PostStop
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 15
&& testTransport.SystemCalls[14] == TestStatusTransport.TransportCall.Stop);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Linq;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.HealthCheck.Readiness;
using Akka.HealthCheck.Tests.Transports;
Expand Down Expand Up @@ -116,7 +117,7 @@ public void ReadinessTransportActor_should_send_Stop_when_terminated()
{
var testTransport = new TestStatusTransport(new TestStatusTransportSettings(true, true, TimeSpan.Zero));
var fakeReadiness = CreateTestProbe("readiness");
var dict = new Dictionary<string, IActorRef> { ["default"] = fakeReadiness }.ToImmutableDictionary();;
var dict = new Dictionary<string, IActorRef> { ["default"] = fakeReadiness }.ToImmutableDictionary();

var transportActor =
Sys.ActorOf(Props.Create(() => new ReadinessTransportActor(testTransport, dict, true)));
Expand All @@ -129,5 +130,78 @@ public void ReadinessTransportActor_should_send_Stop_when_terminated()
AwaitCondition(() => testTransport.SystemCalls.Count == 1
&& testTransport.SystemCalls[0] == TestStatusTransport.TransportCall.Stop);
}

[Fact(DisplayName = "ReadinessTransportActor with multiple probes should report correctly based on probe responses")]
public async Task ReadinessTransportActorMultiProbeTest()
{
var testTransport = new TestStatusTransport(new TestStatusTransportSettings(true, true, TimeSpan.Zero));
var fakeReadiness1 = CreateTestProbe("readiness_1");
var fakeReadiness2 = CreateTestProbe("readiness_2");
var dict = new Dictionary<string, IActorRef>
{
["first"] = fakeReadiness1,
["second"] = fakeReadiness2
}.ToImmutableDictionary();

var transportActor =
Sys.ActorOf(Props.Create(() => new ReadinessTransportActor(testTransport, dict, true)));

fakeReadiness1.ExpectMsg<SubscribeToReadiness>();
fakeReadiness2.ExpectMsg<SubscribeToReadiness>();

// "second" status should still be false because it has not reported in yet
transportActor.Tell(new ReadinessStatus(true), fakeReadiness1);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 1
&& testTransport.SystemCalls[0] == TestStatusTransport.TransportCall.Stop);

// both probe status is true, Go should be called
transportActor.Tell(new ReadinessStatus(true), fakeReadiness2);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 2
&& testTransport.SystemCalls[1] == TestStatusTransport.TransportCall.Go);

// probes reported true, Go should be called all the time
foreach (var i in Enumerable.Range(2, 8))
{
transportActor.Tell(new ReadinessStatus(true), i % 2 == 0 ? fakeReadiness1 : fakeReadiness2);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == i + 1
&& testTransport.SystemCalls[i] == TestStatusTransport.TransportCall.Go);
}

// Stop should be called as soon as one of the probe failed
transportActor.Tell(new ReadinessStatus(false), fakeReadiness1);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 11
&& testTransport.SystemCalls[10] == TestStatusTransport.TransportCall.Stop);

// Go should be called again as soon as the failing probe reports true
transportActor.Tell(new ReadinessStatus(true), fakeReadiness1);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 12
&& testTransport.SystemCalls[11] == TestStatusTransport.TransportCall.Go);

// Stop should be called when a probe died
Watch(fakeReadiness1);
fakeReadiness1.Tell(PoisonPill.Instance);
ExpectTerminated(fakeReadiness1);
Unwatch(fakeReadiness1);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 13
&& testTransport.SystemCalls[12] == TestStatusTransport.TransportCall.Stop);

// transport actor should stop when all probe died
Watch(fakeReadiness2);
Watch(transportActor);
fakeReadiness2.Tell(PoisonPill.Instance);
ExpectTerminated(fakeReadiness2);
ExpectTerminated(transportActor);

// Last Stop call from PostStop
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 15
&& testTransport.SystemCalls[14] == TestStatusTransport.TransportCall.Stop);
}
}
}
39 changes: 28 additions & 11 deletions src/Akka.HealthCheck/Transports/LivenessTransportActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public sealed class LivenessTransportActor : ReceiveActor
{
private const int LivenessTimeout = 1000;
private readonly List<IActorRef> _livenessProbes;
private readonly Dictionary<string, LivenessStatus> _statuses = new ();
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly IStatusTransport _statusTransport;
private readonly bool _logInfo;
Expand All @@ -31,25 +32,34 @@ public LivenessTransportActor(IStatusTransport statusTransport, ImmutableDiction
{
_statusTransport = statusTransport;
var probeReverseLookup = livenessProbes.ToImmutableDictionary(kvp => kvp.Value, kvp => kvp.Key);
foreach (var kvp in livenessProbes)
{
Context.Watch(kvp.Value);
_statuses[kvp.Key] = new LivenessStatus(false, $"Probe {kvp.Key} starting up.");
}
_livenessProbes = livenessProbes.Values.ToList();
_logInfo = log;

ReceiveAsync<LivenessStatus>(async status =>
{
var probeName = probeReverseLookup[Sender];

if (_logInfo)
_log.Info("Received liveness status from probe [{0}]. Live: {1}, Message: {2}", probeName,
status.IsLive, status.StatusMessage);

var cts = new CancellationTokenSource(LivenessTimeout);
using var cts = new CancellationTokenSource(LivenessTimeout);
TransportWriteStatus writeStatus;
try
{
if (status.IsLive)
writeStatus = await _statusTransport.Go($"[{probeName}] {status.StatusMessage}", cts.Token);
if (_logInfo)
_log.Info("Received liveness status from probe [{0}]. Live: {1}, Message: {2}", probeName,
status.IsLive, status.StatusMessage);

_statuses[probeName] = status;
var statusMessage = string.Join(
Environment.NewLine,
_statuses.Select(kvp => $"[{kvp.Key}][{(kvp.Value.IsLive ? "Live" : "Not Live")}] {kvp.Value.StatusMessage}"));

if (_statuses.Values.All(s => s.IsLive))
writeStatus = await _statusTransport.Go(statusMessage, cts.Token);
else
writeStatus = await _statusTransport.Stop($"[{probeName}] {status.StatusMessage}", cts.Token);
writeStatus = await _statusTransport.Stop(statusMessage, cts.Token);
}
catch (Exception e)
{
Expand All @@ -76,12 +86,20 @@ public LivenessTransportActor(IStatusTransport statusTransport, ImmutableDiction

Receive<Terminated>(t =>
{
var probeName = probeReverseLookup[t.ActorRef];
if (_logInfo)
_log.Info("Liveness probe {0} terminated", probeName);

_livenessProbes.Remove(t.ActorRef);
if (_livenessProbes.Count == 0)
{
_log.Warning("All liveness probe actors terminated! Shutting down.");
Context.Stop(Self);
}
else
{
Self.Tell(new LivenessStatus(false, "Probe terminated"), t.ActorRef);
}
});
}

Expand All @@ -96,8 +114,7 @@ protected override void PreStart()

protected override void PostStop()
{
var cts = new CancellationTokenSource(LivenessTimeout);

using var cts = new CancellationTokenSource(LivenessTimeout);
try
{
_statusTransport.Stop(null, cts.Token).Wait(cts.Token);
Expand Down
39 changes: 28 additions & 11 deletions src/Akka.HealthCheck/Transports/ReadinessTransportActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,42 @@ public sealed class ReadinessTransportActor : ReceiveActor
private const int ReadinessTimeout = 1000;
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly List<IActorRef> _readinessProbes;
private readonly Dictionary<string, ReadinessStatus> _statuses = new ();
private readonly IStatusTransport _statusTransport;
private readonly bool _logInfo;

public ReadinessTransportActor(IStatusTransport statusTransport, ImmutableDictionary<string, IActorRef> readinessProbe, bool log)
{
_statusTransport = statusTransport;
var probeReverseLookup = readinessProbe.ToImmutableDictionary(kvp => kvp.Value, kvp => kvp.Key);
foreach (var kvp in readinessProbe)
{
Context.Watch(kvp.Value);
_statuses[kvp.Key] = new ReadinessStatus(false, $"Probe {kvp.Key} starting up.");
}
_readinessProbes = readinessProbe.Values.ToList();
_logInfo = log;

ReceiveAsync<ReadinessStatus>(async status =>
{
var probeName = probeReverseLookup[Sender];

if (_logInfo)
_log.Info("Received readiness status from probe [{0}]. Ready: {1}, Message: {2}", probeName,
status.IsReady, status.StatusMessage);

var cts = new CancellationTokenSource(ReadinessTimeout);
using var cts = new CancellationTokenSource(ReadinessTimeout);
TransportWriteStatus writeStatus;
try
{
if (status.IsReady)
writeStatus = await _statusTransport.Go($"[{probeName}] {status.StatusMessage}", cts.Token);
if (_logInfo)
_log.Info("Received readiness status from probe [{0}]. Ready: {1}, Message: {2}", probeName,
status.IsReady, status.StatusMessage);

_statuses[probeName] = status;
var statusMessage = string.Join(
Environment.NewLine,
_statuses.Select(kvp => $"[{kvp.Key}][{(kvp.Value.IsReady ? "Ready" : "Not Ready")}] {kvp.Value.StatusMessage}"));

if (_statuses.Values.All(s => s.IsReady))
writeStatus = await _statusTransport.Go(statusMessage, cts.Token);
else
writeStatus = await _statusTransport.Stop($"[{probeName}] {status.StatusMessage}", cts.Token);
writeStatus = await _statusTransport.Stop(statusMessage, cts.Token);
}
catch (Exception e)
{
Expand All @@ -76,12 +86,20 @@ public ReadinessTransportActor(IStatusTransport statusTransport, ImmutableDictio

Receive<Terminated>(t =>
{
var probeName = probeReverseLookup[t.ActorRef];
if (_logInfo)
_log.Info("Readiness probe {0} terminated", probeName);

_readinessProbes.Remove(t.ActorRef);
if (_readinessProbes.Count == 0)
{
_log.Warning("All readiness probe actors terminated! Shutting down.");
Context.Stop(Self);
}
else
{
Self.Tell(new ReadinessStatus(false, "Probe terminated"), t.ActorRef);
}
});
}

Expand All @@ -96,8 +114,7 @@ protected override void PreStart()

protected override void PostStop()
{
var cts = new CancellationTokenSource(ReadinessTimeout);

using var cts = new CancellationTokenSource(ReadinessTimeout);
try
{
_statusTransport.Stop(null, cts.Token).Wait(cts.Token);
Expand Down