Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aiogrpc RuntimeError #698

Open
ShafSpecs opened this issue May 14, 2024 · 28 comments · May be fixed by #712
Open

aiogrpc RuntimeError #698

ShafSpecs opened this issue May 14, 2024 · 28 comments · May be fixed by #712

Comments

@ShafSpecs
Copy link
Contributor

Hello, so I was building a simple GCS. A simple UI on the front and a python server at the back.

Using websocket to handle commands and Server-Sent Events (SSE) for unidirectional information (telemetry). Whilst building out the telemetry aspect, I noticed that RuntimeError randomly got thrown.

Exception ignored in: <function WrappedIterator.__del__ at 0x00000202DCC768E0>
Traceback (most recent call last):
  File "C:\Users\user\app\.venv\Lib\site-packages\aiogrpc\utils.py", line 168, in __del__
    self._stream_executor.shutdown()
  File "C:\Python312\Lib\concurrent\futures\thread.py", line 238, in shutdown
    t.join()
  File "C:\Python312\Lib\threading.py", line 1144, in join
    raise RuntimeError("cannot join current thread")
RuntimeError: cannot join current thread

Depending on how I structured the SSE route, it got thrown frequently or once in a while. A basic structure:

# sse route
    async def event_generator():
        while True:
            if await request.is_disconnected():
                print("Gracefully stopping stream...")
                break

            # uncommenting these lines instead gets the error thrown more frequently
            #
            # battery = ensure_future(get_battery())
            # gps = ensure_future(get_gps())
            # position = ensure_future(get_position())
            # heading = ensure_future(get_heading())
            # flight_mode = ensure_future(get_flight_mode())
            # fw_metrics = ensure_future(get_fw_metrics())
            # # is_in_air = ensure_future(get_is_in_air())
            # # landed_state = ensure_future(get_landed_state())
            # # mission_progress = ensure_future(get_mission_progress())
            # home = ensure_future(get_home())
            # health = ensure_future(get_health())
            # # status = ensure_future(get_status_text())
            # rc_status = ensure_future(get_rc_status())

            battery = await get_battery()
            gps = await get_gps()
            position = await get_position()
            heading = await get_heading()
            flight_mode = await get_flight_mode()
            fw_metrics = await get_fw_metrics()
            # is_in_air = ensure_future(get_is_in_air())
            # landed_state = ensure_future(get_landed_state())
            # mission_progress = ensure_future(get_mission_progress())
            home = await get_home()
            health = await get_health()
            # status = ensure_future(get_status_text())
            rc_status = await get_rc_status()

            yield encode({
                "event": "telemetry",
                "data": {
                    "battery": battery,
                    "gps": gps,
                    "position": position,
                    "heading": heading,
                    "flight_mode": flight_mode,
                    "fw_metrics": fw_metrics,
                    # "is_in_air": await is_in_air,
                    # "landed_state": await landed_state,
                    # "mission_progress": await mission_progress,
                    "home": home,
                    "health": health,
                    # "info": info,
                    # "status": await status,
                    "rc_status": rc_status,
                }
            }).decode("utf-8")

            await sleep(.5)

    res = EventSourceResponse(
        content=event_generator(),
        status_code=200,
        media_type="text/event-stream",
        headers={
            "Content-Type": "text/event-stream",
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",
            "Access-Control-Allow-Origin": "*",
            "Access-Control-Allow-Credentials": "true",
        }
    )
    return res

The error comes from mavsdk dependency: aiogrpc. I am not well-versed with multi-threading in python but I assume the issue stems from aiogrpc creating its own threads and attempting to join them. Is there any way I can disable that? Possibly use asyncio fully for running tasks on a separate thread? Or maybe find a fix I am overlooking. Thanks

@ShafSpecs
Copy link
Contributor Author

Digging further, it seems the error is thrown (raised) internally as wrapping the whole code in a try...except block doesn't catch it.

@julianoes
Copy link
Collaborator

I have seen similar annoying exceptions when using it more recently.

Can you run mavsdk_server separate and check whether it doesn't crash or lockup?

@julianoes
Copy link
Collaborator

I ended up doing something like this, not sure if that would help your case:

# When mavsdk_server, which runs separately, crashes, we need to redo all
# mavsdk subscriptions. To do so, it's easiest just to restart this app
# entirely. We can detect that mavsdk_server has restarted by reading through
# the# aiogrpc.utils logs. Unfortunately, I didn't find a better way to catch
# this case.
class ErrorLogHandler(logging.Handler):
    def emit(self, record):
        log_entry = self.format(record)
        if "grpc exception" in log_entry and "StatusCode.UNAVAILABLE" in log_entry:
            print("Triggering restart due to gRPC failure.")
            loop = asyncio.get_event_loop()
            loop.stop()


