Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race Conditions when using OpCodes and session.status #607

Closed
jlashner opened this issue Jan 5, 2024 · 9 comments
Closed

Race Conditions when using OpCodes and session.status #607

jlashner opened this issue Jan 5, 2024 · 9 comments

Comments

@jlashner
Copy link
Collaborator

jlashner commented Jan 5, 2024

I'm encountering the following issue when trying to fix tests for the HWP PMX agent:

The following test is failing the assertion that assert resp.session['op_code'] == OpCode.SUCCEEDED.value

@pytest.mark.integtest
def test_hwp_rotation_set_on(wait_for_crossbar, kikusui_emu, run_agent, client):
    responses = {'output 1': '',
                 'output?': '1'}

    kikusui_emu.define_responses(responses)
    resp = client.set_on()
    print(resp)
    print(resp.session)
    assert resp.status == ocs.OK
    assert resp.session['op_code'] == OpCode.SUCCEEDED.value

with the following error message:

default response set to 'None'
msg='output 1'
response=''
msg='output?'
response='1'
OCSReply: OK : Operation "set_on" is currently not running (SUCCEEDED).
  set_on[session=2]; status=starting for 1.0 s
  messages (1 of 1):
    1704472073.685 Status is now "starting".
  other keys in .session: op_code, data
{'session_id': 2, 'op_name': 'set_on', 'op_code': 2, 'status': 'starting', 'success': True, 'start_time': 1704472073.6851099, 'end_time': None, 'data': None, 'messages': [[1704472073.6851099, 'Status is now "starting".']]}
F

================================================== FAILURES ==================================================
__________________________________________ test_hwp_rotation_set_on __________________________________________

wait_for_crossbar = None, kikusui_emu = <socs.testing.device_emulator.DeviceEmulator object at 0x7ff6fdd3bd60>
run_agent = None, client = OCSClient('hwp-pmx')

    @pytest.mark.integtest
    def test_hwp_rotation_set_on(wait_for_crossbar, kikusui_emu, run_agent, client):
        responses = {'output 1': '',
                     'output?': '1'}
    
        kikusui_emu.define_responses(responses)
        resp = client.set_on()
        print(resp)
        print(resp.session)
    
        # import time
        # time.sleep(1)
        # resp = client.set_on.status()
    
        # print(resp)
        # print(resp.session)
    
        assert resp.status == ocs.OK
>       assert resp.session['op_code'] == OpCode.SUCCEEDED.value
E       assert 2 == 5
E        +  where 5 = <OpCode.SUCCEEDED: 5>.value
E        +    where <OpCode.SUCCEEDED: 5> = OpCode.SUCCEEDED

integration/test_hwp_pmx_agent_integration.py:52: AssertionError

Based on logs, the session.status in resp is still in the starting state, instead of done, and the OpCode is STARTING.

This following test passes:

@pytest.mark.integtest
def test_hwp_rotation_set_on(wait_for_crossbar, kikusui_emu, run_agent, client):
    responses = {'output 1': '',
                 'output?': '1'}

    kikusui_emu.define_responses(responses)
    resp = client.set_on()
    import time
    time.sleep(1)
    resp = client.set_on.status()
    assert resp.status == ocs.OK
    assert resp.session['op_code'] == OpCode.SUCCEEDED.value

I think the change that instigated this issue is switching tasks from being blocking to non-blocking.

I think this is due to a race-condition in when wait resolves compared to when the session-status is updated. This points to a slightly deeper OCS issue that I think @mhasself has experience with (based on comments in the src)

There may be a way to fix this in OCS through use of callback-chains or something, but for now I think we need to be really careful about using OpCode or session.status in tests or for control flow.

@mhasself
Copy link
Member

mhasself commented Jan 5, 2024

Cool that code comment says "you can't trust session.d for purpose X" and then about 8 lines later we use session.d for purpose X ...

I think a quick fix is to add a second Deferred to session (session.d_done) and then at the end of these blocks, resolve it. Then wait should check and/or wait on session.d_done instead of session.d. That has proper ordering.

@jlashner
Copy link
Collaborator Author

jlashner commented Jan 5, 2024

I was thinking something along those lines, but another issue I've seen is that set_status can be asynchronous, for instance if you try to run:

session.set_status('running')
print(session.status)

this will not update immediately, since it is being scheduled to run in the main thread.

Like in the testing example above, when the response is returned from wait (without the sleep) it looks like this:

{'session_id': 2, 'op_name': 'set_on', 'op_code': 2, 'status': 'starting', 'success': True, 'start_time': 1704472073.6851099, 'end_time': None, 'data': None, 'messages': [[1704472073.6851099, 'Status is now "starting".']]}

Meaning that session.success was updated in the _handle_return_task function, however the status has not updated, presumably because that was waiting to run on the main thread.

So I'm not entirely sure that when running a callback from after _handle_return_task will ensure the session.status is updated.

@mhasself
Copy link
Member

mhasself commented Jan 5, 2024

Ah, yes. I see what you mean. That will require a bit more thought to get the final Deferred to fire only once the state is fully up to date.

@mhasself
Copy link
Member

mhasself commented Jan 6, 2024

Ok, actually, I really don't understand how this could happen. And I also can't reproduce it. That session dict you show should really be impossible.

I think _handle_task_return_val will always run in the reactor. Even if your task was in a worker thread. But definitely if your task was registered with blocking=False! There are no other threads for the callback to run as, in that case. And I don't understand how you can get success=True and status='starting' then.

But that puzzle aside ^^^, I do acknowledge these other more clear cut problems:

  1. There's a race condition in wait(). I think the consequences of that should be that the encoded session has success=None and status = 'starting' / 'running'. I.e. the session snapshot is taken a moment too soon, but it is internally consistent.
  2. session.set_status(...) is asynchronous -- but only if you're running in a worker thread, not the reactor. If you're in the reactor (including in Task code registered with blocking=False), it should be instantaneous. Actually, if you're in a worker thread, set_status returns a Deferred and you can yield on it. But no code does that!
  3. There is an invalid encoded session pushed out at one point. It's due to the "add_message" call, between success=ok and set_status('done'). But this invalid session just gets put on the agent's main feed -- which, like, does anyone use that? Certainly http client.wait() does not.

Let's hack on it on next week (if we don't figure it out before then).

@jlashner
Copy link
Collaborator Author

jlashner commented Jan 6, 2024

Ok I was really curious so started poking around a bit more and think I know why this is happening....

This is due to the new structure of the OCS agents. For reference, here is the task that's causing this issue:

@defer.inlineCallbacks
def set_on(self, session, params):
"""set_on()
**Task** - Turn on the PMX Kikusui.
"""
action = Actions.SetOn(**params)
self.action_queue.put(action)
session.data = yield action.deferred
return True, 'Set PMX Kikusui on'

I think what's happening, is that the set_on function starts in the reactor thread, however yields to action.deferred, which is then taken over by the main operation, which is blocking, so is running in a worker thread. This means when the callback is called here, it is not in the reactor thread, so everything after the yield statement in the task, including _handle_task_return_vals is running in the worker thread of the main process.....

Really stretching my intuition for python and twisted here
image

To fix maybe the best way is to wrap action.deferred.callback in callFromThread?

@mhasself
Copy link
Member

mhasself commented Jan 8, 2024

To fix maybe the best way is to wrap action.deferred.callback in callFromThread?

Ahhh, that makes some sense then. Yes, wrap all three uses of action.deferred in callFromThread and I guess that should clean up the behavior.

@jlashner
Copy link
Collaborator Author

jlashner commented Jan 8, 2024

Confirmed, using callFromThread on the callback makes it so the session is self-consistent in the test above. I think the wait race-conditions are probably still worth thinking about, but for the time being I'll add this everywhere we use this structure.

@mhasself
Copy link
Member

I think ocs:371 means we can close this?

@jlashner
Copy link
Collaborator Author

Yep!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants