Skip to content

Commit

Permalink
chore: update oss pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
shengchenyang committed Mar 2, 2024
1 parent 009ac20 commit b0929d8
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 24 deletions.
5 changes: 4 additions & 1 deletion ayugespidertools/common/multiplexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ def fetch_local_conf(vit_dir: Union[str, "Path"], inner_settings: dict) -> dict:
"upload_fields_suffix", "_file_url"
),
"oss_fields_prefix": oss_section.get("oss_fields_prefix", "_"),
"full_link_enable": oss_section.getboolean("full_link_enable", False),
}
return inner_settings

Expand Down Expand Up @@ -224,13 +225,15 @@ def reshape_item(cls, item_dict: Dict[str, Any]) -> AlterItem:
"""
new_item = {}
notes_dic = {}
is_namedtuple = False

insert_data = cls.get_items_except_keys(
dict_conf=item_dict, keys=["_mongo_update_rule", "_table"]
)
judge_item = next(iter(insert_data.values()))
# 是 namedtuple 类型
if cls.is_namedtuple_instance(judge_item):
is_namedtuple = True
_table_name = item_dict["_table"].key_value
_table_notes = item_dict["_table"].notes
table_info = AlterItemTable(_table_name, _table_notes)
Expand All @@ -245,7 +248,7 @@ def reshape_item(cls, item_dict: Dict[str, Any]) -> AlterItem:
new_item[key] = value
notes_dic[key] = ""

return AlterItem(new_item, notes_dic, table_info)
return AlterItem(new_item, notes_dic, table_info, is_namedtuple)

@staticmethod
def is_namedtuple_instance(x: Any) -> bool:
Expand Down
2 changes: 2 additions & 0 deletions ayugespidertools/common/typevars.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ class AlterItem(NamedTuple):
new_item: dict
notes_dic: dict
table: AlterItemTable
is_namedtuple: Optional[bool] = False


@dataclass
Expand Down Expand Up @@ -196,6 +197,7 @@ class OssConf(NamedTuple):
doc: Optional[str] = None
upload_fields_suffix: Optional[str] = "_file_url"
oss_fields_prefix: Optional[str] = "_"
full_link_enable: Optional[bool] = False


class FieldAlreadyExistsError(Exception):
Expand Down
63 changes: 40 additions & 23 deletions ayugespidertools/scraper/pipelines/oss/ali.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
if TYPE_CHECKING:
from scrapy import Spider

from ayugespidertools.common.typevars import OssConf
from ayugespidertools.common.typevars import AlterItem, OssConf, slogT
from ayugespidertools.spiders import AyuSpider


async def files_download_by_scrapy(
Expand All @@ -42,37 +43,53 @@ async def files_download_by_scrapy(
class AyuAsyncOssPipeline:
oss_bucket: AliOssBase
oss_conf: "OssConf"
slog: "slogT"
full_link_enable: bool

def open_spider(self, spider):
def open_spider(self, spider: "AyuSpider"):
assert hasattr(spider, "oss_conf"), "未配置 oss 参数!"
self.oss_conf = spider.oss_conf
oss_conf_dict = self.oss_conf._asdict()
self.oss_bucket = AliOssBase(**oss_conf_dict)
self.full_link_enable = self.oss_conf.full_link_enable
self.slog = spider.slog

async def _upload_process(self, url: str, item: Any, spider: "AyuSpider"):
r, filename = await files_download_by_scrapy(spider, url, item)
self.oss_bucket.put_oss(put_bytes=r.body, file=filename)
if self.full_link_enable:
filename = self.oss_bucket.get_full_link(filename)
return filename

def _add_oss_field(
self, is_namedtuple: bool, item: Any, key: str, value: str
) -> None:
if not is_namedtuple:
item[f"{self.oss_conf.oss_fields_prefix}{key}"] = value
else:
item[f"{self.oss_conf.oss_fields_prefix}{key}"] = DataItem(
key_value=value, notes=f"{key} 对应的 oss 存储字段"
)

async def _upload_file(
self, alter_item: "AlterItem", item: Any, spider: "AyuSpider"
):
if not (new_item := alter_item.new_item):
return

async def process_item(self, item, spider):
item_dict = ReuseOperation.item_to_dict(item)
judge_item = next(iter(item_dict.values()))
file_url_keys = {
key: value
for key, value in item_dict.items()
for key, value in new_item.items()
if key.endswith(self.oss_conf.upload_fields_suffix)
}
_is_namedtuple = alter_item.is_namedtuple
for key, value in file_url_keys.items():
if all([isinstance(value, str), value]):
filename = await self._upload_process(value, item, spider)
self._add_oss_field(_is_namedtuple, item, key, filename)

if ReuseOperation.is_namedtuple_instance(judge_item):
for key, value in file_url_keys.items():
file_url = value.key_value
r, filename = await files_download_by_scrapy(spider, file_url, item)

self.oss_bucket.put_oss(put_bytes=r.body, file=filename)
item[f"{self.oss_conf.oss_fields_prefix}{key}"] = DataItem(
key_value=filename, notes=f"{key} 对应的 oss 存储字段"
)

else:
for key, value in file_url_keys.items():
r, filename = await files_download_by_scrapy(spider, value, item)

self.oss_bucket.put_oss(put_bytes=r.body, file=filename)
item[f"{self.oss_conf.oss_fields_prefix}{key}"] = filename

async def process_item(self, item: Any, spider: "AyuSpider"):
item_dict = ReuseOperation.item_to_dict(item)
alter_item = ReuseOperation.reshape_item(item_dict)
await self._upload_file(alter_item, item, spider)
return item

0 comments on commit b0929d8

Please sign in to comment.