-
Notifications
You must be signed in to change notification settings - Fork 11
/
server.ex
285 lines (220 loc) · 7.35 KB
/
server.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
defmodule Crawlie.Stats.Server do
use GenServer
alias Crawlie.Page
alias Crawlie.Response
alias Crawlie.Utils
alias Crawlie.Stats.Counter, as: Count
alias Crawlie.Stats.Distribution, as: Dist
defmodule Data do
alias __MODULE__, as: This
@typedoc """
Crawling status.
"""
@type status :: :ready | :crawling | :finished
@typedoc """
Stats data returned by `Crawlie.Stats.Server.get_stats/1`.
"""
@type t :: %This{
uris_visited: integer,
uris_extracted: integer,
depths_dist: map,
retry_count_dist: map,
bytes_received: integer,
status_codes_dist: map,
content_types_dist: map,
failed_fetch_uris: MapSet.t(URI.t),
uris_skipped: integer,
failed_parse_uris: MapSet.t(URI.t),
status: status,
utimestamp_started: nil | integer,
utimestamp_finished: nil | integer,
usec_spent_fetching: integer,
}
defstruct [
uris_visited: 0, # fetch
uris_extracted: 0, # extract
depths_dist: %{}, # fetch
retry_count_dist: %{}, # fetch
bytes_received: 0, # fetch
status_codes_dist: %{}, # fetch
content_types_dist: %{}, # fetch
failed_fetch_uris: MapSet.new(), # fetch
uris_skipped: 0, # parse
failed_parse_uris: MapSet.new(), # parse
status: :ready, # | :crawling | :finished # fetch, also UrlManager.shutdown_gracefully()
utimestamp_started: nil, # see status
utimestamp_finished: nil, # see status
usec_spent_fetching: 0, # fetch
]
def new(), do: %This{}
@doc """
Returns time spent crawling (so far), in microseconds.
"""
def elapsed_usec(%This{utimestamp_started: nil}), do: 0
def elapsed_usec(%This{utimestamp_started: start, utimestamp_finished: nil}) do
Utils.utimestamp() - start
end
def elapsed_usec(%This{utimestamp_started: start, utimestamp_finished: finish}) do
finish - start
end
@doc """
Returns true if the crawling is finished
"""
def finished?(%This{status: :finished}), do: true
def finished?(%This{}), do: false
end
defmodule ResponseView do
@moduledoc false
defstruct [
:status_code,
:content_type_simple,
:body_length,
]
def new(%Response{} = resp) do
%ResponseView {
status_code: resp.status_code,
content_type_simple: Response.content_type_simple(resp),
body_length: byte_size(resp.body),
}
end
end
alias __MODULE__, as: This
@moduledoc """
Tracks the crawling statistics of a particular crawling session.
"""
@ref_marker :stats
@typedoc """
Reference used in the API to query the correct instance of `Crawlie.Stats.Server`.
"""
@type ref :: {:stats, pid()}
#===========================================================================
# API Functions
#===========================================================================
@spec start_new() :: ref
@doc false
def start_new() do
{:ok, pid} = Crawlie.Supervisor.start_stats_server
pid_to_ref(pid)
end
@spec get_stats(ref) :: Data.t
@doc """
Returns a `t:Crawlie.Stats.Server.Data.t/0` object containing the crawling stats for a particular session.
The ref can be obtained from `Crawlie.crawl_and_track_stats/3`'s return tuple.
"""
def get_stats(ref) do
pid = ref_to_pid(ref)
GenServer.call(pid, :get_stats)
end
@doc false
def fetch_succeeded(ref, page, response, duration_usec) do
pid = ref_to_pid(ref)
response_view = ResponseView.new(response)
GenServer.cast(pid, {:fetch_succeeded, page, response_view, duration_usec})
end
@doc false
def fetch_failed(ref, page, max_failed_uris_to_track) do
pid = ref_to_pid(ref)
GenServer.cast(pid, {:fetch_failed, page, max_failed_uris_to_track})
end
@doc false
def parse_failed(ref, page, max_failed_uris_to_track) do
pid = ref_to_pid(ref)
GenServer.cast(pid, {:parse_failed, page, max_failed_uris_to_track})
end
@doc false
def page_skipped(ref, _page) do
pid = ref_to_pid(ref)
GenServer.cast(pid, :page_skipped)
end
@doc false
def uris_extracted(ref, count) do
pid = ref_to_pid(ref)
GenServer.cast(pid, {:uris_extracted, count})
end
@doc false
def finished(ref) do
pid = ref_to_pid(ref)
GenServer.cast(pid, :finished)
end
#===========================================================================
# Business logic
#===========================================================================
@doc false
def handle_call(:get_stats, _from, state) do
{:reply, state, state}
end
@doc false
def handle_cast({:fetch_succeeded, %Page{} = page, %ResponseView{} = response_view, duration_usec}, data) do
%Page{uri: _uri, retries: retries, depth: depth} = page
%ResponseView {status_code: status_code, content_type_simple: content_type, body_length: body_length} = response_view
data =
data
|> maybe_start_crawling()
|> Count.inc(:usec_spent_fetching, duration_usec)
|> Count.inc(:bytes_received, body_length)
|> Count.inc(:uris_visited)
|> Dist.add(:depths_dist, depth)
|> Dist.add(:retry_count_dist, retries)
|> Dist.add(:status_codes_dist, status_code)
|> Dist.add(:content_types_dist, content_type)
{:noreply, data}
end
def handle_cast({:fetch_failed, %Page{} = page, max_uris_to_track}, %Data{failed_fetch_uris: failed_fetch_uris} = data) do
failed_fetch_uris =
case MapSet.size(failed_fetch_uris) do
few when few < max_uris_to_track -> MapSet.put(failed_fetch_uris, page.uri)
_ -> failed_fetch_uris
end
{:noreply, %Data{data | failed_fetch_uris: failed_fetch_uris}}
end
def handle_cast({:parse_failed, %Page{} = page, max_uris_to_track}, %Data{failed_parse_uris: failed_parse_uris} = data) do
failed_parse_uris =
case MapSet.size(failed_parse_uris) do
few when few < max_uris_to_track -> MapSet.put(failed_parse_uris, page.uri)
_ -> failed_parse_uris
end
{:noreply, %Data{data | failed_parse_uris: failed_parse_uris}}
end
def handle_cast(:page_skipped, data) do
data = Count.inc(data, :uris_skipped)
{:noreply, data}
end
def handle_cast({:uris_extracted, count}, data) do
data = Count.inc(data, :uris_extracted, count)
{:noreply, data}
end
def handle_cast(:finished, %Data{status: :finished} = data) do
{:noreply, data}
end
def handle_cast(:finished, %Data{} = data) do
data = %Data{
data |
status: :finished,
utimestamp_finished: Utils.utimestamp(),
}
{:noreply, data}
end
#===========================================================================
# Plumbing
#===========================================================================
@doc false
def init([]) do
state = This.Data.new()
{:ok, state}
end
@doc false
def start_link() do
GenServer.start_link(__MODULE__, [])
end
#===========================================================================
# Inernal Functions
#===========================================================================
defp pid_to_ref(pid) do
{@ref_marker, pid}
end
defp ref_to_pid({@ref_marker, pid}), do: pid
defp maybe_start_crawling(%Data{status: :ready} = data) do
%Data{data | status: :crawling, utimestamp_started: Utils.utimestamp()}
end
defp maybe_start_crawling(%Data{status: _} = data), do: data
end