Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom serializers #143

Merged
merged 19 commits into from
Aug 11, 2019
Merged

Conversation

rubik
Copy link
Contributor

@rubik rubik commented Aug 10, 2019

Changes

I would like to propose this change, which allows the user to specify custom serializers in alternative to pickle, which is left as the default.

The public API is changed only very slightly: there are new _serialize and _deserialize arguments to create_pool and Worker. The rest of the changes essentially reduces to passing these to the pickle functions.

For backward compatibility, I have left the pickle_jobs, etc. functions in arq.jobs with the same name, even though the name is slightly inappropriate now.

Rationale

The main reason I am proposing this change is to allow better memory usage. The default serialization method, pickle, is quite space inefficient. Since the job data is essentially JSON, we can attain much better memory usage by switching to serialization methods that are more appropriate.

In my case, by using MsgPack instead of Pickle, I am seeing an improvement of 47%-49% in memory usage which is quite significant. At tens to hundreds of thousands of tasks this means hundreds of MBs if not GBs in savings.

Usage

The usage is quite simple. Existing code works as usual, but one can use a different serialization method as follows:

import msgpack
from arq.connections import create_pool, RedisSettings

pool = await create_pool(RedisSettings(), _serialize=msgpack.packb, _deserialize=msgpack.unpackb)

WorkerSettings:
    serialize = msgpack.packb
    deserialize = msgpack.unpackb

Tests and documentation

The tests pass, except two tests that also fail on master. If this change is approved, I can write a paragraph in the documentation on custom serialization functions.

@rubik rubik changed the title Feat custom serializer Custom serializers Aug 10, 2019
@codecov
Copy link

codecov bot commented Aug 10, 2019

Codecov Report

Merging #143 into master will decrease coverage by 1.59%.
The diff coverage is 81.03%.

@@            Coverage Diff            @@
##           master     #143     +/-   ##
=========================================
- Coverage   98.74%   97.15%   -1.6%     
=========================================
  Files           8        8             
  Lines         638      667     +29     
  Branches       90       95      +5     
=========================================
+ Hits          630      648     +18     
- Misses          6       12      +6     
- Partials        2        7      +5

@codecov
Copy link

codecov bot commented Aug 10, 2019

Codecov Report

Merging #143 into master will decrease coverage by 0.69%.
The diff coverage is 92.53%.

@@            Coverage Diff            @@
##           master     #143     +/-   ##
=========================================
- Coverage   98.74%   98.05%   -0.7%     
=========================================
  Files           8        8             
  Lines         638      667     +29     
  Branches       90       95      +5     
=========================================
+ Hits          630      654     +24     
- Misses          6       10      +4     
- Partials        2        3      +1

Copy link
Member

@samuelcolvin samuelcolvin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me, will need history and docs and a few tweaks suggested below.

def __init__(
self,
pool_or_conn,
_serialize: Optional[Callable[[Any], bytes]] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_serialize: Optional[Callable[[Any], bytes]] = None,
_job_serializer: Optional[Callable[[Any], bytes]] = None,

Might be even clearer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and same obviously for _deserialize -> _job_deserializer.

arq/jobs.py Outdated
pass


def pickle_job(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's rename unpickle_job to deserialize_job, better to get it right now.

Change should be pretty explicit and easy to fix.

arq/jobs.py Outdated
pass


def pickle_job(function_name: str, args: tuple, kwargs: dict, job_try: int, enqueue_time_ms: int):
class PickleError(SerializationError):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove.

Copy link
Member

@samuelcolvin samuelcolvin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and please update HISTORY.rst.

Otherwise LGTM.

self,
pool_or_conn,
_job_serializer: Optional[Callable[[Any], bytes]] = None,
_job_deserializer: Optional[Callable[[bytes], Any]] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_job_deserializer: Optional[Callable[[bytes], Any]] = None,
_job_deserializer: Optional[Callable[[bytes], Dict[str, Any]] = None,

Or am I missing something?

def __init__(
self,
pool_or_conn,
_job_serializer: Optional[Callable[[Any], bytes]] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_job_serializer: Optional[Callable[[Any], bytes]] = None,
_job_serializer: Optional[Callable[[Dict[str, Any]], bytes]] = None,

I think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and obviously elsewhere where it's used, might be worth having these types defined once, eg. in jobs.py?

arq/jobs.py Outdated
job_try: int,
enqueue_time_ms: int,
*,
_serialize: Optional[Callable[[Any], bytes]] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
_serialize: Optional[Callable[[Any], bytes]] = None,
serialize: Optional[Callable[[Any], bytes]] = None,

since this is explicitly required as a keyword arg, I don't think you need to make it "private".

tests/test_worker.py Show resolved Hide resolved
@rubik
Copy link
Contributor Author

rubik commented Aug 10, 2019

@samuelcolvin I addressed your suggestions. You said that serializer should be public and therefore starting without underscore for the functions in arq.jobs. Following that reasoning, the same correction needs to be applied to arq.connections.create_pool. That also makes the arguments equal to the parameters in arq.worker.Worker. That was done in a separate commit, 0cdaf4b, so if you don't agree I can revert it.

I don't think the HISTORY.rst change belongs to this PR and you should do it in a separate commit, also because I don't know when you will release the next version (even though I hope very soon).

@samuelcolvin
Copy link
Member

The way I work every commit to code has an entry in HISTORY.rst, it's the case for all projects so I'm afraid it's a requirement.

Please add the following to history

v0.17 (unreleased)
..................
* custom serializers, eg. to use msgpack rather than pickle, #143 by @rubik 

Copy link
Member

@samuelcolvin samuelcolvin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise looking good.

@@ -141,29 +164,33 @@ class ArqRedis(Redis):
password=settings.password,
timeout=settings.conn_timeout,
encoding='utf8',
commands_factory=ArqRedis,
commands_factory=functools.partial(
ArqRedis, _job_serializer=job_serializer, _job_deserializer=job_deserializer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ArqRedis, _job_serializer=job_serializer, _job_deserializer=job_deserializer
ArqRedis, job_serializer=job_serializer, job_deserializer=job_deserializer

I think since the change?

worker: Worker = worker(functions=[foobar])
with pytest.raises(SerializationError) as exc_info:
await worker.main()
assert exc_info.value.startswith('unable to deserialize job: ')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert exc_info.value.startswith('unable to deserialize job: ')
assert exc_info.value.startswith('unable to deserialize job: ')

this is indented wrongly I think.

)
with pytest.raises(SerializationError) as exc_info:
await worker.main()
assert exc_info.value.startswith('unable to deserialize job: ')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert exc_info.value.startswith('unable to deserialize job: ')
assert exc_info.value.startswith('unable to deserialize job: ')

@rubik
Copy link
Contributor Author

rubik commented Aug 10, 2019

@samuelcolvin Done. Then after you merge this PR I'll add it for the other one.

@rubik
Copy link
Contributor Author

rubik commented Aug 11, 2019

@samuelcolvin I fixed the code as per your latest feedback.

@rubik
Copy link
Contributor Author

rubik commented Aug 11, 2019

It seems some tests fail on Python 3.8, but I don't think the errors are related to this PR.

arq/connections.py Outdated Show resolved Hide resolved
arq/connections.py Outdated Show resolved Hide resolved
@samuelcolvin samuelcolvin merged commit f0fa671 into python-arq:master Aug 11, 2019
@samuelcolvin
Copy link
Member

Great, thanks. I'll deploy as soon as the other PR is passing and I can merge it.

@rubik
Copy link
Contributor Author

rubik commented Aug 11, 2019

@samuelcolvin Thanks for taking the time to review it multiple times.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants