From 796afaea84febc77d5dd0fd1cf916550796e40ee Mon Sep 17 00:00:00 2001 From: Alex Pavlov Date: Mon, 25 Nov 2024 22:17:00 +0600 Subject: [PATCH 1/6] add scheduler celery --- api_v1/users/tasks.py | 11 ++++++++++- config/celery/connection.py | 1 + config/config.py | 30 +++++++++++++++++++++++++++++- 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/api_v1/users/tasks.py b/api_v1/users/tasks.py index 2762e16..bb6b761 100644 --- a/api_v1/users/tasks.py +++ b/api_v1/users/tasks.py @@ -1,4 +1,4 @@ -from config import celery_app +from config import celery_app, settings import asyncio @@ -9,3 +9,12 @@ async def time_sleep_task(): """ await asyncio.sleep(2.0) return 'Task is done' + + +celery_app.conf.beat_schedule = { + 'test-every-10-seconds': { + 'task': 'llm_analizer.tasks.test', + 'schedule': settings.celery.TEST_TIMEDELTA, + 'args': ('hello',) + }, +} diff --git a/config/celery/connection.py b/config/celery/connection.py index bd3a761..7a28eb8 100644 --- a/config/celery/connection.py +++ b/config/celery/connection.py @@ -41,4 +41,5 @@ def wrapper(*args, app = Celery(__name__) app.conf.broker_url = settings.rabbit.broker_url +app.conf.timezone = settings.celery.TIMEZONE app.autodiscover_tasks(packages=['api_v1.users']) diff --git a/config/config.py b/config/config.py index c2adbe0..e764b29 100644 --- a/config/config.py +++ b/config/config.py @@ -1,7 +1,8 @@ from pathlib import Path from pydantic_settings import BaseSettings, SettingsConfigDict -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict from starlette.config import Config +from celery.schedules import crontab base_dir = Path(__file__).resolve().parent.parent @@ -11,6 +12,14 @@ config = Config('.env') +class AlembicSettings(BaseModel): + """ + Настройки Alembic + """ + CONFIG_PATH: Path = Path('alembic.ini') + MIGRATION_PATH: Path = Path('async_alembic/') + + class TestDBSettings(BaseModel): """ Настройки тестовой базы данных @@ -35,6 +44,23 @@ class DBSettings(BaseModel): url: str = f'{_engine}://{_owner}:{_password}@{_name}/{_db_name}' +class CelerySettings(BaseModel): + """ + Настройки Celery + """ + model_config = ConfigDict( + arbitrary_types_allowed=True, + ) + TIMEZONE: str = 'Europe/Moscow' + TIMEDELTA_PER_DAY: crontab = crontab(minute=0, + hour=2, + day_of_week='*/1', + day_of_month='*/1', + month_of_year='*/1', + ) + TEST_TIMEDELTA: crontab = crontab(minute='*/1') + + class RabbitSettings(BaseModel): """ Настройки RabbitMQ @@ -62,7 +88,9 @@ class Settings(BaseSettings): ) db: DBSettings = DBSettings() test_db: TestDBSettings = TestDBSettings() + celery: CelerySettings = CelerySettings() rabbit: RabbitSettings = RabbitSettings() + alembic: AlembicSettings = AlembicSettings() debug: bool = bool(int(config('DEBUG'))) API_PREFIX: str = '/api/v1' BASE_DIR: Path = base_dir From 3e41245d10829cddbcaf1e30c364f95278ffc4cd Mon Sep 17 00:00:00 2001 From: Alex Pavlov Date: Mon, 25 Nov 2024 22:18:46 +0600 Subject: [PATCH 2/6] add celery beat --- docker-compose.yml | 14 ++++++++++++++ docker/celery/beat/start | 9 +++++++++ docker/fastapi/Dockerfile | 4 ++++ 3 files changed, 27 insertions(+) create mode 100644 docker/celery/beat/start diff --git a/docker-compose.yml b/docker-compose.yml index e7549fe..cd40b36 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -42,6 +42,20 @@ services: - db - fast_api + celery_beat: + build: + context: . + dockerfile: ./docker/fastapi/Dockerfile + command: /start-celerybeat + volumes: + - .:/app + env_file: + - .env + depends_on: + - rabbitmq + - db + - fast_api + dashboard: build: context: . diff --git a/docker/celery/beat/start b/docker/celery/beat/start new file mode 100644 index 0000000..164f293 --- /dev/null +++ b/docker/celery/beat/start @@ -0,0 +1,9 @@ +#!/bin/bash + +set -o errexit +set -o nounset + +celery -A config.celery.connection.app \ + --broker=amqp://"${RABBITMQ_DEFAULT_USER}":"${RABBITMQ_DEFAULT_PASS}"@"${RMQ_HOST}":"${RMQ_PORT}" \ + beat \ + --loglevel=info diff --git a/docker/fastapi/Dockerfile b/docker/fastapi/Dockerfile index 80defd7..0431342 100644 --- a/docker/fastapi/Dockerfile +++ b/docker/fastapi/Dockerfile @@ -37,6 +37,10 @@ COPY ./docker/celery/worker/start /start-celeryworker RUN sed -i 's/\r$//g' /start-celeryworker RUN chmod +x /start-celeryworker +COPY ./docker/celery/beat/start /start-celerybeat +RUN sed -i 's/\r$//g' /start-celerybeat +RUN chmod +x /start-celerybeat + COPY ./docker/celery/flower/start /start-flower RUN sed -i 's/\r$//g' /start-flower RUN chmod +x /start-flower From 4b87ab470e5a92fb512a46edffe56efae80e8df4 Mon Sep 17 00:00:00 2001 From: Alex Pavlov Date: Mon, 25 Nov 2024 22:22:09 +0600 Subject: [PATCH 3/6] set online alembic migrations --- api_v1/tests/conftest.py | 18 +++++++++++++++++- async_alembic/env.py | 21 ++++++++++++++++++--- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/api_v1/tests/conftest.py b/api_v1/tests/conftest.py index 932b340..c51ec67 100644 --- a/api_v1/tests/conftest.py +++ b/api_v1/tests/conftest.py @@ -9,6 +9,8 @@ from asgi_lifespan import LifespanManager from fastapi import FastAPI from sqlalchemy.pool import NullPool +from alembic.config import Config +from alembic import command from config import test_connection, settings, BaseModel from config import db_connection @@ -21,6 +23,12 @@ ) +cfg = Config(settings.alembic.CONFIG_PATH.as_posix()) +cfg.set_main_option('script_location', + settings.alembic.MIGRATION_PATH.as_posix(), + ) + + @pytest.fixture(scope='session', autouse=True) def event_loop(request): loop = asyncio.get_event_loop_policy().new_event_loop() @@ -39,7 +47,7 @@ async def app() -> AsyncGenerator[LifespanManager, Any]: async def lifespan(app: FastAPI): async with db_setup.engine.begin() as conn: await conn.run_sync(BaseModel.metadata.create_all) - sys.stdout.write('alembic upgrade head') + await conn.run_sync(alembic_do_upgrade) yield await conn.run_sync(BaseModel.metadata.drop_all) @@ -63,3 +71,11 @@ async def client(app: FastAPI) -> AsyncGenerator[httpx.AsyncClient, Any]: base_url=current_home + current_api, ) as client: yield client + + +def alembic_do_upgrade(connection): + """ + Upgrade миграция алембик + """ + cfg.attributes['connection'] = connection + command.upgrade(cfg, 'head') diff --git a/async_alembic/env.py b/async_alembic/env.py index 06b76c3..e7f47c3 100644 --- a/async_alembic/env.py +++ b/async_alembic/env.py @@ -3,7 +3,8 @@ from sqlalchemy import pool from sqlalchemy.engine import Connection -from sqlalchemy.ext.asyncio import async_engine_from_config +from sqlalchemy.ext.asyncio import async_engine_from_config, AsyncEngine +from sqlalchemy import engine_from_config from alembic import context @@ -84,8 +85,22 @@ async def run_async_migrations() -> None: def run_migrations_online() -> None: """Run migrations in 'online' mode.""" - - asyncio.run(run_async_migrations()) + connectable = context.config.attributes.get('connection', None) + if connectable is None: + connectable = AsyncEngine( + engine_from_config( + context.config.get_section( + context.config.config_ini_section, + ), + prefix='sqlalchemy.', + poolclass=pool.NullPool, + future=True, + ) + ) + if isinstance(connectable, AsyncEngine): + asyncio.run(run_async_migrations(connectable)) + else: + do_run_migrations(connectable) if context.is_offline_mode(): From 033f243ee0b289c24258a55744ebb54d574c38a6 Mon Sep 17 00:00:00 2001 From: Alex Pavlov Date: Mon, 25 Nov 2024 22:45:11 +0600 Subject: [PATCH 4/6] alembic env.py --- api_v1/tests/conftest.py | 1 - async_alembic/env.py | 17 ++++++----------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/api_v1/tests/conftest.py b/api_v1/tests/conftest.py index c51ec67..1a57854 100644 --- a/api_v1/tests/conftest.py +++ b/api_v1/tests/conftest.py @@ -1,4 +1,3 @@ -import sys import asyncio import httpx import pytest_asyncio diff --git a/async_alembic/env.py b/async_alembic/env.py index e7f47c3..bbeca65 100644 --- a/async_alembic/env.py +++ b/async_alembic/env.py @@ -59,27 +59,22 @@ def run_migrations_offline() -> None: def do_run_migrations(connection: Connection) -> None: - context.configure(connection=connection, target_metadata=target_metadata) - + context.configure( + connection=connection, + target_metadata=target_metadata, + compare_type=True, + ) with context.begin_transaction(): context.run_migrations() -async def run_async_migrations() -> None: +async def run_async_migrations(connectable) -> None: """In this scenario we need to create an Engine and associate a connection with the context. """ - - connectable = async_engine_from_config( - config.get_section(config.config_ini_section, {}), - prefix="sqlalchemy.", - poolclass=pool.NullPool, - ) - async with connectable.connect() as connection: await connection.run_sync(do_run_migrations) - await connectable.dispose() From b4e4a3cfd2718a58ec606546cb306b27c35520e4 Mon Sep 17 00:00:00 2001 From: Alex Pavlov Date: Mon, 25 Nov 2024 22:57:20 +0600 Subject: [PATCH 5/6] alembic config --- api_v1/tests/conftest.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/api_v1/tests/conftest.py b/api_v1/tests/conftest.py index 1a57854..efa8396 100644 --- a/api_v1/tests/conftest.py +++ b/api_v1/tests/conftest.py @@ -22,7 +22,11 @@ ) -cfg = Config(settings.alembic.CONFIG_PATH.as_posix()) +cfg = Config() +cfg.set_main_option( + 'sqlalchemy.url', + settings.test_db.url, +) cfg.set_main_option('script_location', settings.alembic.MIGRATION_PATH.as_posix(), ) From 88b3abaafde8c9c8a87e2c28b9d7fb425c94394e Mon Sep 17 00:00:00 2001 From: Alex Pavlov Date: Wed, 27 Nov 2024 00:23:49 +0600 Subject: [PATCH 6/6] fixes --- api_v1/tests/conftest.py | 23 ++++------------------- config/celery/connection.py | 3 +++ 2 files changed, 7 insertions(+), 19 deletions(-) diff --git a/api_v1/tests/conftest.py b/api_v1/tests/conftest.py index efa8396..be8d0b4 100644 --- a/api_v1/tests/conftest.py +++ b/api_v1/tests/conftest.py @@ -8,8 +8,6 @@ from asgi_lifespan import LifespanManager from fastapi import FastAPI from sqlalchemy.pool import NullPool -from alembic.config import Config -from alembic import command from config import test_connection, settings, BaseModel from config import db_connection @@ -22,16 +20,6 @@ ) -cfg = Config() -cfg.set_main_option( - 'sqlalchemy.url', - settings.test_db.url, -) -cfg.set_main_option('script_location', - settings.alembic.MIGRATION_PATH.as_posix(), - ) - - @pytest.fixture(scope='session', autouse=True) def event_loop(request): loop = asyncio.get_event_loop_policy().new_event_loop() @@ -50,7 +38,6 @@ async def app() -> AsyncGenerator[LifespanManager, Any]: async def lifespan(app: FastAPI): async with db_setup.engine.begin() as conn: await conn.run_sync(BaseModel.metadata.create_all) - await conn.run_sync(alembic_do_upgrade) yield await conn.run_sync(BaseModel.metadata.drop_all) @@ -76,9 +63,7 @@ async def client(app: FastAPI) -> AsyncGenerator[httpx.AsyncClient, Any]: yield client -def alembic_do_upgrade(connection): - """ - Upgrade миграция алембик - """ - cfg.attributes['connection'] = connection - command.upgrade(cfg, 'head') +@pytest_asyncio.fixture() +async def get_async_session(): + async with db_setup.session() as session: + yield session diff --git a/config/celery/connection.py b/config/celery/connection.py index 7a28eb8..dbffffd 100644 --- a/config/celery/connection.py +++ b/config/celery/connection.py @@ -41,5 +41,8 @@ def wrapper(*args, app = Celery(__name__) app.conf.broker_url = settings.rabbit.broker_url +app.conf.result_backend = 'db+' + settings.db.url +app.conf.database_engine_options = {'echo': True} app.conf.timezone = settings.celery.TIMEZONE +app.conf.broker_connection_retry_on_startup = True app.autodiscover_tasks(packages=['api_v1.users'])