/
CrawlerLocks.cs
123 lines (114 loc) · 5.03 KB
/
CrawlerLocks.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
namespace tbm.Crawler.Tieba.Crawl;
public class CrawlerLocks : WithLogTrace
{
public static List<string> RegisteredCrawlerLocks { get; } = new() {"thread", "threadLate", "reply", "subReply"};
public record LockId(Fid Fid, Tid? Tid = null, Pid? Pid = null)
{
public override string ToString() => $"f{Fid}" + (Tid == null ? "" : $" t{Tid}") + (Pid == null ? "" : $" p{Pid}");
};
private readonly ConcurrentDictionary<LockId, ConcurrentDictionary<Page, Time>> _crawling = new();
// inner value of field _failed with type ushort refers to failed times on this page and lockId before retry
private readonly ConcurrentDictionary<LockId, ConcurrentDictionary<Page, FailureCount>> _failed = new();
private readonly ILogger<CrawlerLocks> _logger;
private readonly IConfigurationSection _config;
public string LockType { get; }
public CrawlerLocks(ILogger<CrawlerLocks> logger, IConfiguration config, string lockType)
{
_logger = logger;
_config = config.GetSection($"CrawlerLocks:{lockType}");
LockType = lockType;
InitLogTrace(_config);
}
protected override void LogTrace()
{
if (!ShouldLogTrace()) return;
lock (_crawling)
lock (_failed)
{
_logger.LogTrace("Lock: type={} crawlingIdCount={} crawlingPageCount={} crawlingPageCountsKeyById={} failedIdCount={} failedPageCount={} failures={}", LockType,
_crawling.Count, _crawling.Values.Select(d => d.Count).Sum(),
Helper.UnescapedJsonSerialize(_crawling.ToDictionary(i => i.Key.ToString(), i => i.Value.Count)),
_failed.Count, _failed.Values.Select(d => d.Count).Sum(),
Helper.UnescapedJsonSerialize(_failed.ToDictionary(i => i.Key.ToString(), i => i.Value)));
}
}
public IEnumerable<Page> AcquireRange(LockId lockId, IEnumerable<Page> pages)
{
var lockFreePages = pages.ToHashSet();
lock (_crawling)
{ // lock the entire ConcurrentDictionary since following bulk insert should be a single atomic operation
Helper.GetNowTimestamp(out var now);
if (!_crawling.ContainsKey(lockId))
{ // if no one is locking any page in lockId, just insert pages then return it as is
var pageTimeDict = lockFreePages.Select(i => KeyValuePair.Create(i, now));
var newPage = new ConcurrentDictionary<Page, Time>(pageTimeDict);
if (_crawling.TryAdd(lockId, newPage)) return lockFreePages;
}
lockFreePages.ToList().ForEach(page => // iterate on copy in order to mutate the original lockFreePages
{
var pagesLock = _crawling[lockId];
lock (pagesLock)
{
if (pagesLock.TryAdd(page, now)) return;
// when page is locking:
var lockTimeout = _config.GetValue<Time>("LockTimeoutSec", 300); // 5 minutes;
if (pagesLock[page] < now - lockTimeout)
pagesLock[page] = now;
else _ = lockFreePages.Remove(page);
}
});
}
return lockFreePages;
}
public void ReleaseRange(LockId lockId, IEnumerable<Page> pages)
{
lock (_crawling)
{
if (!_crawling.TryGetValue(lockId, out var pagesLock))
{
_logger.LogWarning("Try to release a crawling page lock {} in {} id {} more than once",
pages, LockType, lockId);
return;
}
lock (pagesLock)
{
pages.ForEach(i => pagesLock.TryRemove(i, out _));
if (pagesLock.IsEmpty) _ = _crawling.TryRemove(lockId, out _);
}
}
}
public void AcquireFailed(LockId lockId, Page page, FailureCount failureCount)
{
var maxRetry = _config.GetValue<FailureCount>("MaxRetryTimes", 5);
if (failureCount >= maxRetry)
{
_logger.LogInformation("Retry for previous failed crawling of page {} in {} id {} has been canceled since it's reaching the configured max retry times {}",
page, LockType, lockId, maxRetry);
return;
}
lock (_failed)
{
if (_failed.TryGetValue(lockId, out var pagesLock))
{
lock (pagesLock) if (!pagesLock.TryAdd(page, failureCount)) pagesLock[page] = failureCount;
}
else
{
var newPage = new ConcurrentDictionary<Page, FailureCount> { [page] = failureCount };
_ = _failed.TryAdd(lockId, newPage);
}
}
}
public Dictionary<LockId, Dictionary<Page, FailureCount>> RetryAllFailed()
{
lock (_failed)
{
var deepCloneOfFailed = _failed.ToDictionary(i => i.Key, i =>
{
lock (i.Value) return new Dictionary<Page, FailureCount>(i.Value);
});
_failed.Clear();
return deepCloneOfFailed;
}
}
}