Skip to content

Commit

Permalink
Fix transport reporting to account for all probe statuses (#171)
Browse files Browse the repository at this point in the history
* Fix transport actor to account all reported status

* Fix terminate check and add unit tests
  • Loading branch information
Arkatufus committed Jan 4, 2023
1 parent 85bde87 commit 3d59378
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 33 deletions.
78 changes: 76 additions & 2 deletions src/Akka.HealthCheck.Tests/Liveness/LivenessTransportActorSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.HealthCheck.Liveness;
using Akka.HealthCheck.Tests.Transports;
Expand Down Expand Up @@ -63,7 +64,6 @@ public void LivenessTransportActor_should_crash_when_Stop_or_Go_failure()
});
}


[Fact(DisplayName =
"LivenessTransportActor should crash and try to stop signal upon timeout during signal change")]
public void LivenessTransportActor_should_crash_when_Timedout()
Expand Down Expand Up @@ -116,7 +116,7 @@ public void LivenessTransportActor_should_send_Stop_when_terminated()
{
var testTransport = new TestStatusTransport(new TestStatusTransportSettings(true, true, TimeSpan.Zero));
var fakeLiveness = CreateTestProbe("liveness");
var dict = new Dictionary<string, IActorRef> { ["default"] = fakeLiveness }.ToImmutableDictionary();;
var dict = new Dictionary<string, IActorRef> { ["default"] = fakeLiveness }.ToImmutableDictionary();

var transportActor =
Sys.ActorOf(Props.Create(() => new LivenessTransportActor(testTransport, dict, true)));
Expand All @@ -129,5 +129,79 @@ public void LivenessTransportActor_should_send_Stop_when_terminated()
AwaitCondition(() => testTransport.SystemCalls.Count == 1
&& testTransport.SystemCalls[0] == TestStatusTransport.TransportCall.Stop);
}

[Fact(DisplayName = "LivenessTransportActor with multiple probes should report correctly based on probe responses")]
public async Task LivenessTransportActorMultiProbeTest()
{
var testTransport = new TestStatusTransport(new TestStatusTransportSettings(true, true, TimeSpan.Zero));
var fakeLiveness1 = CreateTestProbe("liveness_1");
var fakeLiveness2 = CreateTestProbe("liveness_2");
var dict = new Dictionary<string, IActorRef>
{
["first"] = fakeLiveness1,
["second"] = fakeLiveness2
}.ToImmutableDictionary();

var transportActor =
Sys.ActorOf(Props.Create(() => new LivenessTransportActor(testTransport, dict, true)));

fakeLiveness1.ExpectMsg<SubscribeToLiveness>();
fakeLiveness2.ExpectMsg<SubscribeToLiveness>();

// "second" status should still be false because it has not reported in yet
transportActor.Tell(new LivenessStatus(true), fakeLiveness1);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 1
&& testTransport.SystemCalls[0] == TestStatusTransport.TransportCall.Stop);

// both probe status is true, Go should be called
transportActor.Tell(new LivenessStatus(true), fakeLiveness2);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 2
&& testTransport.SystemCalls[1] == TestStatusTransport.TransportCall.Go);

// probes reported true, Go should be called all the time
foreach (var i in Enumerable.Range(2, 8))
{
transportActor.Tell(new LivenessStatus(true), i % 2 == 0 ? fakeLiveness1 : fakeLiveness2);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == i + 1
&& testTransport.SystemCalls[i] == TestStatusTransport.TransportCall.Go);
}

// Stop should be called as soon as one of the probe failed
transportActor.Tell(new LivenessStatus(false), fakeLiveness1);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 11
&& testTransport.SystemCalls[10] == TestStatusTransport.TransportCall.Stop);

// Go should be called again as soon as the failing probe reports true
transportActor.Tell(new LivenessStatus(true), fakeLiveness1);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 12
&& testTransport.SystemCalls[11] == TestStatusTransport.TransportCall.Go);

// Stop should be called when a probe died
Watch(fakeLiveness1);
fakeLiveness1.Tell(PoisonPill.Instance);
ExpectTerminated(fakeLiveness1);
Unwatch(fakeLiveness1);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 13
&& testTransport.SystemCalls[12] == TestStatusTransport.TransportCall.Stop);

// transport actor should stop when all probe died
Watch(fakeLiveness2);
Watch(transportActor);
fakeLiveness2.Tell(PoisonPill.Instance);
ExpectTerminated(fakeLiveness2);
ExpectTerminated(transportActor);

// Last Stop call from PostStop
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 14
&& testTransport.SystemCalls[13] == TestStatusTransport.TransportCall.Stop);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using System.Linq;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.HealthCheck.Readiness;
using Akka.HealthCheck.Tests.Transports;
Expand Down Expand Up @@ -116,7 +117,7 @@ public void ReadinessTransportActor_should_send_Stop_when_terminated()
{
var testTransport = new TestStatusTransport(new TestStatusTransportSettings(true, true, TimeSpan.Zero));
var fakeReadiness = CreateTestProbe("readiness");
var dict = new Dictionary<string, IActorRef> { ["default"] = fakeReadiness }.ToImmutableDictionary();;
var dict = new Dictionary<string, IActorRef> { ["default"] = fakeReadiness }.ToImmutableDictionary();

var transportActor =
Sys.ActorOf(Props.Create(() => new ReadinessTransportActor(testTransport, dict, true)));
Expand All @@ -129,5 +130,78 @@ public void ReadinessTransportActor_should_send_Stop_when_terminated()
AwaitCondition(() => testTransport.SystemCalls.Count == 1
&& testTransport.SystemCalls[0] == TestStatusTransport.TransportCall.Stop);
}

