Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poll jobs by comparing both priority and insert_time #344

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

my8100
Copy link
Collaborator

@my8100 my8100 commented Jun 25, 2019

This PR use project_priority_map to store (priority, -timestamp) as value,
in order to find out the queue to pop.

Fix #187 (the updated test_poll_next() demonstrates the effect).
Also, provide backward compatibility for custom SqliteSpiderQueue
and JsonSqlitePriorityQueue.

@codecov
Copy link

codecov bot commented Jun 25, 2019

Codecov Report

Merging #344 into master will decrease coverage by 0.02%.
The diff coverage is 67.64%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #344      +/-   ##
==========================================
- Coverage   68.37%   68.35%   -0.03%     
==========================================
  Files          17       17              
  Lines         860      891      +31     
  Branches      104      112       +8     
==========================================
+ Hits          588      609      +21     
- Misses        242      250       +8     
- Partials       30       32       +2
Impacted Files Coverage Δ
scrapyd/sqlite.py 89.28% <100%> (+0.39%) ⬆️
scrapyd/spiderqueue.py 95.65% <100%> (+0.41%) ⬆️
scrapyd/poller.py 74.07% <60.71%> (-12.14%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 3ff7c1c...f017268. Read the comment docs.

@codecov
Copy link

codecov bot commented Jun 25, 2019

Codecov Report

Merging #344 into master will increase coverage by 0.26%.
The diff coverage is 73.33%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #344      +/-   ##
==========================================
+ Coverage   68.37%   68.63%   +0.26%     
==========================================
  Files          17       17              
  Lines         860      915      +55     
  Branches      104      117      +13     
==========================================
+ Hits          588      628      +40     
- Misses        242      252      +10     
- Partials       30       35       +5
Impacted Files Coverage Δ
scrapyd/spiderqueue.py 96% <100%> (+0.76%) ⬆️
scrapyd/poller.py 75% <62.96%> (-11.21%) ⬇️
scrapyd/sqlite.py 86.76% <79.31%> (-2.13%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 4520c45...1a0cb2b. Read the comment docs.

@my8100
Copy link
Collaborator Author

my8100 commented Jun 26, 2019

@Digenis
How do you think about this?

@Digenis
Copy link
Member

Digenis commented Jun 26, 2019

Your solution implements the second idea in #187.

Your code reveals that the second idea has another drawback.
A for-loop in the poller, querying all queues,
if more processes write/read queues,
will cause race conditions to occur more often.

@Digenis
Copy link
Member

Digenis commented Jun 26, 2019

The compatibility attribute trick was a nice idea by the way

@my8100
Copy link
Collaborator Author

my8100 commented Jun 26, 2019

But the original code also iterates the queues,
and if msg is not None: can handle concurrency problem,
so can the new code.

for p, q in iteritems(self.queues):
c = yield maybeDeferred(q.count)
if c:
msg = yield maybeDeferred(q.pop)
if msg is not None: # In case of a concurrently accessed queue
returnValue(self.dq.put(self._message(msg, p)))

@my8100
Copy link
Collaborator Author

my8100 commented Jun 26, 2019

FYI: #198

@Digenis
Copy link
Member

Digenis commented Jun 26, 2019

But the original code also iterates the queues,

It was stopping on the first non-empty queue.

Unless I misunderstand returnValue(), maybe I do.
Here's what I think it does.
msg = yield maybeDeferred(q.pop) gives control to twisted to call q.pop.
Then twisted sends the result back to the function, which stores it to msg resuming execution.
yield/send, although used here only for giving/receiving control of the execution,
actually make the function a generator, so we can't use a return statement for our result.
As a workaround, twisted has returnValue() which raises an exception to stop there.
Does execution continue after returnValue() is called?

@my8100
Copy link
Collaborator Author

my8100 commented Jun 26, 2019

poll() ends when returnValue() is called.

Can you explain ‘race conditions‘ in details, maybe with an example?

@my8100
Copy link
Collaborator Author

my8100 commented Jun 26, 2019

How about the third commit?
I save the highest priority of each queue in the queue_priority_map,
and get it updated whenever needed,
so that poll() can figure out the highest priority of all queues instantly.

@Digenis
Copy link
Member

Digenis commented Jun 26, 2019

A multiple queue consumer scenario.
Suppose a slow consumer iterates all the queues
and collects the top job of each queue.
Meanwhile, a fast consumer does the same
and also pops the job with the highest priority among all queues.
The slow consumer will then get an empty result when trying to pop the same job
and will start over.
This things are more likely to happen
if the process of finding the top job of all queues takes more time / cpu circles.

@Digenis
Copy link
Member

Digenis commented Jun 26, 2019

Also, I just realized that the FIFO principal is still violated.

A limit of 2 parallel processes.
2 empty fresh queues.

On hour 13:00 projectA gets 5 jobs scheduled, with queueA ids (in sqlite) 1,2,3,4,5

By hour 13:05, projectA's job with queueA id 1 has finished and 2, 3 are running
while projectB gets 5 jobs scheduled, with queueB id 1,2,3,4,5

By hour 13:10, projectA's job with queueA id 2 finishes
and projectB job with queueB id 1 is starting.

projectA job with queue id 3, although scheduled at 13:00,
will have to wait for projectB queue id 1 and 2,
which were scheduled 10 minutes later

Of course this is not exactly the current behaviour of your branch
but may have been your next commit ­— selecting & sorting ids too.
Currently, the priority scope is global
but same priorities revert back to the old behaviour.

The in-queue id defines a priority too — the insertion order.
However, we can't compare insertion orders of different queues.
If we did, we would also end up with "arbitrary" order.
(Not exactly arbitrary but I won't open a parentheses
because it's not a useful scheduling policy in this context)

@Digenis
Copy link
Member

Digenis commented Jun 26, 2019

So, how do we solve this while keeping the multiple queue approach?
To make things easy, suppose all queues were different tables in the same db.

SELECT 'projectA', id FROM queueA
UNION ALL
SELECT 'projectB', id FROM queueB
ORDER BY priority DESC -- then what column?
-- the information about the global insertion order was never stored
LIMIT 1;

At this point, unless you have a business-critical non-upgradeable legacy system
you probably give up any attempt to do a backwards compatible fix
that'd go in 1.3.
It's dirty fixes all the way from here:

In the poller, an equivalent of ORDER BY RANDOM(), priority DESC
(which I just decided to name random-robin priority queues).

In all queues, a new datatime column, saving the insertion time,
then ORDER BY priority DESC, insert_time ASC

A master queue saving a global insertion order

CREATE TABLE queue (id INTEGER PRIMARY KEY,
                    project TEXT NOT NULL,
                    foreign_id INTEGER NOT NULL);

which is close to the unified queue idea,
only more complicated implementing all the rest simultaneously.

Of all the desperate attempts to keep backwards compatibility,
the one with the insertion time is the most acceptable,
although of debatable compatibility.
It even qualifies as something to test a unified queue against.
Multiple tables + datetime definitely add a probability of error
but FIFO will be violated only in sub-second resolutions
and bothering about this is like bothering about the time it takes
for a scheduling request to cross the network.
I'd still like to see the queue unified,
even in a single-db multiple tables schema.
Let's discuss and test the datetime trick against the single table approach.

@my8100
Copy link
Collaborator Author

my8100 commented Jun 27, 2019

A multiple queue consumer scenario.
Suppose a slow consumer iterates all the queues

So, if cancel.json is called while poll() is iterating all the queues with yield maybeDeferred,
chances are that msg = yield maybeDeferred(q.pop) may return None?

            for p, q in iteritems(self.queues):
                c = yield maybeDeferred(q.count)
                if c:
                    msg = yield maybeDeferred(q.pop)
                    if msg is not None:  # In case of a concurrently accessed queue
                        returnValue(self.dq.put(self._message(msg, p)))

@my8100
Copy link
Collaborator Author

my8100 commented Jun 27, 2019

Also, I just realized that the FIFO principal is still violated.

A limit of 2 parallel processes.
2 empty fresh queues.

On hour 13:00 projectA gets 5 jobs scheduled, with queueA ids (in sqlite) 1,2,3,4,5

By hour 13:05, projectA's job with queueA id 1 has finished and 2, 3 are running
while projectB gets 5 jobs scheduled, with queueB id 1,2,3,4,5

By hour 13:10, projectA's job with queueA id 2 finishes
and projectB job with queueB id 1 is starting.

projectA job with queue id 3, although scheduled at 13:00,
will have to wait for projectB queue id 1 and 2,
which were scheduled 10 minutes later

What are the names of these projects and the priorities of each job?

@my8100
Copy link
Collaborator Author

my8100 commented Jun 27, 2019

Even though for p, q in iteritems(self.queues): would violate FIFO when polling multiple projects,
we can still use the priority parameter to adjust the polling order
as long as we figure out the project with the highest priority of all queues.

Maybe we can/should make it in v1.3.0, which provides backward compatibility.

Check out my fourth commit, which saves the highest priority of each project in project_priority_map,
so that there's no need to iterate all the queues.

@my8100
Copy link
Collaborator Author

my8100 commented Jun 27, 2019

Changes in the fifth commit:

  1. A new column named 'insert_time' is add when creating tables in JsonSqlitePriorityQueue:
    insert_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  2. The table name in SqliteSpiderQueue now defaults to 'spider_queue_insert_time',
    in order to avoid such error when selecting the non-existing 'insert_time' column in the existing db file:
    sqlite3.OperationalError: no such column: insert_time
  3. project_priority_map now stores (priority, -timestamp) as value
    so that the priority is taken first and the FIFO principle is also respected. (see test_poll_next())
    {'project1': (0.0, -1561646348), 'project2': (1.0, -1561646349)}
  4. Backward compatibility is still provided.

@Digenis
Copy link
Member

Digenis commented Jun 27, 2019

Even if it's backwards compatible with user code, it's not with user data in the queue.

If we break compatibility, it better be once and better be worthy.

But I also don't want to let users fix it on their own.
It's annoying because most of their work would be boilerplate stuff.
Let's do something similar to what scrapy did with scrapy.contrib.
We can add a sub-package with these sample fixes
and users would only need to change configuration.
Any idea for a name?

@my8100
Copy link
Collaborator Author

my8100 commented Jun 27, 2019

Even if it's backwards compatible with user code,
it's not with user data in the queue.

But it only ignores the pending jobs on the first startup,
which may not exist at all for most cases.

Any idea for a name?

Simply use the same name ‘contrib’?
(I still don’t know how it works yet)
Or are you asking for a name of the new configuration option?

@my8100
Copy link
Collaborator Author

my8100 commented Jun 27, 2019

What about inserting a new column into the existing table via ALTER TABLE?

@my8100
Copy link
Collaborator Author

my8100 commented Jun 28, 2019

The 6th commit introduces the ensure_insert_time_column method
to add a new column named 'insert_time'
into the table of the existing db files if needed.
Also, fill in the 'insert_time' column for any pending jobs inside the table.

@my8100 my8100 changed the title Fix polling order by comparing priority of all queues Poll jobs by comparing both priority and insert_time Jun 28, 2019
@my8100 my8100 requested a review from Digenis June 29, 2019 07:55
@my8100
Copy link
Collaborator Author

my8100 commented Jun 30, 2019

@Digenis
How about the 6th commit?

@Digenis
Copy link
Member

Digenis commented Jul 1, 2019

I'm a bit busy right now

I think I'll be able to review it in the next 2 days

@@ -144,6 +147,24 @@ def clear(self):
self.conn.commit()
self.update_project_priority_map()

def ensure_insert_time_column(self):
q = "SELECT insert_time FROM %s LIMIT 1" % self.table
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also:

SELECT sql FROM sqlite_master WHERE type='table' AND name='spider_queue';
-- ⇒ CREATE TABLE spider_queue (id integer primary key, priority real key, message blob)

but is it any better?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know about 'sqlite_master' before.
The ensure_insert_time_column method is updated in the 7th commit.

@@ -82,7 +83,7 @@ class JsonSqlitePriorityQueue(object):
"""SQLite priority queue. It relies on SQLite concurrency support for
providing atomic inter-process operations.
"""
queue_priority_map = {}
project_priority_map = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is like "master table" solution I was talking about
actually implemented as a singleton to share state between all instances of the queue class
and save us lot of io and cpu cycles,
right?

Copy link
Collaborator Author

@my8100 my8100 Jul 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fast to find out the queue to pop in poll()
since project_priority_map is a dict like:
{'project1': (0.0, -1561646348), 'project2': (1.0, -1561646349)}

So, there's no need to introduce an actual "master table".

@@ -131,6 +131,12 @@ def clear(self):
self.conn.execute("delete from %s" % self.table)
self.conn.commit()

def get_highest_priority(self):
q = "select priority from %s order by priority desc limit 1" \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but since additional selects are ran for almost all queue method calls
we are not saving cpu cycles or io

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduce SQLite triggers in the 8th commit,
in which project_priority_map would be updated
whenever an INSERT/UPDATE/DELETE occurs.

This is intended to speed up poll() and avoid race conditions,
rather than to save CPU cycles or io.
I think the cost is acceptable.

@Digenis
Copy link
Member

Digenis commented Jul 2, 2019

Comments are to be red in chronological order (not commit order).

I feel reluctant to even make comments.
You do understand that this will most probably only be a workaround, right?
In a contrib/ subpackage, I wouldn't mind merging it even in 1.2
but in 1.3 I'd rather see FIFO queues taking turns in round robin
and fixing the strategy in 1.4
instead of making code with so many workarounds the default in 1.3.

Remove unnecessary getattr statement

Add queue_priority_map to save highest priority

Save the highest priority of each project in project_priority_map

project_priority_map stores (priority, -timestamp) as value

Add ensure_insert_time_column()

Query sqlite_master in ensure_insert_time_column()

Introduce create_triggers()
@codecov-io
Copy link

codecov-io commented Jul 3, 2019

Codecov Report

Merging #344 into master will increase coverage by 0.61%.
The diff coverage is 80.76%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #344      +/-   ##
==========================================
+ Coverage   68.37%   68.98%   +0.61%     
==========================================
  Files          17       17              
  Lines         860      906      +46     
  Branches      104      116      +12     
==========================================
+ Hits          588      625      +37     
- Misses        242      247       +5     
- Partials       30       34       +4
Impacted Files Coverage Δ
scrapyd/sqlite.py 90.55% <100%> (+1.66%) ⬆️
scrapyd/spiderqueue.py 96% <100%> (+0.76%) ⬆️
scrapyd/poller.py 75% <62.96%> (-11.21%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 3ff7c1c...53a619b. Read the comment docs.

@my8100
Copy link
Collaborator Author

my8100 commented Jul 3, 2019

I feel reluctant to even make comments.

If so, how can we go on discussing?

You do understand that this will most probably only be a workaround, right?

No, this PR aims to fix #187 in a creative and effective way:

  1. The priority of a job is taken first and the FIFO principle is also respected for multiple projects,
    and the effect is verified in the new test_poll_next().
  2. There's no delay in poll() as project_priority_map is updated by SQLite triggers.
  3. No change to the current 'multiple queues' implementation.
  4. Backward compatibility for custom modules is provided.
  5. Backward compatibility for user data is provided.
  6. It's easy to remove the workarounds in v1.4.

In a contrib/ subpackage, I wouldn't mind merging it even in 1.2
but in 1.3 I'd rather see FIFO queues taking turns in round robin
and fixing the strategy in 1.4
instead of making code with so many workarounds the default in 1.3.

I think we should fix #187 in v1.3 as the 'priority' parameter is exposed in PR #161,
and merge PR #343 in v1.4 while removing backward compatibility.

@my8100
Copy link
Collaborator Author

my8100 commented Jul 4, 2019

The queue table name is changed from 'spider_queue' to 'spider_queue_with_triggers'.

This PR introduces SQLite triggers which are stored in the database,
Users would encounter a problem if they downgrade Scrapyd from v1.3 to v1.2:
sqlite3.OperationalError: no such function: update_project_priority_map

Renaming the table name ensures upgrade and downgrade work well anytime,
and the pending jobs could be retrieved if needed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Polling order / Scheduled jobs priorities vs queue priorities
3 participants