Skip to content

Commit

Permalink
Poll jobs by comparing both priority and insert_time
Browse files Browse the repository at this point in the history
Remove unnecessary getattr statement

Add queue_priority_map to save highest priority

Save the highest priority of each project in project_priority_map

project_priority_map stores (priority, -timestamp) as value

Add ensure_insert_time_column()

Query sqlite_master in ensure_insert_time_column()

Introduce create_triggers()
  • Loading branch information
my8100 committed Jul 3, 2019
1 parent 3ff7c1c commit 1a0cb2b
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 20 deletions.
37 changes: 33 additions & 4 deletions scrapyd/poller.py
Expand Up @@ -12,17 +12,35 @@ def __init__(self, config):
self.config = config
self.update_projects()
self.dq = DeferredQueue()
# For backward compatibility with custom SqliteSpiderQueue and JsonSqlitePriorityQueue
# TODO: remove it and add method get_project_with_highest_priority in ISpiderQueue in 1.4
self.support_comparing_priorities = None

@inlineCallbacks
def poll(self):
if not self.dq.waiting:
return
for p, q in iteritems(self.queues):
c = yield maybeDeferred(q.count)
if c:

if self.support_comparing_priorities is None:
self.test_comparing_priorities()

project_with_highest_priority = None
if self.support_comparing_priorities:
for p, q in iteritems(self.queues):
project_with_highest_priority = q.get_project_with_highest_priority()
break
if project_with_highest_priority:
q = self.queues[project_with_highest_priority]
msg = yield maybeDeferred(q.pop)
if msg is not None: # In case of a concurrently accessed queue
returnValue(self.dq.put(self._message(msg, p)))
returnValue(self.dq.put(self._message(msg, project_with_highest_priority)))
if not self.support_comparing_priorities or not project_with_highest_priority:
for p, q in iteritems(self.queues):
c = yield maybeDeferred(q.count)
if c:
msg = yield maybeDeferred(q.pop)
if msg is not None: # In case of a concurrently accessed queue
returnValue(self.dq.put(self._message(msg, p)))

def next(self):
return self.dq.get()
Expand All @@ -35,3 +53,14 @@ def _message(self, queue_msg, project):
d['_project'] = project
d['_spider'] = d.pop('name')
return d

def test_comparing_priorities(self):
for p, q in iteritems(self.queues):
try:
getattr(q, 'get_project_with_highest_priority')
getattr(q.q, 'project_priority_map')
except AttributeError:
self.support_comparing_priorities = False
else:
self.support_comparing_priorities = True
return
7 changes: 7 additions & 0 deletions scrapyd/spiderqueue.py
Expand Up @@ -29,3 +29,10 @@ def remove(self, func):

def clear(self):
self.q.clear()

def get_project_with_highest_priority(self):
if self.q.project_priority_map:
return sorted(self.q.project_priority_map,
key=lambda x: self.q.project_priority_map[x], reverse=True)[0]
else:
return None
44 changes: 42 additions & 2 deletions scrapyd/sqlite.py
@@ -1,5 +1,6 @@
import sqlite3
import json
import os
try:
from collections.abc import MutableMapping
except ImportError:
Expand Down Expand Up @@ -82,19 +83,29 @@ class JsonSqlitePriorityQueue(object):
"""SQLite priority queue. It relies on SQLite concurrency support for
providing atomic inter-process operations.
"""
project_priority_map = {}

def __init__(self, database=None, table="queue"):
self.database = database or ':memory:'
self.table = table
if database:
dbname = os.path.split(database)[-1]
self.project = os.path.splitext(dbname)[0]
else:
self.project = self.database
# about check_same_thread: http://twistedmatrix.com/trac/ticket/4040
self.conn = sqlite3.connect(self.database, check_same_thread=False)
q = "create table if not exists %s (id integer primary key, " \
"priority real key, message blob)" % table
"priority real key, message blob, insert_time TIMESTAMP)" % table
self.conn.execute(q)
self.ensure_insert_time_column() # Backward compatibility for scrapyd<1.3.0
self.create_triggers()
self.update_project_priority_map()

