Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel push to share #3164

Merged
merged 12 commits into from Jun 30, 2022
Merged

Parallel push to share #3164

merged 12 commits into from Jun 30, 2022

Conversation

mitchellwrosen
Copy link
Member

@mitchellwrosen mitchellwrosen commented Jun 28, 2022

Overview

This PR implements a parallel push, similar to the implementation used in #3153, but a little simpler.

  • The main thread pulls chunks of hashes off of a set and assigns them to one-shot workers.
  • Each worker downloads the chunk of entities, inserts them into the database, elaborates the hashes of the subset of those entities that went into temp storage, and appends the resulting set of hashes to the work queue.

Experimentally, it seems to go around 1.8x as fast as the serial implementation on trunk. Next step might be to go with a streaming endpoint, which can reuse a lot of this implementation, and would also allow us to get rid of the server-side sqlite write locks.

@mitchellwrosen mitchellwrosen marked this pull request as ready for review June 30, 2022 17:42

doJob :: [STM UploadDispatcherJob] -> IO Bool
doJob jobs =
atomically (asum jobs) >>= \case
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for trying it this way, I find it clearer 👍🏼

doJob jobs =
atomically (asum jobs) >>= \case
UploadDispatcherReturnFailure -> pure False
UploadDispatcherForkWorkerWhenAvailable hashes -> doJob [forkWorkerMode hashes, checkForFailureMode]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm, was it intentional to check for failure only if we fail to start a new job?

I'm confused about the semantics of checking for failure, it seems in some spots it bails, and spots like here it bails only conditionally.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this ordering was intentional, but I think it would be correct either way. It's just a question of whether we want to be paranoid and re-check something we just checked (whether the failure TMVar has filled), or only fall back to checking if we end up blocking waiting for a chance to fork a worker (due to N workers already existing).

dedupeVar <- newTVarIO Set.empty
nextWorkerIdVar <- newTVarIO 0
workersVar <- newTVarIO Set.empty
workerFailedVar <- newEmptyTMVarIO
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm, ki propagates exceptions from threads to the parent right?
I wonder if we should just throw an exception on failure rather than need to remember to manually check a failure var every time before we do something.

I notice lower down you only bail on failure if there isn't more work to do, is there a particular reason for that?

Exiting with an exception would maybe mean we have some entities downloaded that we didn't insert, but honestly that seems like a reasonable sacrifice to make for simplicity and avoiding a foot-gun of forgetting to check for errors and rolling forward in some sort of invalid state.

It's also worth noting that the http client is already throwing exceptions on Timeouts, 500s, or bad gateways, so the workerFailedVar is only going to handle a subset of "blessed" failures, which I think maybe just adds some additional confusion; unless there's some other reason for it I'd say one failure mode is easier to maintain than two.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I'm not sure I agree. I think returning failures from the endpoint as 200s so we could pattern match on them was done so the code is easier to understand, and we don't have to know what exception types might be thrown, and where.

I agree it's confusing right now -- maybe we shouldn't have a catchSyncExceptions thing that only catches some subset of exceptions these actions might throw? Couldn't we return these as Lefts too, and pass them around manually? (Or use ExceptT syntactic sugar).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't we return these as Lefts too, and pass them around manually? (Or use ExceptT syntactic sugar).

If you'd prefer to do it that way then yes, you could catch all of those error cases and pass around the Either's, but it's definitely a bit annoying, since ideally you'd do all of the "generic" error handling in the servant http client itself, but since each endpoint has its own custom Sum type of which errors/responses it can return, you'd need to convert from the generic http errors into the endpoint's error type for each and every endpoint (doable, but annoying), OR you'd have to defer catching the errors until outside of the creation of the http-client, which I don't like because it makes it very likely that some implementations will forget to catch certain error conditions.

I'm a big fan of using Exceptions for exceptional cases, as long as the set of "expected" exception types is well-bounded and well-known.

Another option would be to recognize the difficulties introduced by "200 Error" and try going back the other way, where we just send error codes with a good message and have the client propagate those errors.

-- hashes of the entities we just uploaded from the `dedupe` set, because they will never be relevant for any
-- subsequent deduping operations. If we didn't delete from the `dedupe` set, this algorithm would still be
-- correct, it would just use an unbounded amount of memory to remember all the hashes we've uploaded so far.
whenJust maybeYoungestWorkerThatWasAlive \youngestWorkerThatWasAlive -> do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hrmm, I understand what this is doing; but it's very complex, and even though we have all the necessary information in the client, it kind of seems like an implementation detail of the server.

I suppose this is something else that can go away when we move to streaming right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this can go away when we move to streaming, because the server won't have multiple simultaneous transactions open that all correspond to our push.

@ChrisPenner
Copy link
Contributor

I don't love having multiple routes for errors to propagate, I think it's a foot-gun trap waiting to be sprung, but we can re-visit that later I suppose.

@mitchellwrosen
Copy link
Member Author

Sure - wait, unclear to me what you mean - what's the trap? (Anyway, yeah we can revisit something).

@mitchellwrosen mitchellwrosen merged commit ed94820 into trunk Jun 30, 2022
@mitchellwrosen mitchellwrosen deleted the 22-06-28-push-conc branch June 30, 2022 21:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants