/
WorkerActor.cs
101 lines (81 loc) · 3.21 KB
/
WorkerActor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
using System.Runtime.CompilerServices;
using Akka.Actor;
using Akka.Jobs.Interfaces;
using Akka.Jobs.Models;
using Akka.Jobs.Theater.ActorQueries.Messages.States;
using Akka.Jobs.Theater.Master.Groups.Workers.Messages;
using Microsoft.Extensions.DependencyInjection;
// ReSharper disable ClassNeverInstantiated.Global
[assembly:InternalsVisibleTo("Job.Tests")]
namespace Akka.Jobs.Theater.Master.Groups.Workers;
internal sealed class WorkerActor<TIn, TOut> : ReceiveActor
where TIn : IJobInput
where TOut : IJobResult
{
private string? _jobId;
private readonly IServiceScope _scope;
private readonly IActorRef _self;
private IJob<TIn, TOut>? _job;
private readonly string _jobNullError =
"Error, _job cannot be null. Interface IJob<TIn, TOut> has not been registered.";
public WorkerActor(IServiceProvider serviceProvider)
{
_self = Self;
_scope = serviceProvider.CreateScope();
//Commands
Receive<WorkerDoJobCommand<TIn>>((msg) =>
{
WorkerDoJobCommandHandlerAsync(msg).PipeTo(_self);
});
//Queries
Receive<ReadWorkerInfoCommand>(ReadWorkerInfoCommandHandler);
//Internal
Receive<Status.Failure>(Failed);
Context.Parent.Tell(new GiveMeWorkerDoJobCommand());
}
private void Failed(Status.Failure msg)
{
throw msg.Cause ?? throw new Exception("Unknown error, msg.Cause == null");
}
private void ReadWorkerInfoCommandHandler(ReadWorkerInfoCommand command)
{
if (_job == null || string.IsNullOrWhiteSpace(_jobId))
{
var error = new RespondWorkerInfo<TOut>(false, command.RequestId, _jobNullError);
Sender.Tell(error);
return;
}
var currentState = _job.GetCurrentState(_jobId);
var result = new RespondWorkerInfo<TOut>(command.RequestId, currentState);
Sender.Tell(result);
}
private async Task WorkerDoJobCommandHandlerAsync(WorkerDoJobCommand<TIn> command)
{
if (_job != null)
{
_jobId = command.JobId;
Context.Parent.Tell(new TrySaveWorkerActorRefCommand(Self, _jobId, command.DoJobCommandSender));
if(command.IsCreateCommand)
command.DoJobCommandSender.Tell(new JobCreatedCommandResult(true, "", _jobId));
var token = command.CancellationTokenSource.Token;
var jobResult = await _job.DoAsync(command.JobInput, token);
if(!command.IsCreateCommand && !token.IsCancellationRequested)
command.DoJobCommandSender.Tell(new JobDoneCommandResult(jobResult, "Ok", command.JobId));
}
else
{
command.DoJobCommandSender.Tell(command.IsCreateCommand
? new JobCreatedCommandResult(false, _jobNullError, command.JobId)
: new JobDoneCommandResult(false, _jobNullError, command.JobId));
}
_self.Tell(PoisonPill.Instance);
}
protected override void PreStart()
{
_job = _scope.ServiceProvider.GetService<IJob<TIn, TOut>>();
}
protected override void PostStop()
{
_scope.Dispose();
}
}