Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Batched Uploads and Green Threads #14

Merged
merged 21 commits into from Feb 19, 2019

Conversation

william-silversmith
Copy link
Contributor

@william-silversmith william-silversmith commented Feb 15, 2019

This contains a few features.

  1. Send messages to AWS SQS using the send_message_batch (via tq.insert_all) which allows sending 10 messages at a time. This results in a 10x speedup.
  2. Add GreenTaskQueue as an option. Users need to manually monkeypatch their application to make it work. This can be useful in NUMA or vCPU environments.
  3. Added delay_seconds to insert and insert_all which allows you to delay a message's visibility in the queue.

Copy link
Contributor

@nkemnitz nkemnitz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can test this change along with my next test run once the tests are passing.

Splitting the features into separate PR's would help me speed up review and tests.

try:
iter(obj)
if type(obj) is dict:
return [ obj ]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

list(obj) does what you want, [obj] just puts the dictionary itself in a list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's actually what I wanted for this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. If it's not a "general purpose" function, you could try to hide it a bit from the user - maybe name it _to_iterable. Not crucial, though.

@@ -6,6 +6,17 @@

from .secrets import aws_credentials


def toiter(obj):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would change the name to to_iterable - you are not returning an iterator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I am!

AWS_BATCH_SIZE = 10

resps = []
for i in range(0, len(tasks), AWS_BATCH_SIZE):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterables are not guaranteed to have len (generators, for example)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true and a headache. I should call it "tolist".

def check_monkey_patch_status(self):
import gevent.monkey
if not gevent.monkey.is_module_patched("socket"):
print(yellow("""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yellow is not defined

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

@william-silversmith
Copy link
Contributor Author

william-silversmith commented Feb 17, 2019

@nkemnitz latest changes include allowing the use of generators instead of lists in insert_all for both GreenTaskQueue and TaskQueue. Additionally, we do not monkey patch automatically. Instead, if the socket standard library is not monkey patched and you are using GreenTaskQueue, we issue a warning containing instructions on how to monkey patch your application. TaskQueue continues to work.

Generators are necessary for two reasons when the number of tasks being generated becomes large. Firstly, they allow the commencement of upload nearly instantly. Previously, while batching was an option, typically people would create lists of hundreds of thousands or millions of elements before submitting them for upload. This process took many seconds of time to complete, resulting in substantial latency. Secondly, these lists of millions of items would often result in MemoryErrors, requiring more complex batching logic. Now, so long as the user structures their submission logic as a generator, both issues are disposed of completely.

@@ -6,7 +6,7 @@
import pytest

import taskqueue
from taskqueue import RegisteredTask, TaskQueue, MockTask, PrintTask, LocalTaskQueue, MockTaskQueue
from taskqueue import RegisteredTask, GreenTaskQueue, TaskQueue, MockTask, PrintTask, LocalTaskQueue, MockTaskQueue
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No monkey patching here, so GreenTaskQueue will simply behave as a single threaded TaskQueue. monkey patching was disrupting LocalTaskQueue.

@william-silversmith william-silversmith force-pushed the wms_green_threads branch 3 times, most recently from df58d58 to eb688c7 Compare February 18, 2019 04:09
@william-silversmith william-silversmith force-pushed the wms_green_threads branch 3 times, most recently from 8384f8a to e06192c Compare February 18, 2019 04:26
@william-silversmith william-silversmith merged commit d88fcdf into master Feb 19, 2019
@william-silversmith william-silversmith deleted the wms_green_threads branch February 19, 2019 01:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants