-
Notifications
You must be signed in to change notification settings - Fork 21
/
DownloadCoordinator.cs
202 lines (164 loc) · 7.59 KB
/
DownloadCoordinator.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
// -----------------------------------------------------------------------
// <copyright file="DownloadCoordinator.cs" company="Petabridge, LLC">
// Copyright (C) 2015 - 2019 Petabridge, LLC <https://petabridge.com>
// </copyright>
// -----------------------------------------------------------------------
using System;
using Akka.Actor;
using Akka.Event;
using Akka.Streams;
using Akka.Streams.Dsl;
using WebCrawler.Shared.IO.Messages;
using WebCrawler.Shared.State;
namespace WebCrawler.Shared.IO
{
/// <summary>
/// Actor responsible for using Akka.Streams to execute download and parsing of all content.
/// Can be remote-deployed to other systems.
/// Publishes statistics updates to its parent.
/// </summary>
public class DownloadCoordinator : ReceiveActor
{
private const int DefaultMaxConcurrentDownloads = 50;
protected readonly IActorRef Commander;
protected readonly IActorRef DownloadsTracker;
protected readonly long MaxConcurrentDownloads;
private readonly ILoggingAdapter _logger = Context.GetLogger();
private ICancelable _publishStatsTask;
protected IActorRef DownloaderRouter;
protected CrawlJob Job;
protected IActorRef ParserRouter;
protected IActorRef SourceActor;
protected CrawlJobStats Stats;
public DownloadCoordinator(CrawlJob job, IActorRef commander, IActorRef downloadsTracker,
long maxConcurrentDownloads)
{
Job = job;
DownloadsTracker = downloadsTracker;
MaxConcurrentDownloads = maxConcurrentDownloads;
Commander = commander;
Stats = new CrawlJobStats(Job);
var selfHtmlSink = Sink.ActorRef<CheckDocuments>(Self, StreamCompleteTick.Instance);
var selfDocSink = Sink.ActorRef<CompletedDocument>(Self, StreamCompleteTick.Instance);
var selfImgSink = Sink.ActorRef<CompletedDocument>(Self, StreamCompleteTick.Instance);
var htmlFlow = Flow.Create<CrawlDocument>().Via(DownloadFlow.SelectDocType())
.Throttle(30, TimeSpan.FromSeconds(5), 100, ThrottleMode.Shaping)
.Via(DownloadFlow.ProcessHtmlDownloadFor(DefaultMaxConcurrentDownloads, HttpClientFactory.GetClient()));
var imageFlow = Flow.Create<CrawlDocument>()
.Via(DownloadFlow.SelectDocType())
.Throttle(30, TimeSpan.FromSeconds(1), 100, ThrottleMode.Shaping)
.Via(DownloadFlow.ProcessImageDownloadFor(DefaultMaxConcurrentDownloads, HttpClientFactory.GetClient()))
.Via(DownloadFlow.ProcessCompletedDownload());
var source = Source.ActorRef<CrawlDocument>(5000, OverflowStrategy.DropTail);
var graph = GraphDsl.Create(source, (builder, s) =>
{
// html flows
var downloadHtmlFlow = builder.Add(htmlFlow);
var downloadBroadcast = builder.Add(new Broadcast<DownloadHtmlResult>(2));
var completedDownload = builder.Add(DownloadFlow.ProcessCompletedHtmlDownload());
var parseCompletedDownload = builder.Add(ParseFlow.GetParseFlow(Job));
var htmlSink = builder.Add(selfHtmlSink);
var docSink = builder.Add(selfDocSink);
builder.From(downloadHtmlFlow).To(downloadBroadcast);
builder.From(downloadBroadcast.Out(0)).To(completedDownload.Inlet);
builder.From(downloadBroadcast.Out(1)).To(parseCompletedDownload.Inlet);
builder.From(parseCompletedDownload).To(htmlSink);
builder.From(completedDownload).To(docSink);
// image flows
var imgSink = builder.Add(selfImgSink);
var downloadImageFlow = builder.Add(imageFlow);
builder.From(downloadImageFlow).To(imgSink);
var sourceBroadcast = builder.Add(new Broadcast<CrawlDocument>(2));
builder.From(sourceBroadcast.Out(0)).To(downloadImageFlow.Inlet);
builder.From(sourceBroadcast.Out(1)).To(downloadHtmlFlow.Inlet);
builder.From(s.Outlet).To(sourceBroadcast.In);
return ClosedShape.Instance;
});
SourceActor = Context.Materializer().Materialize(graph);
Receiving();
}
protected override void PreStart()
{
// Schedule regular stats updates
_publishStatsTask = new Cancelable(Context.System.Scheduler);
Context.System.Scheduler.ScheduleTellRepeatedly(TimeSpan.FromMilliseconds(250),
TimeSpan.FromMilliseconds(250), Self, PublishStatsTick.Instance, Self, _publishStatsTask);
}
protected override void PreRestart(Exception reason, object message)
{
//don't dispose of children
PostStop();
}
protected override void PostStop()
{
try
{
//cancel the regularly scheduled task
_publishStatsTask.Cancel();
}
catch
{
}
}
private void Receiving()
{
Receive<PublishStatsTick>(stats =>
{
if (!Stats.IsEmpty)
{
_logger.Info("Publishing {0} to parent", Stats);
Commander.Tell(Stats.Copy());
//reset our stats after publishing
Stats = Stats.Reset();
}
});
//Received word from a ParseWorker that we need to check for new documents
Receive<CheckDocuments>(documents =>
{
//forward this onto the downloads tracker, but have it reply back to our parent router so the work might get distributed more evenly
DownloadsTracker.Tell(documents, Context.Parent);
});
//Update our local stats
Receive<DiscoveredDocuments>(discovered => { Stats = Stats.WithDiscovered(discovered); });
//Received word from the DownloadTracker that we need to process some docs
Receive<ProcessDocuments>(process =>
{
foreach (var doc in process.Documents)
SourceActor.Tell(doc);
});
//hand the work off to the downloaders
Receive<IDownloadDocument>(download => { SourceActor.Tell(download.Document); });
Receive<CompletedDocument>(completed =>
{
_logger.Info("Logging completed download {0} bytes {1}", completed.Document.DocumentUri,
completed.NumBytes);
Stats = Stats.WithCompleted(completed);
_logger.Info("Total stats {0}", Stats);
});
Receive<StreamCompleteTick>(_ => { _logger.Info("Stream has completed. No more messages to process."); });
}
#region Constants
public const string Downloader = "downloader";
public const string Parser = "parser";
#endregion
#region Messages
/// <summary>
/// Used to signal that it's time to publish to the JobMaster
/// </summary>
public class PublishStatsTick
{
private PublishStatsTick()
{
}
public static PublishStatsTick Instance { get; } = new PublishStatsTick();
}
public class StreamCompleteTick
{
public static readonly StreamCompleteTick Instance = new StreamCompleteTick();
private StreamCompleteTick()
{
}
}
#endregion
}
}