Skip to content

Commit

Permalink
chore: unify oss and file download pipeline styles & fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
shengchenyang committed Mar 4, 2024
1 parent 61ceb65 commit f0f1b2f
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 80 deletions.
85 changes: 27 additions & 58 deletions ayugespidertools/scraper/pipelines/download/file.py
Original file line number Diff line number Diff line change
@@ -1,55 +1,16 @@
import hashlib
from pathlib import Path
from typing import TYPE_CHECKING, Any

import scrapy
from scrapy.http.request import NO_CALLBACK
from scrapy.utils.defer import maybe_deferred_to_future
from scrapy.utils.python import to_bytes

from ayugespidertools.common.multiplexing import ReuseOperation
from ayugespidertools.common.utils import ToolsForAyu
from ayugespidertools.config import logger
from ayugespidertools.items import DataItem
from ayugespidertools.scraper.pipelines.oss.ali import files_download_by_scrapy

__all__ = [
"FilesDownloadPipeline",
"files_download_by_scrapy",
]
__all__ = ["FilesDownloadPipeline"]

if TYPE_CHECKING:
from scrapy import Spider

from ayugespidertools.common.typevars import DataItemModeStr


async def files_download_by_scrapy(
spider: "Spider",
file_path: str,
file_url: str,
item: Any,
key: str,
mode: "DataItemModeStr" = "namedtuple",
):
request = scrapy.Request(file_url, callback=NO_CALLBACK)
response = await maybe_deferred_to_future(spider.crawler.engine.download(request))
if response.status != 200:
return item

headers_dict = ToolsForAyu.get_dict_form_scrapy_req_headers(
scrapy_headers=response.headers
)
content_type = headers_dict.get("Content-Type")
file_format = content_type.split("/")[-1].replace("jpeg", "jpg")
file_guid = hashlib.sha1(to_bytes(file_url)).hexdigest()
filename = f"{file_path}/{file_guid}.{file_format}"
Path(filename).write_bytes(response.body)

# Store file in item.
if mode == "namedtuple":
item[key] = DataItem(key_value=filename, notes=f"{key} 文件存储路径")
else:
item[key] = filename
from ayugespidertools.common.typevars import AlterItem
from ayugespidertools.spiders import AyuSpider


class FilesDownloadPipeline:
Expand All @@ -66,21 +27,29 @@ def from_crawler(cls, crawler):
Path(_file_path).mkdir(parents=True)
return cls(file_path=_file_path)

async def process_item(self, item, spider):
item_dict = ReuseOperation.item_to_dict(item)
judge_item = next(iter(item_dict.values()))
async def _download_and_add_field(
self, alter_item: "AlterItem", item: Any, spider: "AyuSpider"
) -> None:
if not (new_item := alter_item.new_item):
return

file_url_keys = {
key: value for key, value in item_dict.items() if key.endswith("_file_url")
key: url for key, url in new_item.items() if key.endswith("_file_url")
}
if ReuseOperation.is_namedtuple_instance(judge_item):
for key, value in file_url_keys.items():
await files_download_by_scrapy(
spider, self.file_path, value.key_value, item, f"{key}_local"
)
else:
for key, value in file_url_keys.items():
await files_download_by_scrapy(
spider, self.file_path, value, item, f"{key}_local", "normal"
)

_is_namedtuple = alter_item.is_namedtuple
for key, url in file_url_keys.items():
if all([isinstance(url, str), url]):
_, filename = await files_download_by_scrapy(spider, url)
# Store file in item
if not _is_namedtuple:
item[f"{key}_local"] = filename
else:
item[f"{key}_local"] = DataItem(
key_value=filename, notes=f"{key} 文件存储路径"
)

async def process_item(self, item: Any, spider: "AyuSpider"):
item_dict = ReuseOperation.item_to_dict(item)
alter_item = ReuseOperation.reshape_item(item_dict)
await self._download_and_add_field(alter_item, item, spider)
return item
43 changes: 21 additions & 22 deletions ayugespidertools/scraper/pipelines/oss/ali.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import hashlib
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Tuple

import scrapy
from scrapy.http.request import NO_CALLBACK
Expand All @@ -11,31 +11,30 @@
from ayugespidertools.extras.oss import AliOssBase
from ayugespidertools.items import DataItem

__all__ = ["AyuAsyncOssPipeline"]
__all__ = [
"AyuAsyncOssPipeline",
"files_download_by_scrapy",
]

if TYPE_CHECKING:
from scrapy import Spider
from scrapy.http.response import Response

from ayugespidertools.common.typevars import AlterItem, OssConf, slogT
from ayugespidertools.spiders import AyuSpider


async def files_download_by_scrapy(
spider: "Spider",
file_url: str,
item: Any,
):
request = scrapy.Request(file_url, callback=NO_CALLBACK)
spider: "Spider", url: str
) -> Tuple["Response", str]:
request = scrapy.Request(url, callback=NO_CALLBACK)
response = await maybe_deferred_to_future(spider.crawler.engine.download(request))
if response.status != 200:
return item

headers_dict = ToolsForAyu.get_dict_form_scrapy_req_headers(
scrapy_headers=response.headers
)
content_type = headers_dict.get("Content-Type")
file_format = content_type.split("/")[-1].replace("jpeg", "jpg")
file_guid = hashlib.sha1(to_bytes(file_url)).hexdigest()
file_guid = hashlib.sha1(to_bytes(url)).hexdigest()
filename = f"{file_guid}.{file_format}"
return response, filename

Expand All @@ -54,38 +53,38 @@ def open_spider(self, spider: "AyuSpider"):
self.full_link_enable = self.oss_conf.full_link_enable
self.slog = spider.slog

async def _upload_process(self, url: str, item: Any, spider: "AyuSpider"):
r, filename = await files_download_by_scrapy(spider, url, item)
async def _upload_process(self, url: str, spider: "AyuSpider") -> str:
r, filename = await files_download_by_scrapy(spider, url)
self.oss_bucket.put_oss(put_bytes=r.body, file=filename)
if self.full_link_enable:
filename = self.oss_bucket.get_full_link(filename)
return filename

def _add_oss_field(
self, is_namedtuple: bool, item: Any, key: str, value: str
self, is_namedtuple: bool, item: Any, key: str, filename: str
) -> None:
if not is_namedtuple:
item[f"{self.oss_conf.oss_fields_prefix}{key}"] = value
item[f"{self.oss_conf.oss_fields_prefix}{key}"] = filename
else:
item[f"{self.oss_conf.oss_fields_prefix}{key}"] = DataItem(
key_value=value, notes=f"{key} 对应的 oss 存储字段"
key_value=filename, notes=f"{key} 对应的 oss 存储字段"
)

async def _upload_file(
self, alter_item: "AlterItem", item: Any, spider: "AyuSpider"
):
) -> None:
if not (new_item := alter_item.new_item):
return

file_url_keys = {
key: value
for key, value in new_item.items()
key: url
for key, url in new_item.items()
if key.endswith(self.oss_conf.upload_fields_suffix)
}
_is_namedtuple = alter_item.is_namedtuple
for key, value in file_url_keys.items():
if all([isinstance(value, str), value]):
filename = await self._upload_process(value, item, spider)
for key, url in file_url_keys.items():
if all([isinstance(url, str), url]):
filename = await self._upload_process(url, spider)
self._add_oss_field(_is_namedtuple, item, key, filename)

async def process_item(self, item: Any, spider: "AyuSpider"):
Expand Down

0 comments on commit f0f1b2f

Please sign in to comment.