Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOW 2 #1504

Merged
merged 30 commits into from
Feb 26, 2019
Merged

SOW 2 #1504

merged 30 commits into from
Feb 26, 2019

Conversation

njsmith
Copy link
Contributor

@njsmith njsmith commented Jan 24, 2019

So far this is very much an initial sketch, but it has the split
between the service and the API, the json-over-Unix-socket API, and a
very rudimentary attempt at redoing api_runCancel based on the
API (hidden behind a feature flag).

@robnagler: This is completely untested, because it depends on #1499
do you want me to take a stab at rearranging the db_dir config, or do
you want to do it? (I think I could get something working, but I feel
like without some hints it would probably be different than how you'd
do it :-).)

simulation_db.write_result({'state': 'canceled'}, run_dir=job_info.run_dir)
# XX TODO: this is what api_runCancel used to do, but we can't really do
# it here. What should we do? (For a cancelled job, should we just delete
# the run-dir entirely, once we have the full hash in the jid?)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

canceled jobs can contain usable data. When we run parallel simulations, the user can cancel at any time, and still have the results already computed. Perhaps this might be a different state.

Copy link
Member

@robnagler robnagler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I didn't see any issues, just some style/encapsulation comments.

I think we should drop docker next so we can get the workflow right in a real environment. We can assume rely on python2 -m sirepo.sirepo_console srw run run_dir for now.

sirepo/server.py Outdated Show resolved Hide resolved
sirepo/runner_client.py Outdated Show resolved Hide resolved
sirepo/runner_client.py Outdated Show resolved Hide resolved
sirepo/server.py Show resolved Hide resolved
sirepo/server.py Show resolved Hide resolved
sirepo/feature_config.py Outdated Show resolved Hide resolved
sirepo/pkcli/runner.py Outdated Show resolved Hide resolved
sirepo/pkcli/runner.py Outdated Show resolved Hide resolved
sirepo/pkcli/runner.py Outdated Show resolved Hide resolved
sirepo/pkcli/runner.py Show resolved Hide resolved
@njsmith
Copy link
Contributor Author

njsmith commented Feb 8, 2019

Status: the structure's there, it starts up (relatively conveniently with the sirepo runner dev I threw together), it has code for processes. The wiring to get service.py to actually use runner.py is the next big thing. Now I'm sorting through simulation_db.py and service.py to figure out how to get that wiring to work.

Copy link
Member

@robnagler robnagler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. Keep at it. If you need help with simulation_db or server.py let me know.

sirepo/pkcli/runner.py Outdated Show resolved Hide resolved
sirepo/pkcli/runner.py Outdated Show resolved Hide resolved
sirepo/pkcli/runner.py Outdated Show resolved Hide resolved
sirepo/pkcli/runner.py Outdated Show resolved Hide resolved
sirepo/pkcli/runner.py Outdated Show resolved Hide resolved
@njsmith njsmith changed the title [WIP] SOW 2 SOW 2 Feb 18, 2019
@njsmith
Copy link
Contributor Author

njsmith commented Feb 18, 2019

OK, I guess you'll want some tweaks but I think this has a working implementation of everything in SOW 2 now!

How to run it

We're running flask under py2, and the runner daemon under py3. We already have pyenv set up to create both a py2 and a py3 environment; now we need sirepo installed in both. So something like:

$ pyenv global py2:py3
$ pip3 install -e ~/src/radiasoft/pykern
$ mkdir -p ~/src/njsmith
$ cd ~/src/njsmith
$ gcl sirepo
$ cd sirepo
$ git checkout njs-sow-2
$ pip3 install -e .
$ pip2 install -e .
# Make sure it worked:
$ PYENV_VERSION=py3 pyenv exec sirepo --help
$ [[ $(cat $(pyenv prefix py2)/lib/python*/site-packages/sirepo*) =~ $PWD ]] && echo ok
$ [[ $(cat $(pyenv prefix py3)/lib/python*/site-packages/sirepo*) =~ $PWD ]] && echo ok

To spin up both the flask server and runner daemon to try it:

$ PYENV_VERSION=py3 SIREPO_FEATURE_CONFIG_RUNNER_DAEMON=1 pyenv exec sirepo runner dev

To run the test:

$ cd ~/src/njsmith/sirepo
$ pytest tests -k runner_test

The test assumes you have the py3 env set up. We might want to change that before merging?

Because of #1528, I haven't been able to run the full test suite.

Other notes

I ended up switching to using run_dir+hash as the unique identifier for the job... so right now the runner doesn't actually use jids at all. I think eventually we'll want to switch to using jid as the unique identifier plus have ways to convert jid to-and-from run_dir, and make the runner responsible for managing run dirs in general, but (1) for now we can't change the actual run_dir path because server.py assumes that it can calculate it locally without talking to the runner, and (2) mapping jid to-and-from run_dir isn't trivial right now. So this seemed like the simplest approach for now.

@robnagler
Copy link
Member

I updated your comment to include a better environment test and to install the branch in py2 as well. It won't work in ~/src/radiasoft so better to pull your branch and test there for now. Need to pass the feature_config.

Here's patch to fix a bug when running the server:

diff --git a/sirepo/simulation_db.py b/sirepo/simulation_db.py
index f7e6b3a0..965bdec2 100644
--- a/sirepo/simulation_db.py
+++ b/sirepo/simulation_db.py
@@ -918,7 +918,7 @@ def write_result(result, run_dir=None):
         # Don't overwrite first written file, because first write is
         # closest to the reason is stopped (e.g. canceled)
         return
-    result.setdefault('state', 'completed')
+    result.setdefault('state', 'complete')
     write_json(fn, result)
     write_status(result['state'], run_dir)
     input_file = json_filename(template_common.INPUT_BASE_NAME, run_dir)

When I run the test, I get:

$ pytest tests -k runner_test
================================================================================= test session starts ==================================================================================
platform linux2 -- Python 2.7.14, pytest-3.2.3, py-1.7.0, pluggy-0.4.0
rootdir: /home/vagrant/src/njsmith/sirepo, inifile:
plugins: forked-0.2
collected 37 items

tests/runner_test.py F

======================================================================================= FAILURES =======================================================================================
__________________________________________________________________________________ test_runner_myapp ___________________________________________________________________________________
Traceback (most recent call last):
  File "/home/vagrant/src/njsmith/sirepo/tests/runner_test.py", line 77, in test_runner_myapp
    run.nextRequest
  File "/home/vagrant/src/radiasoft/pykern/pykern/pkcollections.py", line 56, in __getattr__
    return self.__getattribute__(name)
AttributeError: 'Dict' object has no attribute 'nextRequest'
--------------------------------------------------------------------------------- Captured stdout call ---------------------------------------------------------------------------------
/home/vagrant/src/njsmith/sirepo/tests/runner_work/db/runner.sock
[{u'name': u'Scooby Doo', u'isExample': True, u'simulation': {u'simulationSerial': 1550518848490481, u'isExample': True, u'name': u'Scooby Doo', u'documentationUrl': u'', u'notes': u'', u'outOfSessionSimulationId': u'', u'folder': u'/', u'simulationId': u'a11pySID'}, u'last_modified': u'2019-02-18 19:40', u'folder': u'/', u'simulationId': u'a11pySID'}]
{u'simulationSerial': 1550518848490481, u'isExample': True, u'name': u'Scooby Doo', u'documentationUrl': u'', u'notes': u'', u'outOfSessionSimulationId': u'', u'folder': u'/', u'simulationId': u'a11pySID'}
{u'models': {u'simFolder': {}, u'heightWeightReport': {}, u'dog': {u'weight': 70.25, u'gender': u'male', u'breed': u'Great Dane', u'height': 81.28, u'disposition': u'friendly', u'favoriteTreat': u''}, u'simulation': {u'simulationSerial': 1550518848490481, u'outOfSessionSimulationId': u'', u'name': u'Scooby Doo', u'documentationUrl': u'', u'notes': u'', u'isExample': True, u'folder': u'/', u'simulationId': u'a11pySID'}}, u'simulationType': u'myapp', u'version': u'20190218.183041'}
{u'lastUpdateTime': 1550518848, u'nextRequest': {u'report': u'heightWeightReport', u'simulationId': u'a11pySID', u'simulationType': u'myapp', u'reportParametersHash': u'cc093c5d2ff817018618f1422b06fb8e'}, u'elapsedTime': 0, u'parametersChanged': False, u'state': u'running', u'nextRequestSeconds': 1, u'startTime': 1550518848}
{u'lastUpdateTime': 1550518848, u'nextRequest': {u'report': u'heightWeightReport', u'simulationId': u'a11pySID', u'simulationType': u'myapp', u'reportParametersHash': u'cc093c5d2ff817018618f1422b06fb8e'}, u'elapsedTime': 0, u'parametersChanged': False, u'state': u'running', u'nextRequestSeconds': 1, u'startTime': 1550518848}
{u'lastUpdateTime': 1550518849, u'nextRequest': {u'report': u'heightWeightReport', u'simulationId': u'a11pySID', u'simulationType': u'myapp', u'reportParametersHash': u'cc093c5d2ff817018618f1422b06fb8e'}, u'elapsedTime': 1, u'parametersChanged': False, u'state': u'running', u'nextRequestSeconds': 1, u'startTime': 1550518848}
{u'x_range': [0.0, 11.0], u'lastUpdateTime': 1550518849, u'title': u'Dog Height and Weight Over Time', u'x_label': u'Age (years)', u'y_label': u'', u'x_points': [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0, 11.0], u'elapsedTime': 1, u'parametersChanged': False, u'state': u'complete', u'startTime': 1550518848, u'plots': [{u'color': u'#1f77b4', u'points': [0.2585063948347131, 40.21214187401185, 62.86256433979385, 70.72255244830933, 73.34528811291915, 75.79496025061201, 76.65710318744058, 77.46562824139762, 77.54195385075052, 76.99962352192519, 77.78121331936578, 78.34547050274512], u'name': u'height', u'label': u'Height [cm]'}, {u'color': u'#ff7f0e', u'points': [1.633799347829997, 34.95554368122375, 53.53419538335956, 61.49624285854372, 63.19572998363415, 64.95538663173728, 65.17655914923125, 65.74938620364235, 66.76049226524795, 66.3744414760862, 66.16140689716958, 67.02105552245497], u'name': u'weight', u'label': u'Weight [lbs]'}], u'y_range': [0.2585063948347131, 78.34547050274512]}
================================================================================= 36 tests deselected ==================================================================================
======================================================================= 1 failed, 36 deselected in 5.96 seconds ========================================================================

@njsmith
Copy link
Contributor Author

njsmith commented Feb 19, 2019

Nice catch on the complete vs completed thing. It looks like there are several places in the existing code that are hard-coded to use completed, plus there might be existing files on disk using completed. So instead of switching the old code to use complete like in your patch, I switched the new code to using completed.

This also explains the test failure you saw: I copied the test's status-checking loop from server_test.py, so the test is looping until it sees run.state == 'completed' (i.e., what the old code uses). This didn't make any sense, because my new code was using run.state = 'complete', so the loop check could never succeed. So now I have no idea how the test ever passed for me. But the output you pasted here shows exactly the failure mode we'd expect given this issue: the test gets a response with run.state == 'complete' (last line of the captured stdout), which is where it's supposed to stop looping. But, because it's checking for run.state == 'completed', it keeps looping anyway, and then crashes trying to send the next request.

So my guess is that if you pull the latest version of my branch, the test should start passing for you.

This still doesn't explain why I'm having trouble running the other tests, but I guess it must be something unrelated...

@robnagler
Copy link
Member

Leaving it as completed is fine. Could also fix the code.

Nice catch on the complete vs completed thing.

Fail fast. Let the computer do the thinking. :)

@robnagler
Copy link
Member

I realized that that we should be testing JobStatus. That was the lack of fail fast. When read_status runs, it should validate against JobStatus.

@njsmith
Copy link
Contributor Author

njsmith commented Feb 20, 2019

The new code does validate against JobStatus, both when reading from the run_dir in runner.py, and when parsing off the wire in runner_client.py. The problem here is that we were correctly validating against the wrong values...

@njsmith
Copy link
Contributor Author

njsmith commented Feb 20, 2019

I updated this so that the test automatically skips itself if you don't have the new py3 environment stuff set up. I guess eventually we'll want to make the py3 environment a standard thing, but I don't want to break your test suite in the mean time if this is merged :-).

@robnagler
Copy link
Member

pkdp and print are not allowed. This patch converts to pkdlog.

diff --git a/sirepo/pkcli/runner.py b/sirepo/pkcli/runner.py
index 82654a3d..c9a99adb 100644
--- a/sirepo/pkcli/runner.py
+++ b/sirepo/pkcli/runner.py
@@ -264,11 +264,11 @@ async def _handle_conn(job_tracker, lock_dict, stream):
             request = pkjson.load_any(request_bytes)
             if 'run_dir' in request:
                 request.run_dir = pkio.py_path(request.run_dir)
-            pkdp('runner request: {!r}', request)
+            pkdlog('runner request: {!r}', request)
             handler = _RPC_HANDLERS[request.action]
             async with lock_dict[request.run_dir]:
                 response = await handler(job_tracker, request)
-            pkdp('runner response: {!r}', response)
+            pkdlog('runner response: {!r}', response)
             response_bytes = pkjson.dump_bytes(response)
         except Exception as exc:
             await stream.send_all(
diff --git a/tests/runner_test.py b/tests/runner_test.py
index c5a39d94..7ee0de61 100644
--- a/tests/runner_test.py
+++ b/tests/runner_test.py
@@ -35,8 +35,9 @@ def test_runner_myapp():

     fc = srunit.flask_client()

+    from pykern.pkdebug import pkdlog
     from sirepo import srdb
-    print(srdb.runner_socket_path())
+    pkdlog(srdb.runner_socket_path())

     pkio.unchecked_remove(srdb.runner_socket_path())

@@ -56,9 +57,9 @@ def test_runner_myapp():
             {'simulationType': 'myapp',
              'search': {'simulationName': 'heightWeightReport'}},
         )
-        print(data)
+        pkdlog(data)
         data = data[0].simulation
-        print(data)
+        pkdlog(data)
         data = fc.sr_get(
             'simulationData',
             params=dict(
@@ -67,7 +68,7 @@ def test_runner_myapp():
                 simulation_type='myapp',
             ),
         )
-        print(data)
+        pkdlog(data)
         run = fc.sr_post(
             'runSimulation',
             dict(
@@ -78,13 +79,13 @@ def test_runner_myapp():
                 simulationType=data.simulationType,
             ),
         )
-        print(run)
+        pkdlog(run)
         for _ in range(10):
             run = fc.sr_post(
                 'runStatus',
                 run.nextRequest
             )
-            print(run)
+            pkdlog(run)
             if run.state == 'completed':
                 break
             time.sleep(1)

sirepo/pkcli/runner.py Outdated Show resolved Hide resolved
Copy link
Member

@robnagler robnagler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments. I'll probably have more. One general comment: I think there are assumptions about the state of the global data structures on return from an await that can't be made. I would assume that anything can happen once you release the thread of control. Therefore, you need to validate jhash to be sure that the job hasn't been switched out. I could be wrong. I haven't followed the LockDict code so perhaps that guarantees something about the globals.

pkdp('runner response: {!r}', response)
response_bytes = pkjson.dump_bytes(response)
except Exception as exc:
await stream.send_all(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is an error on the stream itself, you can't send to it. It's safer to protect against that rather than raise another exception.

I think it shouldn't send back the error string, and instead, just send back "error". It breaks the abstraction to respond with raw errors. Rather, just log the error here. If there's a known error, translate it to a known error code for the sender.

Logging might look like: pkdlog('Request={} error={} stack=', request, exc, pkdexc())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is an error on the stream itself, you can't send to it. It's safer to protect against that rather than raise another exception.

In Python 3 we have implicit exception chaining, so if stream.send_all raises an exception, then both the original exception+traceback and the new exception+traceback will both be logged.

Also, we're already logging any leaking exceptions with _catch_and_log_errors, so that's not an issue.

Easy enough to just send back "error" though, I'll do that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we might as well delete this bit and make the error indicator be "empty response" – adding fancier error handling code just produces more chances for things to go wrong :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Python 3 we have implicit exception chaining, so if stream.send_all raises an exception, then both the original exception+traceback and the new exception+traceback will both be logged.

...actually, I'm wrong, because pkdexc is buggy on py3 :-( radiasoft/pykern#29

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is an error on the stream itself, you can't send to it. It's safer to protect against that rather than raise another exception.

Easy enough to just send back "error" though, I'll do that.

To be clearer: stream is in error, you shouldn't send on it, because you'll get a cascade that could be avoided. I feel like in error code, we have to take pains to avoid that type of cascade, because it makes debugging harder, even if py3 gives you all the tracebacks.

sirepo/pkcli/runner.py Outdated Show resolved Hide resolved
sirepo/pkcli/runner.py Show resolved Hide resolved
sirepo/pkcli/runner.py Show resolved Hide resolved
sirepo/pkcli/runner.py Outdated Show resolved Hide resolved
sirepo/pkcli/runner.py Show resolved Hide resolved
sirepo/pkcli/runner.py Show resolved Hide resolved
sirepo/pkcli/runner.py Show resolved Hide resolved
sirepo/pkcli/runner.py Outdated Show resolved Hide resolved
# everyone the job is done, no matter what happened
job_info = self.jobs.pop(run_dir, None)
if job_info is not None:
job_info.finished.set()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to validate the jhash, because could be a different job

@njsmith
Copy link
Contributor Author

njsmith commented Feb 25, 2019 via email

@robnagler
Copy link
Member

I see that lock_dict serialized rpc calls. I think _run_job is in a separate thread, which means that the comments about jhash in _run_job are valid. If an exception is thrown, there's no guarantee that the _run_job thread owns the process. I assume that cancel goes through finally, which is the code I'm concerned about.

@njsmith
Copy link
Contributor Author

njsmith commented Feb 26, 2019

pkdp and print are not allowed. This patch converts to pkdlog.

In a pytest test, the normal behavior of print (and any output for that matter) is that pytest captures it by default, and only shows it on the console if the test fails. This is really handy because it means you can leave in lots of chatty prints to help debug when things do go wrong, but without creating a bunch of noise during regular runs. (E.g. that's part of how I could debug your failures in runner_test.py.

It looks like pkdlog somehow circumvents pytest's capturing, so the chatty prints are dumped to the console regardless of whether there's a problem or not.

Would you rather we remove the chatty prints to keep the tests silent, keep them to make debugging easier, or figure out a way to get pytest's output capturing to work properly?

@njsmith
Copy link
Contributor Author

njsmith commented Feb 26, 2019

OK, I think I addressed most of your comments. The big thing left is that we still have race conditions around start_job. (The other race conditions are fixed by the LockDict stuff.) I think we mostly get away with it for now, because the frontend is still trying hard to avoid tickling the backend's race conditions. But we should fix them. This needs a holistic approach. I think the two invariants we want are:

  • only runner.py, and the jobs it runs, are allowed to write to the run_dir
  • there are never two jobs running at the same time in the same run_dir

Together, these will fix the race conditions on disk, and the race conditions in memory. (In particular, note that if we enforce that there are never two jobs running at the same time in the same dir, then all the issues around one job's shutdown overlapping with another job's startup and causing stuff to be scribbled on the wrong job_info object just goes away.)

This will require some rearranging:

  • Right now after server.py decides that it needs to start a new job, it directly goes blows away the run_dir and writes new stuff into it. If two server.py threads decide to do this at the same time, then bad stuff happens. We need to switch to server.py writing the job info into a temp dir, and then pass that to server.py to set up the real run_dir.
    • There may be some other cases where server.py writes directly to run_dir, but I guess we'll deal with the rest later when we're figuring out how to handle NERSC...
  • runner.py needs to get some logic around start_job to check for existing jobs and handle them appropriately. This has some complications to make sure we get right: the policy for handling existing jobs should involve the serial, I guess (?) – not really sure what that is, but it certainly sounds relevant now that you've mentioned it :-) – and if we decide to cancel the old job then we need to make sure to wait for it to fully exit before starting a new one.

This is all doable, but it's complicated enough that I think we should bump it into SOW-3.

@robnagler
Copy link
Member

This is all doable, but it's complicated enough that I think we should bump it into SOW-3.

I agree about the race condition and SOW-3. I'd like to discuss the solution, because we have to start decoupling based on the runner class (NERSC, subprocess, or Docker).

A key factor for NERSC (coming up soon now) is that a "job" is really anything that touches run_dir. This is also key for the runner in general, because we want to run anything to any "codes" inside a separate container to ensure encapsulation and to allow sirepo to migrate to py3. Therefore, I think we need to generalize the job to "do something with this particular run_dir, which might already exist."

@robnagler
Copy link
Member

In a pytest test, the normal behavior of print (and any output for that matter) is that pytest captures it by default, and only shows it on the console if the test fails.

We should have that as a goal without print, because pytest doesn't tell you what line number the print is on. The reason for pkdp, pkdlog, and pkdc is that I don't have to write anything to explain what's going on, because you can jump right to the line and see what the code is actually doing (as opposed to what the string says it is doing :).

It looks like pkdlog somehow circumvents pytest's capturing, so the chatty prints are dumped to the console regardless of whether there's a problem or not.

There are numerous bugs around how pytest and pykern interact, e.g. pytest breaks capsys and a bunch of nulls problem that I have yet to write up. Anyway, I agree with the need for better logging.

Let's leave in the pkdlogs as is for now, because you don't see any test output when running python setup.py tests, which is the most common mode. Having it chatty for an individual test run is tolerable until radiasoft/pykern#31 is fixed.

@njsmith
Copy link
Contributor Author

njsmith commented Feb 26, 2019

A key factor for NERSC (coming up soon now) is that a "job" is really anything that touches run_dir. This is also key for the runner in general, because we want to run anything to any "codes" inside a separate container to ensure encapsulation and to allow sirepo to migrate to py3. Therefore, I think we need to generalize the job to "do something with this particular run_dir, which might already exist."

I think we'll want to distinguish between the current kind of job, and the data extraction jobs, because they have fundamentally different interactions with the run dir lifecycle. For example, the current "status check" operation is basically a check of what kind of run dir we have; checking on the status of a particular data extraction job will need very different inputs and outputs.

Let's leave in the pkdlogs as is for now

Sounds good.

@robnagler robnagler merged commit f4302ad into radiasoft:master Feb 26, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants