Skip to content

Commit 83ee344

Browse files
committed
fix: don't halt all processing if error occurs
1 parent 907e86e commit 83ee344

1 file changed

Lines changed: 57 additions & 28 deletions

File tree

BackgroundProcessor/Processors/LinkProcessor.cs

Lines changed: 57 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -52,50 +52,79 @@ public async Task Process(CancellationToken cancellationToken)
5252

5353
foreach (var item in postIdsWithPendingChanges)
5454
{
55-
using var redditPostIdLock = await _resourceAccessManager.ObtainExclusiveAccess(item.RedditPostId, cancellationToken);
55+
await WithLogging(
56+
item.RedditPostId,
57+
item.QueuedItemId,
58+
async ct =>
59+
{
60+
using var redditPostIdLock = await _resourceAccessManager.ObtainExclusiveAccess(item.RedditPostId, cancellationToken);
61+
await ProcessLink(item.RedditPostId, item.QueuedItemId, ct);
62+
},
63+
cancellationToken);
64+
}
65+
}
5666

57-
var links = await _linkProvider.GetLinksByRedditPostId(item.RedditPostId);
58-
var maybeExistingComment = await _commentProvider.FindCommentIdByPostId(item.RedditPostId);
67+
private async Task WithLogging(string redditPostId, int queuedItemId, Func<CancellationToken, Task> action, CancellationToken cancellationToken)
68+
{
69+
using var _ = _logger.BeginScope(new Dictionary<string, object>
70+
{
71+
["RedditPostId"] = redditPostId,
72+
["QueuedItemId"] = queuedItemId,
73+
});
5974

60-
await using var connection = _dbConnectionFactory.CreateConnection();
61-
await using var lazyTx = LazyDbTransaction.CreateFrom(connection, IsolationLevel.Serializable);
75+
try
76+
{
77+
await action(cancellationToken);
78+
}
79+
catch (Exception ex)
80+
{
81+
_logger.LogError(ex, "An error occurred while processing link");
82+
}
83+
}
6284

63-
if (!links.Any())
85+
private async Task ProcessLink(string redditPostId, int queuedItemId, CancellationToken cancellationToken)
86+
{
87+
var links = await _linkProvider.GetLinksByRedditPostId(redditPostId);
88+
var maybeExistingComment = await _commentProvider.FindCommentIdByPostId(redditPostId);
89+
90+
await using var connection = _dbConnectionFactory.CreateConnection();
91+
await using var lazyTx = LazyDbTransaction.CreateFrom(connection, IsolationLevel.Serializable);
92+
93+
if (!links.Any())
94+
{
95+
if (maybeExistingComment.HasValue)
96+
await _commentBrowser.DeleteComment(CommentThing.CreateFromShortId(maybeExistingComment.Value));
97+
}
98+
else
99+
{
100+
var message = await BuildComment(links,
101+
getUsername: async userId => (await _userProvider.FindUserByIdIncludeDeleted(userId)).Value.DisplayUsername,
102+
cancellationToken);
103+
104+
if (maybeExistingComment.HasValue)
64105
{
65-
if (maybeExistingComment.HasValue)
66-
await _commentBrowser.DeleteComment(CommentThing.CreateFromShortId(maybeExistingComment.Value));
106+
var result = await _commentBrowser.EditComment(CommentThing.CreateFromShortId(maybeExistingComment.Value), message);
107+
await TryDistinguishStickyAndLockLogFailure(result.ParentId, result.CommentId);
67108
}
68109
else
69110
{
70-
var message = await BuildComment(links,
71-
getUsername: async userId => (await _userProvider.FindUserByIdIncludeDeleted(userId)).Value.DisplayUsername,
72-
cancellationToken);
73-
74-
if (maybeExistingComment.HasValue)
75-
{
76-
var result = await _commentBrowser.EditComment(CommentThing.CreateFromShortId(maybeExistingComment.Value), message);
77-
await TryDistinguishStickyAndLockLogFailure(result.ParentId, result.CommentId);
78-
}
79-
else
80-
{
81-
var result = await _commentBrowser.SubmitComment(LinkThing.CreateFromShortId(item.RedditPostId), message);
82-
await TryDistinguishStickyAndLockLogFailure(result.ParentId, result.CommentId);
111+
var result = await _commentBrowser.SubmitComment(LinkThing.CreateFromShortId(redditPostId), message);
112+
await TryDistinguishStickyAndLockLogFailure(result.ParentId, result.CommentId);
83113

84114
#pragma warning disable IDISP001
85-
var innerTx = await lazyTx.GetOrCreateTx(cancellationToken);
115+
var innerTx = await lazyTx.GetOrCreateTx(cancellationToken);
86116
#pragma warning restore IDISP001
87-
await RedditCommentProvider.CreateOrUpdateLinkedComment(innerTx, item.RedditPostId, result.CommentId.ShortId);
88-
}
117+
await RedditCommentProvider.CreateOrUpdateLinkedComment(innerTx, redditPostId, result.CommentId.ShortId);
89118
}
119+
}
90120

91121
#pragma warning disable IDISP001
92-
var tx = await lazyTx.GetOrCreateTx(cancellationToken);
122+
var tx = await lazyTx.GetOrCreateTx(cancellationToken);
93123
#pragma warning restore IDISP001
94124

95-
await LinkProvider.MarkRedditPostIdAsProcessed(tx, item.QueuedItemId);
125+
await LinkProvider.MarkRedditPostIdAsProcessed(tx, queuedItemId);
96126

97-
await tx.Commit(cancellationToken);
98-
}
127+
await tx.Commit(cancellationToken);
99128
}
100129

101130
private async Task<string> BuildComment(IReadOnlyCollection<Link> links, Func<int, Task<string>> getUsername, CancellationToken cancellationToken)

0 commit comments

Comments
 (0)