Skip to content

Commit

Permalink
optimize: postgresql AsyncConnection upgraded to AsyncConnectionPool
Browse files Browse the repository at this point in the history
  • Loading branch information
shengchenyang committed Dec 22, 2023
1 parent f7b6511 commit 341e768
Showing 1 changed file with 10 additions and 16 deletions.
26 changes: 10 additions & 16 deletions ayugespidertools/scraper/pipelines/postgres/asynced.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
from typing import TYPE_CHECKING, Optional

import psycopg
from psycopg_pool import AsyncConnectionPool
from scrapy.utils.defer import deferred_from_coro

from ayugespidertools.common.expend import PostgreSQLPipeEnhanceMixin
Expand All @@ -21,7 +20,7 @@ class AyuAsyncPostgresPipeline(PostgreSQLPipeEnhanceMixin):
def __init__(self) -> None:
self.postgres_conf: Optional["PostgreSQLConf"] = None
self.slog = None
self.conn = None
self.pool = None
self.running_tasks: set = set()

def open_spider(self, spider):
Expand All @@ -31,32 +30,27 @@ def open_spider(self, spider):
return deferred_from_coro(self._open_spider(spider))

async def _open_spider(self, spider):
self.conn = await psycopg.AsyncConnection.connect(
self.pool = AsyncConnectionPool(
f"dbname={self.postgres_conf.database} "
f"user={self.postgres_conf.user} "
f"host={self.postgres_conf.host} "
f"port={self.postgres_conf.port} "
f"password={self.postgres_conf.password}",
autocommit=True,
open=False,
)
await self.pool.open()

async def insert_item(self, item_dict) -> None:
async with self.conn.cursor() as cursor:
async def process_item(self, item, spider):
async with self.pool.connection() as conn:
item_dict = ReuseOperation.item_to_dict(item)
alter_item = ReuseOperation.reshape_item(item_dict)
new_item = alter_item.new_item
sql = self._get_sql_by_item(table=alter_item.table.name, item=new_item)
await cursor.execute(sql, tuple(new_item.values()))

async def process_item(self, item, spider):
item_dict = ReuseOperation.item_to_dict(item)
task = asyncio.create_task(self.insert_item(item_dict))
self.running_tasks.add(task)
await task
task.add_done_callback(lambda t: self.running_tasks.discard(t))
await conn.execute(sql, tuple(new_item.values()))
return item

async def _close_spider(self):
await self.conn.close()
await self.pool.close()

def close_spider(self, spider):
return deferred_from_coro(self._close_spider())

0 comments on commit 341e768

Please sign in to comment.