diff --git a/api_v1/tests/conftest.py b/api_v1/tests/conftest.py index 932b340..be8d0b4 100644 --- a/api_v1/tests/conftest.py +++ b/api_v1/tests/conftest.py @@ -1,4 +1,3 @@ -import sys import asyncio import httpx import pytest_asyncio @@ -39,7 +38,6 @@ async def app() -> AsyncGenerator[LifespanManager, Any]: async def lifespan(app: FastAPI): async with db_setup.engine.begin() as conn: await conn.run_sync(BaseModel.metadata.create_all) - sys.stdout.write('alembic upgrade head') yield await conn.run_sync(BaseModel.metadata.drop_all) @@ -63,3 +61,9 @@ async def client(app: FastAPI) -> AsyncGenerator[httpx.AsyncClient, Any]: base_url=current_home + current_api, ) as client: yield client + + +@pytest_asyncio.fixture() +async def get_async_session(): + async with db_setup.session() as session: + yield session diff --git a/api_v1/users/tasks.py b/api_v1/users/tasks.py index 2762e16..bb6b761 100644 --- a/api_v1/users/tasks.py +++ b/api_v1/users/tasks.py @@ -1,4 +1,4 @@ -from config import celery_app +from config import celery_app, settings import asyncio @@ -9,3 +9,12 @@ async def time_sleep_task(): """ await asyncio.sleep(2.0) return 'Task is done' + + +celery_app.conf.beat_schedule = { + 'test-every-10-seconds': { + 'task': 'llm_analizer.tasks.test', + 'schedule': settings.celery.TEST_TIMEDELTA, + 'args': ('hello',) + }, +} diff --git a/async_alembic/env.py b/async_alembic/env.py index 06b76c3..bbeca65 100644 --- a/async_alembic/env.py +++ b/async_alembic/env.py @@ -3,7 +3,8 @@ from sqlalchemy import pool from sqlalchemy.engine import Connection -from sqlalchemy.ext.asyncio import async_engine_from_config +from sqlalchemy.ext.asyncio import async_engine_from_config, AsyncEngine +from sqlalchemy import engine_from_config from alembic import context @@ -58,34 +59,43 @@ def run_migrations_offline() -> None: def do_run_migrations(connection: Connection) -> None: - context.configure(connection=connection, target_metadata=target_metadata) - + context.configure( + connection=connection, + target_metadata=target_metadata, + compare_type=True, + ) with context.begin_transaction(): context.run_migrations() -async def run_async_migrations() -> None: +async def run_async_migrations(connectable) -> None: """In this scenario we need to create an Engine and associate a connection with the context. """ - - connectable = async_engine_from_config( - config.get_section(config.config_ini_section, {}), - prefix="sqlalchemy.", - poolclass=pool.NullPool, - ) - async with connectable.connect() as connection: await connection.run_sync(do_run_migrations) - await connectable.dispose() def run_migrations_online() -> None: """Run migrations in 'online' mode.""" - - asyncio.run(run_async_migrations()) + connectable = context.config.attributes.get('connection', None) + if connectable is None: + connectable = AsyncEngine( + engine_from_config( + context.config.get_section( + context.config.config_ini_section, + ), + prefix='sqlalchemy.', + poolclass=pool.NullPool, + future=True, + ) + ) + if isinstance(connectable, AsyncEngine): + asyncio.run(run_async_migrations(connectable)) + else: + do_run_migrations(connectable) if context.is_offline_mode(): diff --git a/config/celery/connection.py b/config/celery/connection.py index bd3a761..dbffffd 100644 --- a/config/celery/connection.py +++ b/config/celery/connection.py @@ -41,4 +41,8 @@ def wrapper(*args, app = Celery(__name__) app.conf.broker_url = settings.rabbit.broker_url +app.conf.result_backend = 'db+' + settings.db.url +app.conf.database_engine_options = {'echo': True} +app.conf.timezone = settings.celery.TIMEZONE +app.conf.broker_connection_retry_on_startup = True app.autodiscover_tasks(packages=['api_v1.users']) diff --git a/config/config.py b/config/config.py index c2adbe0..e764b29 100644 --- a/config/config.py +++ b/config/config.py @@ -1,7 +1,8 @@ from pathlib import Path from pydantic_settings import BaseSettings, SettingsConfigDict -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict from starlette.config import Config +from celery.schedules import crontab base_dir = Path(__file__).resolve().parent.parent @@ -11,6 +12,14 @@ config = Config('.env') +class AlembicSettings(BaseModel): + """ + Настройки Alembic + """ + CONFIG_PATH: Path = Path('alembic.ini') + MIGRATION_PATH: Path = Path('async_alembic/') + + class TestDBSettings(BaseModel): """ Настройки тестовой базы данных @@ -35,6 +44,23 @@ class DBSettings(BaseModel): url: str = f'{_engine}://{_owner}:{_password}@{_name}/{_db_name}' +class CelerySettings(BaseModel): + """ + Настройки Celery + """ + model_config = ConfigDict( + arbitrary_types_allowed=True, + ) + TIMEZONE: str = 'Europe/Moscow' + TIMEDELTA_PER_DAY: crontab = crontab(minute=0, + hour=2, + day_of_week='*/1', + day_of_month='*/1', + month_of_year='*/1', + ) + TEST_TIMEDELTA: crontab = crontab(minute='*/1') + + class RabbitSettings(BaseModel): """ Настройки RabbitMQ @@ -62,7 +88,9 @@ class Settings(BaseSettings): ) db: DBSettings = DBSettings() test_db: TestDBSettings = TestDBSettings() + celery: CelerySettings = CelerySettings() rabbit: RabbitSettings = RabbitSettings() + alembic: AlembicSettings = AlembicSettings() debug: bool = bool(int(config('DEBUG'))) API_PREFIX: str = '/api/v1' BASE_DIR: Path = base_dir diff --git a/docker-compose.yml b/docker-compose.yml index e7549fe..cd40b36 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -42,6 +42,20 @@ services: - db - fast_api + celery_beat: + build: + context: . + dockerfile: ./docker/fastapi/Dockerfile + command: /start-celerybeat + volumes: + - .:/app + env_file: + - .env + depends_on: + - rabbitmq + - db + - fast_api + dashboard: build: context: . diff --git a/docker/celery/beat/start b/docker/celery/beat/start new file mode 100644 index 0000000..164f293 --- /dev/null +++ b/docker/celery/beat/start @@ -0,0 +1,9 @@ +#!/bin/bash + +set -o errexit +set -o nounset + +celery -A config.celery.connection.app \ + --broker=amqp://"${RABBITMQ_DEFAULT_USER}":"${RABBITMQ_DEFAULT_PASS}"@"${RMQ_HOST}":"${RMQ_PORT}" \ + beat \ + --loglevel=info diff --git a/docker/fastapi/Dockerfile b/docker/fastapi/Dockerfile index 80defd7..0431342 100644 --- a/docker/fastapi/Dockerfile +++ b/docker/fastapi/Dockerfile @@ -37,6 +37,10 @@ COPY ./docker/celery/worker/start /start-celeryworker RUN sed -i 's/\r$//g' /start-celeryworker RUN chmod +x /start-celeryworker +COPY ./docker/celery/beat/start /start-celerybeat +RUN sed -i 's/\r$//g' /start-celerybeat +RUN chmod +x /start-celerybeat + COPY ./docker/celery/flower/start /start-flower RUN sed -i 's/\r$//g' /start-flower RUN chmod +x /start-flower