Skip to content

Commit

Permalink
retry hangfire jobs - #158
Browse files Browse the repository at this point in the history
* Fixed auto-retry as per this forum post: https://discuss.hangfire.io/t/recurring-jobs-do-not-automatically-get-retried-after-application-crash-net-core-service/9160
* MongoDB can't handle documents greater than 16MB
* Treat messages from one id as a group
* Kill failing messages over 4 days old
  • Loading branch information
johnml1135 committed May 8, 2024
1 parent db1f173 commit 3e19b1d
Show file tree
Hide file tree
Showing 19 changed files with 513 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ public static IMachineBuilder AddMongoDataAccess(this IMachineBuilder builder, s
)
)
);
o.AddRepository<OutboxMessage>("outbox_messages");
}
);
builder.Services.AddHealthChecks().AddMongoDb(connectionString!, name: "Mongo");
Expand All @@ -284,6 +285,9 @@ public static IMachineBuilder AddMongoDataAccess(this IMachineBuilder builder, s
throw new InvalidOperationException("Serval connection string is required");

builder.Services.AddScoped<IPlatformService, ServalPlatformService>();

builder.Services.AddSingleton<IMessageOutboxService, MessageOutboxService>();

builder
.Services.AddGrpcClient<TranslationPlatformApi.TranslationPlatformApiClient>(o =>
{
Expand Down Expand Up @@ -338,6 +342,9 @@ public static IMachineBuilder AddMongoDataAccess(this IMachineBuilder builder, s
options.Interceptors.Add<UnimplementedInterceptor>();
});
builder.AddServalPlatformService(connectionString);

builder.Services.AddHostedService<MessageOutboxHandlerService>();

engineTypes ??=
builder.Configuration?.GetSection("TranslationEngines").Get<TranslationEngineType[]?>()
?? [TranslationEngineType.SmtTransfer, TranslationEngineType.Nmt];
Expand Down
23 changes: 23 additions & 0 deletions src/SIL.Machine.AspNetCore/Models/OutboxMessage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
namespace SIL.Machine.AspNetCore.Models;

public enum OutboxMessageMethod
{
BuildStarted,
BuildCompleted,
BuildCanceled,
BuildFaulted,
BuildRestarting,
InsertPretranslations,
IncrementTranslationEngineCorpusSize
}

public record OutboxMessage : IEntity
{
public string Id { get; set; } = "";
public int Revision { get; set; } = 1;
public required OutboxMessageMethod Method { get; set; }
public required string GroupId { get; set; }
public required string? RequestContent { get; set; }
public DateTimeOffset Created { get; set; } = DateTimeOffset.UtcNow;
public int Attempts { get; set; } = 0;
}
62 changes: 41 additions & 21 deletions src/SIL.Machine.AspNetCore/Services/ClearMLMonitorService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ public class ClearMLMonitorService(
IServiceProvider services,
IClearMLService clearMLService,
ISharedFileService sharedFileService,
IDataAccessContext dataAccessContext,
IOptions<ClearMLOptions> options,
ILogger<ClearMLMonitorService> logger
)
Expand All @@ -24,6 +25,7 @@ ILogger<ClearMLMonitorService> logger

private readonly IClearMLService _clearMLService = clearMLService;
private readonly ISharedFileService _sharedFileService = sharedFileService;
private readonly IDataAccessContext _dataAccessContext = dataAccessContext;
private readonly ILogger<ClearMLMonitorService> _logger = logger;
private readonly Dictionary<string, ProgressStatus> _curBuildStatus = new();

Expand Down Expand Up @@ -191,16 +193,22 @@ or ClearMLTaskStatus.Completed
)
{
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
if (!await buildJobService.BuildJobStartedAsync(engineId, buildId, cancellationToken))
return false;
}
await platformService.BuildStartedAsync(buildId, CancellationToken.None);
return await _dataAccessContext.WithTransactionAsync(
async (ct) =>
{
await using (await @lock.WriterLockAsync(cancellationToken: ct))
{
if (!await buildJobService.BuildJobStartedAsync(engineId, buildId, ct))
return false;
}
await platformService.BuildStartedAsync(buildId, CancellationToken.None);
await UpdateTrainJobStatus(platformService, buildId, new ProgressStatus(0), 0, cancellationToken);
_logger.LogInformation("Build started ({0})", buildId);
return true;
await UpdateTrainJobStatus(platformService, buildId, new ProgressStatus(0), 0, ct);
_logger.LogInformation("Build started ({0})", buildId);
return true;
},
cancellationToken: cancellationToken
);
}

private async Task<bool> TrainJobCompletedAsync(
Expand Down Expand Up @@ -252,12 +260,18 @@ CancellationToken cancellationToken
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
await platformService.BuildFaultedAsync(buildId, message, cancellationToken);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
await _dataAccessContext.WithTransactionAsync(
async (ct) =>
{
await platformService.BuildFaultedAsync(buildId, message, ct);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: cancellationToken
);
}
_logger.LogError("Build faulted ({0}). Error: {1}", buildId, message);
Expand All @@ -282,12 +296,18 @@ CancellationToken cancellationToken
IDistributedReaderWriterLock @lock = await lockFactory.CreateAsync(engineId, cancellationToken);
await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
{
await platformService.BuildCanceledAsync(buildId, cancellationToken);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
await _dataAccessContext.WithTransactionAsync(
async (ct) =>
{
await platformService.BuildCanceledAsync(buildId, ct);
await buildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: cancellationToken
);
}
_logger.LogInformation("Build canceled ({0})", buildId);
Expand Down
51 changes: 36 additions & 15 deletions src/SIL.Machine.AspNetCore/Services/HangfireBuildJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ public abstract class HangfireBuildJob(
IPlatformService platformService,
IRepository<TranslationEngine> engines,
IDistributedReaderWriterLockFactory lockFactory,
IDataAccessContext dataAccessContext,
IBuildJobService buildJobService,
ILogger<HangfireBuildJob> logger
) : HangfireBuildJob<object?>(platformService, engines, lockFactory, buildJobService, logger)
) : HangfireBuildJob<object?>(platformService, engines, lockFactory, dataAccessContext, buildJobService, logger)
{
public virtual Task RunAsync(
string engineId,
Expand All @@ -23,13 +24,15 @@ public abstract class HangfireBuildJob<T>(
IPlatformService platformService,
IRepository<TranslationEngine> engines,
IDistributedReaderWriterLockFactory lockFactory,
IDataAccessContext dataAccessContext,
IBuildJobService buildJobService,
ILogger<HangfireBuildJob<T>> logger
)
{
protected IPlatformService PlatformService { get; } = platformService;
protected IRepository<TranslationEngine> Engines { get; } = engines;
protected IDistributedReaderWriterLockFactory LockFactory { get; } = lockFactory;
protected IDataAccessContext DataAccessContext { get; } = dataAccessContext;
protected IBuildJobService BuildJobService { get; } = buildJobService;
protected ILogger<HangfireBuildJob<T>> Logger { get; } = logger;

Expand Down Expand Up @@ -69,12 +72,18 @@ await using (await @lock.WriterLockAsync(cancellationToken: cancellationToken))
completionStatus = JobCompletionStatus.Canceled;
await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.None))
{
await PlatformService.BuildCanceledAsync(buildId, CancellationToken.None);
await BuildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
await DataAccessContext.WithTransactionAsync(
async (ct) =>
{
await PlatformService.BuildCanceledAsync(buildId, CancellationToken.None);
await BuildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: CancellationToken.None
);
}
Logger.LogInformation("Build canceled ({0})", buildId);
Expand All @@ -86,8 +95,14 @@ await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.No
completionStatus = JobCompletionStatus.Restarting;
await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.None))
{
await PlatformService.BuildRestartingAsync(buildId, CancellationToken.None);
await BuildJobService.BuildJobRestartingAsync(engineId, buildId, CancellationToken.None);
await DataAccessContext.WithTransactionAsync(
async (ct) =>
{
await PlatformService.BuildRestartingAsync(buildId, CancellationToken.None);
await BuildJobService.BuildJobRestartingAsync(engineId, buildId, CancellationToken.None);
},
cancellationToken: CancellationToken.None
);
}
throw;
}
Expand All @@ -101,12 +116,18 @@ await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.No
completionStatus = JobCompletionStatus.Faulted;
await using (await @lock.WriterLockAsync(cancellationToken: CancellationToken.None))
{
await PlatformService.BuildFaultedAsync(buildId, e.Message, CancellationToken.None);
await BuildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
await DataAccessContext.WithTransactionAsync(
async (ct) =>
{
await PlatformService.BuildFaultedAsync(buildId, e.Message, CancellationToken.None);
await BuildJobService.BuildJobFinishedAsync(
engineId,
buildId,
buildComplete: false,
CancellationToken.None
);
},
cancellationToken: CancellationToken.None
);
}
Logger.LogError(0, e, "Build faulted ({0})", buildId);
Expand Down
11 changes: 11 additions & 0 deletions src/SIL.Machine.AspNetCore/Services/IMessageOutboxService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace SIL.Machine.AspNetCore.Services;

public interface IMessageOutboxService
{
public Task EnqueueMessageAsync(
OutboxMessageMethod method,
string groupId,
string requestContent,
CancellationToken cancellationToken
);
}
Loading

0 comments on commit 3e19b1d

Please sign in to comment.