In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from litequeue import SQLQueue

import sqlite3

In [3]:
sqlite3.sqlite_version

'3.35.5'

In [4]:
TEST_1 = "key_test_1"
TEST_2 = "key_test_2"

In [5]:
for conn_type_kwargs in (
    {"filename_or_conn": sqlite3.connect(":memory:")},
    {"filename_or_conn": ":memory:"},
    {"memory": True},
):
    q = SQLQueue(**conn_type_kwargs)
    assert (
        q.conn.isolation_level is None
    ), f"Isolation level not set properly for connection '{conn_type_kwargs}'"

In [6]:
q = SQLQueue(sqlite3.connect(":memory:"))

q.put("hello")
q.put("world")
q.put("foo")
q.put("bar")

4

In [7]:
q.pop()

{'message': 'hello',
 'message_id': '2e65052f1fa6e9b1155c3aa40d5ddab8',
 'status': 1,
 'in_time': 1628333805,
 'lock_time': 1628333805,
 'done_time': None}

In [8]:
print(q)

SQLQueue(Connection=<sqlite3.Connection object at 0x7fb9e34bde30>, items=[{'done_time': None,
  'in_time': 1628333805,
  'lock_time': 1628333805,
  'message': 'hello',
  'message_id': '2e65052f1fa6e9b1155c3aa40d5ddab8',
  'status': 1},
 {'done_time': None,
  'in_time': 1628333805,
  'lock_time': None,
  'message': 'world',
  'message_id': 'b41e4afeedbe2839786a86e057a93633',
  'status': 0},
 {'done_time': None,
  'in_time': 1628333805,
  'lock_time': None,
  'message': 'foo',
  'message_id': '4b1c3d584bbfb43b1787cf48da491f5b',
  'status': 0},
 {'done_time': None,
  'in_time': 1628333805,
  'lock_time': None,
  'message': 'bar',
  'message_id': '544c25cf44ff8929ca1b0fca7d822f38',
  'status': 0}])


In [9]:
# pop remaining
for _ in range(3):
    q.pop()


assert q.pop() is None

In [10]:
print(q)

SQLQueue(Connection=<sqlite3.Connection object at 0x7fb9e34bde30>, items=[{'done_time': None,
  'in_time': 1628333805,
  'lock_time': 1628333805,
  'message': 'hello',
  'message_id': '2e65052f1fa6e9b1155c3aa40d5ddab8',
  'status': 1},
 {'done_time': None,
  'in_time': 1628333805,
  'lock_time': 1628333806,
  'message': 'world',
  'message_id': 'b41e4afeedbe2839786a86e057a93633',
  'status': 1},
 {'done_time': None,
  'in_time': 1628333805,
  'lock_time': 1628333806,
  'message': 'foo',
  'message_id': '4b1c3d584bbfb43b1787cf48da491f5b',
  'status': 1},
 {'done_time': None,
  'in_time': 1628333805,
  'lock_time': 1628333806,
  'message': 'bar',
  'message_id': '544c25cf44ff8929ca1b0fca7d822f38',
  'status': 1}])


In [11]:
q.put("hello")
q.put("world")
q.put("foo")
q.put("bar")

8

In [12]:
task = q.pop()

assert task["message"] == "hello"

In [13]:
q.peek()

{'message': 'world',
 'message_id': 'd4bbefebcd92a0f091eace2b93f9b906',
 'status': 0,
 'in_time': 1628333807,
 'lock_time': None,
 'done_time': None}

In [14]:
# next one that is free
assert q.peek()["message"] == "world"

# status = 0 = free
assert q.peek()["status"] == 0

In [15]:
task["message"], task["message_id"]

('hello', '8ac2b6e55eb06c906c37863d9433c9d3')

In [16]:
q.done(task["message_id"])

8

In [17]:
q.get(task["message_id"])

{'message': 'hello',
 'message_id': '8ac2b6e55eb06c906c37863d9433c9d3',
 'status': 2,
 'in_time': 1628333807,
 'lock_time': 1628333807,
 'done_time': 1628333811}

In [18]:
already_done = q.get(task["message_id"])

# stauts = 2 = done
assert already_done["status"] == 2

in_time = already_done["in_time"]
lock_time = already_done["lock_time"]
done_time = already_done["done_time"]

assert done_time >= lock_time >= in_time
print(
    f"Task {already_done['message_id']} took {done_time - lock_time} seconds to get done and was in the queue for {done_time - in_time} seconds"
)

