/
client.py
66 lines (54 loc) · 2.72 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import functools
import anyio
import async_exit_stack
from purerpc.grpc_proto import GRPCProtoSocket
from purerpc.grpclib.config import GRPCConfiguration
from purerpc.rpc import RPCSignature, Cardinality
from purerpc.wrappers import ClientStubUnaryUnary, ClientStubStreamStream, ClientStubUnaryStream, \
ClientStubStreamUnary
class _Channel(async_exit_stack.AsyncExitStack):
def __init__(self, host, port, ssl_context=None):
super().__init__()
self._host = host
self._port = port
self._ssl = ssl_context
self._grpc_socket = None
async def __aenter__(self):
await super().__aenter__() # Does nothing
socket = await anyio.connect_tcp(self._host, self._port,
ssl_context=self._ssl,
autostart_tls=self._ssl is not None,
tls_standard_compatible=False)
config = GRPCConfiguration(client_side=True)
self._grpc_socket = await self.enter_async_context(GRPCProtoSocket(config, socket))
return self
def insecure_channel(host, port):
return _Channel(host, port)
def secure_channel(host, port, ssl_context):
return _Channel(host, port, ssl_context)
class Client:
def __init__(self, service_name: str, channel: _Channel):
self.service_name = service_name
self.channel = channel
async def rpc(self, method_name: str, request_type, response_type, metadata=None):
message_type = request_type.DESCRIPTOR.full_name
if metadata is None:
metadata = ()
stream = await self.channel._grpc_socket.start_request("http", self.service_name,
method_name, message_type,
"{}:{}".format(self.channel._host,
self.channel._port),
custom_metadata=metadata)
stream.expect_message_type(response_type)
return stream
def get_method_stub(self, method_name: str, signature: RPCSignature):
stream_fn = functools.partial(self.rpc, method_name, signature.request_type,
signature.response_type)
if signature.cardinality == Cardinality.STREAM_STREAM:
return ClientStubStreamStream(stream_fn)
elif signature.cardinality == Cardinality.UNARY_STREAM:
return ClientStubUnaryStream(stream_fn)
elif signature.cardinality == Cardinality.STREAM_UNARY:
return ClientStubStreamUnary(stream_fn)
else:
return ClientStubUnaryUnary(stream_fn)