Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace settings variable with function #134

Merged
merged 2 commits into from Mar 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion openff/bespokefit/cli/cache.py
Expand Up @@ -24,7 +24,7 @@
exit_with_messages,
print_header,
)
from openff.bespokefit.executor.services import settings
from openff.bespokefit.executor.services import current_settings
from openff.bespokefit.executor.services.qcgenerator.cache import _canonicalize_task
from openff.bespokefit.executor.utilities.redis import is_redis_available, launch_redis
from openff.bespokefit.schema.data import LocalQCData
Expand Down Expand Up @@ -160,6 +160,8 @@ def _update(

console.print(Padding("2. connecting to redis cache", (1, 0, 1, 0)))

settings = current_settings()

if launch_redis_if_unavailable and not is_redis_available(
host=settings.BEFLOW_REDIS_ADDRESS, port=settings.BEFLOW_REDIS_PORT
):
Expand Down
4 changes: 3 additions & 1 deletion openff/bespokefit/cli/executor/list.py
Expand Up @@ -41,12 +41,14 @@ def list_cli():
console = rich.get_console()
print_header(console)

from openff.bespokefit.executor.services import settings
from openff.bespokefit.executor.services import current_settings
from openff.bespokefit.executor.services.coordinator.models import (
CoordinatorGETPageResponse,
)
from openff.bespokefit.executor.utilities import handle_common_errors

settings = current_settings()

base_href = (
f"http://127.0.0.1:"
f"{settings.BEFLOW_GATEWAY_PORT}"
Expand Down
28 changes: 11 additions & 17 deletions openff/bespokefit/executor/executor.py
Expand Up @@ -19,7 +19,7 @@
from rich.padding import Padding
from typing_extensions import Literal

from openff.bespokefit.executor.services import Settings, settings
from openff.bespokefit.executor.services import Settings, current_settings
from openff.bespokefit.executor.services.coordinator.models import (
CoordinatorGETResponse,
CoordinatorGETStageStatus,
Expand All @@ -41,12 +41,14 @@


def _base_endpoint():
settings = current_settings()
return (
f"http://127.0.0.1:{settings.BEFLOW_GATEWAY_PORT}{settings.BEFLOW_API_V1_STR}/"
)


def _coordinator_endpoint():
settings = current_settings()
return f"{_base_endpoint()}{settings.BEFLOW_COORDINATOR_PREFIX}"


Expand Down Expand Up @@ -251,6 +253,8 @@ def _cleanup_processes(self):
def _launch_redis(self):
"""Launches a redis server if an existing one cannot be found."""

settings = current_settings()

if self._launch_redis_if_unavailable and not is_redis_available(
host=settings.BEFLOW_REDIS_ADDRESS, port=settings.BEFLOW_REDIS_PORT
):
Expand All @@ -275,22 +279,12 @@ def _launch_workers(self):
BEFLOW_OPTIMIZER_WORKER_MAX_MEM=self._optimizer_worker_config.max_memory,
).apply_env():

for worker_settings, n_workers, config in (
(
settings.fragmenter_settings,
self._n_fragmenter_workers,
self._fragmenter_worker_config,
),
(
settings.qc_compute_settings,
self._n_qc_compute_workers,
self._qc_compute_worker_config,
),
(
settings.optimizer_settings,
self._n_optimizer_workers,
self._optimizer_worker_config,
),
settings = current_settings()

for worker_settings, n_workers in (
(settings.fragmenter_settings, self._n_fragmenter_workers),
(settings.qc_compute_settings, self._n_qc_compute_workers),
(settings.optimizer_settings, self._n_optimizer_workers),
):

if n_workers == 0:
Expand Down
7 changes: 5 additions & 2 deletions openff/bespokefit/executor/services/__init__.py
@@ -1,5 +1,8 @@
from openff.bespokefit.executor.services._settings import Settings

settings = Settings()

__all__ = ["settings", "Settings"]
def current_settings() -> Settings:
return Settings()


__all__ = ["current_settings", "Settings"]
34 changes: 18 additions & 16 deletions openff/bespokefit/executor/services/coordinator/app.py
Expand Up @@ -10,7 +10,7 @@
from fastapi.responses import Response
from openff.toolkit.topology import Molecule

from openff.bespokefit.executor.services import settings
from openff.bespokefit.executor.services import current_settings
from openff.bespokefit.executor.services.coordinator import worker
from openff.bespokefit.executor.services.coordinator.models import (
CoordinatorGETPageResponse,
Expand All @@ -32,8 +32,10 @@
_logger = logging.getLogger(__name__)
_worker_task: Optional[asyncio.Future] = None

__settings = current_settings()

__GET_TASK_IMAGE_ENDPOINT = (
"/" + settings.BEFLOW_COORDINATOR_PREFIX + "/{optimization_id}/image"
"/" + __settings.BEFLOW_COORDINATOR_PREFIX + "/{optimization_id}/image"
)


Expand All @@ -56,7 +58,7 @@ def _get_task(
return CoordinatorTask.parse_obj(pickle.loads(task_pickle))


@router.get("/" + settings.BEFLOW_COORDINATOR_PREFIX)
@router.get("/" + __settings.BEFLOW_COORDINATOR_PREFIX)
def get_optimizations(skip: int = 0, limit: int = 1000) -> CoordinatorGETPageResponse:
"""Retrieves all bespoke optimizations that have been submitted to this server."""

Expand All @@ -75,8 +77,8 @@ def get_optimizations(skip: int = 0, limit: int = 1000) -> CoordinatorGETPageRes
contents = [
Link(
self=(
f"{settings.BEFLOW_API_V1_STR}/"
f"{settings.BEFLOW_COORDINATOR_PREFIX}/"
f"{__settings.BEFLOW_API_V1_STR}/"
f"{__settings.BEFLOW_COORDINATOR_PREFIX}/"
f"{optimization_id}"
),
id=optimization_id,
Expand All @@ -89,26 +91,26 @@ def get_optimizations(skip: int = 0, limit: int = 1000) -> CoordinatorGETPageRes

return CoordinatorGETPageResponse(
self=(
f"{settings.BEFLOW_API_V1_STR}/"
f"{settings.BEFLOW_COORDINATOR_PREFIX}?skip={skip}&limit={limit}"
f"{__settings.BEFLOW_API_V1_STR}/"
f"{__settings.BEFLOW_COORDINATOR_PREFIX}?skip={skip}&limit={limit}"
),
prev=None
if prev_index >= skip
else (
f"{settings.BEFLOW_API_V1_STR}/"
f"{settings.BEFLOW_COORDINATOR_PREFIX}?skip={prev_index}&limit={limit}"
f"{__settings.BEFLOW_API_V1_STR}/"
f"{__settings.BEFLOW_COORDINATOR_PREFIX}?skip={prev_index}&limit={limit}"
),
next=None
if (next_index <= skip or next_index == n_optimizations)
else (
f"{settings.BEFLOW_API_V1_STR}/"
f"{settings.BEFLOW_COORDINATOR_PREFIX}?skip={next_index}&limit={limit}"
f"{__settings.BEFLOW_API_V1_STR}/"
f"{__settings.BEFLOW_COORDINATOR_PREFIX}?skip={next_index}&limit={limit}"
),
contents=contents,
)


@router.get("/" + settings.BEFLOW_COORDINATOR_PREFIX + "/{optimization_id}")
@router.get("/" + __settings.BEFLOW_COORDINATOR_PREFIX + "/{optimization_id}")
def get_optimization(optimization_id: str) -> CoordinatorGETResponse:
"""Retrieves a bespoke optimization that has been submitted to this server
using its unique id."""
Expand All @@ -118,15 +120,15 @@ def get_optimization(optimization_id: str) -> CoordinatorGETResponse:
)
response.links = {
"image": (
settings.BEFLOW_API_V1_STR
__settings.BEFLOW_API_V1_STR
+ __GET_TASK_IMAGE_ENDPOINT.format(optimization_id=optimization_id)
)
}

return response


@router.post("/" + settings.BEFLOW_COORDINATOR_PREFIX)
@router.post("/" + __settings.BEFLOW_COORDINATOR_PREFIX)
def post_optimization(body: CoordinatorPOSTBody) -> CoordinatorPOSTResponse:
"""Submit a bespoke optimization to be performed by the server."""

Expand Down Expand Up @@ -159,8 +161,8 @@ def post_optimization(body: CoordinatorPOSTBody) -> CoordinatorPOSTResponse:
return CoordinatorPOSTResponse(
id=task_id,
self=(
f"{settings.BEFLOW_API_V1_STR}/"
f"{settings.BEFLOW_COORDINATOR_PREFIX}/{task.id}"
f"{__settings.BEFLOW_API_V1_STR}/"
f"{__settings.BEFLOW_COORDINATOR_PREFIX}/{task.id}"
),
)

Expand Down
6 changes: 5 additions & 1 deletion openff/bespokefit/executor/services/coordinator/models.py
Expand Up @@ -2,7 +2,7 @@

from pydantic import Field

from openff.bespokefit.executor.services import settings
from openff.bespokefit.executor.services import current_settings
from openff.bespokefit.executor.services.coordinator.stages import StageType
from openff.bespokefit.executor.services.models import Link, PaginatedCollection
from openff.bespokefit.executor.utilities.typing import Status
Expand Down Expand Up @@ -48,6 +48,8 @@ def from_stage(cls, stage: StageType):
else:
raise NotImplementedError()

settings = current_settings()

base_endpoint = f"{settings.BEFLOW_API_V1_STR}/"

endpoints = {
Expand Down Expand Up @@ -91,6 +93,8 @@ class CoordinatorGETResponse(Link):
@classmethod
def from_task(cls, task: "CoordinatorTask"):

settings = current_settings()

stages = [
*task.pending_stages,
*([] if task.running_stage is None else [task.running_stage]),
Expand Down
17 changes: 16 additions & 1 deletion openff/bespokefit/executor/services/coordinator/stages.py
Expand Up @@ -20,7 +20,7 @@
from qcengine.procedures.torsiondrive import TorsionDriveResult
from typing_extensions import Literal

from openff.bespokefit.executor.services import settings
from openff.bespokefit.executor.services import current_settings
from openff.bespokefit.executor.services.coordinator.utils import get_cached_parameters
from openff.bespokefit.executor.services.fragmenter.models import (
FragmenterGETResponse,
Expand Down Expand Up @@ -111,6 +111,8 @@ class FragmentationStage(_Stage):

async def _enter(self, task: "CoordinatorTask"):

settings = current_settings()

async with httpx.AsyncClient() as client:

raw_response = await client.post(
Expand Down Expand Up @@ -143,6 +145,8 @@ async def _update(self):
if self.status == "errored":
return

settings = current_settings()

async with httpx.AsyncClient() as client:

raw_response = await client.get(
Expand Down Expand Up @@ -209,6 +213,9 @@ def _generate_torsion_parameters(
Returns:
The list of generated smirks patterns including any cached values, and a list of fragments which require torsiondrives.
"""

settings = current_settings()

cached_torsions = None

if is_redis_available(
Expand Down Expand Up @@ -341,6 +348,8 @@ async def _generate_parameters(

async def _enter(self, task: "CoordinatorTask"):

settings = current_settings()

fragment_stage = next(
iter(
stage
Expand Down Expand Up @@ -423,6 +432,8 @@ async def _enter(self, task: "CoordinatorTask"):

async def _update(self):

settings = current_settings()

if self.status == "errored":
return

Expand Down Expand Up @@ -537,6 +548,8 @@ async def _inject_bespoke_qc_data(

async def _enter(self, task: "CoordinatorTask"):

settings = current_settings()

completed_stages = {stage.type: stage for stage in task.completed_stages}

input_schema = task.input_schema.copy(deep=True)
Expand Down Expand Up @@ -581,6 +594,8 @@ async def _enter(self, task: "CoordinatorTask"):

async def _update(self):

settings = current_settings()

if self.status == "errored":
return

Expand Down
10 changes: 6 additions & 4 deletions openff/bespokefit/executor/services/coordinator/worker.py
Expand Up @@ -4,15 +4,17 @@

import redis

from openff.bespokefit.executor.services import settings
from openff.bespokefit.executor.services import current_settings
from openff.bespokefit.executor.services.coordinator.models import CoordinatorTask

_logger = logging.getLogger(__name__)

__settings = current_settings()

redis_connection = redis.Redis(
host=settings.BEFLOW_REDIS_ADDRESS,
port=settings.BEFLOW_REDIS_PORT,
db=settings.BEFLOW_REDIS_DB,
host=__settings.BEFLOW_REDIS_ADDRESS,
port=__settings.BEFLOW_REDIS_PORT,
db=__settings.BEFLOW_REDIS_DB,
)


Expand Down
16 changes: 9 additions & 7 deletions openff/bespokefit/executor/services/fragmenter/app.py
Expand Up @@ -5,7 +5,7 @@
from openff.fragmenter.depiction import _oe_render_fragment
from openff.fragmenter.fragment import FragmentationResult

from openff.bespokefit.executor.services import settings
from openff.bespokefit.executor.services import current_settings
from openff.bespokefit.executor.services.fragmenter import worker
from openff.bespokefit.executor.services.fragmenter.cache import (
cached_fragmentation_task,
Expand All @@ -20,10 +20,12 @@

router = APIRouter()

__GET_ENDPOINT = "/" + settings.BEFLOW_FRAGMENTER_PREFIX + "/{fragmentation_id}"
__settings = current_settings()

__GET_ENDPOINT = "/" + __settings.BEFLOW_FRAGMENTER_PREFIX + "/{fragmentation_id}"
__GET_FRAGMENT_IMAGE_ENDPOINT = (
"/"
+ settings.BEFLOW_FRAGMENTER_PREFIX
+ __settings.BEFLOW_FRAGMENTER_PREFIX
+ "/{fragmentation_id}/fragment/{fragment_id}/image"
)

Expand All @@ -36,14 +38,14 @@ def get_fragment(fragmentation_id: str) -> FragmenterGETResponse:

return FragmenterGETResponse(
id=fragmentation_id,
self=settings.BEFLOW_API_V1_STR
self=__settings.BEFLOW_API_V1_STR
+ __GET_ENDPOINT.format(fragmentation_id=fragmentation_id),
status=task_info["status"],
result=task_result,
error=json.dumps(task_info["error"]),
_links={
f"fragment-{i}-image": (
settings.BEFLOW_API_V1_STR
__settings.BEFLOW_API_V1_STR
+ __GET_FRAGMENT_IMAGE_ENDPOINT.format(
fragmentation_id=fragmentation_id, fragment_id=i
)
Expand All @@ -55,7 +57,7 @@ def get_fragment(fragmentation_id: str) -> FragmenterGETResponse:
)


@router.post("/" + settings.BEFLOW_FRAGMENTER_PREFIX)
@router.post("/" + __settings.BEFLOW_FRAGMENTER_PREFIX)
def post_fragment(body: FragmenterPOSTBody) -> FragmenterPOSTResponse:
# We use celery delay method in order to enqueue the task with the given
# parameters
Expand All @@ -65,7 +67,7 @@ def post_fragment(body: FragmenterPOSTBody) -> FragmenterPOSTResponse:
)
return FragmenterPOSTResponse(
id=task_id,
self=settings.BEFLOW_API_V1_STR
self=__settings.BEFLOW_API_V1_STR
+ __GET_ENDPOINT.format(fragmentation_id=task_id),
)

Expand Down