aio_logger = logging.getLogger("aiogrpc.utils")
aio_logger.addHandler(ErrorLogHandler())

@ShafSpecs
Copy link
Contributor Author

ShafSpecs commented May 15, 2024

I have seen similar annoying exceptions when using it more recently.

Can you run mavsdk_server separate and check whether it doesn't crash or lockup?

That's what I am doing. I am using windows which means I have to start the server process right before each connection:

async def start_mavsdk_server(connection_url: str, port: int):
    cmd = [resource_path('include/mavsdk_server.exe'), "-p", str(port), connection_url]

    mavsdk_server_process = Popen(
        cmd,
        stdout=PIPE,
        stderr=PIPE,
        universal_newlines=True,
        start_new_session=True,
    )

    return mavsdk_server_process

Still raises the exception

@ShafSpecs
Copy link
Contributor Author

I ended up doing something like this, not sure if that would help your case:

# When mavsdk_server, which runs separately, crashes, we need to redo all
# mavsdk subscriptions. To do so, it's easiest just to restart this app
# entirely. We can detect that mavsdk_server has restarted by reading through
# the# aiogrpc.utils logs. Unfortunately, I didn't find a better way to catch
# this case.
class ErrorLogHandler(logging.Handler):
    def emit(self, record):
        log_entry = self.format(record)
        if "grpc exception" in log_entry and "StatusCode.UNAVAILABLE" in log_entry:
            print("Triggering restart due to gRPC failure.")
            loop = asyncio.get_event_loop()
            loop.stop()


aio_logger = logging.getLogger("aiogrpc.utils")
aio_logger.addHandler(ErrorLogHandler())

Would try adding this to my event generator and seeing if it helps 👍. Does this restart the entire app or just the mavsdk server? Due to the nature of sockets, restarting the entire app would pose an issue due to socket client behaviour.

Would I also need to restart mavsdk server manually after stopping the event loop?

@julianoes
Copy link
Collaborator

For me this restarts the full app which is a quart server in my case.

@dp1140a
Copy link

dp1140a commented May 20, 2024

I get a similar error when running the examples. However it seems to be intermittent (The worst kind of error :) ) Takeoff and Land:

#!/usr/bin/env python3

import asyncio
from mavsdk import System


async def run():

    drone = System()
    await drone.connect(system_address="udp://:14540")

    status_text_task = asyncio.ensure_future(print_status_text(drone))

    print("Waiting for drone to connect...")
    async for state in drone.core.connection_state():
        if state.is_connected:
            print(f"-- Connected to drone!")
            break

    print("Waiting for drone to have a global position estimate...")
    async for health in drone.telemetry.health():
        if health.is_global_position_ok and health.is_home_position_ok:
            print("-- Global position estimate OK")
            break

    print("-- Arming")
    await drone.action.arm()

    print("-- Taking off")
    await drone.action.takeoff()

    await asyncio.sleep(10)

    print("-- Landing")
    await drone.action.land()

    #status_text_task.cancel()


async def print_status_text(drone):
    try:
        async for status_text in drone.telemetry.status_text():
            print(f"Status: {status_text.type}: {status_text.text}")
    except asyncio.CancelledError:
        return


if __name__ == "__main__":
    # Run the asyncio loop
    asyncio.run(run())

It runs seemingly ok. Drone takes off and lands. but then it throws the error:

python test1.py 
Waiting for drone to connect...
-- Connected to drone!
Waiting for drone to have a global position estimate...
-- Global position estimate OK
-- Arming
-- Taking off
Status: INFO: Armed by external command	
Status: INFO: Using default takeoff altitude: 10.0 m	
Status: INFO: [logger] ./log/2024-05-20/23_47_00.ulg	
Status: INFO: Using default takeoff altitude: 10.0 m	
Status: INFO: Takeoff detected	
-- Landing
Exception ignored in: <function WrappedIterator.__del__ at 0x710c78765750>
Traceback (most recent call last):
  File "/home/dave/.local/lib/python3.10/site-packages/aiogrpc/utils.py", line 168, in __del__
    self._stream_executor.shutdown()
  File "/usr/lib/python3.10/concurrent/futures/thread.py", line 235, in shutdown
    t.join()
  File "/usr/lib/python3.10/threading.py", line 1093, in join
    raise RuntimeError("cannot join current thread")
