Skip to content

Commit

Permalink
[ycable] cleanup logic for creating grpc future ready (sonic-net#289)
Browse files Browse the repository at this point in the history
Signed-off-by: vaibhav-dahiya vdahiya@microsoft.com
This PR attempts to remove this call
channel_ready = grpc.channel_ready_future for the ycabled gRPC implementation. The reason to do this is incase of channel not being connected ycabled tries to reattempt the channel creation again with each RPC request from linkmgrd.
This is not the right way to maintain the channels/stubs, and actually adds unrequired overhead for ycabled. This PR supports creating channel/stub in ycabled in correct manner.

Description
Motivation and Context
Required to reduce CPU usage for ycabled

How Has This Been Tested?
Deploying the changes on Arista testbed and UT
  • Loading branch information
vdahiya12 committed Sep 7, 2022
1 parent ce3b6db commit 3acb171
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 24 deletions.
12 changes: 10 additions & 2 deletions sonic-ycabled/tests/test_y_cable_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4962,10 +4962,18 @@ def test_setup_grpc_channel_for_port(self):
with patch('ycable.ycable_utilities.y_cable_helper.y_cable_platform_sfputil') as patched_util:

patched_util.get_asic_id_for_logical_port.return_value = 0
rc = setup_grpc_channel_for_port("Ethernet0", "192.168.0.1")
(channel, stub) = setup_grpc_channel_for_port("Ethernet0", "192.168.0.1")

assert(rc == (None, None))
assert(stub == True)
assert(channel != None)

def test_connect_channel(self):

with patch('grpc.channel_ready_future') as patched_util:

patched_util.result.return_value = 0
rc = connect_channel(patched_util, None, None)
assert(rc == None)

def test_setup_grpc_channels(self):

Expand Down
70 changes: 48 additions & 22 deletions sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ def wrapper(*args, **kwargs):

return wrapper


def retry_setup_grpc_channel_for_port(port, asic_index):

global grpc_port_stubs
Expand Down Expand Up @@ -409,34 +408,61 @@ def get_grpc_credentials(type, kvp):

return credential

def create_channel(type,level, kvp, soc_ip):
def connect_channel(channel, stub, port):

channel_ready = grpc.channel_ready_future(channel)
retries = 3

for _ in range(retries):
try:
channel_ready.result(timeout=2)
except grpc.FutureTimeoutError:
helper_logger.log_warning("gRPC port {} state changed to SHUTDOWN".format(port))
else:
break

if type == "secure":
credential = get_grpc_credentials(level, kvp)
target_name = kvp.get("grpc_ssl_credential", None)
if credential is None or target_name is None:
return (None, None)
def create_channel(type, level, kvp, soc_ip, port):

GRPC_CLIENT_OPTIONS.append(('grpc.ssl_target_name_override', '{}'.format(target_name)))

channel = grpc.secure_channel("{}:{}".format(soc_ip, GRPC_PORT), credential, options=GRPC_CLIENT_OPTIONS)
else:
channel = grpc.insecure_channel("{}:{}".format(soc_ip, GRPC_PORT), options=GRPC_CLIENT_OPTIONS)
#Helper callback to get an channel connectivity state
def wait_for_state_change(channel_connectivity):
if channel_connectivity == grpc.ChannelConnectivity.TRANSIENT_FAILURE:
helper_logger.log_notice("gRPC port {} state changed to TRANSIENT_FAILURE".format(port))
if channel_connectivity == grpc.ChannelConnectivity.CONNECTING:
helper_logger.log_notice("gRPC port {} state changed to CONNECTING".format(port))
if channel_connectivity == grpc.ChannelConnectivity.READY:
helper_logger.log_notice("gRPC port {} state changed to READY".format(port))
if channel_connectivity == grpc.ChannelConnectivity.IDLE:
helper_logger.log_notice("gRPC port {} state changed to IDLE".format(port))
if channel_connectivity == grpc.ChannelConnectivity.SHUTDOWN:
helper_logger.log_notice("gRPC port {} state changed to SHUTDOWN".format(port))

stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel)

channel_ready = grpc.channel_ready_future(channel)
if type == "secure":
credential = get_grpc_credentials(level, kvp)
target_name = kvp.get("grpc_ssl_credential", None)
if credential is None or target_name is None:
return (None, None)

try:
channel_ready.result(timeout=2)
except grpc.FutureTimeoutError:
channel = None
stub = None
else:
break
GRPC_CLIENT_OPTIONS.append(('grpc.ssl_target_name_override', '{}'.format(target_name)))

channel = grpc.secure_channel("{}:{}".format(soc_ip, GRPC_PORT), credential, options=GRPC_CLIENT_OPTIONS)
else:
channel = grpc.insecure_channel("{}:{}".format(soc_ip, GRPC_PORT), options=GRPC_CLIENT_OPTIONS)


stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel)


if channel is not None:
channel.subscribe(wait_for_state_change)

#connect_channel(channel, stub, port)
"""
Comment the connect channel call for now, since it is not required for normal gRPC I/O
and all use cases work without it.
TODO: check if this subroutine call can be ommitted for all use cases in future enhancements
"""

return channel, stub

Expand Down Expand Up @@ -490,12 +516,12 @@ def setup_grpc_channel_for_port(port, soc_ip):
kvp = dict(fvs)


channel, stub = create_channel(type, level, kvp, soc_ip)
channel, stub = create_channel(type, level, kvp, soc_ip, port)

if stub is None:
helper_logger.log_warning("stub was not setup for gRPC soc ip {} port {}, no gRPC soc server running ?".format(soc_ip, port))
if channel is None:
helper_logger.log_warning("channel was not setup for gRPC soc ip {} port {}, no gRPC server running ?".format(soc_ip, port))
helper_logger.log_warning("channel was not setup for gRPC soc ip {} port {}, no gRPC soc server running ?".format(soc_ip, port))

return channel, stub

Expand Down

0 comments on commit 3acb171

Please sign in to comment.