Skip to content

Commit

Permalink
Combine channels into single archive, match patterns when consumed
Browse files Browse the repository at this point in the history
This fixes most of the performance issues discovered with #bc09255.
Instead of storing messages in a key per-channel (which requires slow
unioning and re-sorting during every call to consume), I discovered
that archiving them together and matching the channel pattern message-
by-message was significantly faster. Consumers recovering from failure
can actually catch up to pub/sub now, bringing sustainable performance
back to around 35k/sec.

Along with storing messages in a single archive, it made sense to
split this archive up over multiple buckets over time to allow for
easier expiration and migration. Currently each archive bucket holds
100,000 messages. This might be adjusted in the future as I'm able to
start using remq in production.
  • Loading branch information
kainosnoema committed Feb 16, 2013
1 parent bc09255 commit 9388ed0
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 52 deletions.
23 changes: 12 additions & 11 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@

Remq (pronounced 'rem-que') is two things: (1) A [Redis](http://redis.io)-based
protocol defined by a collection of Lua scripts (this project) which effectively
turn Redis into a capable message queue broker for fast, reliable inter-service
turn Redis into a durable message queue broker for fast, reliable inter-service
communication. (2) Multiple client libraries using these scripts for building
fast, persisted pub/sub message queues.

- Producers publish any string to a message channel and receive a unique message-id
- Consumers subscribe to message channels via polling with a cursor (allowing resume), or via Redis pub/sub
- Consumers can subscribe to multible channels using Redis key globbing (ie. `'events.*'`)
- Able to sustain ~10k messages/sec (bursts to ~30k/sec) on loopback interface (1 producer -> 1 consumer)
- Consistent performance if Redis has enough memory (tested up to ~15m messages, 3GB in memory)
- Channels may be flushed of old messages periodically to maintain performance

**WARNING**: In early-stage development, API not locked. If you've used a previous
- Consumers subscribe to message channels via vanilla Redis pub/sub for instant delivery
- Subscribe to multiple channels using Redis key globbing (ie. `'events.*'`)
- Replay archived messages from a given message-id for failure recovery
- Able to sustain ~35k messages/sec on loopback interface (1 producer -> 1 consumer)
- Consistent performance up to system memory limit (tested to ~25m messages in 4GB memory)
- Channels may be flushed of old messages periodically to reduce memory footprint

**WARNING**: In early-stage development, API not stable. If you've used a previous
version of these scripts, you'll most likely have to clear all previously
published messages in order to upgrade to the latest version.

Expand All @@ -33,7 +34,7 @@ Raw Redis syntax:
**Producer:**
``` sh
redis> EVAL <publish.lua> 0 <channel> <message>
# returns a unique message id
# returns a unique message-id
```

**Consumer:**
Expand All @@ -47,8 +48,8 @@ redis> EVAL <consume.lua> 0 <pattern> <cursor> <limit>
# messages are returned in the format "<channel>@<id>\n<message>"
```

**Purge:**
**Flush:**
``` sh
redis> EVAL <flush.lua> 0 <pattern> [BEFORE <id> (or) KEEP <count>]
redis> EVAL <flush.lua> 0 <pattern> <before-message-id>
# returns the count of messages flushed
```
48 changes: 24 additions & 24 deletions scripts/consume.lua
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
local pattern, cursor, limit = ARGV[1], ARGV[2], ARGV[3]
local pattern_key = 'remq:channel:' .. pattern

limit = math.min(limit or 1000, 1000)
-- convert Redis-style globbing to a Lua Pattern
pattern = '^' .. pattern:gsub('%.', '%%.'):gsub('%*', '.*') .. '@\%d+\n'

-- for results from multiple channels, we'll merge them into a single set
-- zunionstore is not optimal here since we only need a subset of matching sets
local union_key = pattern_key .. '@' .. (redis.call('get', 'remq:id') or 0)
local channel_keys = redis.call('keys', pattern_key)
for i=1,#channel_keys do
local key = channel_keys[i]
local channel = key:gsub('remq:channel:', '')
local msgs_ids = redis.call(
'zrangebyscore', key, '(' .. cursor, '+inf', 'WITHSCORES', 'LIMIT', 0, limit
cursor = math.max(cursor or 0, 0)
limit = math.min(math.max(limit or 1000, 0), 1000)

local matched, per_loop, per_bucket = {}, limit, 100000
while true do
local bucket = math.floor(cursor / per_bucket) * per_bucket
local unfiltered = redis.call(
'zrangebyscore', 'remq:archive:' .. bucket,
'(' .. cursor, '+inf', 'LIMIT', 0, per_loop
)
for i=1,#msgs_ids do
if i % 2 == 0 then
-- add a header in the format: "<channel>@<id>\n<message>"
local msg = channel .. '@' .. msgs_ids[i] .. '\n' .. msgs_ids[i - 1]
redis.call('zadd', union_key, msgs_ids[i], msg)
end
end
end

local msgs = redis.call(
'zrangebyscore', union_key, '(' .. cursor, '+inf', 'LIMIT', 0, limit
)
if #unfiltered == 0 then
return matched -- end of the timeline
end

redis.call('del', union_key) -- remove the union key
for i=1, #unfiltered do
if unfiltered[i]:match(pattern) then
matched[#matched + 1] = unfiltered[i]
if #matched == limit then
return matched -- reached the limit
end
end
end

return msgs
cursor = cursor + per_loop
end
41 changes: 29 additions & 12 deletions scripts/flush.lua
Original file line number Diff line number Diff line change
@@ -1,16 +1,33 @@
local pattern, cmd, value = ARGV[1], ARGV[2], ARGV[3]
local pattern_key = 'remq:channel:' .. pattern
local pattern, cursor = ARGV[1], ARGV[2]

local channel_keys = redis.call('keys', pattern_key)
-- convert Redis-style globbing to a Lua Pattern
pattern = '^' .. pattern:gsub('%.', '%%.'):gsub('%*', '.*') .. '@\%d+\n'

local count = 0
for i=1,#channel_keys do
local key = channel_keys[i]
if cmd == 'BEFORE' then
count = count + redis.call('zremrangebyscore', key, '-inf', '(' .. value)
elseif cmd == 'KEEP' then
count = count + redis.call('zremrangebyrank', key, 0, 0 - (value - 1))
local flushed, per_loop, per_bucket = 0, 1000, 100000
while true do
local bucket = math.floor((cursor - 1) / per_bucket) * per_bucket
local prev_cursor = 0 + cursor - math.min(cursor - bucket, per_loop)

local unfiltered = redis.call(
'zrangebyscore', 'remq:archive:' .. bucket,
prev_cursor, '(' .. cursor
)

if #unfiltered == 0 then
return flushed -- end of the timeline
end

local matched = {}
for i=1, #unfiltered do
if unfiltered[i]:match(pattern) then
matched[#matched + 1] = unfiltered[i]
end
end

if #matched > 0 then
redis.call('zrem', 'remq:archive:' .. bucket, unpack(matched))
flushed = flushed + #matched
end
end

return count
cursor = prev_cursor
end
13 changes: 8 additions & 5 deletions scripts/publish.lua
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
local channel, msg = ARGV[1], ARGV[2]
local channel_key = 'remq:channel:' .. channel

-- ids are an incrementing double precision integer
local id = redis.call('incr', 'remq:id')
local id, per_bucket = redis.call('incr', 'remq:message-id'), 100000

redis.call('zadd', channel_key, id, msg) -- add to channel
-- prefix message with header in the format: "<channel>@<id>\n<message>"
msg = channel .. '@' .. id .. '\n' .. msg

-- publish using pub/sub with header in the format: "<channel>@<id>\n<message>"
redis.call('publish', channel_key, channel .. '@' .. id .. '\n' .. msg)
-- split into buckets every 100,000 to allow 4x10^14 (400 trillion) messages
local bucket = 'remq:archive:' .. math.floor(id / per_bucket) * per_bucket

redis.call('zadd', bucket, id, msg) -- add to bucket
redis.call('publish', 'remq:channel:' .. channel, msg) -- publish to pub/sub

return id

0 comments on commit 9388ed0

Please sign in to comment.