RuntimeError: cannot join current thread

@julianoes
Copy link
Collaborator

@JonasVautherin any ideas?

@ShafSpecs
Copy link
Contributor Author

Getting a new error, still stemming from aiogrpc:

__anext__ grpc exception
Traceback (most recent call last):
  File "C:\Users\user\project\.venv\Lib\site-packages\aiogrpc\utils.py", line 145, in cb
    fut.result()
  File "C:\Python312\Lib\concurrent\futures\thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\user\project\.venv\Lib\site-packages\aiogrpc\utils.py", line 131, in _next
    return next(self._iterator)
           ^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\user\project\.venv\Lib\site-packages\grpc\_channel.py", line 542, in __next__
    return self._next()
           ^^^^^^^^^^^^
  File "C:\Users\user\project\.venv\Lib\site-packages\grpc\_channel.py", line 968, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.UNKNOWN
	details = "Unexpected error in RPC handling"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2024-05-28T11:11:28.0840526+00:00", grpc_status:2, grpc_message:"Unexpected error in RPC handling"}"
>

This occurs more rarely, but it gives my systems a bit of unpredictability when trying to read telemetry consistently

Any ideas? cc @JonasVautherin

@ShafSpecs
Copy link
Contributor Author

ShafSpecs commented May 28, 2024

Overall, I am thinking if it's possible to dump aiogrpc (which hasn't been updated in 4 years) and roll with something like: grpclib which still receives updates and works natively with asyncio? I am not sure how tightly coupled this project is with aiogrpc. Would take a look at the feasibility of such conversion 👍

@ShafSpecs
Copy link
Contributor Author

Overall, I am thinking if it's possible to dump aiogrpc (which hasn't been updated in 4 years) and roll with something like: grpclib which still receives updates and works natively with asyncio? I am not sure how tightly coupled this project is with aiogrpc. Would take a look at the feasibility of such conversion 👍

It doesn't look too complicated to facilitate a switch, but I do not know what I am doing yet. aiogrpc is used in just the async plugin manager. Not sure about the pbs files generation though, as grpclib does have a protoc plugin: https://github.com/vmagamedov/grpclib#protoc-plugin

@julianoes
Copy link
Collaborator

I didn't realize we could swap out aiogrpc. That sounds reasonable.

@ShafSpecs
Copy link
Contributor Author

I didn't realize we could swap out aiogrpc. That sounds reasonable.

Is this something you intend on working on in the near future?

@julianoes
Copy link
Collaborator

Probably not in the next couple of weeks. Is this urgent for you, or are you asking to try yourself?

@ShafSpecs
Copy link
Contributor Author

Probably not in the next couple of weeks. Is this urgent for you, or are you asking to try yourself?

Both. It's a bit urgent, and I am wondering if I can do it. Asides changing imports in the async plugin manager, I am not sure if I would need to re-generate any file or the current proto files generation are good to go

@JonasVautherin
Copy link
Collaborator

If it has a similar API to aiogrpc, maybe it's actually easy to swap 👍. I would say just try it and see where it goes 😊.

@julianoes
Copy link
Collaborator

I wouldn't know but all I find is this: https://github.com/search?q=repo%3Amavlink%2FMAVSDK-Python%20aiogrpc&type=code
Does that make sense @JonasVautherin? I don't actually know all the pieces.

But at @ShafSpecs, I assume you're aware of how to do the generation as outlined here:
https://github.com/mavlink/MAVSDK-Python?tab=readme-ov-file#generate-the-code

@ShafSpecs
Copy link
Contributor Author

But at @ShafSpecs, I assume you're aware of how to do the generation as outlined here: https://github.com/mavlink/MAVSDK-Python?tab=readme-ov-file#generate-the-code

I am on windows, and it just hit me at the moment that I won't be able to run the necessary scripts. Maybe I could convert them to ps scripts? I don't know and the bash scripts seem to utilise native unix modules. I might just wait out the few weeks before you can implement it

@julianoes
Copy link
Collaborator

I would just do it in WSL2 or docker if I was you, but up to you.

@ShafSpecs
Copy link
Contributor Author

ShafSpecs commented Jun 17, 2024

Apparently, the official grpc package supports asyncio now so I didn't even need an external dependency or re-build protoc files. Ran a few examples locally and most work (though a few examples, like calibration, freezes indefinitely for some reason - seems like out of scope for this issue tho)! Is there a way I can move this local mavsdk into my original project to compare vs current release?

