Skip to content

Commit

Permalink
retry hangfire jobs - #158
Browse files Browse the repository at this point in the history
* Fixed auto-retry as per this forum post: https://discuss.hangfire.io/t/recurring-jobs-do-not-automatically-get-retried-after-application-crash-net-core-service/9160
* MongoDB can't handle documents greater than 16MB
* Treat messages from one id as a group
* Kill failing messages over 4 days old
* Universal message incrementer
* Add testing
  • Loading branch information
johnml1135 committed Jun 3, 2024
1 parent a19d577 commit ba69059
Show file tree
Hide file tree
Showing 22 changed files with 789 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ public static IMachineBuilder AddMongoDataAccess(this IMachineBuilder builder, s
)
)
);
o.AddRepository<OutboxMessage>("outbox_messages");
o.AddRepository<Sequence>("outbox_message_index");
}
);
builder.Services.AddHealthChecks().AddMongoDb(connectionString!, name: "Mongo");
Expand All @@ -284,6 +286,9 @@ public static IMachineBuilder AddMongoDataAccess(this IMachineBuilder builder, s
throw new InvalidOperationException("Serval connection string is required");

builder.Services.AddScoped<IPlatformService, ServalPlatformService>();

builder.Services.AddSingleton<IMessageOutboxService, MessageOutboxService>();

builder
.Services.AddGrpcClient<TranslationPlatformApi.TranslationPlatformApiClient>(o =>
{
Expand Down Expand Up @@ -338,6 +343,9 @@ public static IMachineBuilder AddMongoDataAccess(this IMachineBuilder builder, s
options.Interceptors.Add<UnimplementedInterceptor>();
});
builder.AddServalPlatformService(connectionString);

builder.Services.AddHostedService<MessageOutboxHandlerService>();

engineTypes ??=
builder.Configuration?.GetSection("TranslationEngines").Get<TranslationEngineType[]?>()
?? [TranslationEngineType.SmtTransfer, TranslationEngineType.Nmt];
Expand Down
23 changes: 23 additions & 0 deletions src/SIL.Machine.AspNetCore/Models/OutboxMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace SIL.Machine.AspNetCore.Models;

public enum OutboxMessageMethod
{
BuildStarted,
BuildCompleted,
BuildCanceled,
BuildFaulted,
BuildRestarting,
InsertPretranslations,
IncrementTranslationEngineCorpusSize
}

public record OutboxMessage : IEntity
{
public string Id { get; set; } = "";
public int Revision { get; set; } = 1;
public required OutboxMessageMethod Method { get; set; }
public required string GroupId { get; set; }
public required string? RequestContent { get; set; }
public DateTimeOffset Created { get; set; } = DateTimeOffset.UtcNow;
public int Attempts { get; set; } = 0;
}
16 changes: 16 additions & 0 deletions src/SIL.Machine.AspNetCore/Models/Sequence.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace SIL.Machine.AspNetCore.Models;

public record Sequence : IEntity
{
public string Id { get; set; } = "";

public int Revision { get; set; }

public string Context { get; set; } = "";
public int CurrentIndex { get; set; }

public static string IndexToObjectIdString(int value)
{
return value.ToString("x24");
}
}
62 changes: 41 additions & 21 deletions src/SIL.Machine.AspNetCore/Services/ClearMLMonitorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ public class ClearMLMonitorService(
IServiceProvider services,
IClearMLService clearMLService,
ISharedFileService sharedFileService,
IDataAccessContext dataAccessContext,
IOptions<ClearMLOptions> options,
ILogger<ClearMLMonitorService> logger
)
Expand All @@ -24,6 +25,7 @@ ILogger<ClearMLMonitorService> logger

private readonly IClearMLService _clearMLService = clearMLService;
private readonly ISharedFileService _sharedFileService = sharedFileService;
private readonly IDataAccessContext _dataAccessContext = dataAccessContext;
private readonly ILogger<ClearMLMonitorService> _logger = logger;
private readonly Dictionary<string, ProgressStatus> _curBuildStatus = new();

Expand Down Expand Up @@ -191,16 +193,22 @@ or ClearMLTaskStatus.Completed
)
{
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
if (!await buildJobService.BuildJobStartedAsync(engineId, buildId, cancellationToken))
return false;
}
await platformService.BuildStartedAsync(buildId, CancellationToken.None);
return await _dataAccessContext.WithTransactionAsync(
async (ct) =>
{
await using (await @lock.WriterLockAsync(cancellationToken: ct))
{
if (!await buildJobService.BuildJobStartedAsync(engineId, buildId, ct))
return false;
}
await platformService.BuildStartedAsync(buildId, CancellationToken.None);
await UpdateTrainJobStatus(platformService, buildId, new ProgressStatus(0), 0, cancellationToken);
_logger.LogInformation("Build started ({0})", buildId);
return true;
await UpdateTrainJobStatus(platformService, buildId, new ProgressStatus(0), 0, ct);
_logger.LogInformation("Build started ({0})", buildId);
return true;
},
cancellationToken: cancellationToken
);
}

private async Task<bool> TrainJobCompletedAsync(
Expand Down Expand Up @@ -252,12 +260,18 @@ CancellationToken cancellationToken
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
await platformService.BuildFaultedAsync(buildId, message, cancellationToken);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
await _dataAccessContext.WithTransactionAsync(
async (ct) =>
{
await platformService.BuildFaultedAsync(buildId, message, ct);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: cancellationToken
);
}
_logger.LogError("Build faulted ({0}). Error: {1}", buildId, message);
Expand All @@ -282,12 +296,18 @@ CancellationToken cancellationToken
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
await platformService.BuildCanceledAsync(buildId, cancellationToken);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
await _dataAccessContext.WithTransactionAsync(
async (ct) =>
{
await platformService.BuildCanceledAsync(buildId, ct);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: cancellationToken
);
}
_logger.LogInformation("Build canceled ({0})", buildId);
Expand Down
51 changes: 36 additions & 15 deletions src/SIL.Machine.AspNetCore/Services/HangfireBuildJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ public abstract class HangfireBuildJob(
IPlatformService platformService,
IRepository<TranslationEngine> engines,
IDistributedReaderWriterLockFactory lockFactory,
IDataAccessContext dataAccessContext,
IBuildJobService buildJobService,
ILogger<HangfireBuildJob> logger
) : HangfireBuildJob<object?>(platformService, engines, lockFactory, buildJobService, logger)
) : HangfireBuildJob<object?>(platformService, engines, lockFactory, dataAccessContext, buildJobService, logger)
{
public virtual Task RunAsync(
string engineId,
Expand All @@ -23,13 +24,15 @@ public abstract class HangfireBuildJob<T>(
IPlatformService platformService,
IRepository<TranslationEngine> engines,
IDistributedReaderWriterLockFactory lockFactory,
IDataAccessContext dataAccessContext,
IBuildJobService buildJobService,
ILogger<HangfireBuildJob<T>> logger
)
{
protected IPlatformService PlatformService { get; } = platformService;
protected IRepository<TranslationEngine> Engines { get; } = engines;
protected IDistributedReaderWriterLockFactory LockFactory { get; } = lockFactory;
protected IDataAccessContext DataAccessContext { get; } = dataAccessContext;
protected IBuildJobService BuildJobService { get; } = buildJobService;
protected ILogger<HangfireBuildJob<T>> Logger { get; } = logger;

Expand Down Expand Up @@ -69,12 +72,18 @@ await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
completionStatus = JobCompletionStatus.Canceled;
await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.None))
{
await PlatformService.BuildCanceledAsync(buildId, CancellationToken.None);
await BuildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
await DataAccessContext.WithTransactionAsync(
async (ct) =>
{
await PlatformService.BuildCanceledAsync(buildId, CancellationToken.None);
await BuildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: CancellationToken.None
);
}
Logger.LogInformation("Build canceled ({0})", buildId);
Expand All @@ -86,8 +95,14 @@ await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.No
completionStatus = JobCompletionStatus.Restarting;
await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.None))
{
await PlatformService.BuildRestartingAsync(buildId, CancellationToken.None);
await BuildJobService.BuildJobRestartingAsync(engineId, buildId, CancellationToken.None);
await DataAccessContext.WithTransactionAsync(
async (ct) =>
{
await PlatformService.BuildRestartingAsync(buildId, CancellationToken.None);
await BuildJobService.BuildJobRestartingAsync(engineId, buildId, CancellationToken.None);
},
cancellationToken: CancellationToken.None
);
}
throw;
}
Expand All @@ -101,12 +116,18 @@ await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.No
completionStatus = JobCompletionStatus.Faulted;
await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.None))
{
await PlatformService.BuildFaultedAsync(buildId, e.Message, CancellationToken.None);
await BuildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
await DataAccessContext.WithTransactionAsync(
async (ct) =>
{
await PlatformService.BuildFaultedAsync(buildId, e.Message, CancellationToken.None);
await BuildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: CancellationToken.None
);
}
Logger.LogError(0, e, "Build faulted ({0})", buildId);
Expand Down
11 changes: 11 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/IMessageOutboxService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace SIL.Machine.AspNetCore.Services;

public interface IMessageOutboxService
{
public Task<string> EnqueueMessageAsync(
OutboxMessageMethod method,
string groupId,
string requestContent,
CancellationToken cancellationToken
);
}
Loading

0 comments on commit ba69059

Please sign in to comment.