Skip to content

Commit

Permalink
feat: add elasticsearch async
Browse files Browse the repository at this point in the history
  • Loading branch information
shengchenyang committed Jan 8, 2024
1 parent 48c6cee commit 7651dd3
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 0 deletions.
2 changes: 2 additions & 0 deletions ayugespidertools/pipelines.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ayugespidertools.scraper.pipelines.download.file import FilesDownloadPipeline
from ayugespidertools.scraper.pipelines.es import AyuESPipeline
from ayugespidertools.scraper.pipelines.es.asynced import AyuAsyncESPipeline
from ayugespidertools.scraper.pipelines.mongo.asynced import AyuAsyncMongoPipeline
from ayugespidertools.scraper.pipelines.mongo.fantasy import AyuFtyMongoPipeline
from ayugespidertools.scraper.pipelines.mongo.twisted import AyuTwistedMongoPipeline
Expand All @@ -21,6 +22,7 @@
__all__ = [
"AyuFtyMysqlPipeline",
"AyuESPipeline",
"AyuAsyncESPipeline",
"AyuStatisticsMysqlPipeline",
"AyuTurboMysqlPipeline",
"AyuTwistedMysqlPipeline",
Expand Down
2 changes: 2 additions & 0 deletions ayugespidertools/scraper/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
from ayugespidertools.scraper.pipelines.download.file import FilesDownloadPipeline
from ayugespidertools.scraper.pipelines.es import AyuESPipeline
from ayugespidertools.scraper.pipelines.es.asynced import AyuAsyncESPipeline
from ayugespidertools.scraper.pipelines.mongo.asynced import AyuAsyncMongoPipeline
from ayugespidertools.scraper.pipelines.mongo.fantasy import AyuFtyMongoPipeline
from ayugespidertools.scraper.pipelines.mongo.twisted import AyuTwistedMongoPipeline
Expand All @@ -25,6 +26,7 @@
__all__ = [
"FilesDownloadPipeline",
"AyuESPipeline",
"AyuAsyncESPipeline",
"AyuAsyncMongoPipeline",
"AyuFtyMongoPipeline",
"AyuTwistedMongoPipeline",
Expand Down
80 changes: 80 additions & 0 deletions ayugespidertools/scraper/pipelines/es/asynced.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import asyncio
from typing import TYPE_CHECKING, Optional

from elasticsearch import AsyncElasticsearch
from elasticsearch.helpers import async_bulk
from scrapy.utils.defer import deferred_from_coro

from ayugespidertools.common.multiplexing import ReuseOperation

__all__ = ["AyuAsyncESPipeline"]

from ayugespidertools.scraper.pipelines.es import dynamic_es_document

if TYPE_CHECKING:
from ayugespidertools.common.typevars import ESConf


class AyuAsyncESPipeline:
def __init__(self) -> None:
self.es_conf: Optional["ESConf"] = None
self.slog = None
self.client = None
self.es_type = None
self.running_tasks: set = set()

def open_spider(self, spider):
self.es_conf = spider.es_conf
assert hasattr(spider, "es_conf"), "未配置 elasticsearch 连接信息!"
_hosts_lst = self.es_conf.hosts.split(",")
if any([self.es_conf.user is not None, self.es_conf.password is not None]):
http_auth = (self.es_conf.user, self.es_conf.password)
else:
http_auth = None
self.client = AsyncElasticsearch(
hosts=_hosts_lst,
basic_auth=http_auth,
verify_certs=self.es_conf.verify_certs,
ca_certs=self.es_conf.ca_certs,
client_cert=self.es_conf.client_cert,
client_key=self.es_conf.client_key,
ssl_assert_fingerprint=self.es_conf.ssl_assert_fingerprint,
)

async def process_item(self, item, spider):
item_dict = ReuseOperation.item_to_dict(item)
alert_item = ReuseOperation.reshape_item(item_dict)
if not (new_item := alert_item.new_item):
return

_index = alert_item.table.name
if not self.es_type:
fields_definition = {k: v.notes for k, v in item_dict.items()}
es_index_define = self.es_conf.index_class
es_index_define["name"] = _index
self.es_type = dynamic_es_document(
"ESType", fields_definition, es_index_define
)
if self.es_conf.init:
self.es_type.init()

task = asyncio.create_task(self.insert_item(new_item, _index))
self.running_tasks.add(task)
await task
task.add_done_callback(lambda t: self.running_tasks.discard(t))
return item

async def insert_item(self, new_item, index):
async def gendata():
yield {
"_index": index,
"doc": new_item,
}

await async_bulk(self.client, gendata())

async def _close_spider(self):
await self.client.close()

def close_spider(self, spider):
return deferred_from_coro(self._close_spider())
9 changes: 9 additions & 0 deletions ayugespidertools/scraper/pipelines/es/fantasy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from ayugespidertools.scraper.pipelines.es import AyuESPipeline

__all__ = ["AyuFtyESPipeline"]


class AyuFtyESPipeline(AyuESPipeline):
"""Elasticsearch 存储场景"""

...

0 comments on commit 7651dd3

Please sign in to comment.