Task 8ac2b6e55eb06c906c37863d9433c9d3 took 4 seconds to get done and was in the queue for 4 seconds


In [19]:
print(q)

SQLQueue(Connection=<sqlite3.Connection object at 0x7fb9e34bde30>, items=[{'done_time': None,
  'in_time': 1628333805,
  'lock_time': 1628333805,
  'message': 'hello',
  'message_id': '2e65052f1fa6e9b1155c3aa40d5ddab8',
  'status': 1},
 {'done_time': None,
  'in_time': 1628333805,
  'lock_time': 1628333806,
  'message': 'world',
  'message_id': 'b41e4afeedbe2839786a86e057a93633',
  'status': 1},
 {'done_time': None,
  'in_time': 1628333805,
  'lock_time': 1628333806,
  'message': 'foo',
  'message_id': '4b1c3d584bbfb43b1787cf48da491f5b',
  'status': 1},
 {'done_time': None,
  'in_time': 1628333805,
  'lock_time': 1628333806,
  'message': 'bar',
  'message_id': '544c25cf44ff8929ca1b0fca7d822f38',
  'status': 1},
 {'done_time': 1628333811,
  'in_time': 1628333807,
  'lock_time': 1628333807,
  'message': 'hello',
  'message_id': '8ac2b6e55eb06c906c37863d9433c9d3',
  'status': 2},
 {'done_time': None,
  'in_time': 1628333807,
  'lock_time': None,
  'message': 'world',
  'message_id': 'd4bb

In [20]:
assert q.qsize() == 7

In [21]:
next_one_msg = q.peek()["message"]
next_one_id = q.peek()["message_id"]

task = q.pop()

assert task["message"] == next_one_msg
assert task["message_id"] == next_one_id

In [22]:
print(q)

SQLQueue(Connection=<sqlite3.Connection object at 0x7fb9e34bde30>, items=[{'done_time': None,
  'in_time': 1628333805,
  'lock_time': 1628333805,
  'message': 'hello',
  'message_id': '2e65052f1fa6e9b1155c3aa40d5ddab8',
  'status': 1},
 {'done_time': None,
  'in_time': 1628333805,
  'lock_time': 1628333806,
  'message': 'world',
  'message_id': 'b41e4afeedbe2839786a86e057a93633',
  'status': 1},
 {'done_time': None,
  'in_time': 1628333805,
  'lock_time': 1628333806,
  'message': 'foo',
  'message_id': '4b1c3d584bbfb43b1787cf48da491f5b',
  'status': 1},
 {'done_time': None,
  'in_time': 1628333805,
  'lock_time': 1628333806,
  'message': 'bar',
  'message_id': '544c25cf44ff8929ca1b0fca7d822f38',
  'status': 1},
 {'done_time': 1628333811,
  'in_time': 1628333807,
  'lock_time': 1628333807,
  'message': 'hello',
  'message_id': '8ac2b6e55eb06c906c37863d9433c9d3',
  'status': 2},
 {'done_time': None,
  'in_time': 1628333807,
  'lock_time': 1628333812,
  'message': 'world',
  'message_id':

In [23]:
q.prune()

In [24]:
print(q)

SQLQueue(Connection=<sqlite3.Connection object at 0x7fb9e34bde30>, items=[{'done_time': None,
  'in_time': 1628333805,
  'lock_time': 1628333805,
  'message': 'hello',
  'message_id': '2e65052f1fa6e9b1155c3aa40d5ddab8',
  'status': 1},
 {'done_time': None,
  'in_time': 1628333805,
  'lock_time': 1628333806,
  'message': 'world',
  'message_id': 'b41e4afeedbe2839786a86e057a93633',
  'status': 1},
 {'done_time': None,
  'in_time': 1628333805,
  'lock_time': 1628333806,
  'message': 'foo',
  'message_id': '4b1c3d584bbfb43b1787cf48da491f5b',
  'status': 1},
 {'done_time': None,
  'in_time': 1628333805,
  'lock_time': 1628333806,
  'message': 'bar',
  'message_id': '544c25cf44ff8929ca1b0fca7d822f38',
  'status': 1},
 {'done_time': None,
  'in_time': 1628333807,
  'lock_time': 1628333812,
  'message': 'world',
  'message_id': 'd4bbefebcd92a0f091eace2b93f9b906',
  'status': 1},
 {'done_time': None,
  'in_time': 1628333807,
  'lock_time': None,
  'message': 'foo',
  'message_id': '3c9c699be7b9

In [25]:
from string import ascii_lowercase, printable
from random import choice


def random_string(string_length=10, fuzz=False, space=False):
    """Generate a random string of fixed length """
    letters = ascii_lowercase
    letters = letters + " " if space else letters
    if fuzz:
        letters = printable
    return "".join(choice(letters) for i in range(string_length))

In [26]:
q = SQLQueue(":memory:", maxsize=50)

In [27]:
for i in range(50):

    q.put(random_string(20))

In [28]:
assert q.qsize() == 50

Make sure an error is raised when the queue has reached its size limit

In [29]:
import sqlite3

try:
    q.put(random_string(20))
except sqlite3.IntegrityError:  # max len reached
    assert q.full() == True
    print("test pass")

test pass


In [30]:
q.pop()

{'message': 'rziaqccoasoogmvamksl',
 'message_id': 'fe3b7b3059a454e1ffccd3fb9a30731d',
 'status': 1,
 'in_time': 1628333814,
 'lock_time': 1628333815,
 'done_time': None}

In [31]:
assert q.full() == False

In [32]:
q.put("hello")

51

In [33]:
q.empty()

False

In [34]:
assert q.empty() == False

q2 = SQLQueue(":memory:")

assert q2.empty() == True

**Random benchmarks**

Inserting 100 items in the queue. Since it will run many times, the queue will end up having a lot more than 100 items

In [35]:
import gc

In-memory SQL queue

In [36]:
q = SQLQueue(":memory:", maxsize=None)

In [37]:
gc.collect()

261

In [38]:
%%timeit -n10000 -r7

q.put(random_string(20))

36.8 µs ± 2.16 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)


In [39]:
q.qsize()

70000

Standard python queue.

In [40]:
from queue import Queue

In [41]:
q = Queue()

In [42]:
gc.collect()

116

In [43]:
%%timeit -n10000 -r7

q.put(random_string(20))

25.1 µs ± 3.27 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)


Persistent SQL queue

In [44]:
q = SQLQueue("test.queue", maxsize=None)

In [45]:
gc.collect()

69

In [46]:
%%timeit -n10000 -r7

q.put(random_string(20))

219 µs ± 19.2 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)


In [47]:
!du -sh test.queue*

9.1M	test.queue
32K	test.queue-shm
5.0M	test.queue-wal


In [48]:
!rm test.queue*

In [49]:
assert q.conn.isolation_level is None

Creating and removing tasks

In [50]:
q = Queue()

In [51]:
gc.collect()

138

In [52]:
%%timeit -n10000 -r7

tid = random_string(20)

q.put(tid)

q.get()

q.task_done()

37.1 µs ± 16.7 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)


In [53]:
q = SQLQueue(":memory:", maxsize=None)

In [54]:
gc.collect()

69

In [55]:
%%timeit -n10000 -r7

tid = random_string(20)

q.put(tid)

task = q.pop()

q.done(task["message_id"])

96.2 µs ± 7.89 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)


**RETURNING vs. TRANSACTION**

In [1]:
from litequeue import SQLQueue
import gc

from string import ascii_lowercase, printable
from random import choice


def random_string(string_length=10, fuzz=False, space=False):
    """Generate a random string of fixed length"""
    letters = ascii_lowercase
    letters = letters + " " if space else letters
    if fuzz:
        letters = printable
    return "".join(choice(letters) for i in range(string_length))

RETURNING

In [2]:
q = SQLQueue("pop_bench.db", maxsize=None)

q.pop = q._pop_returning

gc.collect()

57

In [3]:
for _ in range(10000):
    tid = random_string(60)

    q.put(tid)

In [4]:
%%time


for _ in range(8000):
    task = q.pop()

CPU times: user 557 ms, sys: 712 ms, total: 1.27 s
Wall time: 2.15 s


In [5]:
!rm pop_bench.db*

In [6]:
q = SQLQueue("pop_bench.db", maxsize=None)

q.pop = q._pop_transaction

gc.collect()

48

In [7]:
for _ in range(10000):
    tid = random_string(60)

    q.put(tid)

In [None]:
%%time


for _ in range(8000):
    task = q.pop()

In [None]:
!rm pop_bench.db*