Skip to content

Add marine_pool#19

Merged
assafrabin merged 6 commits intomasterfrom
feature/marine_pool
Apr 19, 2020
Merged

Add marine_pool#19
assafrabin merged 6 commits intomasterfrom
feature/marine_pool

Conversation

@assafrabin
Copy link
Copy Markdown
Collaborator

No description provided.

@tomlegkov tomlegkov linked an issue Apr 4, 2020 that may be closed by this pull request
Copy link
Copy Markdown
Collaborator

@yehonatanz yehonatanz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to previous comments, I'd like to see at least one test dedicated to marine pool.

return MarinePool(marine_so_path)


@pytest.fixture(scope="session", params=[False, True], ids=["marine", "marine_pool"])
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be more straight forward to pass the fixture names as params and use request.getfixturevalue?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. You add a line and a parameter (request) for each test

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant in here, not in every test

return []

ctx = multiprocessing.get_context("spawn")
pool = ctx.Pool(self.process_count)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you spawning a pool each time filter_and_parse is called?
Please don't

if len(packets) == 0:
return []

ctx = multiprocessing.get_context("spawn")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just use pool = multiprocessing.Pool(count)?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default fork_method is "fork" and not "spawn". I assumed @tomlegkov chose spawn for a reason...

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, he explained it on slack.
Add a comment explaining it so some wise-ass won't change it in the future without understanding.

ctx = multiprocessing.get_context("spawn")
pool = ctx.Pool(self.process_count)
chunk_size = int(math.ceil(len(packets) / float(self.process_count)))
packet_chunks = [
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multiprocessing.Pool.map already splits the input to batches for you and exposes a clean API.

@@ -1,4 +1,5 @@
from .marine import Marine
from .marine_pool import MarinePool
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<NotSerious>
MarinePool should be called Lagoon
</NotSerious>

@assafrabin assafrabin force-pushed the feature/marine_pool branch from e3de20f to a53b964 Compare April 5, 2020 15:49
Copy link
Copy Markdown
Collaborator

@yehonatanz yehonatanz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had some notes.
I still think that make init_marine idempotent on the C level (with a is_initialized flag or something) is the best option though.
@tomlegkov What do you think?



class MarinePool:
_marine_instance = None
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

`_marine_instance: ClassVar[Optional[Marine]] = None

)

@staticmethod
def _init_marine(lib_path):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switch to classmethod if Pool supports it.

ctx = multiprocessing.get_context("spawn")
# Using spawn so child processes won't get the already initialized marine from the parent process.
self.pool = ctx.Pool(self.process_count)
self.pool.map(MarinePool._init_marine, repeat(self._lib_path, self.process_count))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pool accepts an initializer argument, exactly for that reason

MarinePool._marine_instance = Marine(lib_path)

@staticmethod
def _filter_and_parse(*args):
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switch to classmethod if Pool supports it.

return MarinePool(marine_so_path)


@pytest.fixture(scope="session", params=[False, True], ids=["marine", "marine_pool"])
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant in here, not in every test

Copy link
Copy Markdown
Owner

@tomlegkov tomlegkov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add MarinePool to the benchmark


def __init__(self, lib_path: str, process_count: int = 4):
self._lib_path = lib_path
self.process_count = process_count
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is process_count not private? Do we want to support editing it outside of the constructor?

MarinePool._marine_instance = Marine(lib_path)

@staticmethod
def _filter_and_parse(*args):
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the advantage of using *args over explicitly stating the parameters?
Also missing return type

)

@staticmethod
def _init_marine(lib_path):
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typing

@tomlegkov
Copy link
Copy Markdown
Owner

tomlegkov commented Apr 6, 2020

To add to what @yehonatanz said, I think what's missing is a complete test for MarinePool, showing that packets stay sorted even when some don't pass the filter (maybe with different process counts), etc.

@assafrabin assafrabin force-pushed the feature/marine_pool branch from 5dfc53a to fb729fd Compare April 19, 2020 10:18
@assafrabin assafrabin force-pushed the feature/marine_pool branch from 2c41cec to 8b5c429 Compare April 19, 2020 18:07
@assafrabin assafrabin merged commit 7807c76 into master Apr 19, 2020
@tomlegkov tomlegkov deleted the feature/marine_pool branch February 5, 2021 18:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add option to run Marine in a process pool

3 participants