Skip to content

Commit

Permalink
feat: add es support
Browse files Browse the repository at this point in the history
  • Loading branch information
shengchenyang committed Jan 6, 2024
1 parent 712a25f commit c4d048e
Show file tree
Hide file tree
Showing 12 changed files with 164 additions and 6 deletions.
2 changes: 1 addition & 1 deletion ayugespidertools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
"__version__",
]

__version__ = "3.9.3"
__version__ = "3.9.4rc1"
14 changes: 14 additions & 0 deletions ayugespidertools/common/multiplexing.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@ def fetch_local_conf(vit_dir: Union[str, "Path"], inner_settings: dict) -> dict:
"database": postgres_section.get("database", ""),
"charset": postgres_section.get("charset", "UTF8"),
}
if "elasticsearch" in cfg:
es_section = cfg["elasticsearch"]
inner_settings["ES_CONFIG"] = {
"hosts": es_section.get("hosts", None),
"index_class": json.loads(
es_section.get(
"index_class", '{"settings":{"number_of_shards": 2}}'
)
),
"user": es_section.get("user", None),
"password": es_section.get("password", None),
"init": es_section.getboolean("init", False),
"verify_certs": es_section.getboolean("verify_certs", False),
}
if "oracle" in cfg:
oracle_section = cfg["oracle"]
inner_settings["ORACLE_CONFIG"] = {
Expand Down
47 changes: 44 additions & 3 deletions ayugespidertools/common/spiderconf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ayugespidertools.common.typevars import (
DatabaseEngineClass,
DynamicProxyConf,
ESConf,
ExclusiveProxyConf,
KafkaConf,
MongoDBConf,
Expand All @@ -18,13 +19,16 @@
from ayugespidertools.config import logger

try:
from elasticsearch_dsl import connections
from oracledb.exceptions import DatabaseError
except ImportError:
# pip install ayugespidertools[database]
# pip install ayugespidertools[database] # oracledb ImportError
# pip install elasticsearch_dsl # elasticsearch_dsl ImportError
pass

__all__ = [
"get_spider_conf",
"ESConfCreator",
"MysqlConfCreator",
"MongoDBConfCreator",
"MQConfCreator",
Expand All @@ -37,6 +41,7 @@
]

if TYPE_CHECKING:
from elasticsearch import Elasticsearch
from scrapy.settings import Settings
from sqlalchemy.engine.base import Connection as SqlalchemyConnectT
from sqlalchemy.engine.base import Engine as SqlalchemyEngineT
Expand Down Expand Up @@ -65,7 +70,8 @@ def get_engine(
self, db_conf: SpiderConf, db_engine_enabled: bool
) -> Tuple[Optional["SqlalchemyEngineT"], Optional["SqlalchemyConnectT"]]:
"""获取各个工具中对应的 sqlalchemy db_engine 和 db_engine_conn。
需要此方法的工具有 mysql,postgresql,mongodb,oracle,其余的不需要。
需要此方法的工具有 mysql,postgresql,oracle,elasticsearch 其余的不需要。
其中 elasticsearch 不采用 sqlalchemy 的方式了。
"""
pass

Expand Down Expand Up @@ -142,7 +148,7 @@ def get_conn_conf(

def get_engine(
self, db_conf: PostgreSQLConf, db_engine_enabled: bool
) -> Tuple[Optional["SqlalchemyEngineT"], Optional["SqlalchemyConnectT"]]:
) -> Tuple[Optional["Elasticsearch"], Optional["Elasticsearch"]]:
postgres_engine = postgres_engine_conn = None
if db_engine_enabled:
postgres_url = (
Expand All @@ -157,6 +163,36 @@ def get_engine(
return postgres_engine, postgres_engine_conn


class ESConfProduct(Product):
def get_conn_conf(
self, settings: "Settings", remote_option: dict
) -> Optional[ESConf]:
if settings.get("APP_CONF_MANAGE", False):
remote_conf = ToolsForAyu.fetch_remote_conf(
conf_name="elasticsearch", **remote_option
)
return ESConf(**remote_conf) if remote_conf else None

local_conf = settings.get("ES_CONFIG")
return ESConf(**local_conf) if local_conf else None

def get_engine(
self, db_conf: ESConf, db_engine_enabled: bool
) -> Tuple[Optional["SqlalchemyEngineT"], Optional["SqlalchemyConnectT"]]:
if db_engine_enabled:
_hosts_lst = db_conf.hosts.split(",")
if any([db_conf.user is not None, db_conf.password is not None]):
http_auth = (db_conf.user, db_conf.password)
else:
http_auth = None
client = connections.create_connection(
hosts=_hosts_lst,
http_auth=http_auth,
verify_certs=db_conf.verify_certs,
)
return client, client


class OracleConfProduct(Product):
def get_conn_conf(
self, settings: "Settings", remote_option: dict
Expand Down Expand Up @@ -277,6 +313,11 @@ def create_product(self) -> Product:
return PostgreSQLConfProduct()


class ESConfCreator(Creator):
def create_product(self) -> Product:
return ESConfProduct()


class OracleConfCreator(Creator):
def create_product(self) -> Product:
return OracleConfProduct()
Expand Down
9 changes: 9 additions & 0 deletions ayugespidertools/common/typevars.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ class PostgreSQLConf(NamedTuple):
charset: str = "UTF8"


class ESConf(NamedTuple):
hosts: str
index_class: dict
user: Optional[str] = None
password: Optional[str] = None
init: bool = False
verify_certs: bool = False


class OracleConf(NamedTuple):
host: str
port: int
Expand Down
1 change: 1 addition & 0 deletions ayugespidertools/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"mongodb",
"mysql",
"postgresql",
"elasticsearch",
"oracle",
"rabbitmq",
"kafka",
Expand Down
2 changes: 1 addition & 1 deletion ayugespidertools/items.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DataItem(NamedTuple):
"""

key_value: Any
notes: str = ""
notes: Any = ""


class ItemMeta(ABCMeta):
Expand Down
2 changes: 2 additions & 0 deletions ayugespidertools/pipelines.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from ayugespidertools.scraper.pipelines.download.file import FilesDownloadPipeline
from ayugespidertools.scraper.pipelines.es import AyuESPipeline
from ayugespidertools.scraper.pipelines.mongo.asynced import AyuAsyncMongoPipeline
from ayugespidertools.scraper.pipelines.mongo.fantasy import AyuFtyMongoPipeline
from ayugespidertools.scraper.pipelines.mongo.twisted import AyuTwistedMongoPipeline
Expand All @@ -19,6 +20,7 @@

__all__ = [
"AyuFtyMysqlPipeline",
"AyuESPipeline",
"AyuStatisticsMysqlPipeline",
"AyuTurboMysqlPipeline",
"AyuTwistedMysqlPipeline",
Expand Down
2 changes: 2 additions & 0 deletions ayugespidertools/scraper/pipelines/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Don"t forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
from ayugespidertools.scraper.pipelines.download.file import FilesDownloadPipeline
from ayugespidertools.scraper.pipelines.es import AyuESPipeline
from ayugespidertools.scraper.pipelines.mongo.asynced import AyuAsyncMongoPipeline
from ayugespidertools.scraper.pipelines.mongo.fantasy import AyuFtyMongoPipeline
from ayugespidertools.scraper.pipelines.mongo.twisted import AyuTwistedMongoPipeline
Expand All @@ -23,6 +24,7 @@

__all__ = [
"FilesDownloadPipeline",
"AyuESPipeline",
"AyuAsyncMongoPipeline",
"AyuFtyMongoPipeline",
"AyuTwistedMongoPipeline",
Expand Down
62 changes: 62 additions & 0 deletions ayugespidertools/scraper/pipelines/es/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from typing import TYPE_CHECKING, Optional

from ayugespidertools.common.multiplexing import ReuseOperation

try:
from elasticsearch_dsl import Document, connections

def dynamic_es_document(class_name, fields, index_settings=None):
class_attrs = fields.copy()

if index_settings:
class_attrs["Index"] = type("Index", (), index_settings)

return type(class_name, (Document,), class_attrs)

except ImportError:
# pip install elasticsearch-dsl
pass

__all__ = ["AyuESPipeline"]

if TYPE_CHECKING:
from ayugespidertools.common.typevars import ESConf


class AyuESPipeline:
def __init__(self):
self.es_conf: Optional["ESConf"] = None
self.es_type = None

def open_spider(self, spider):
self.es_conf = spider.es_conf
assert hasattr(spider, "es_conf"), "未配置 elasticsearch 连接信息!"
_hosts_lst = self.es_conf.hosts.split(",")
if any([self.es_conf.user is not None, self.es_conf.password is not None]):
http_auth = (self.es_conf.user, self.es_conf.password)
else:
http_auth = None
connections.create_connection(
hosts=_hosts_lst,
http_auth=http_auth,
verify_certs=self.es_conf.verify_certs,
)

def process_item(self, item, spider):
item_dict = ReuseOperation.item_to_dict(item)
alert_item = ReuseOperation.reshape_item(item_dict)
if not (new_item := alert_item.new_item):
return

if not self.es_type:
fields_definition = {k: v.notes for k, v in item_dict.items()}
es_index_define = self.es_conf.index_class
es_index_define["name"] = alert_item.table.name
self.es_type = dynamic_es_document(
"ESType", fields_definition, es_index_define
)
if self.es_conf.init:
self.es_type.init()
es_item = self.es_type(**new_item)
es_item.save()
return item
12 changes: 12 additions & 0 deletions ayugespidertools/scraper/spiders/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from ayugespidertools.common.multiplexing import ReuseOperation
from ayugespidertools.common.spiderconf import (
DynamicProxyCreator,
ESConfCreator,
ExclusiveProxyCreator,
KafkaConfCreator,
MongoDBConfCreator,
Expand All @@ -26,6 +27,7 @@
if TYPE_CHECKING:
import logging

from elasticsearch import Elasticsearch
from loguru import Logger
from scrapy.crawler import Crawler
from scrapy.settings import BaseSettings
Expand All @@ -47,6 +49,8 @@ def __init__(self, *args: Any, **kwargs: Any):
self.postgres_engine_conn: Optional["SqlalchemyConnectT"] = None
self.oracle_engine: Optional["SqlalchemyEngineT"] = None
self.oracle_engine_conn: Optional["SqlalchemyConnectT"] = None
self.es_engine: Optional["Elasticsearch"] = None
self.es_engine_conn: Optional["Elasticsearch"] = None

@property
def slog(self) -> Union["Logger", "logging.LoggerAdapter"]:
Expand Down Expand Up @@ -118,6 +122,14 @@ def from_crawler(cls, crawler: "Crawler", *args: Any, **kwargs: Any) -> "Self":
db_engine_enabled=_db_engine_enabled,
)

if es_conf := get_spider_conf(ESConfCreator(), crawler.settings, remote_option):
spider.es_conf = es_conf
spider.es_engine, spider.es_engine_conn = get_sqlalchemy_conf(
creator=ESConfCreator(),
db_conf=es_conf,
db_engine_enabled=_db_engine_enabled,
)

if oracle_conf := get_spider_conf(
OracleConfCreator(), crawler.settings, remote_option
):
Expand Down
15 changes: 15 additions & 0 deletions ayugespidertools/templates/project/module/VIT/.conf
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,21 @@
;port=5432
;charset=UTF8

; 存储至 elasticsearch 的配置:
; 其中 index_class 为 es Index class 的定义,可以配置除了 name 以外
; 的所有内容,具体内容请查看 elasticsearch-dsl 的文档,如果在 index_class
; 中还是配置了 name,那么会被 AyuItem 中的 _table 覆盖;
; init 参数为是否创建 es 索引,此设置一般只在第一次运行项目时打开,或者选择
; 手动创建而配置此参数永远为 false。

;[elasticsearch]
;hosts=https://127.0.0.1:9200
;index_class={"settings":{"number_of_shards": 2}}
;user=elastic
;password=elastic
;init=false
;verify_certs=false

;[mq]
;virtualhost=
;queue=
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "AyugeSpiderTools"
version = "3.9.3"
version = "3.9.4rc1"
description = "scrapy 扩展库:用于扩展 Scrapy 功能来解放双手。"
authors = ["ayuge <ayugesheng@gmail.com>"]
maintainers = ["ayuge <ayugesheng@gmail.com>"]
Expand Down

0 comments on commit c4d048e

Please sign in to comment.