Skip to content

Commit

Permalink
kafka init (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
grigoriev-semyon committed Aug 19, 2023
1 parent e652514 commit abedb46
Show file tree
Hide file tree
Showing 14 changed files with 411 additions and 17 deletions.
61 changes: 49 additions & 12 deletions .github/workflows/build_and_publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ jobs:
name: Testing
url: https://api.test.profcomff.com/userdata
env:
CONTAINER_NAME: com_profcomff_api_userdata_test
API_CONTAINER_NAME: com_profcomff_api_userdata_test
WORKER_CONTAINER_NAME: com_profcomff_worker_userdata_test
permissions:
packages: read

Expand All @@ -72,25 +73,43 @@ jobs:
--rm \
--network=web \
--env DB_DSN=${{ secrets.DB_DSN }} \
--name ${{ env.CONTAINER_NAME }}_migration \
--name ${{ env.API_CONTAINER_NAME }}_migration \
--workdir="/" \
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:test \
alembic upgrade head
- name: Run new version
id: run_prod
- name: Run new version API
id: run_test_api
run: |
docker stop ${{ env.CONTAINER_NAME }} || true && docker rm ${{ env.CONTAINER_NAME }} || true
docker stop ${{ env.API_CONTAINER_NAME }} || true && docker rm ${{ env.API_CONTAINER_NAME }} || true
docker run \
--detach \
--restart always \
--network=web \
--env DB_DSN='${{ secrets.DB_DSN }}' \
--env ROOT_PATH='/userdata' \
--env GUNICORN_CMD_ARGS='--log-config logging_test.conf' \
--name ${{ env.CONTAINER_NAME }} \
--name ${{ env.API_CONTAINER_NAME }} \
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:test
- name: Run new version worker
id: run_test_worker
run: |
docker stop ${{ env.WORKER_CONTAINER_NAME }} || true && docker rm ${{ env.WORKER_CONTAINER_NAME }} || true
docker run \
--detach \
--restart always \
--network=web \
--env DB_DSN='${{ secrets.DB_DSN }}' \
--env KAFKA_DSN='${{ secrets.KAFKA_DSN }}' \
--env KAFKA_LOGIN='${{ secrets.KAFKA_LOGIN }}' \
--env KAFKA_PASSWORD='${{ secrets.KAFKA_PASSWORD }}' \
--env KAFKA_GROUP_ID='${{ vars.KAFKA_GROUP_ID }}' \
--env KAFKA_TOPICS='${{ vars.KAFKA_TOPICS }}' \
--name ${{ env.WORKER_CONTAINER_NAME }} \
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:test python -m userdata_api start --instance worker

deploy-production:
name: Deploy Production
needs: build-and-push-image
Expand All @@ -100,7 +119,8 @@ jobs:
name: Production
url: https://api.profcomff.com/userdata
env:
CONTAINER_NAME: com_profcomff_api_userdata
API_CONTAINER_NAME: com_profcomff_api_userdata
WORKER_CONTAINER_NAME: com_profcomff_worker_userdata
permissions:
packages: read

Expand All @@ -114,21 +134,38 @@ jobs:
--rm \
--network=web \
--env DB_DSN=${{ secrets.DB_DSN }} \
--name ${{ env.CONTAINER_NAME }}_migration \
--name ${{ env.API_CONTAINER_NAME }}_migration \
--workdir="/" \
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest \
alembic upgrade head
- name: Run new version
id: run_test
- name: Run new version API
id: run_prod_api
run: |
docker stop ${{ env.CONTAINER_NAME }} || true && docker rm ${{ env.CONTAINER_NAME }} || true
docker stop ${{ env.API_CONTAINER_NAME }} || true && docker rm ${{ env.API_CONTAINER_NAME }} || true
docker run \
--detach \
--restart always \
--network=web \
--env DB_DSN='${{ secrets.DB_DSN }}' \
--env ROOT_PATH='/userdata' \
--env GUNICORN_CMD_ARGS='--log-config logging_prod.conf' \
--name ${{ env.CONTAINER_NAME }} \
--name ${{ env.API_CONTAINER_NAME }} \
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
- name: Run new version worker
id: run_prod_worker
run: |
docker stop ${{ env.WORKER_CONTAINER_NAME }} || true && docker rm ${{ env.WORKER_CONTAINER_NAME }} || true
docker run \
--detach \
--restart always \
--network=web \
--env DB_DSN='${{ secrets.DB_DSN }}' \
--env KAFKA_DSN='${{ secrets.KAFKA_DSN }}' \
--env KAFKA_LOGIN='${{ secrets.KAFKA_LOGIN }}' \
--env KAFKA_PASSWORD='${{ secrets.KAFKA_PASSWORD }}' \
--env KAFKA_GROUP_ID='${{ vars.KAFKA_GROUP_ID }}' \
--env KAFKA_TOPICS='${{ vars.KAFKA_TOPICS }}' \
--name ${{ env.WORKER_CONTAINER_NAME }} \
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest python -m userdata_api start --instance worker
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,19 @@
```
4. Запускайте приложение!
```console
foo@bar:~$ python -m userdata_api
foo@bar:~$ python -m userdata_api start --instance api -- запустит АПИ
foo@bar:~$ python -m userdata_api start --instance worker -- запустит Kafka worker
```
Приложение состоит из двух частей - АПИ и Kafka worker'а.

АПИ нужно для управления структурой пользовательских данных -
контроль над категориями данных, параметрами, источниками данных.
Также, в АПИ пользовательские данные может слать
сам пользователь(владелец этих данных), а также админ

Kafka worker нужен для того, чтобы разгребать поступающие от OAuth
методов авторизации AuthAPI пользовательские данные

## ENV-file description
- `DB_DSN=postgresql://postgres@localhost:5432/postgres` – Данные для подключения к БД
2 changes: 1 addition & 1 deletion migrations/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from alembic import context
from sqlalchemy import engine_from_config, pool

from settings import get_settings
from userdata_api.models.base import Base
from userdata_api.settings import get_settings


# this is the Alembic Config object, which provides
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ pydantic[dotenv]
SQLAlchemy
uvicorn
pydantic-settings
event_schema_profcomff
confluent_kafka
7 changes: 7 additions & 0 deletions userdata_api/settings.py → settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ class Settings(BaseSettings):
"""Application settings"""

DB_DSN: PostgresDsn = 'postgresql://postgres@localhost:5432/postgres'

KAFKA_DSN: str | None = None
KAFKA_LOGIN: str | None = None
KAFKA_PASSWORD: str | None = None
KAFKA_TOPICS: list[str] | None = None
KAFKA_GROUP_ID: str | None = None

ROOT_PATH: str = '/' + os.getenv("APP_NAME", "")

CORS_ALLOW_ORIGINS: list[str] = ['*']
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from settings import get_settings
from userdata_api.models.db import *
from userdata_api.routes.base import app
from userdata_api.settings import get_settings
from userdata_api.utils.utils import random_string


Expand Down
Empty file added tests/test_worker/__init__.py
Empty file.
110 changes: 110 additions & 0 deletions tests/test_worker/test_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import pytest
import sqlalchemy.exc
from event_schema.auth import UserLogin

from userdata_api.models.db import Category, Info, Param, Source
from userdata_api.utils.utils import random_string
from worker.user import patch_user_info


@pytest.fixture()
def category(dbsession):
name = f"test{random_string()}"
dbsession.add(
_cat := Category(
name=name, read_scope=f"testscope.{random_string()}", update_scope=f"testscope.{random_string()}"
)
)
dbsession.commit()
yield _cat
dbsession.delete(_cat)
dbsession.commit()


@pytest.fixture()
def param(dbsession, category):
time_ = f"test{random_string()}"
dbsession.add(
_par := Param(name=f"test{time_}", category_id=category.id, type="last", changeable=True, is_required=True)
)
dbsession.commit()
yield _par
dbsession.delete(_par)
dbsession.commit()


@pytest.fixture()
def source(dbsession):
time_ = f"test{random_string()}"
__source = Source(name=f"test{time_}", trust_level=8)
dbsession.add(__source)
dbsession.commit()
yield __source
dbsession.delete(__source)
dbsession.commit()


@pytest.fixture()
def info(param, source, dbsession):
time_ = f"test{random_string()}"
__info = Info(value=f"test{time_}", source_id=source.id, param_id=param.id, owner_id=1)
dbsession.add(__info)
dbsession.commit()
yield __info
try:
dbsession.delete(__info)
dbsession.commit()
except sqlalchemy.exc.Any:
pass


def test_create(param, source, dbsession):
with pytest.raises(sqlalchemy.exc.NoResultFound):
dbsession.query(Info).filter(Info.param_id == param.id, Info.source_id == source.id, Info.value == "test").one()
patch_user_info(
UserLogin.model_validate(
{"items": [{"category": param.category.name, "param": param.name, "value": "test"}], "source": source.name}
),
1,
session=dbsession,
)
info = (
dbsession.query(Info).filter(Info.param_id == param.id, Info.source_id == source.id, Info.value == "test").one()
)
assert info
dbsession.delete(info)
dbsession.commit()


def test_update(info, dbsession):
assert info.value != "updated"
patch_user_info(
UserLogin.model_validate(
{
"items": [{"category": info.category.name, "param": info.param.name, "value": "updated"}],
"source": info.source.name,
}
),
1,
session=dbsession,
)

dbsession.expire(info)
assert info.value == "updated"


def test_delete(info, dbsession):
assert info.is_deleted is False
patch_user_info(
UserLogin.model_validate(
{
"items": [{"category": info.category.name, "param": info.param.name, "value": None}],
"source": info.source.name,
}
),
1,
session=dbsession,
)

dbsession.expire(info)
assert info.is_deleted is True
20 changes: 19 additions & 1 deletion userdata_api/__main__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
import argparse

import uvicorn

from userdata_api.routes.base import app
from worker.consumer import process


def get_args():
parser = argparse.ArgumentParser()
subparsers = parser.add_subparsers(dest='command')

start = subparsers.add_parser("start")
start.add_argument('--instance', type=str, required=True)

return parser.parse_args()


if __name__ == '__main__':
uvicorn.run(app)
args = get_args()
match args.instance:
case "api":
uvicorn.run(app)
case "worker":
process()
2 changes: 1 addition & 1 deletion userdata_api/routes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from fastapi.middleware.cors import CORSMiddleware
from fastapi_sqlalchemy import DBSessionMiddleware

from settings import get_settings
from userdata_api import __version__
from userdata_api.settings import get_settings

from .category import category
from .param import param
Expand Down
Empty file added worker/__init__.py
Empty file.
40 changes: 40 additions & 0 deletions worker/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import logging
from typing import Any

import pydantic
from event_schema.auth import UserLogin, UserLoginKey
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from settings import get_settings
from worker.kafka import KafkaConsumer

from .user import patch_user_info


log = logging.getLogger(__name__)
settings = get_settings()
consumer = KafkaConsumer()

_engine = create_engine(str(settings.DB_DSN), pool_pre_ping=True, isolation_level="AUTOCOMMIT")
_Session = sessionmaker(bind=_engine)


def process_models(key: Any, value: Any) -> tuple[UserLoginKey | None, UserLogin | None]:
try:
return UserLoginKey.model_validate(key), UserLogin.model_validate(value)
except pydantic.ValidationError:
log.error(f"Validation error occurred, {key=}, {value=}", exc_info=False)
return None, None


def process_message(message: tuple[Any, Any]) -> None:
processed_k, processed_v = process_models(*message)
if not (processed_k and processed_v):
return
patch_user_info(processed_v, processed_k.user_id, session=_Session())


def process():
for message in consumer.listen():
process_message(message)
Loading

0 comments on commit abedb46

Please sign in to comment.