Skip to content

Intermittent TestStreamManager "job already exists" error on Windows #6933

@mhucka

Description

@mhucka

Description of the issue

A recent run of ci.yaml produced a pytest error on Windows:

_ TestStreamManager.test_submit_job_already_exist_expects_get_result_request __
[gw0] win32 -- Python 3.10.11 C:\hostedtoolcache\windows\Python\3.10.11\x64\python.exe

self = <cirq_google.engine.stream_manager_test.TestStreamManager object at 0x000002AB0FE40970>
client_constructor = <MagicMock name='QuantumEngineServiceAsyncClient' spec='QuantumEngineServiceAsyncClient' id='2934272411344'>

    @mock.patch.object(quantum, 'QuantumEngineServiceAsyncClient', autospec=True)
    def test_submit_job_already_exist_expects_get_result_request(self, client_constructor):
        """Verifies the behavior when the client receives a JOB_ALREADY_EXISTS error.
    
        This error is only expected to be triggered in the following race condition:
        1. The client sends a CreateQuantumProgramAndJobRequest.
        2. The client's stream disconnects.
        3. The client retries with a new stream and a GetQuantumResultRequest.
        4. The job doesn't exist yet, and the client receives a "job not found" error.
        5. Scheduler creates the program and job.
        6. The client retries with a CreateJobRequest and fails with a "job already exists" error.
    
        The JOB_ALREADY_EXISTS error from `CreateQuantumJobRequest` is only possible if the job
        doesn't exist yet at the last `GetQuantumResultRequest`.
        """
        expected_result = quantum.QuantumResult(parent='projects/proj/programs/prog/jobs/job0')
        fake_client, manager = setup(client_constructor)
    
        async def test():
            async with duet.timeout_scope(5):
                actual_result_future = manager.submit(
                    REQUEST_PROJECT_NAME, REQUEST_PROGRAM, REQUEST_JOB0
                )
                await fake_client.wait_for_requests()
                await fake_client.reply(google_exceptions.ServiceUnavailable('unavailable'))
                await fake_client.wait_for_requests()
                # Trigger a retry with `CreateQuantumJobRequest`.
                await fake_client.reply(
                    quantum.QuantumRunStreamResponse(
                        error=quantum.StreamError(code=quantum.StreamError.Code.JOB_DOES_NOT_EXIST)
                    )
                )
                await fake_client.wait_for_requests()
                await fake_client.reply(
                    quantum.QuantumRunStreamResponse(
                        error=quantum.StreamError(code=quantum.StreamError.Code.JOB_ALREADY_EXISTS)
                    )
                )
                await fake_client.wait_for_requests()
                await fake_client.reply(quantum.QuantumRunStreamResponse(result=expected_result))
                actual_result = await actual_result_future
                manager.stop()
    
                assert actual_result == expected_result
                assert len(fake_client.all_stream_requests) == 4
                assert 'create_quantum_program_and_job' in fake_client.all_stream_requests[0]
                assert 'get_quantum_result' in fake_client.all_stream_requests[1]
                assert 'create_quantum_job' in fake_client.all_stream_requests[2]
                assert 'get_quantum_result' in fake_client.all_stream_requests[3]
    
>       duet.run(test)

cirq-google\cirq_google\engine\stream_manager_test.py:647: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\site-packages\duet\api.py:60: in run
    scheduler.tick()
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\site-packages\duet\impl.py:406: in tick
    task.advance()
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\site-packages\duet\impl.py:129: in advance
    f = next(self._generator)
cirq-google\cirq_google\engine\stream_manager_test.py:620: in test
    await fake_client.wait_for_requests()
cirq-google\cirq_google\engine\stream_manager_test.py:142: in wait_for_requests
    requests.append(await self._request_buffer.__anext__())
C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\site-packages\duet\aitertools.py:109: in __anext__
    await self._waiter
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <AwaitableFuture at 0x2ab2fffc3a0 state=cancelled>

    def __await__(self) -> Generator["AwaitableFuture[T]", None, T]:
>       yield self
E       TimeoutError

C:\hostedtoolcache\windows\Python\3.10.11\x64\lib\site-packages\duet\futuretools.py:87: TimeoutError

This seems to be a random failure. The PR was #6900. The log can be found at
https://github.com/quantumlib/Cirq/actions/runs/12695629658/job/35387939180?pr=6900.

Although this was a random flake, we should look if there's a way to guard against it.

How to reproduce the issue

I can't reproduce this deliberately. The frequency of occurrence is unknown.

Cirq version

1.5.0.dev

Metadata

Metadata

Assignees

Labels

kind/bug-reportSomething doesn't seem to work.triage/discussNeeds decision / discussion, bring these up during Cirq Cynque

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions