Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possibility of removing bus sockets #61

Closed
2 of 6 tasks
shikokuchuo opened this issue Apr 3, 2023 · 31 comments
Closed
2 of 6 tasks

Possibility of removing bus sockets #61

shikokuchuo opened this issue Apr 3, 2023 · 31 comments
Assignees

Comments

@shikokuchuo
Copy link
Contributor

Prework

Description

I was contemplating how well things scale up in general, and came to the realisation that with your bus sockets and CVs, it's actually now a duplication of what already happens at the main mirai socket.

What you can do is pre-launch, check online status is 0, [check and kill process if alive], and record the instance number.
Then if the instance number increases, the server has been 'discovered'.

This seems to remove an unnecessary complication.

@wlandau
Copy link
Owner

wlandau commented Apr 3, 2023

It is so tempting to simplify crew to use the instance # column. That would really streamline and clean up the integration. However, I still prefer your breakthrough idea from #31 (comment). Without a token to explicitly identify a specific instance of a mirai server, I would worry about race conditions like #31 (comment) (also c.f. #31 (comment) and #31 (comment)). Whether based on condition variables or nanonext::stat(), the instance counter from daemons() gives the same result, right? Or am I missing something?

To do away with bus sockets in crew, I think I would need a way to supply a custom token to server() and then see that same token reflected back in daemons() somehow. I wonder if there is a fancy feature of websocket paths that can be cleverly leveraged to support this without compromising efficiency.

@wlandau wlandau closed this as completed Apr 3, 2023
@shikokuchuo
Copy link
Contributor Author

shikokuchuo commented Apr 3, 2023

Your idea to use the websocket path - what if the dispatcher ws url path consists of the token?
Then after a server disconnects due to timeout etc., the dispatcher automatically replaces the listener using a new token instead of listening at the same address.

I can use your current method to generate the tokens at dispatcher. crew would then get those from a daemons() call and pass in as the url argument to server().

@wlandau
Copy link
Owner

wlandau commented Apr 3, 2023

That sounds promising. Assuming a websocket path always looks like "ws://x.x.x.x:port/index/token" and the token rotates on disconnect, crew could record the token at launch time and then always infer the correct state from a call to daemons():

  1. If the token from launch time disagrees with current token from daemons(), then the worker is disconnected, and crew needs to relaunch it if the task load is large enough.
  2. If the token from launch time agrees with the current token from daemons(), then:
    A. If status_online is 1, then the worker is connected and running.
    B. If status_online is 0, then the worker may be in the process of starting, or it may have failed to start.

I am wondering what to do in the case of 2B. If a worker fails to connect within the expiry window, I think I would need the token to rotate somehow. That way, if the worker starts after the expiry window, it will connect to a socket where nobody is listening, and it will promptly exit because asyncdial is FALSE. (This assumes the dispatcher stops listening to the old socket when a token rotates.) This would ensure subsequent launches do not have duplicate instances at the same socket.

What would be the best way to get the token to rotate after an expiry window? Only crew knows when the expiry window starts (on launch) so it may be tricky. How hard would it be for mirai to allow the user to manually rotate the token of a "ws://x.x.x.x:port/index/token" socket with a given index?

@shikokuchuo
Copy link
Contributor Author

Rotating after an expiry window would be non-optimal with the new dispatcher design.

I can have the dispatcher listeners only accept one connection (dropping new connections before they are added, effectively lock the socket). This would be new functionality added at nanonext.

Then for 2B if a subsequent worker tries to connect, it will fail and exit.

All-in-all seems like a good solution and prevents 2 servers dialling into the same socket accidentally (or intentionally) - the token would also provide some obfuscation for the URL from a security perspective.

@wlandau
Copy link
Owner

wlandau commented Apr 3, 2023

I can have the dispatcher listeners only accept one connection (dropping new connections before they are added, effectively lock the socket). This would be new functionality added at nanonext.

Yes, I think that would work well!

@wlandau wlandau reopened this Apr 3, 2023
@shikokuchuo
Copy link
Contributor Author

This is implemented in mirai 0.8.2.9001 2541b9e. Set token = TRUE at dispatcher.

The socket locking turned out to be quite troublesome. It was perfectly effective for preventing additional connections at the dispatcher. However the retry mechanisms for the server trying to connect can be quite aggressive and end up interfering with the connections. Not reliable enough for our purposes.

Instead I have chosen to use the mirai bus socket interface.

Now if you specify an integer third argument to daemons, leaving the first 2 missing, it sends a command to replace that socket at dispatcher with a new one.

daemons(,,3L)

You get the new socket URL as the return value at $daemons.

This interface slots into the existing one well, but I can create a new function for this if you prefer!

@wlandau
Copy link
Owner

wlandau commented Apr 4, 2023

Amazing! This is exactly what I need, and it will simplify crew beautifully. Thank you so much!

This interface slots into the existing one well, but I can create a new function for this if you prefer!

Overloading the dispatcher argument this way is indeed a different way of doing thing, but I think it's fine if the documentation is there. Could be clearer if implemented with its own argument or function, but it works well in my local tests.

@shikokuchuo
Copy link
Contributor Author

Great! I just wanted to check it was all good first. The overloading is indeed different... but I will open up a new interface for this to be cleaner.

wlandau-lilly pushed a commit that referenced this issue Apr 4, 2023
@shikokuchuo
Copy link
Contributor Author

shikokuchuo commented Apr 4, 2023

implemented by function saisei() in shikokuchuo/mirai@c265c46
再生 meaning regenerate or literally re-birth

@shikokuchuo
Copy link
Contributor Author

I've also implemented the locked sockets for when token = TRUE for extra safety. This will prevent 'rogue' servers trying to connect in to the dispatcher if there is already a connection. At worst, if a second server tries to connect at the same URL, it will knock both servers out. Shouldn't affect the others.

@wlandau
Copy link
Owner

wlandau commented Apr 4, 2023

That's perfect!

I have been slowly working my way through the lower-level data structures of crew, and I am just about ready to refactor the controllers. I am amazed by how much things naturally simplify when the websocket path is a property of the worker instance instead of the whole worker.

@shikokuchuo
Copy link
Contributor Author

shikokuchuo commented Apr 5, 2023

I have renamed the headers for the daemons() status matrix. These really shouldn't change going forward - just they were quite inconsistent to begin with...

I have taken out 'busy' as that was just the difference of complete - assigned, it didn't actively monitor 'busy' status.

Please feel free to update in crew. I'm testing the dev nanonext on some other workflows, and seems on track for a release next week. If so, mirai can be released any time thereafter, ready for your next release.

This will break existing crew so I think will have to ask for a manual dispensation when I send mirai to CRAN.
I have added a workaround by testing for the existence of crew_controller_callr. Once the new version of crew is available, this can be removed.

@wlandau
Copy link
Owner

wlandau commented Apr 5, 2023

I am almost ready to merge my updates and close this issue, but I am having trouble with shikokuchuo/mirai#47. That should be the final obstacle before a much simpler and more robust crew.

@wlandau
Copy link
Owner

wlandau commented Apr 5, 2023

Working through the tests, I am getting a lot of crashes in RStudio on my local macbook, and I am frequently getting 'errorValue' int 5 | Timed out from daemons(). I am also seeing tasks that have trouble completing (task$data unresolved). This is hard to reproduce.

@wlandau
Copy link
Owner

wlandau commented Apr 5, 2023

I am also getting crashes sometimes after I manually terminate the dispatcher in htop and then restart my R session.

@shikokuchuo
Copy link
Contributor Author

@wlandau
Copy link
Owner

wlandau commented Apr 5, 2023

shikokuchuo/mirai#48 reproduces some of the same errors, and the reprex only uses mirai and nanonext.

@shikokuchuo
Copy link
Contributor Author

Hopefully my fix in 0.8.2.9007 also eliminates the crashes you have been experiencing.

Making a record of this one (the one I caught running crew tests):

*** caught segfault ***
address 0x7f673c021000, cause 'invalid permissions'
malloc(): unsorted double linked list corrupted
Aborted

@wlandau
Copy link
Owner

wlandau commented Apr 6, 2023

Thank you so much! Like I mentioned at shikokuchuo/mirai#48, although not entirely gone, the errors are greatly reduced on my end. I am now experiencing new ones at shikokuchuo/mirai#50.

@wlandau
Copy link
Owner

wlandau commented Apr 6, 2023

This branch of crew currently fails on the second transient worker:

library(crew)
x <- crew_controller_local(tasks_max = 1L)
x$start()
x$push(command = ps::ps_pid())
Sys.sleep(5)
x$pop()
#> # A tibble: 1 × 11
#>   name                             command result seconds   seed error trace…¹ warni…² launc…³ worker insta…⁴
#>   <chr>                            <chr>   <list>   <dbl>  <int> <chr> <chr>   <chr>   <chr>    <int> <chr>  
#> 1 bea03b0cc5adc7a10fe8b60a1f0e126… ps::ps… <int>        0 6.70e8 NA    NA      NA      aed93c…      1 e86f73…
# … with abbreviated variable names ¹​traceback, ²​warnings, ³​launcher, ⁴​instance
x$push(command = ps::ps_pid())
Sys.sleep(5)
x$pop() # should not be NULL
#> NULL
x$queue[[1]]$handle[[1]]$data # should be resolved
#> 'unresolved' logi NA
x$terminate()

Before I terminated the controller, the second instance of x$launcher$workers$handle[[1]]$get_result() showed:

Error: 
! in callr subprocess.
Caused by error in `socket(protocol = "rep", dial = url, autostart = asyncdial || …`:
! 15 | Address invalid

That's a clue.

@wlandau
Copy link
Owner

wlandau commented Apr 6, 2023

In my last test, the socket from daemons() is "ws://10.0.0.9:50594/db42f70af6b31c05d739f7c25637080dbf3d388f", but the socket from saisei() is "ws://10.0.0.9:0/e96f4a6af7dbb640603bbb1d1cfa82b022f1c8ee", with port 0. I wonder if that explains it.

@wlandau
Copy link
Owner

wlandau commented Apr 6, 2023

Oddly enough, even after shikokuchuo/mirai#51 is fixed, I am still getting "'errorValue' int 7 | Object closed" on half of my transient worker launches. Based on my tests so far, I suspect it is because saisei() can rotate the websocket even when a worker is still using it.

@shikokuchuo
Copy link
Contributor Author

The way you phrased the problem, makes it somewhat clearer. saisei() won't be rotating the socket whilst a worker is using it, but conceivably a message may be queued at a socket waiting for a worker, and then if saisei() switches it, the message is returned as 7 | Object closed. Does that make sense?

I suspect the answer will be to rotate just the listener rather than the entire socket.

@wlandau
Copy link
Owner

wlandau commented Apr 6, 2023

Yes, that explanation is consistent with what I am seeing. Anything I will need to do differently in crew at this point?

It if helps, the code for #61 does not actually rely on having a different websocket path, only that the online and instance counters are zero. As long as those counters are zero when I launch the worker, the websocket path can be the same as the previous one.

@wlandau
Copy link
Owner

wlandau commented Apr 6, 2023

So I would be fine if saisei() just resets the counters rather than the paths. I didn't know I could relax this assumption until I coded most of #61.

@shikokuchuo
Copy link
Contributor Author

Should be fine as I can reset the counter in the cv. The behaviour should match what it is now. The path will still change.

This is an NNG concept - a socket can have as many dialers and listeners attached to it - for really complex applications!

@wlandau
Copy link
Owner

wlandau commented Apr 6, 2023

Sounds great! I was going to clarify that I still need an indication from saisei() as to whether the reset succeeded or failed, but if saisei() still returns a path on success and NULL on failure, then I will still know.

Applications with many listeners and many dialers sound complicated indeed!

@shikokuchuo
Copy link
Contributor Author

Take f1e961f v0.8.2.9012 for a spin. I really hope this works!

@wlandau
Copy link
Owner

wlandau commented Apr 6, 2023

Flawless! The transient worker throughput test now runs all tasks in a timely manner and shows the exact right number of worker launches!

@wlandau
Copy link
Owner

wlandau commented Apr 6, 2023

I know this was a tough one for both of us, so I want to thank you again for sticking with me and rooting out the cause. crew is in such a strong position now that it relies completely on mirai and does not need any custom bus sockets.

@shikokuchuo
Copy link
Contributor Author

You're welcome! I really hate to leave things unfinished... so I'm also glad we persevered!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants