Skip to content

Commit

Permalink
chore: another refactor for another circular dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
yolile committed Dec 1, 2021
1 parent 40dd8f2 commit 0c1c12c
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 26 deletions.
21 changes: 21 additions & 0 deletions tools/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
from datetime import date, datetime
from typing import Any, List, Optional

from yapw.methods.blocking import ack, publish

from tools import settings
from tools.services import commit
from tools.state import set_dataset_state, state


def parse_datetime(str_datetime: Optional[str]) -> Optional[datetime]:
Expand Down Expand Up @@ -86,3 +90,20 @@ def retrieve_samples(self) -> List[Any]:

def is_step_required(step_name: str) -> bool:
return step_name in settings.STEPS


def finish_worker(
client_state, channel, method, dataset_id, phase, routing_key=None, logger_message=None, logger=None
):
"""
Changes the dataset step status, publishes a message for the next phase and ack the received message.
"""
set_dataset_state(dataset_id, state.OK, phase)
commit()
if logger and logger_message:
logger.info(logger_message)
if routing_key:
# send message for a next phase
message = {"dataset_id": dataset_id}
publish(client_state, channel, message, routing_key)
ack(client_state, channel, method.delivery_tag)
21 changes: 1 addition & 20 deletions tools/state.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from typing import Optional

from yapw.methods.blocking import ack, publish

from tools.services import commit, get_cursor
from tools.services import get_cursor


class state:
Expand Down Expand Up @@ -99,20 +97,3 @@ def get_dataset_progress(dataset_id: int) -> tuple:
with get_cursor() as cursor:
cursor.execute("SELECT * FROM progress_monitor_dataset WHERE dataset_id = %(id)s", {"id": dataset_id})
return cursor.fetchone()


def finish_worker(
client_state, channel, method, dataset_id, phase, routing_key=None, logger_message=None, logger=None
):
"""
Changes the dataset step status, publishes a message for the next phase and ack the received message.
"""
set_dataset_state(dataset_id, state.OK, phase)
commit()
if logger and logger_message:
logger.info(logger_message)
if routing_key:
# send message for a next phase
message = {"dataset_id": dataset_id}
publish(client_state, channel, message, routing_key)
ack(client_state, channel, method.delivery_tag)
3 changes: 1 addition & 2 deletions workers/check/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@
from dataset import processor
from tools import settings
from tools.currency_converter import bootstrap
from tools.helpers import is_step_required
from tools.helpers import finish_worker, is_step_required
from tools.services import commit, create_client
from tools.state import (
finish_worker,
get_dataset_progress,
get_processed_items_count,
get_total_items_count,
Expand Down
4 changes: 2 additions & 2 deletions workers/check/time_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@

from time_variance import processor
from tools import settings
from tools.helpers import is_step_required
from tools.helpers import finish_worker, is_step_required
from tools.services import commit, create_client
from tools.state import finish_worker, phase, set_dataset_state, state
from tools.state import phase, set_dataset_state, state

consume_routing_key = "dataset_checker"
routing_key = "time_variance_checker"
Expand Down
4 changes: 2 additions & 2 deletions workers/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import contracting_process.resource_level.report as resource_level_report
from dataset import meta_data_aggregator
from tools import settings
from tools.helpers import is_step_required
from tools.helpers import finish_worker, is_step_required
from tools.services import create_client
from tools.state import finish_worker, phase
from tools.state import phase

consume_routing_key = "time_variance_checker"
logger = logging.getLogger("pelican.workers.report")
Expand Down

0 comments on commit 0c1c12c

Please sign in to comment.