Skip to content

Commit

Permalink
chore: encoding error handling && type hints
Browse files Browse the repository at this point in the history
  • Loading branch information
shengchenyang committed Feb 27, 2024
1 parent 1fc1fcf commit d2772b5
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 60 deletions.
2 changes: 1 addition & 1 deletion ayugespidertools/common/typevars.py
Expand Up @@ -113,8 +113,8 @@ class AiohttpConf(NamedTuple):
ssl: bool
verify_ssl: bool
limit_per_host: int
timeout: int
allow_redirects: bool
timeout: Optional[int] = None


class AlterItemTable(NamedTuple):
Expand Down
1 change: 0 additions & 1 deletion ayugespidertools/scraper/http/request/__init__.py
Expand Up @@ -37,7 +37,6 @@ def __init__(
args: Optional[Union[AiohttpRequestArgs, dict]] = None,
**kwargs,
) -> None:
# 用 meta 缓存 scrapy meta 的参数
meta = copy.deepcopy(meta) or {}
aiohttp_meta = meta.setdefault("aiohttp", {})

Expand Down
120 changes: 62 additions & 58 deletions ayugespidertools/scraper/middlewares/netlib/aiohttplib.py
@@ -1,9 +1,10 @@
import asyncio
from typing import TYPE_CHECKING, Optional, Tuple, TypeVar
from typing import TYPE_CHECKING, Any, Optional, Tuple, TypeVar, Union

import aiohttp
import scrapy
from itemadapter import ItemAdapter
from scrapy import signals
from scrapy.http import HtmlResponse
from scrapy.utils.python import global_object_name

Expand All @@ -18,16 +19,26 @@
]

if TYPE_CHECKING:
from aiohttp.connector import BaseConnector
from scrapy import Request
from scrapy.crawler import Crawler
from scrapy.http import Response
from scrapy.statscollectors import StatsCollector
from typing_extensions import Self

from ayugespidertools.common.typevars import slogT
from ayugespidertools.spiders import AyuSpider

ItemAdapterT = TypeVar("ItemAdapterT", bound=ItemAdapter)


class AiohttpDownloaderMiddleware:
"""Downloader middleware handling the requests with aiohttp"""

def __init__(self):
self.aiohttp_args = None
session: "aiohttp.ClientSession"
priority_adjust: int
aiohttp_cfg: AiohttpConf
aiohttp_args: dict
slog: "slogT"

def _retry(
self,
Expand All @@ -47,14 +58,20 @@ def _retry(
"""
retries = request.meta.get("retry_times", 0) + 1
stats = spider.crawler.stats
if retries <= self.retry_times:
if retries <= self.aiohttp_cfg.retry_times:
return self._retry_with_limit(request, retries, reason, stats)

stats.inc_value("retry/max_reached")
logger.error(f"Gave up retrying {request} (failed {retries} times): {reason}")
return None

def _retry_with_limit(self, request, retries, reason, stats):
def _retry_with_limit(
self,
request: scrapy.Request,
retries: int,
reason: int,
stats: "StatsCollector",
):
logger.debug(f"Retrying {request} (failed {retries} times): {reason}")
retry_req = request.copy()
retry_req.meta["retry_times"] = retries
Expand All @@ -69,21 +86,21 @@ def _retry_with_limit(self, request, retries, reason, stats):
stats.inc_value(f"retry/reason_count/{reason}")
return retry_req

def _get_args(self, key: str):
def _get_args(self, key: str) -> Any:
"""根据优先级依次获取不为 None 的请求参数"""
data_lst = [
self.aiohttp_args.get(key),
getattr(self, key),
getattr(self.aiohttp_cfg, key),
]
return ToolsForAyu.first_not_none(data_lst=data_lst)

@classmethod
def from_crawler(cls, crawler):
settings = crawler.settings
def spider_opened(self, spider: "AyuSpider") -> None:
self.slog = spider.slog
settings = spider.crawler.settings
# 自定义 aiohttp 全局配置信息,优先级小于 aiohttp_meta 中的配置
if local_aiohttp_conf := settings.get("AIOHTTP_CONFIG", {}):
# 这里的配置信息如果在 aiohttp_meta 中重复设置,则会更新当前请求的参数
_aiohttp_conf = AiohttpConf(
self.aiohttp_cfg = AiohttpConf(
timeout=settings.get("DOWNLOAD_TIMEOUT"),
sleep=local_aiohttp_conf.get("sleep"),
proxy=local_aiohttp_conf.get("proxy"),
Expand All @@ -99,63 +116,53 @@ def from_crawler(cls, crawler):
allow_redirects=local_aiohttp_conf.get("allow_redirects", True),
)

# 初始化所需要的参数信息,用于构建 aiohttp 请求信息
# cls.retry_http_codes = set(int(x) for x in settings.getlist('RETRY_HTTP_CODES'))
cls.timeout = _aiohttp_conf.timeout
cls.proxy = _aiohttp_conf.proxy
cls.proxy_auth = _aiohttp_conf.proxy_auth
cls.proxy_headers = _aiohttp_conf.proxy_headers
cls.sleep = _aiohttp_conf.sleep
cls.retry_times = _aiohttp_conf.retry_times
cls.limit = _aiohttp_conf.limit
cls.ssl = _aiohttp_conf.ssl
cls.verify_ssl = _aiohttp_conf.verify_ssl
cls.limit_per_host = _aiohttp_conf.limit_per_host
cls.allow_redirects = _aiohttp_conf.allow_redirects
cls.priority_adjust = settings.getint("RETRY_PRIORITY_ADJUST")
return cls()
# 这些参数全局生效,不会在 meta 中更新
_connector = aiohttp.TCPConnector(
ssl=self.aiohttp_cfg.ssl,
limit=self.aiohttp_cfg.limit,
verify_ssl=self.aiohttp_cfg.verify_ssl,
limit_per_host=self.aiohttp_cfg.limit_per_host,
)
# 超时设置, 若同时配置 AiohttpRequestArgs 的 timeout 参数会更新此值
_timeout = aiohttp.ClientTimeout(total=self.aiohttp_cfg.timeout)
self.session = aiohttp.ClientSession(connector=_connector, timeout=_timeout)
self.priority_adjust = settings.getint("RETRY_PRIORITY_ADJUST")

@classmethod
def from_crawler(cls, crawler: "Crawler") -> "Self":
s = cls()
crawler.signals.connect(s.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(s.spider_closed, signal=signals.spider_closed)
return s

async def _request_by_aiohttp(
self,
aio_request_args: ItemAdapterT,
timeout: Optional[aiohttp.ClientTimeout] = None,
connector: Optional["BaseConnector"] = None,
) -> Tuple[int, str]:
"""使用 aiohttp 来请求
Args:
aio_request_args: 普通的 aiohttp 请求参数
timeout: aiohttp.ClientSession 的 timeout 参数
connector: aiohttp connector 参数
Returns:
1). status_code: 请求状态码
2). response_text: 请求返回的文本内容
"""
try:
async with aiohttp.ClientSession(
connector=connector, timeout=timeout
) as session:
async with session.request(**aio_request_args) as response:
status_code = response.status
response_text = await response.text()
return status_code, response_text
except Exception:
async with self.session.request(**aio_request_args) as response:
status_code = response.status
response_text = await response.text(errors="ignore")
return status_code, response_text
except Exception as e:
self.slog.error(f"aiohttp 出现请求错误,Error: {e}")
return 504, ""

async def process_request(self, request, spider):
async def process_request(
self, request: "Request", spider: "AyuSpider"
) -> Union["Request", "Response", None]:
aiohttp_options = request.meta.get("aiohttp")
self.aiohttp_args = aiohttp_options.setdefault("args", {})

# 根据 LOCAL_AIOHTTP_CONFIG 中设置 aiohttp 请求参数
# 这些参数全局生效,不会在 meta 中更新
_connector = aiohttp.TCPConnector(
ssl=self.ssl,
limit=self.limit,
verify_ssl=self.verify_ssl,
limit_per_host=self.limit_per_host,
)

# 设置 url
_url = self.aiohttp_args.get("url") or request.url
aiohttp_req_args = AiohttpRequestArgs(
Expand Down Expand Up @@ -215,18 +222,13 @@ async def process_request(self, request, spider):
)
aiohttp_req_args.cookies = aiohttp_cookie_dict

# 请求超时设置
_timeout_obj = None
# 超时设置
if _timeout := self.aiohttp_args.get("timeout"):
aiohttp_req_args.timeout = _timeout
elif self.timeout is not None:
_timeout_obj = aiohttp.ClientTimeout(total=self.timeout)

aio_request_args = ItemAdapter(aiohttp_req_args)
status_code, html_content = await self._request_by_aiohttp(
timeout=_timeout_obj,
aio_request_args=aio_request_args,
connector=_connector,
aio_request_args=aio_request_args
)

# 请求间隔设置
Expand All @@ -237,12 +239,14 @@ async def process_request(self, request, spider):
spider.slog.error(f"url: {_url} 返回内容为空,请求超时!")
self._retry(request=request, reason=504, spider=spider)

body = str.encode(html_content)
return HtmlResponse(
url=_url,
status=status_code,
headers=aiohttp_req_args.headers,
body=body,
body=html_content,
encoding="utf-8",
request=request,
)

async def spider_closed(self, spider: "AyuSpider") -> None:
await self.session.close()

0 comments on commit d2772b5

Please sign in to comment.