[Fact(DisplayName = "ReadinessTransportActor with multiple probes should report correctly based on probe responses")]
public async Task ReadinessTransportActorMultiProbeTest()
{
var testTransport = new TestStatusTransport(new TestStatusTransportSettings(true, true, TimeSpan.Zero));
var fakeReadiness1 = CreateTestProbe("readiness_1");
var fakeReadiness2 = CreateTestProbe("readiness_2");
var dict = new Dictionary<string, IActorRef>
{
["first"] = fakeReadiness1,
["second"] = fakeReadiness2
}.ToImmutableDictionary();

var transportActor =
Sys.ActorOf(Props.Create(() => new ReadinessTransportActor(testTransport, dict, true)));

fakeReadiness1.ExpectMsg<SubscribeToReadiness>();
fakeReadiness2.ExpectMsg<SubscribeToReadiness>();

// "second" status should still be false because it has not reported in yet
transportActor.Tell(new ReadinessStatus(true), fakeReadiness1);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 1
&& testTransport.SystemCalls[0] == TestStatusTransport.TransportCall.Stop);

// both probe status is true, Go should be called
transportActor.Tell(new ReadinessStatus(true), fakeReadiness2);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 2
&& testTransport.SystemCalls[1] == TestStatusTransport.TransportCall.Go);

// probes reported true, Go should be called all the time
foreach (var i in Enumerable.Range(2, 8))
{
transportActor.Tell(new ReadinessStatus(true), i % 2 == 0 ? fakeReadiness1 : fakeReadiness2);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == i + 1
&& testTransport.SystemCalls[i] == TestStatusTransport.TransportCall.Go);
}

// Stop should be called as soon as one of the probe failed
transportActor.Tell(new ReadinessStatus(false), fakeReadiness1);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 11
&& testTransport.SystemCalls[10] == TestStatusTransport.TransportCall.Stop);

// Go should be called again as soon as the failing probe reports true
transportActor.Tell(new ReadinessStatus(true), fakeReadiness1);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 12
&& testTransport.SystemCalls[11] == TestStatusTransport.TransportCall.Go);

// Stop should be called when a probe died
Watch(fakeReadiness1);
fakeReadiness1.Tell(PoisonPill.Instance);
ExpectTerminated(fakeReadiness1);
Unwatch(fakeReadiness1);
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 13
&& testTransport.SystemCalls[12] == TestStatusTransport.TransportCall.Stop);

// transport actor should stop when all probe died
Watch(fakeReadiness2);
Watch(transportActor);
fakeReadiness2.Tell(PoisonPill.Instance);
ExpectTerminated(fakeReadiness2);
ExpectTerminated(transportActor);

// Last Stop call from PostStop
await AwaitConditionAsync(() =>
testTransport.SystemCalls.Count == 14
&& testTransport.SystemCalls[13] == TestStatusTransport.TransportCall.Stop);
}
}
}
39 changes: 28 additions & 11 deletions src/Akka.HealthCheck/Transports/LivenessTransportActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public sealed class LivenessTransportActor : ReceiveActor
{
private const int LivenessTimeout = 1000;
private readonly List<IActorRef> _livenessProbes;
private readonly Dictionary<string, LivenessStatus> _statuses = new ();
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly IStatusTransport _statusTransport;
private readonly bool _logInfo;
Expand All @@ -31,25 +32,34 @@ public LivenessTransportActor(IStatusTransport statusTransport, ImmutableDiction
{
_statusTransport = statusTransport;
var probeReverseLookup = livenessProbes.ToImmutableDictionary(kvp => kvp.Value, kvp => kvp.Key);
foreach (var kvp in livenessProbes)
{
Context.Watch(kvp.Value);
_statuses[kvp.Key] = new LivenessStatus(false, $"Probe {kvp.Key} starting up.");
}
_livenessProbes = livenessProbes.Values.ToList();
_logInfo = log;

ReceiveAsync<LivenessStatus>(async status =>
{
var probeName = probeReverseLookup[Sender];
if (_logInfo)
_log.Info("Received liveness status from probe [{0}]. Live: {1}, Message: {2}", probeName,
status.IsLive, status.StatusMessage);
var cts = new CancellationTokenSource(LivenessTimeout);
using var cts = new CancellationTokenSource(LivenessTimeout);
TransportWriteStatus writeStatus;
try
{
if (status.IsLive)
writeStatus = await _statusTransport.Go($"[{probeName}] {status.StatusMessage}", cts.Token);
if (_logInfo)
_log.Info("Received liveness status from probe [{0}]. Live: {1}, Message: {2}", probeName,
status.IsLive, status.StatusMessage);
_statuses[probeName] = status;
var statusMessage = string.Join(
Environment.NewLine,
_statuses.Select(kvp => $"[{kvp.Key}][{(kvp.Value.IsLive ? "Live" : "Not Live")}] {kvp.Value.StatusMessage}"));
if (_statuses.Values.All(s => s.IsLive))
writeStatus = await _statusTransport.Go(statusMessage, cts.Token);
else
writeStatus = await _statusTransport.Stop($"[{probeName}] {status.StatusMessage}", cts.Token);
writeStatus = await _statusTransport.Stop(statusMessage, cts.Token);
}
catch (Exception e)
{
Expand All @@ -76,12 +86,20 @@ public LivenessTransportActor(IStatusTransport statusTransport, ImmutableDiction

Receive<Terminated>(t =>
{
var probeName = probeReverseLookup[t.ActorRef];
if (_logInfo)
_log.Info("Liveness probe {0} terminated", probeName);
_livenessProbes.Remove(t.ActorRef);
if (_livenessProbes.Count == 0)
{
_log.Warning("All liveness probe actors terminated! Shutting down.");
Context.Stop(Self);
}
else
{
Self.Tell(new LivenessStatus(false, "Probe terminated"), t.ActorRef);
}
});
}

Expand All @@ -96,8 +114,7 @@ protected override void PreStart()

protected override void PostStop()
{
var cts = new CancellationTokenSource(LivenessTimeout);

using var cts = new CancellationTokenSource(LivenessTimeout);
try
{
_statusTransport.Stop(null, cts.Token).Wait(cts.Token);
Expand Down
39 changes: 28 additions & 11 deletions src/Akka.HealthCheck/Transports/ReadinessTransportActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,42 @@ public sealed class ReadinessTransportActor : ReceiveActor
private const int ReadinessTimeout = 1000;
private readonly ILoggingAdapter _log = Context.GetLogger();
private readonly List<IActorRef> _readinessProbes;
private readonly Dictionary<string, ReadinessStatus> _statuses = new ();
private readonly IStatusTransport _statusTransport;
private readonly bool _logInfo;

public ReadinessTransportActor(IStatusTransport statusTransport, ImmutableDictionary<string, IActorRef> readinessProbe, bool log)
{
_statusTransport = statusTransport;
var probeReverseLookup = readinessProbe.ToImmutableDictionary(kvp => kvp.Value, kvp => kvp.Key);
foreach (var kvp in readinessProbe)
{
Context.Watch(kvp.Value);
_statuses[kvp.Key] = new ReadinessStatus(false, $"Probe {kvp.Key} starting up.");
}
_readinessProbes = readinessProbe.Values.ToList();
_logInfo = log;

ReceiveAsync<ReadinessStatus>(async status =>
{
var probeName = probeReverseLookup[Sender];
if (_logInfo)
_log.Info("Received readiness status from probe [{0}]. Ready: {1}, Message: {2}", probeName,
status.IsReady, status.StatusMessage);
var cts = new CancellationTokenSource(ReadinessTimeout);
using var cts = new CancellationTokenSource(ReadinessTimeout);
TransportWriteStatus writeStatus;
try
{
if (status.IsReady)
writeStatus = await _statusTransport.Go($"[{probeName}] {status.StatusMessage}", cts.Token);
if (_logInfo)
_log.Info("Received readiness status from probe [{0}]. Ready: {1}, Message: {2}", probeName,
status.IsReady, status.StatusMessage);
_statuses[probeName] = status;
var statusMessage = string.Join(
Environment.NewLine,
_statuses.Select(kvp => $"[{kvp.Key}][{(kvp.Value.IsReady ? "Ready" : "Not Ready")}] {kvp.Value.StatusMessage}"));
if (_statuses.Values.All(s => s.IsReady))
writeStatus = await _statusTransport.Go(statusMessage, cts.Token);
else
writeStatus = await _statusTransport.Stop($"[{probeName}] {status.StatusMessage}", cts.Token);
writeStatus = await _statusTransport.Stop(statusMessage, cts.Token);
}
catch (Exception e)
{
Expand All @@ -76,12 +86,20 @@ public ReadinessTransportActor(IStatusTransport statusTransport, ImmutableDictio

Receive<Terminated>(t =>
{
var probeName = probeReverseLookup[t.ActorRef];
if (_logInfo)
_log.Info("Readiness probe {0} terminated", probeName);
_readinessProbes.Remove(t.ActorRef);
if (_readinessProbes.Count == 0)
{
_log.Warning("All readiness probe actors terminated! Shutting down.");
Context.Stop(Self);
}
else
{
Self.Tell(new ReadinessStatus(false, "Probe terminated"), t.ActorRef);
}
});
}

Expand All @@ -96,8 +114,7 @@ protected override void PreStart()

protected override void PostStop()
{
var cts = new CancellationTokenSource(ReadinessTimeout);

using var cts = new CancellationTokenSource(ReadinessTimeout);
try
{
_statusTransport.Stop(null, cts.Token).Wait(cts.Token);
Expand Down

0 comments on commit 3d59378

Please sign in to comment.