I also noticed that when I setup the package for local development, it correctly links to mavsdk_server, meaning I don't need to start it up myself. But in the actual package, it needs to be manually started on Windows. Do you know the reason why? I suppose I can also try and tackle that issue too

@ShafSpecs
Copy link
Contributor Author

ShafSpecs commented Jun 18, 2024

Ran a few examples locally and most work (though a few examples, like calibration, freezes indefinitely for some reason - seems like out of scope for this issue tho)!

Checking the git history, most of the grpc files were generated ~5 yrs ago (as well as outdated dependencies). I am trying to regenerate it on my end but running into an issue with protoc-gen-mavsdk. Could regenerating all the files possibly fix the issue? I mean a few things might have changed between now and then. Not sure though. The freezing is very worrying

@JonasVautherin
Copy link
Collaborator

Not sure what you mean. Most of these files are autogenerated. Some haven't changed in years, but that means that the API is stable 😊.

Don't forget that MAVSDK-Python just relays the API to MAVSDK-C++. E.g. the API of the camera plugin hasn't changed much (that's API stability), but the implementation (in C++) has changed.

@ShafSpecs
Copy link
Contributor Author

ShafSpecs commented Jun 18, 2024

Not sure what you mean. Most of these files are autogenerated. Some haven't changed in years, but that means that the API is stable 😊.

Oh okay. There is a dependency conflict currently in the project that was making me wonder, in the dev requirements - caused by protoc-gen-mavsdk. The outdated dependencies made me wonder if perhaps it's not the mavlink APIs but rather python APIs that is causing most of the issues
image

Note

I tried to vary versions, but protoc-gen-mavsdk depends on a very old version of the protobuf package

grpcio and grpcio-tools seem to have updated a lot of their APIs, python included (deprecation of the distutils package, for example in >=3.11). I am wondering wether those upgrades and deprecations is what is leading to random behaviours from the generated python files. I had to make some changes to even make setup.py and a few build files to make building work. I am not sure if this would fix things, but I am suggesting updating protoc-gen-mavsdk generally and re-running run_protoc.sh.

Edit:
You worked on protoc-gen-mavsdk pb plugin quite a lot, Jonas 😄. Impressive! I have no idea what I am looking at. But the protobuf version seems to be limited on purpose

protobuf>=3.13,<=3.20.1
jinja2>=2.11

@ShafSpecs
Copy link
Contributor Author

ShafSpecs commented Jun 18, 2024

Attempted to fix the issues I noticed in the pb_plugin in this PR: mavlink/MAVSDK-Proto#347. It should fix the issues with dependency conflict. Not sure if there are affecting breaking changes though, the tests all pass now though

@ShafSpecs
Copy link
Contributor Author

ShafSpecs commented Jun 20, 2024

I was able to regenerate all files as well as remove aiogrpc completely, in spite of all that, there are a few examples that still hang up indefinitely (e.g calibration)

In light of this, I have decided to open a PR (soon) in this repo to address the aiogrpc issue and leave the rest for another time. The freezing is a baffling issue that seems to have no reason, and it also means that some features of mavsdk unusable (e.g mission tracking, etc.). Thank you

@ShafSpecs ShafSpecs linked a pull request Jun 20, 2024 that will close this issue
@ShafSpecs
Copy link
Contributor Author

@JonasVautherin you work at Auterion, right? Love the company btw, a favorite of mine.

How do you guys handle the MAVLink part of communication? Do you use the CPP version of MAVSDK straight up? Or mavlink? I want to work with mavsdk without having to pre-check working/non-working plugins. Which would you advise? Thanks for the help

@julianoes
Copy link
Collaborator

How do you guys handle the MAVLink part of communication? Do you use the CPP version of MAVSDK straight up?

What exactly do you mean? I mostly use MAVSDK C++ unless I specifically need Python. Use cases for that for me are quick scripting, or recently a quart webserver backend.

@ShafSpecs
Copy link
Contributor Author

ShafSpecs commented Jun 22, 2024

What exactly do you mean? I mostly use MAVSDK C++ unless I specifically need Python. Use cases for that for me are quick scripting, or recently a quart webserver backend.

Ah okay. I was taking a look at the python and the cpp of mavsdk. And the cpp seems more fleshed out. I was using electron and Python previously for my simple gcs but I am considering Qt instead thanks to the C++ backend - with mavsdk-cpp

Plus, it allows me to learn cpp. I'm much familiar with Rust than CPP

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants