-
Notifications
You must be signed in to change notification settings - Fork 11
/
aiohttplib.py
252 lines (215 loc) · 9.65 KB
/
aiohttplib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
import asyncio
from typing import TYPE_CHECKING, Any, Optional, Tuple, TypeVar, Union
import aiohttp
import scrapy
from itemadapter import ItemAdapter
from scrapy import signals
from scrapy.http import HtmlResponse
from scrapy.utils.python import global_object_name
from ayugespidertools.common.multiplexing import ReuseOperation
from ayugespidertools.common.params import Param
from ayugespidertools.common.typevars import AiohttpConf, AiohttpRequestArgs
from ayugespidertools.common.utils import ToolsForAyu
from ayugespidertools.config import logger
__all__ = [
"AiohttpDownloaderMiddleware",
]
if TYPE_CHECKING:
from scrapy import Request
from scrapy.crawler import Crawler
from scrapy.http import Response
from scrapy.statscollectors import StatsCollector
from typing_extensions import Self
from ayugespidertools.common.typevars import slogT
from ayugespidertools.spiders import AyuSpider
ItemAdapterT = TypeVar("ItemAdapterT", bound=ItemAdapter)
class AiohttpDownloaderMiddleware:
"""Downloader middleware handling the requests with aiohttp"""
session: "aiohttp.ClientSession"
priority_adjust: int
aiohttp_cfg: AiohttpConf
aiohttp_args: dict
slog: "slogT"
def _retry(
self,
request: scrapy.Request,
reason: int,
spider: scrapy.Spider,
) -> Optional[scrapy.Request]:
"""重试请求
Args:
request: scrapy request
reason: reason
spider: scrapy spider
Returns:
Optional[scrapy.Request]: 重试的 request 对象
"""
retries = request.meta.get("retry_times", 0) + 1
stats = spider.crawler.stats
if retries <= self.aiohttp_cfg.retry_times:
return self._retry_with_limit(request, retries, reason, stats)
stats.inc_value("retry/max_reached")
logger.error(f"Gave up retrying {request} (failed {retries} times): {reason}")
return None
def _retry_with_limit(
self,
request: scrapy.Request,
retries: int,
reason: int,
stats: "StatsCollector",
):
logger.debug(f"Retrying {request} (failed {retries} times): {reason}")
retry_req = request.copy()
retry_req.meta["retry_times"] = retries
retry_req.dont_filter = True
# 优先级逐级降低,以防堆积
retry_req.priority = request.priority + self.priority_adjust
if isinstance(reason, Exception):
reason = global_object_name(reason.__class__)
stats.inc_value("retry/count")
stats.inc_value(f"retry/reason_count/{reason}")
return retry_req
def _get_args(self, key: str) -> Any:
"""根据优先级依次获取不为 None 的请求参数"""
data_lst = [
self.aiohttp_args.get(key),
getattr(self.aiohttp_cfg, key),
]
return ToolsForAyu.first_not_none(data_lst=data_lst)
def spider_opened(self, spider: "AyuSpider") -> None:
self.slog = spider.slog
settings = spider.crawler.settings
# 自定义 aiohttp 全局配置信息,优先级小于 aiohttp_meta 中的配置
if local_aiohttp_conf := settings.get("AIOHTTP_CONFIG", {}):
# 这里的配置信息如果在 aiohttp_meta 中重复设置,则会更新当前请求的参数
self.aiohttp_cfg = AiohttpConf(
timeout=settings.get("DOWNLOAD_TIMEOUT"),
sleep=local_aiohttp_conf.get("sleep"),
proxy=local_aiohttp_conf.get("proxy"),
proxy_auth=local_aiohttp_conf.get("proxy_auth"),
proxy_headers=local_aiohttp_conf.get("proxy_headers"),
retry_times=local_aiohttp_conf.get(
"retry_times", Param.aiohttp_retry_times_default
),
limit=local_aiohttp_conf.get("limit", 100),
ssl=local_aiohttp_conf.get("ssl", True),
verify_ssl=local_aiohttp_conf.get("verify_ssl", True),
limit_per_host=local_aiohttp_conf.get("limit_per_host", 0),
allow_redirects=local_aiohttp_conf.get("allow_redirects", True),
)
# 这些参数全局生效,不会在 meta 中更新
_connector = aiohttp.TCPConnector(
ssl=self.aiohttp_cfg.ssl,
limit=self.aiohttp_cfg.limit,
verify_ssl=self.aiohttp_cfg.verify_ssl,
limit_per_host=self.aiohttp_cfg.limit_per_host,
)
# 超时设置, 若同时配置 AiohttpRequestArgs 的 timeout 参数会更新此值
_timeout = aiohttp.ClientTimeout(total=self.aiohttp_cfg.timeout)
self.session = aiohttp.ClientSession(connector=_connector, timeout=_timeout)
self.priority_adjust = settings.getint("RETRY_PRIORITY_ADJUST")
@classmethod
def from_crawler(cls, crawler: "Crawler") -> "Self":
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(s.spider_closed, signal=signals.spider_closed)
return s
async def _request_by_aiohttp(
self,
aio_request_args: ItemAdapterT,
) -> Tuple[int, str]:
"""使用 aiohttp 来请求
Args:
aio_request_args: 普通的 aiohttp 请求参数
Returns:
1). status_code: 请求状态码
2). response_text: 请求返回的文本内容
"""
try:
async with self.session.request(**aio_request_args) as response:
status_code = response.status
response_text = await response.text(errors="ignore")
return status_code, response_text
except Exception as e:
self.slog.error(f"aiohttp 出现请求错误,Error: {e}")
return 504, ""
async def process_request(
self, request: "Request", spider: "AyuSpider"
) -> Union["Request", "Response", None]:
aiohttp_options = request.meta.get("aiohttp")
self.aiohttp_args = aiohttp_options.setdefault("args", {})
# 设置 url
_url = self.aiohttp_args.get("url") or request.url
aiohttp_req_args = AiohttpRequestArgs(
url=_url,
)
# 设置请求方式
if _method := self.aiohttp_args.get("method"):
aiohttp_req_args.method = _method
elif _method := str(request.method).upper():
aiohttp_req_args.method = _method
if _method not in {"GET", "POST"}:
logger.error(f"出现未知请求方式 {_method},请及时查看,默认 GET")
# 设置请求头信息
if _headers_args := self.aiohttp_args.get("headers"):
aiohttp_req_args.headers = _headers_args
elif _headers_args := ToolsForAyu.get_dict_form_scrapy_req_headers(
scrapy_headers=request.headers
):
aiohttp_req_args.headers = _headers_args
# 设置请求 body 参数,GET 情况下和 POST 情况下的请求参数处理
if _req_data := self.aiohttp_args.get("data"):
aiohttp_req_args.data = _req_data
elif req_data_str := str(request.body, encoding="utf-8"):
# 如果是 json 字典格式的数据时,则是 scrapy body 传来的 json dumps 参数
if ReuseOperation.judge_str_is_json(judge_str=req_data_str):
aiohttp_req_args.data = req_data_str
# 否则就是传来的字典格式
else:
req_data_dict = ReuseOperation.get_req_dict_from_scrapy(
req_body_data_str=req_data_str
)
aiohttp_req_args.data = req_data_dict
# 设置 proxy
if _proxy := self._get_args("proxy"):
aiohttp_req_args.proxy = _proxy
if _proxy_auth := self._get_args("proxy_auth"):
aiohttp_req_args.proxy_auth = _proxy_auth
if _proxy_headers := self._get_args("proxy_headers"):
aiohttp_req_args.proxy_headers = _proxy_headers
# 设置 allow_redirects
_allow_redirects = self._get_args("allow_redirects")
if _allow_redirects is not None:
aiohttp_req_args.allow_redirects = _allow_redirects
# 设置 cookies,优先从 AiohttpRequest 中的 cookies 参数中取值,没有时再从 headers 中取值
_ck_args = self.aiohttp_args.get("cookies")
aiohttp_cookie_dict = _ck_args if _ck_args is not None else request.cookies
if all([not aiohttp_cookie_dict, request.headers.get("Cookie", None)]):
headers_cookie_str = str(request.headers.get("Cookie"), encoding="utf-8")
aiohttp_cookie_dict = ReuseOperation.get_ck_dict_from_headers(
headers_ck_str=headers_cookie_str
)
aiohttp_req_args.cookies = aiohttp_cookie_dict
# 超时设置
if _timeout := self.aiohttp_args.get("timeout"):
aiohttp_req_args.timeout = _timeout
aio_request_args = ItemAdapter(aiohttp_req_args)
status_code, html_content = await self._request_by_aiohttp(
aio_request_args=aio_request_args
)
# 请求间隔设置
_sleep = self._get_args("sleep")
await asyncio.sleep(_sleep)
if all([status_code == 504, not html_content]):
spider.slog.error(f"url: {_url} 返回内容为空,请求超时!")
self._retry(request=request, reason=504, spider=spider)
return HtmlResponse(
url=_url,
status=status_code,
headers=aiohttp_req_args.headers,
body=html_content,
encoding="utf-8",
request=request,
)
async def spider_closed(self, spider: "AyuSpider") -> None:
await self.session.close()