def put(self, message, priority=0.0):
args = (priority, self.encode(message))
q = "insert into %s (priority, message) values (?,?)" % self.table
q = "insert into %s (priority, message, insert_time) values (?,?, CURRENT_TIMESTAMP)" \
% self.table
self.conn.execute(q, args)
self.conn.commit()

Expand Down Expand Up @@ -131,6 +142,35 @@ def clear(self):
self.conn.execute("delete from %s" % self.table)
self.conn.commit()

def ensure_insert_time_column(self):
q = "SELECT sql FROM sqlite_master WHERE type='table' AND name='%s'" % self.table
if 'insert_time TIMESTAMP' not in self.conn.execute(q).fetchone()[0]:
q = "ALTER TABLE %s ADD COLUMN insert_time TIMESTAMP" % self.table
self.conn.execute(q)
q = "UPDATE %s SET insert_time=CURRENT_TIMESTAMP" % self.table
self.conn.execute(q)
self.conn.commit()

def create_triggers(self):
self.conn.create_function("update_project_priority_map", 0, self.update_project_priority_map)
for action in ['INSERT', 'UPDATE', 'DELETE']:
name = 'trigger_on_%s' % action.lower()
self.conn.execute("""
CREATE TRIGGER IF NOT EXISTS %s AFTER %s ON %s
BEGIN
SELECT update_project_priority_map();
END;
""" % (name, action, self.table))

def update_project_priority_map(self):
q = "select priority, strftime('%%s', insert_time) from %s order by priority desc limit 1" \
% self.table
result = self.conn.execute(q).fetchone()
if result is None:
self.project_priority_map.pop(self.project, None)
else:
self.project_priority_map[self.project] = (result[0], -int(result[-1]))

def __len__(self):
q = "select count(*) from %s" % self.table
return self.conn.execute(q).fetchone()[0]
Expand Down
39 changes: 25 additions & 14 deletions scrapyd/tests/test_poller.py
@@ -1,4 +1,5 @@
import os
import time

from twisted.trial import unittest
from twisted.internet.defer import Deferred
Expand Down Expand Up @@ -28,30 +29,40 @@ def test_interface(self):
verifyObject(IPoller, self.poller)

def test_poll_next(self):
cfg = {'mybot1': 'spider1',
'mybot2': 'spider2'}
priority = 0
for prj, spd in cfg.items():
cfg = [('mybot2', 'spider2', 0), # second
('mybot1', 'spider2', 0.0), # third
('mybot1', 'spider1', -1), # fourth
('mybot1', 'spider3', 1.0)] # first
for prj, spd, priority in cfg:
self.queues[prj].add(spd, priority)
if prj == 'mybot2':
time.sleep(1.5) # ensure different timestamp

d1 = self.poller.next()
d2 = self.poller.next()
d3 = self.poller.next()
d4 = self.poller.next()
d5 = self.poller.next()
self.failUnless(isinstance(d1, Deferred))
self.failIf(hasattr(d1, 'result'))

# poll once
# first poll
self.poller.poll()
self.failUnless(hasattr(d1, 'result') and getattr(d1, 'called', False))
self.assertEqual(d1.result, {'_project': 'mybot1', '_spider': 'spider3'})

# which project got run: project1 or project2?
self.failUnless(d1.result.get('_project'))
prj = d1.result['_project']
self.failUnlessEqual(d1.result['_spider'], cfg.pop(prj))
# second poll
self.poller.poll()
self.assertEqual(d2.result, {'_project': 'mybot2', '_spider': 'spider2'})

# third poll
self.poller.poll()
self.assertEqual(d3.result, {'_project': 'mybot1', '_spider': 'spider2'})

self.queues[prj].pop()
# fourth poll
self.poller.poll()
self.assertEqual(d4.result, {'_project': 'mybot1', '_spider': 'spider1'})

# poll twice
# check that the other project's spider got to run
# final poll
self.poller.poll()
prj, spd = cfg.popitem()
self.failUnlessEqual(d2.result, {'_project': prj, '_spider': spd})
self.failIf(hasattr(d5, 'result'))

0 comments on commit 1a0cb2b

Please sign in to comment.