Skip to content

Commit e2514dd

Browse files
committed
fix: hold lock on reddit id during write operations
These changes will serialize operations against the specific reddit post id in-process. There was a race condition where the background processor could pull in a reddit post id for processing, read all of the links, make the reddit API calls, and then remove the link from the queue. Specifically, the race condition is between the reddit API calls and removing the link from the queue (making it as processed). While the database operations are (under normal circumstances) extremely fast, the reddit API operations have consistently profiled as relatively slow (averaging 250ms per call), and each link can require up to 4 API calls to be made. This leaves a non-deterministic window where additional links sent to the API will never be posted.
1 parent a19b2a1 commit e2514dd

6 files changed

Lines changed: 90 additions & 58 deletions

File tree

ApplicationData/ApplicationData.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
</PackageReference>
2525
<PackageReference Include="Microsoft.Data.Sqlite" Version="8.0.2" />
2626
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="8.0.0" />
27+
<PackageReference Include="Nito.AsyncEx" Version="5.1.2" />
2728
<PackageReference Include="SnooBrowser" Version="3.1.1" />
2829
<PackageReference Include="System.Linq.Async" Version="6.0.1" />
2930
</ItemGroup>
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
using Microsoft.Extensions.Caching.Memory;
2+
using Nito.AsyncEx;
3+
4+
namespace ApplicationData.Services;
5+
6+
public sealed class ResourceAccessManager(IMemoryCache _memoryCache)
7+
{
8+
private readonly AsyncLock _resourceWriteLock = new();
9+
10+
public async Task<IDisposable> ObtainExclusiveAccess(string resourceId, CancellationToken cancellationToken)
11+
{
12+
var resourceLock = await GetOrCreateResourceLock(resourceId, cancellationToken);
13+
return await resourceLock.LockAsync(cancellationToken);
14+
}
15+
16+
public async Task<T> WithExclusiveAccess<T>(string resourceId, Func<Task<T>> action, CancellationToken cancellationToken)
17+
{
18+
var resourceLock = await GetOrCreateResourceLock(resourceId, cancellationToken);
19+
using var _ = await resourceLock.LockAsync(cancellationToken);
20+
21+
return await action();
22+
}
23+
24+
public async Task WithExclusiveAccess(string resourceId, Func<Task> action, CancellationToken cancellationToken) =>
25+
await WithExclusiveAccess(
26+
resourceId,
27+
async () =>
28+
{
29+
await action();
30+
return 0;
31+
},
32+
cancellationToken);
33+
34+
private async Task<AsyncLock> GetOrCreateResourceLock(string resourceId, CancellationToken cancellationToken)
35+
{
36+
using var _ = await _resourceWriteLock.LockAsync(cancellationToken);
37+
38+
var resourceLock = _memoryCache.GetOrCreate(
39+
resourceId,
40+
entry =>
41+
{
42+
entry.SetSlidingExpiration(TimeSpan.FromMinutes(10));
43+
return new AsyncLock();
44+
});
45+
46+
if (resourceLock is null)
47+
throw new InvalidOperationException($"Resource lock was not created or retrieved (ResourceId={resourceId}");
48+
49+
return resourceLock;
50+
}
51+
}

BackgroundProcessor/BackgroundProcessor.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
<UserSecretsId>dotnet-BackgroundProcessor-35541A68-9074-4C0A-9003-D5B4779CC1E8</UserSecretsId>
88
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
99
<LangVersion>12</LangVersion>
10+
<OutputType>Library</OutputType>
1011
</PropertyGroup>
1112

1213
<ItemGroup>

BackgroundProcessor/Processors/LinkProcessor.cs

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,35 +12,17 @@ namespace BackgroundProcessor.Processors;
1212

1313
// ReSharper disable once UnusedType.Global
1414
// it is auto-created
15-
public class LinkProcessor : IBackgroundProcessor
15+
public class LinkProcessor(
16+
ILogger<LinkProcessor> _logger,
17+
LinkProvider _linkProvider,
18+
RedditCommentProvider _commentProvider,
19+
CommentBrowser _commentBrowser,
20+
UserProvider _userProvider,
21+
TemplateCache _templateCache,
22+
IDbConnectionFactory _dbConnectionFactory,
23+
ResourceAccessManager _resourceAccessManager
24+
) : IBackgroundProcessor
1625
{
17-
private readonly ILogger<LinkProcessor> _logger;
18-
private readonly LinkProvider _linkProvider;
19-
private readonly RedditCommentProvider _commentProvider;
20-
private readonly CommentBrowser _commentBrowser;
21-
private readonly UserProvider _userProvider;
22-
private readonly TemplateCache _templateCache;
23-
private readonly IDbConnectionFactory _dbConnectionFactory;
24-
25-
public LinkProcessor(
26-
ILogger<LinkProcessor> logger,
27-
LinkProvider linkProvider,
28-
RedditCommentProvider commentProvider,
29-
CommentBrowser commentBrowser,
30-
UserProvider userProvider,
31-
TemplateCache templateCache,
32-
IDbConnectionFactory dbConnectionFactory
33-
)
34-
{
35-
_logger = logger;
36-
_linkProvider = linkProvider;
37-
_commentProvider = commentProvider;
38-
_commentBrowser = commentBrowser;
39-
_userProvider = userProvider;
40-
_templateCache = templateCache;
41-
_dbConnectionFactory = dbConnectionFactory;
42-
}
43-
4426
/// <inheritdoc />
4527
public async Task Process(CancellationToken cancellationToken)
4628
{
@@ -50,6 +32,8 @@ public async Task Process(CancellationToken cancellationToken)
5032

5133
foreach (var item in postIdsWithPendingChanges)
5234
{
35+
using var redditPostIdLock = await _resourceAccessManager.ObtainExclusiveAccess(item.RedditPostId, cancellationToken);
36+
5337
var links = await _linkProvider.GetLinksByRedditPostId(item.RedditPostId);
5438
var maybeExistingComment = await _commentProvider.FindCommentIdByPostId(item.RedditPostId);
5539

BackgroundProcessor/Program.cs

Lines changed: 0 additions & 18 deletions
This file was deleted.

WebApi/Controllers/LinkController.cs

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,21 @@ public class LinkController : ApiController
2121
{
2222
private readonly LinkProvider _linkProvider;
2323
private readonly SubmissionBrowser _submissionBrowser;
24+
private readonly ResourceAccessManager _resourceAccessManager;
2425

2526
/// <summary>
2627
/// C'tor
2728
/// </summary>
2829
public LinkController(
2930
IHttpContextAccessor httpContextAccessor,
3031
LinkProvider linkProvider,
31-
SubmissionBrowser submissionBrowser
32+
SubmissionBrowser submissionBrowser,
33+
ResourceAccessManager resourceAccessManager
3234
) : base(httpContextAccessor)
3335
{
3436
_linkProvider = linkProvider;
3537
_submissionBrowser = submissionBrowser;
38+
_resourceAccessManager = resourceAccessManager;
3639
}
3740

3841
/// <summary>
@@ -47,7 +50,7 @@ SubmissionBrowser submissionBrowser
4750
[SwaggerResponse((int)HttpStatusCode.Conflict,
4851
description: "Conflict. This user has already provided this combination of reddit post ID, URL, and link type.",
4952
typeof(LinkAlreadyExistsError))]
50-
public async Task<IActionResult> SubmitLink([FromBody] SubmitLinkRequest linkRequest)
53+
public async Task<IActionResult> SubmitLink([FromBody] SubmitLinkRequest linkRequest, CancellationToken cancellationToken)
5154
{
5255
var validUrl = GetValidUriOrFail(linkRequest.LinkUrl);
5356
var linkKind = GetLinkKindOrFail(linkRequest.LinkType!.Value);
@@ -60,12 +63,16 @@ public async Task<IActionResult> SubmitLink([FromBody] SubmitLinkRequest linkReq
6063
RedditPostId: linkRequest.RedditPostId!
6164
));
6265

63-
var createResult = await _linkProvider.CreateLink(new NewLink(
64-
redditPostId: linkRequest.RedditPostId,
65-
linkUrl: validUrl.OriginalString,
66-
linkKind,
67-
ownerUserId: UserId
68-
));
66+
var createResult = await _resourceAccessManager.WithExclusiveAccess(
67+
linkRequest.RedditPostId!,
68+
async () =>
69+
await _linkProvider.CreateLink(new NewLink(
70+
redditPostId: linkRequest.RedditPostId,
71+
linkUrl: validUrl.OriginalString,
72+
linkKind,
73+
ownerUserId: UserId
74+
)),
75+
cancellationToken);
6976

7077
if (createResult.IsFailure)
7178
{
@@ -93,7 +100,7 @@ public async Task<IActionResult> SubmitLink([FromBody] SubmitLinkRequest linkReq
93100
[SwaggerResponse((int)HttpStatusCode.NotFound,
94101
description: "Not Found. This user does not have this combination of reddit post ID, URL, and link type.",
95102
typeof(LinkNotFoundError))]
96-
public async Task<IActionResult> DeleteLinkByLinkData([FromBody] DeleteLinkRequest linkRequest)
103+
public async Task<IActionResult> DeleteLinkByLinkData([FromBody] DeleteLinkRequest linkRequest, CancellationToken cancellationToken)
97104
{
98105
var validUrl = GetValidUriOrFail(linkRequest.LinkUrl);
99106
var linkKind = GetLinkKindOrFail(linkRequest.LinkType!.Value);
@@ -105,7 +112,10 @@ public async Task<IActionResult> DeleteLinkByLinkData([FromBody] DeleteLinkReque
105112
Url: validUrl.OriginalString
106113
));
107114

108-
await _linkProvider.DeleteLinkById(link.LinkId);
115+
await _resourceAccessManager.WithExclusiveAccess(
116+
linkRequest.RedditPostId,
117+
() => _linkProvider.DeleteLinkById(link.LinkId),
118+
cancellationToken);
109119

110120
return new OkObjectResult(new LinkDeleteQueuedSuccessfully(
111121
Message: TranslatedStrings.LinkController.LinkDeleted,
@@ -125,15 +135,18 @@ public async Task<IActionResult> DeleteLinkByLinkData([FromBody] DeleteLinkReque
125135
[SwaggerResponse((int)HttpStatusCode.NotFound,
126136
description: "Not Found. This user does not have this combination of reddit post ID, URL, and link type.",
127137
typeof(LinkIdNotFoundError))]
128-
public async Task<IActionResult> DeleteLinkById([FromRoute] int linkId)
138+
public async Task<IActionResult> DeleteLinkById([FromRoute] int linkId, CancellationToken cancellationToken)
129139
{
130140
if (!(await _linkProvider.FindLinkById(UserId, linkId)).Try(out var link))
131141
return new NotFoundObjectResult(new LinkIdNotFoundError(
132142
Message: TranslatedStrings.LinkController.LinkIdNotFound(linkId),
133143
LinkId: linkId
134144
));
135145

136-
await _linkProvider.DeleteLinkById(link.LinkId);
146+
await _resourceAccessManager.WithExclusiveAccess(
147+
link.RedditPostId,
148+
() => _linkProvider.DeleteLinkById(link.LinkId),
149+
cancellationToken);
137150

138151
return new OkObjectResult(new LinkDeleteQueuedSuccessfully(
139152
Message: TranslatedStrings.LinkController.LinkDeleted,

0 commit comments

Comments
 (0)