-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement ordered concurrency #31
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This handles threads correctly, as parent message are handled before the children (as all are handled sequentially), right?
In TS camelCase is used, rather than snake_case, but good job! Also kudos for code which passes all pre-commit checks and tests 👍
https://ci.netzbegruenung.verdigado.net/repos/938/pipeline/1240
src/app.ts
Outdated
const user_queue: RcUser[] = [] | ||
const room_queue: RcRoom[] = [] | ||
const messages_per_room: Map<string, RcMessage[]> = new Map() | ||
|
||
const rl = new lineByLine(`./inputs/${entities[entity].filename}`) | ||
|
||
let line: false | Buffer | ||
while ((line = rl.next())) { | ||
const item = JSON.parse(line.toString()) | ||
switch (entity) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I am thinking to move the switch (entity)
outside and looping and handling the queue inside each case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, threadmessages are handled just like any message, so the sequence is the same as from the RC export.
I converted the variables to camelCase. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I am thinking to move the
switch (entity)
outside and looping and handling the queue inside each case
I've thought of it, but that would add as many loops as there are cases, not sure which of the two versions would be faster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the function is called 3 times, each with one entity type, it would check 3 times and loop within it. We also don't need to check non-affected queues, then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In 9fa8e03,
64935fd
and 7a7fdec :
I moved the switch outside of the loop. For users and rooms, the loop is handled by the PromisePool, so no need to use queues anymore. For messages, to maintain the sequence, I kept the two steps of populating the Map and then loading.
Because PromisePool
loops over an iterator, I added a wrapper to turn lineByLine
into an iterator (its .next()
method made me think it was implemented as an iterator, but it isn't).
I feel the message handling can still be improved, maybe avoiding the first step of loading. To take care of #3 (comment), cut the messages in batches or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel the message handling can still be improved, maybe avoiding the first step of loading. To take care of #3 (comment), cut the messages in batches or something.
I added batches for the message handling in bee1c2f.
Tested with:
./reset.sh && npm run compile && systemd-run --scope -p MemoryMax=75M -p MemorySwapMax=1K --user node dist/app.js
As discussed in #22 and #3 (comment):
Concurrency on message handling broke the server-side ordering, because the server orders the messages in the sequence they arrived in.
To add a bit of concurrency anyways, the idea is to group messages per room and handle the messages in each room sequentially, while handling the rooms in parallel. Of course you would need a very large number of rooms for this to make a great difference.
To note that I am new to typescript...