Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions ni_measurementlink_service/_internal/discovery_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import logging
import os
import pathlib
import subprocess
import sys
import time
import typing
from typing import Optional

Expand All @@ -26,6 +28,9 @@

_logger = logging.getLogger(__name__)

_START_SERVICE_TIMEOUT = 30.0
_START_SERVICE_POLLING_INTERVAL = 100e-3


class ServiceLocation(typing.NamedTuple):
"""Represents the location of a service."""
Expand Down Expand Up @@ -204,12 +209,78 @@ def resolve_service(self, provided_interface: str, service_class: str = "") -> S

def _get_discovery_service_address() -> str:
key_file_path = _get_key_file_path()
_ensure_discovery_service_started(key_file_path)
_logger.debug("Discovery service key file path: %s", key_file_path)
with _open_key_file(str(key_file_path)) as key_file:
key_json = json.load(key_file)
return "localhost:" + key_json["InsecurePort"]


def _ensure_discovery_service_started(key_file_path: pathlib.Path) -> None:
"""Check whether discovery service already running, if not start the discovery service."""
if _service_already_running(key_file_path):
return

exe_file_path = _get_discovery_service_location()
_start_service(exe_file_path, key_file_path)


def _get_discovery_service_location() -> pathlib.PurePath:
"""Gets the location of the discovery service process executable."""
registration_json_path = _get_registration_json_file_path()
registration_json_obj = json.loads(registration_json_path.read_text())
return registration_json_path.parent / registration_json_obj["discovery"]["path"]


def _get_registration_json_file_path() -> pathlib.Path:
if sys.platform == "win32":
return (
pathlib.Path(os.environ["ProgramW6432"])
/ "National Instruments"
/ "Shared"
/ "MeasurementLink"
/ "MeasurementLinkServices.json"
)
else:
raise NotImplementedError("Platform not supported")


def _key_file_exists(key_file_path: pathlib.Path) -> bool:
return key_file_path.is_file() and key_file_path.stat().st_size > 0


def _start_service(exe_file_path: pathlib.PurePath, key_file_path: pathlib.Path) -> None:
"""Starts the service at the specified path and wait for the service to get up and running."""
subprocess.Popen([exe_file_path], cwd=exe_file_path.parent)
# After the execution of process, check for key file existence in the path
# stop checking after 30 seconds have elapsed and throw error
timeout_time = time.time() + _START_SERVICE_TIMEOUT
while True:
try:
with _open_key_file(str(key_file_path)) as _:
return
except IOError:
pass
if time.time() >= timeout_time:
raise TimeoutError("Timed out waiting for discovery service to start")
time.sleep(_START_SERVICE_POLLING_INTERVAL)


def _service_already_running(key_file_path: pathlib.Path) -> bool:
try:
_delete_existing_key_file(key_file_path)
except IOError:
return True
return False


def _delete_existing_key_file(key_file_path: pathlib.Path) -> None:
if _key_file_exists(key_file_path):
with key_file_path.open("w") as _:
pass
key_file_path.unlink()


def _get_key_file_path(cluster_id: Optional[str] = None) -> pathlib.Path:
if cluster_id is not None:
return _get_key_file_directory() / f"DiscoveryService_{cluster_id}.json"
Expand Down