Skip to content

Commit

Permalink
Stream commands for Redis 5 (#142)
Browse files Browse the repository at this point in the history
* First step in implementing streams

* Correct key name

* Id in time requires milliseconds

* Create Streams class

* Ensure data persists by adding a member

* Sequence numbers

* Sequence number with multiple items at the same time

* xlen

* Add ranges

* streams -> stream

* Extract Ids to new class

* Rubocop

* streams -> stream (really this time)

* More xrange

* A little more readable

* Ensure messages are strings rather than symbols

* COUNT option for xrange

* List commands that are not yet supported

* Correctly report some errors in xadd

* Add xrevrange command

* Raise error correctly

* Deal with errors

* Run Travis tests with Redis 5 server

* Use redis-cli from installed Redis

* Extract install of Redis 5 into a separate script

* Remove pry (oops)

* Travis didn't like this having execute permissions

* Run the redis 5 install without requiring execute permissions

* Allow fuzzy match between mock and real redis

* Remove pry

* Remove outdated Ruby version

* Try Ruby version without patch number

* Satisfy Rubocop

* Redis 5 RC4 released

* ... and update the travis file

* Remove WIP (I added this file by accident)

* Use gsub oon content of array if necessary

* Use system supplied version of Redis again

* First step in implementing streams

* Correct key name

* Id in time requires milliseconds

* Create Streams class

* Ensure data persists by adding a member

* Sequence numbers

* Sequence number with multiple items at the same time

* xlen

* Add ranges

* streams -> stream

* Extract Ids to new class

* Rubocop

* streams -> stream (really this time)

* More xrange

* A little more readable

* Ensure messages are strings rather than symbols

* COUNT option for xrange

* List commands that are not yet supported

* Correctly report some errors in xadd

* Add xrevrange command

* Raise error correctly

* Deal with errors

* Run Travis tests with Redis 5 server

* Use redis-cli from installed Redis

* Extract install of Redis 5 into a separate script

* Remove pry (oops)

* Travis didn't like this having execute permissions

* Run the redis 5 install without requiring execute permissions

* Allow fuzzy match between mock and real redis

* Try Ruby version without patch number

* Satisfy Rubocop

* Redis 5 RC4 released

* ... and update the travis file

* Remove WIP (I added this file by accident)

* Use gsub oon content of array if necessary

* Use system supplied version of Redis again

* Fix error with merge
  • Loading branch information
jrmhaig authored and sds committed Nov 21, 2018
1 parent 2d15877 commit 70794df
Show file tree
Hide file tree
Showing 11 changed files with 660 additions and 1 deletion.
2 changes: 2 additions & 0 deletions lib/mock_redis/database.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
require 'mock_redis/info_method'
require 'mock_redis/utility_methods'
require 'mock_redis/geospatial_methods'
require 'mock_redis/stream_methods'

class MockRedis
class Database
Expand All @@ -22,6 +23,7 @@ class Database
include InfoMethod
include UtilityMethods
include GeospatialMethods
include StreamMethods

attr_reader :data, :expire_times

Expand Down
73 changes: 73 additions & 0 deletions lib/mock_redis/stream.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
require 'forwardable'
require 'set'
require 'date'
require 'mock_redis/stream/id'

# TODO: Implement the following commands
#
# * xread
# * xgroup
# * xreadgroup
# * xack
# * xpending
# * xclaim
# * xinfo
# * xtrim
# * xdel
#
# For details of these commands see https://redis.io/topics/streams-intro

class MockRedis
class Stream
include Enumerable
extend Forwardable

attr_accessor :members

def_delegators :members, :empty?

def initialize
@members = Set.new
@last_id = nil
end

def last_id
@last_id.to_s
end

def add(id, values)
@last_id = MockRedis::Stream::Id.new(id, min: @last_id)
members.add [@last_id, values.map(&:to_s)]
@last_id.to_s
end

def range(start, finish, reversed, *opts_in)
opts = options opts_in, ['count']
unless opts['count'].nil? || /^\d*$/.match(opts['count'])
raise Redis::CommandError, 'ERR value is not an integer or out of range'
end
start_id = MockRedis::Stream::Id.new(start)
finish_id = MockRedis::Stream::Id.new(finish, sequence: Float::INFINITY)
items = members
.select { |m| (start_id <= m[0]) && (finish_id >= m[0]) }
.map { |m| [m[0].to_s, m[1]] }
items.reverse! if reversed
return items.first(opts['count'].to_i) if opts.key?('count')
items
end

def each
members.each { |m| yield m }
end

private

def options(opts_in, permitted)
opts_out = {}
raise Redis::CommandError, 'ERR syntax error' unless (opts_in.length % 2).zero?
opts_in.each_slice(2).map { |pair| opts_out[pair[0].downcase] = pair[1] }
raise Redis::CommandError, 'ERR syntax error' unless (opts_out.keys - permitted).empty?
opts_out
end
end
end
53 changes: 53 additions & 0 deletions lib/mock_redis/stream/id.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
class MockRedis
class Stream
class Id
include Comparable

attr_accessor :timestamp, :sequence

def initialize(id, min: nil, sequence: 0)
case id
when '*'
@timestamp = (Time.now.to_f * 1000).to_i
@sequence = 0
if self <= min
@timestamp = min.timestamp
@sequence = min.sequence + 1
end
when '-'
@timestamp = @sequence = 0
when '+'
@timestamp = @sequence = Float::INFINITY
else
if id.is_a? String
(_, @timestamp, @sequence) = id.match(/^(\d+)-?(\d+)?$/)
.to_a
if @timestamp.nil?
raise Redis::CommandError,
'ERR Invalid stream ID specified as stream command argument'
end
@timestamp = @timestamp.to_i
else
@timestamp = id
end
@sequence = @sequence.nil? ? sequence : @sequence.to_i
if self <= min
raise Redis::CommandError,
'ERR The ID specified in XADD is equal or smaller than ' \
'the target stream top item'
end
end
end

def to_s
"#{@timestamp}-#{@sequence}"
end

def <=>(other)
return 1 if other.nil?
return @sequence <=> other.sequence if @timestamp == other.timestamp
@timestamp <=> other.timestamp
end
end
end
end
72 changes: 72 additions & 0 deletions lib/mock_redis/stream_methods.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
require 'mock_redis/assertions'
require 'mock_redis/utility_methods'
require 'mock_redis/stream'

class MockRedis
module StreamMethods
include Assertions
include UtilityMethods

def xadd(key = nil, id = nil, *args)
if args.count == 0
raise Redis::CommandError,
"ERR wrong number of arguments for 'xadd' command"
end
if args.count.odd?
raise Redis::CommandError,
'ERR wrong number of arguments for XADD'
end
with_stream_at(key) do |stream|
stream.add id, args
return stream.last_id
end
end

def xlen(key = nil, *args)
if key.nil? || args.count > 0
raise Redis::CommandError,
"ERR wrong number of arguments for 'xlen' command"
end
with_stream_at(key) do |stream|
return stream.count
end
end

def xrange(key = nil, start = nil, finish = nil, *options)
if finish.nil?
raise Redis::CommandError,
"ERR wrong number of arguments for 'xrange' command"
end
with_stream_at(key) do |stream|
return stream.range(start, finish, false, *options)
end
end

def xrevrange(key = nil, finish = nil, start = nil, *options)
if start.nil?
raise Redis::CommandError,
"ERR wrong number of arguments for 'xrevrange' command"
end
with_stream_at(key) do |stream|
return stream.range(start, finish, true, *options)
end
end

private

def with_stream_at(key, &blk)
with_thing_at(key, :assert_streamy, proc { Stream.new }, &blk)
end

def streamy?(key)
data[key].nil? || data[key].is_a?(Stream)
end

def assert_streamy(key)
unless streamy?(key)
raise Redis::CommandError,
'WRONGTYPE Operation against a key holding the wrong kind of value'
end
end
end
end
1 change: 1 addition & 0 deletions mock_redis.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ Gem::Specification.new do |s|
s.add_development_dependency 'redis', '~>4.0.1'
s.add_development_dependency 'rspec', '~> 3.0'
s.add_development_dependency 'rspec-its', '~> 1.0'
s.add_development_dependency 'timecop', '~> 0.9.1'
end
85 changes: 85 additions & 0 deletions spec/commands/xadd_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
require 'spec_helper'

describe '#xadd(key, id, [field, value, ...])' do
before :all do
sleep 1 - (Time.now.to_f % 1)
@key = 'mock-redis-test:xadd'
end

before :each do
@redises._gsub(/\d{3}-\d/, '...-.')
end

it 'returns an id based on the timestamp' do
t = Time.now.to_i
expect(@redises.xadd(@key, '*', 'key', 'value')).to match(/#{t}\d{3}-0/)
end

it 'adds data with symbols' do
@redises.xadd(@key, '*', :symbol_key, :symbol_value)
expect(@redises.xrange(@key, '-', '+').last[1])
.to eq(%w[symbol_key symbol_value])
end

it 'increments the sequence number with the same timestamp' do
Timecop.freeze do
@redises.xadd(@key, '*', 'key', 'value')
expect(@redises.xadd(@key, '*', 'key', 'value')).to match(/\d+-1/)
end
end

it 'sets the id if it is given' do
expect(@redises.xadd(@key, '1234567891234-2', 'key', 'value'))
.to eq '1234567891234-2'
end

it 'accepts is as an integer' do
expect(@redises.xadd(@key, 1_234_567_891_234, 'key', 'value'))
.to eq '1234567891234-0'
end

it 'sets an id based on the timestamp if the given id is before the last' do
@redises.xadd(@key, '1234567891234-0', 'key', 'value')
expect { @redises.xadd(@key, '1234567891233-0', 'key', 'value') }
.to raise_error(
Redis::CommandError,
'ERR The ID specified in XADD is equal or smaller than the target ' \
'stream top item'
)
end

it 'caters for the current time being before the last time' do
t = (Time.now.to_f * 1000).to_i + 2000
@redises.xadd(@key, "#{t}-0", 'key', 'value')
expect(@redises.xadd(@key, '*', 'key', 'value')).to match(/#{t}-1/)
end

it 'appends a sequence number if it is missing' do
expect(@redises.xadd(@key, '1234567891234', 'key', 'value'))
.to eq '1234567891234-0'
end

it 'raises wrong number of arguments error with missing values' do
expect { @redises.xadd(@key, '*') }
.to raise_error(
Redis::CommandError,
"ERR wrong number of arguments for 'xadd' command"
)
end

it 'raises wrong number of arguments error with odd number of values' do
expect { @redises.xadd(@key, '*', 'key', 'value', 'key') }
.to raise_error(
Redis::CommandError,
'ERR wrong number of arguments for XADD'
)
end

it 'raises an invalid stream id error' do
expect { @redises.xadd(@key, 'X', 'key', 'value') }
.to raise_error(
Redis::CommandError,
'ERR Invalid stream ID specified as stream command argument'
)
end
end
36 changes: 36 additions & 0 deletions spec/commands/xlen_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
require 'spec_helper'

describe '#xlen(key)' do
before :all do
sleep 1 - (Time.now.to_f % 1)
@key = 'mock-redis-test:xlen'
end

before :each do
@redises._gsub(/\d{3}-\d/, '...-.')
end

it 'returns the number of items in the stream' do
expect(@redises.xlen(@key)).to eq 0
@redises.xadd(@key, '*', 'key', 'value')
expect(@redises.xlen(@key)).to eq 1
3.times { @redises.xadd(@key, '*', 'key', 'value') }
expect(@redises.xlen(@key)).to eq 4
end

it 'raises wrong number of arguments error with missing key' do
expect { @redises.xlen }
.to raise_error(
Redis::CommandError,
"ERR wrong number of arguments for 'xlen' command"
)
end

it 'raises wrong number of arguments error with extra arguments' do
expect { @redises.xlen(@key, 'xyz') }
.to raise_error(
Redis::CommandError,
"ERR wrong number of arguments for 'xlen' command"
)
end
end

0 comments on commit 70794df

Please sign in to comment.