Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade to anyio 3.x #33

Merged
merged 5 commits into from
Apr 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
matrix:
os: [ubuntu-latest]
python-version: ['3.6', '3.7', '3.8'] # 'pypy-3.6'
backend: [asyncio, curio, trio, uvloop]
backend: [asyncio, trio, uvloop]
steps:
- uses: actions/checkout@v3
- name: Setup Python
Expand All @@ -29,7 +29,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, macos-latest]
backend: [asyncio, curio, trio, uvloop]
backend: [asyncio, trio, uvloop]
steps:
- uses: actions/checkout@v3
- name: Setup Python
Expand Down
23 changes: 13 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
Asynchronous pure Python gRPC client and server implementation supporting
[asyncio](https://docs.python.org/3/library/asyncio.html),
[uvloop](https://github.com/MagicStack/uvloop),
[curio](https://github.com/dabeaz/curio) and
[trio](https://github.com/python-trio/trio) (achieved with [anyio](https://github.com/agronholm/anyio) compatibility layer).

## Requirements
Expand All @@ -29,7 +28,7 @@ Latest development version:
pip install git+https://github.com/standy66/purerpc.git
```

By default purerpc uses asyncio event loop, if you want to use uvloop, curio or trio, please install them manually.
By default purerpc uses asyncio event loop, if you want to use uvloop or trio, please install them manually.

## protoc plugin

Expand Down Expand Up @@ -57,9 +56,9 @@ Just mark yielding coroutines with `@async_generator` decorator and use `await y
### Server

```python
from purerpc import Server
from greeter_pb2 import HelloRequest, HelloReply
from greeter_grpc import GreeterServicer
from purerpc import Server


class Greeter(GreeterServicer):
Expand All @@ -71,15 +70,17 @@ class Greeter(GreeterServicer):
yield HelloReply(message=f"Hello, {message.name}")


server = Server(50055)
server.add_service(Greeter().service)
server.serve(backend="asyncio") # backend can also be one of: "uvloop", "curio", "trio"
if __name__ == '__main__':
server = Server(50055)
server.add_service(Greeter().service)
# NOTE: if you already have an async loop running, use "await server.serve_async()"
import anyio
anyio.run(server.serve_async) # or set explicit backend="asyncio" or "trio"
```

### Client

```python
import anyio
import purerpc
from greeter_pb2 import HelloRequest, HelloReply
from greeter_grpc import GreeterStub
Expand All @@ -90,7 +91,7 @@ async def gen():
yield HelloRequest(name=str(i))


async def main():
async def listen():
async with purerpc.insecure_channel("localhost", 50055) as channel:
stub = GreeterStub(channel)
reply = await stub.SayHello(HelloRequest(name="World"))
Expand All @@ -100,8 +101,10 @@ async def main():
print(reply.message)


if __name__ == "__main__":
anyio.run(main, backend="asyncio") # backend can also be one of: "uvloop", "curio", "trio"
if __name__ == '__main__':
# NOTE: if you already have an async loop running, use "await listen()"
import anyio
anyio.run(listen) # or set explicit backend="asyncio" or "trio"
```

You can mix server and client code, for example make a server that requests something using purerpc from another gRPC server, etc.
Expand Down
3 changes: 1 addition & 2 deletions misc/greeter/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ async def worker(channel):


async def main_coro():
# await curio.spawn(print_memory_growth_statistics(), daemon=True)
async with purerpc.insecure_channel("localhost", 50055) as channel:
for _ in range(100):
start = time.time()
async with anyio.create_task_group() as task_group:
for _ in range(100):
await task_group.spawn(worker, channel)
task_group.start_soon(worker, channel)
print("RPS: {}".format(10000 / (time.time() - start)))


Expand Down
10 changes: 5 additions & 5 deletions misc/greeter/failing_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async def do_load_unary(result_queue, stub, num_requests, message_size):
result = (await stub.SayHello(greeter_pb2.HelloRequest(name=message))).message
assert (len(result) == message_size)
avg_latency = (time.time() - start) / num_requests
await result_queue.put(avg_latency)
await result_queue.send(avg_latency)


async def do_load_stream(result_queue, stub, num_requests, message_size):
Expand All @@ -30,7 +30,7 @@ async def do_load_stream(result_queue, stub, num_requests, message_size):
avg_latency = (time.time() - start) / num_requests
await stream.close()
await stream.receive_message()
await result_queue.put(avg_latency)
await result_queue.send(avg_latency)


async def worker(port, num_concurrent_streams, num_requests_per_stream,
Expand All @@ -45,17 +45,17 @@ async def worker(port, num_concurrent_streams, num_requests_per_stream,
raise ValueError(f"Unknown load type: {load_type}")
for idx in range(num_rounds):
start = time.time()
task_results = anyio.create_queue(sys.maxsize)
send_queue, receive_queue = anyio.create_memory_object_stream(max_buffer_size=sys.maxsize)
agronholm marked this conversation as resolved.
Show resolved Hide resolved
async with anyio.create_task_group() as task_group:
for _ in range(num_concurrent_streams):
await task_group.spawn(load_fn, task_results, stub, num_requests_per_stream, message_size)
task_group.start_soon(load_fn, send_queue, stub, num_requests_per_stream, message_size)
end = time.time()

rps = num_concurrent_streams * num_requests_per_stream / (end - start)

latencies = []
for _ in range(num_concurrent_streams):
latencies.append(await task_results.get())
latencies.append(await receive_queue.receive())

print("Round", idx, "rps", rps, "avg latency", 1000 * sum(latencies) / len(latencies))

Expand Down
10 changes: 5 additions & 5 deletions misc/greeter/test_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async def do_load_unary(result_queue, stub, num_requests, message_size):
result = (await stub.SayHello(HelloRequest(name=message))).message
assert (len(result) == message_size)
avg_latency = (time.time() - start) / num_requests
await result_queue.put(avg_latency)
await result_queue.send(avg_latency)


async def do_load_stream(result_queue, stub, num_requests, message_size):
Expand All @@ -43,7 +43,7 @@ async def do_load_stream(result_queue, stub, num_requests, message_size):
avg_latency = (time.time() - start) / num_requests
await stream.close()
await stream.receive_message()
await result_queue.put(avg_latency)
await result_queue.send(avg_latency)


async def worker(port, queue, num_concurrent_streams, num_requests_per_stream,
Expand All @@ -58,16 +58,16 @@ async def worker(port, queue, num_concurrent_streams, num_requests_per_stream,
raise ValueError(f"Unknown load type: {load_type}")
for _ in range(num_rounds):
start = time.time()
task_results = anyio.create_queue(sys.maxsize)
send_queue, receive_queue = anyio.create_memory_object_stream(max_buffer_size=sys.maxsize)
async with anyio.create_task_group() as task_group:
for _ in range(num_concurrent_streams):
await task_group.spawn(load_fn, task_results, stub, num_requests_per_stream, message_size)
task_group.start_soon(load_fn, send_queue, stub, num_requests_per_stream, message_size)
end = time.time()
rps = num_concurrent_streams * num_requests_per_stream / (end - start)
queue.put(rps)
results = []
for _ in range(num_concurrent_streams):
results.append(await task_results.get())
results.append(await receive_queue.receive())
queue.put(results)
queue.close()
queue.join_thread()
Expand Down
18 changes: 13 additions & 5 deletions requirements_test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,23 @@
#
# pip-compile --extra=test --output-file=requirements_test.txt setup.py
#
anyio==1.4.0
anyio==3.5.0
# via purerpc (setup.py)
async-exit-stack==1.0.1
# via purerpc (setup.py)
async-generator==1.10
# via
# anyio
# purerpc (setup.py)
# trio
attrs==21.4.0
# via
# outcome
# pytest
# trio
curio==1.5
# via purerpc (setup.py)
cffi==1.15.0
# via cryptography
cryptography==36.0.2
# via trustme
grpcio==1.44.0
# via
# grpcio-tools
Expand All @@ -36,6 +37,7 @@ idna==3.3
# via
# anyio
# trio
# trustme
importlib-metadata==4.11.3
# via
# pluggy
Expand All @@ -54,6 +56,8 @@ protobuf==3.20.0
# purerpc (setup.py)
py==1.11.0
# via pytest
pycparser==2.21
# via cffi
pyparsing==3.0.8
# via packaging
pytest==7.1.1
Expand All @@ -74,8 +78,12 @@ tomli==2.0.1
# via pytest
trio==0.20.0
# via purerpc (setup.py)
trustme==0.9.0
# via purerpc (setup.py)
typing-extensions==4.1.1
# via importlib-metadata
# via
# anyio
# importlib-metadata
uvloop==0.16.0
# via purerpc (setup.py)
zipp==3.8.0
Expand Down
18 changes: 14 additions & 4 deletions requirements_test_py36.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,30 @@
#
# pip-compile --output-file=requirements_test_py36.txt requirements_test.in setup.py
#
anyio==1.4.0
anyio==3.5.0
# via purerpc (setup.py)
async-exit-stack==1.0.1
# via purerpc (setup.py)
async-generator==1.10
# via
# anyio
# purerpc (setup.py)
# trio
attrs==21.4.0
# via
# outcome
# pytest
# trio
cffi==1.15.0
# via cryptography
contextvars==2.4
# via
# anyio
# sniffio
# trio
curio==1.4
# via -r requirements_test.in
cryptography==36.0.2
# via trustme
dataclasses==0.8
# via anyio
grpcio==1.44.0
# via
# -r requirements_test.in
Expand All @@ -40,6 +44,7 @@ idna==3.3
# via
# anyio
# trio
# trustme
immutables==0.17
# via contextvars
importlib-metadata==4.8.3
Expand All @@ -60,6 +65,8 @@ protobuf==3.19.4
# purerpc (setup.py)
py==1.11.0
# via pytest
pycparser==2.21
# via cffi
pyparsing==3.0.8
# via packaging
pytest==7.0.1
Expand All @@ -80,8 +87,11 @@ tomli==1.2.3
# via pytest
trio==0.19.0
# via -r requirements_test.in
trustme==0.9.0
# via -r requirements_test.in
typing-extensions==4.1.1
# via
# anyio
# immutables
# importlib-metadata
uvloop==0.14.0
Expand Down
6 changes: 3 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def main():
version=__version__,
license="Apache License Version 2.0",
description=("Asynchronous pure Python gRPC client and server implementation "
"supporting asyncio, uvloop, curio and trio"),
"supporting asyncio, uvloop, trio"),
long_description='%s\n%s' % (
re.compile('^.. start-badges.*^.. end-badges', re.M | re.S).sub('', read('README.md')),
re.sub(':[a-z]+:`~?(.*?)`', r'``\1``', read('RELEASE.md'))
Expand Down Expand Up @@ -64,7 +64,7 @@ def main():
install_requires=[
"h2>=3.1.0,<4",
"protobuf>=3.5.1",
"anyio>=1.0.0,<2", # TODO: anyio 3.x upgrade
"anyio>=3.0.0",
"async_exit_stack>=1.0.1",
"tblib>=1.3.2",
"async_generator>=1.10",
Expand All @@ -79,8 +79,8 @@ def main():
"grpcio_tools>=1.25.0", # same here
"uvloop",
"trio>=0.11",
"curio>=0.9",
"python-forge>=18.6",
"trustme",
]
},
)
Expand Down
6 changes: 3 additions & 3 deletions src/purerpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ def __init__(self, host, port, ssl_context=None):
super().__init__()
self._host = host
self._port = port
self._ssl = ssl_context
self._ssl_context = ssl_context
self._grpc_socket = None

async def __aenter__(self):
await super().__aenter__() # Does nothing
socket = await anyio.connect_tcp(self._host, self._port,
ssl_context=self._ssl,
autostart_tls=self._ssl is not None,
ssl_context=self._ssl_context,
tls=self._ssl_context is not None,
tls_standard_compatible=False)
config = GRPCConfiguration(client_side=True)
self._grpc_socket = await self.enter_async_context(GRPCProtoSocket(config, socket))
Expand Down