Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 59 additions & 45 deletions ni_measurementlink_service/_internal/service_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Callable, List, Optional

import grpc
from deprecation import deprecated
from grpc.framework.foundation import logging_pool

from ni_measurementlink_service._internal.discovery_client import (
Expand All @@ -28,29 +29,49 @@


class GrpcService:
"""Class that manages hosting the measurement as service and closing service.

Attributes:
discovery_client (DiscoveryClient, optional): Instance of Discovery Client.
Defaults to None.
"""
"""Manages the gRPC server lifetime and registration."""

def __init__(self, discovery_client: Optional[DiscoveryClient] = None) -> None:
"""Initialize Service Manager.

Args:
discovery_client (DiscoveryClient, optional): Instance of Discovery Client.
Defaults to None.

servicer(MeasurementServiceServicer): The gRPC implementation class of the service.
Used in tests.

port(str) : The port number of the hosted service.Used in Tests.

"""
self.discovery_client = discovery_client or DiscoveryClient()
"""Initialize the service."""
self._discovery_client = discovery_client or DiscoveryClient()
self._server: Optional[grpc.Server] = None
self._service_location: Optional[ServiceLocation] = None
self._registration_id = ""

@property
@deprecated(
deprecated_in="1.3.0-dev0",
details="This property should not be public and will be removed in a later release.",
)
def discovery_client(self) -> DiscoveryClient:
"""Client for accessing the MeasurementLink discovery service."""
return self._discovery_client

@property
@deprecated(
deprecated_in="1.3.0-dev0",
details="Use service_location instead.",
)
def port(self) -> str:
"""The insecure port."""
return self.service_location.insecure_port

@property
@deprecated(
deprecated_in="1.3.0-dev0",
details="This property should not be public and will be removed in a later release.",
)
def server(self) -> Optional[grpc.Server]:
"""The gRPC server."""
return self._server

@property
def service_location(self) -> ServiceLocation:
"""The location of the service on the network."""
if self._service_location is None:
raise RuntimeError("Measurement service not running")
return self._service_location

def start(
self,
measurement_info: MeasurementInfo,
Expand All @@ -59,26 +80,15 @@ def start(
output_parameter_list: List[ParameterMetadata],
measure_function: Callable,
) -> str:
"""Host a gRPC service with the registered measurement method.

Args:
measurement_info (MeasurementInfo): Measurement info

service_info (ServiceInfo): Service info

configuration_parameter_list (List): List of configuration parameters.

output_parameter_list (List): List of output parameters.

measure_function (Callable): Registered measurement function.
"""Start the gRPC server and register it with the discovery service.

Returns:
int: The port number of the server
The insecure port.
"""
interceptors: List[grpc.ServerInterceptor] = []
if ServerLogger.is_enabled():
interceptors.append(ServerLogger())
self.server = grpc.server(
self._server = grpc.server(
logging_pool.pool(max_workers=10),
interceptors=interceptors,
options=[
Expand All @@ -95,7 +105,7 @@ def start(
measure_function,
)
v1_measurement_service_pb2_grpc.add_MeasurementServiceServicer_to_server(
servicer_v1, self.server
servicer_v1, self._server
)
elif interface == _V2_INTERFACE:
servicer_v2 = MeasurementServiceServicerV2(
Expand All @@ -105,26 +115,30 @@ def start(
measure_function,
)
v2_measurement_service_pb2_grpc.add_MeasurementServiceServicer_to_server(
servicer_v2, self.server
servicer_v2, self._server
)
else:
raise ValueError(
f"Unknown interface was provided in the .serviceconfig file: {interface}"
)
port = str(self.server.add_insecure_port("[::]:0"))
self.server.start()
port = str(self._server.add_insecure_port("[::]:0"))
self._server.start()
_logger.info("Measurement service hosted on port: %s", port)

service_location = ServiceLocation("localhost", port, "")
self._registration_id = self.discovery_client.register_service(
service_info, service_location
self._service_location = ServiceLocation("localhost", port, "")
self._registration_id = self._discovery_client.register_service(
service_info, self.service_location
)

self.port = port
return port

def stop(self) -> None:
"""Close the Service after un-registering with discovery service and cleanups."""
self.discovery_client.unregister_service(self._registration_id)
self.server.stop(5)
"""Unregister and stop the gRPC server."""
if self._registration_id:
self._discovery_client.unregister_service(self._registration_id)
if self._server is not None:
self._server.stop(5)

self._registration_id = ""
self._server = None
self._service_location = None
_logger.info("Measurement service closed.")
18 changes: 4 additions & 14 deletions ni_measurementlink_service/measurement/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,8 @@ def service_location(self) -> ServiceLocation:
"""The location of the service on the network."""
with self._initialization_lock:
if self._grpc_service is None:
raise RuntimeError(
"Measurement service not running. Call host_service() before querying the service_location."
)

return ServiceLocation(
location="localhost",
insecure_port=self._grpc_service.port,
ssl_authenticated_port="",
)
raise RuntimeError("Measurement service not running")
return self._grpc_service.service_location

def register_measurement(self, measurement_function: _F) -> _F:
"""Register a function as the measurement function for a measurement service.
Expand Down Expand Up @@ -485,8 +478,5 @@ def get_channel(self, provided_interface: str, service_class: str = "") -> grpc.
Exception: If service_class is not specified and there is more than one matching service
registered.
"""
service_location = self.grpc_service.discovery_client.resolve_service(
provided_interface, service_class
)

return self.channel_pool.get_channel(target=service_location.insecure_address)
service_location = self.discovery_client.resolve_service(provided_interface, service_class)
return self.channel_pool.get_channel(service_location.insecure_address)