Skip to content

Commit

Permalink
fix xadd inconsistency with redis implementation
Browse files Browse the repository at this point in the history
This commit aligns id generation with the official "specification/docu":

> Both quantities are 64-bit numbers. When an ID is auto-generated, the
> first part is the Unix time in milliseconds of the Redis instance
> generating the ID. The second part is just a sequence number and is
> used in order to distinguish IDs generated in the same millisecond.

Main motivation is that the current implementation was flawed when using
capped streams by creating invalid ids (e.g. generating the same id twice)
as well as re-using mocked tests and run them in an e2e fashion.

Tests were added to cover the additional funcionality.

Links:
https://redis.io/commands/xadd/
  • Loading branch information
ullumullu committed Feb 5, 2024
1 parent 6a58955 commit c2e2b81
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 43 deletions.
32 changes: 27 additions & 5 deletions src/commands/xadd.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,40 @@ export function xadd(stream, id, ...args) {
keyId = args.shift()
}

const eventId = `${
keyId === '*' ? this.data.get(stream).length + 1 : keyId
}-0`
const list = this.data.get(stream)
// Last timestamp is relevant for auto generating valid ids
let lastTimestamp = 0
let lastCounter = 0
if (list.length > 0) {
;[lastTimestamp, lastCounter] = list[list.length - 1][0].split('-')
}

let [unixTimestamp, counter] = keyId.split('-')
if (unixTimestamp === '*') {
unixTimestamp =
Number(lastTimestamp) > Date.now() ? Number(lastTimestamp) : Date.now()
}
if (counter === undefined || counter === '*') {
counter =
Number(lastTimestamp) === Number(unixTimestamp)
? Number(lastCounter) + 1
: 0
}

if (list.length > 0 && list[0][0] === `${eventId}`) {
const sequentialIdProvided =
Number(unixTimestamp) > Number(lastTimestamp) ||
(Number(unixTimestamp) === Number(lastTimestamp) &&
Number(counter) >= Number(lastCounter))
if (!sequentialIdProvided) {
throw new Error(
'ERR The ID specified in XADD is equal or smaller than the target stream top item'
)
}

this.data.set(`stream:${stream}:${eventId}`, { polled: false })
const eventId = `${unixTimestamp}-${counter}`
this.data.set(`stream:${stream}:${eventId}`, {
polled: false,
})

let newData = list.concat([[`${eventId}`, [...args]]])
if (threshold && newData.length > threshold) {
Expand Down
105 changes: 95 additions & 10 deletions test/integration/commands/xadd.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,87 @@ import Redis from 'ioredis'

describe('xadd', () => {
const redis = new Redis()
if (!process.env.IS_E2E) jest.useFakeTimers()
const fixedTimestamp = Date.now()

afterAll(() => {
redis.disconnect()
if (!process.env.IS_E2E) jest.useRealTimers()
})

it('should add events with a given and sequential id to a stream', () => {
return redis
.xadd('stream', '0-1', 'key', 'val')
.then(id => {
expect(id).toBe('0-1')
})
.then(() => redis.xadd('stream', '0-2', 'key', 'val'))
.then(id => {
expect(id).toBe('0-2')
})
.then(() => redis.xadd('stream', '1-0', 'key', 'val'))
.then(id => {
expect(id).toBe('1-0')
})
.then(() => redis.xadd('stream', '1337-1337', 'key', 'val'))
.then(id => {
expect(id).toBe('1337-1337')
})
})

it('should only increment counter when last timestamp is greater than current date', () => {
return redis
.xadd('stream', `${fixedTimestamp + 100000}-0`, 'key', 'val')
.then(id => {
expect(id).toBe(`${fixedTimestamp + 100000}-0`)
})
.then(() => redis.xadd('stream', '*', 'key', 'val'))
.then(id => {
expect(id).toBe(`${fixedTimestamp + 100000}-1`)
})
})

it('should correctly generate ids on capped streams', () => {
return redis
.xadd('stream', 'MAXLEN', '=', '2', '*', 'key', 'val')
.then(id => {
expect(id).toBe(`${fixedTimestamp}-0`)
})
.then(() => redis
.xadd('stream', 'MAXLEN', '=', '1', '*', 'key', 'val'))
.then(id => {
expect(id).toBe(`${fixedTimestamp}-1`)
}).then(() => redis
.xadd('stream', 'MAXLEN', '=', '2', '*', 'key', 'val'))
.then(id => {
expect(id).toBe(`${fixedTimestamp}-2`)
}).then(() => redis
.xadd('stream', 'MAXLEN', '=', '1', '*', 'key', 'val'))
.then(id => {
expect(id).toBe(`${fixedTimestamp}-3`)
})
})

// @TODO Rewrite test so it runs on a real Redis instance
;(process.env.IS_E2E ? it.skip : it)('should add events to a stream', () => {
;
(process.env.IS_E2E ? it.skip : it)('should add events to a stream', () => {
return redis
.xadd('stream', '*', 'key', 'val')
.then(id => {
expect(id).toBe('1-0')
expect(redis.data.get('stream')).toEqual([['1-0', ['key', 'val']]])
expect(id).toBe(`${fixedTimestamp}-0`)
expect(redis.data.get('stream')).toEqual([
[`${fixedTimestamp}-0`, ['key', 'val']],
])
expect(redis.data.get(`stream:stream:${id}`)).toEqual({
polled: false,
})
})
.then(() => redis.xadd('stream', '*', 'key', 'val'))
.then(id => {
expect(id).toBe('2-0')
expect(id).toBe(`${fixedTimestamp}-1`)
expect(redis.data.get('stream')).toEqual([
['1-0', ['key', 'val']],
['2-0', ['key', 'val']],
[`${fixedTimestamp}-0`, ['key', 'val']],
[`${fixedTimestamp}-1`, ['key', 'val']],
])
expect(redis.data.get(`stream:stream:${id}`)).toEqual({
polled: false,
Expand All @@ -40,10 +100,24 @@ describe('xadd', () => {
)
)
.then(id => {
expect(id).toBe('3-0')
expect(id).toBe(`${fixedTimestamp}-2`)
expect(redis.data.get('stream')).toEqual([
[`${fixedTimestamp}-1`, ['key', 'val']],
[`${fixedTimestamp}-2`, ['reading', '{"key": "value"}']],
])
expect(redis.data.get(`stream:stream:${id}`)).toEqual({
polled: false,
})
// Advancing the time will reset the counter to 0
jest.advanceTimersByTime(1)
})
.then(() => redis.xadd('stream', '*', 'key', 'val'))
.then(id => {
expect(id).toBe(`${fixedTimestamp + 1}-0`)
expect(redis.data.get('stream')).toEqual([
['2-0', ['key', 'val']],
['3-0', ['reading', '{"key": "value"}']],
[`${fixedTimestamp}-1`, ['key', 'val']],
[`${fixedTimestamp}-2`, ['reading', '{"key": "value"}']],
[`${fixedTimestamp + 1}-0`, ['key', 'val']],
])
expect(redis.data.get(`stream:stream:${id}`)).toEqual({
polled: false,
Expand Down Expand Up @@ -74,4 +148,15 @@ describe('xadd', () => {
)
)
})
})

it('should throw when adding a smaller id', () => {
redis
.xadd('stream', '0-2', 'key', 'value')
.then(() => redis.xadd('stream', '0-1', 'key', 'value'))
.catch(err =>
expect(err.message).toBe(
'ERR The ID specified in XADD is equal or smaller than the target stream top item'
)
)
})
})
90 changes: 62 additions & 28 deletions test/integration/commands/xread.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@ import Redis from 'ioredis'

describe('xread', () => {
const redis = new Redis()
if (!process.env.IS_E2E) jest.useFakeTimers({
doNotFake: ['setTimeout', 'performance'],
})
const fixedTimestamp = Date.now()

afterAll(() => {
redis.disconnect()
if (!process.env.IS_E2E) jest.useRealTimers()
})

// @TODO Rewrite test so it runs on a real Redis instance
Expand All @@ -13,26 +19,34 @@ describe('xread', () => {
const redis = new Redis({
data: {
stream: [
['1-0', ['key', 'val']],
['2-0', ['key', 'val']],
['3-0', ['key', 'val']],
['4-0', ['key', 'val']],
[`${fixedTimestamp}-0`, ['key', 'val']],
[`${fixedTimestamp}-1`, ['key', 'val']],
[`${fixedTimestamp}-2`, ['key', 'val']],
[`${fixedTimestamp}-3`, ['key', 'val']],
],
'stream:stream:1-0': { polled: false },
'stream:stream:2-0': { polled: false },
'stream:stream:3-0': { polled: false },
'stream:stream:4-0': { polled: false },
[`stream:stream:${fixedTimestamp}-0`]: {
polled: false,
},
[`stream:stream:${fixedTimestamp}-1`]: {
polled: false,
},
[`stream:stream:${fixedTimestamp}-2`]: {
polled: false,
},
[`stream:stream:${fixedTimestamp}-3`]: {
polled: false,
},
},
})
return redis
.xread('COUNT', '2', 'STREAMS', 'stream', '2-0')
.xread('COUNT', '2', 'STREAMS', 'stream', `${fixedTimestamp}-1`)
.then(events =>
expect(events).toEqual([
[
'stream',
[
['2-0', ['key', 'val']],
['3-0', ['key', 'val']],
[`${fixedTimestamp}-1`, ['key', 'val']],
[`${fixedTimestamp}-2`, ['key', 'val']],
],
],
])
Expand All @@ -52,10 +66,18 @@ describe('xread', () => {
['3-0', ['key', 'val']],
],
'other-stream': [['1-0', ['key', 'val']]],
'stream:stream:1-0': { polled: false },
'stream:stream:2-0': { polled: false },
'stream:stream:3-0': { polled: false },
'stream:other-stream:1-0': { polled: false },
'stream:stream:1-0': {
polled: false,
},
'stream:stream:2-0': {
polled: false,
},
'stream:stream:3-0': {
polled: false,
},
'stream:other-stream:1-0': {
polled: false,
},
},
})
return redis
Expand Down Expand Up @@ -87,10 +109,18 @@ describe('xread', () => {
['3-0', ['key', 'val']],
],
'other-stream': [['1-0', ['key', 'val']]],
'stream:stream:1-0': { polled: false },
'stream:stream:2-0': { polled: false },
'stream:stream:3-0': { polled: false },
'stream:other-stream:1-0': { polled: false },
'stream:stream:1-0': {
polled: false,
},
'stream:stream:2-0': {
polled: false,
},
'stream:stream:3-0': {
polled: false,
},
'stream:other-stream:1-0': {
polled: false,
},
},
})
return redis
Expand Down Expand Up @@ -119,7 +149,7 @@ describe('xread', () => {
.then(row => {
const [[stream, [[id, values]]]] = row
expect(stream).toBe('stream')
expect(id).toBe('1-0')
expect(id).toBe(`${fixedTimestamp}-0`)
expect(values).toEqual(['key', 'val'])
})
return redis.xadd('stream', '*', 'key', 'val').then(() => op)
Expand All @@ -131,11 +161,11 @@ describe('xread', () => {
'should block reads till data becomes available since an id',
() => {
const op = redis
.xread('BLOCK', '0', 'STREAMS', 'stream', '2-0')
.xread('BLOCK', '0', 'STREAMS', 'stream', `${fixedTimestamp}-1`)
.then(row => {
const [[stream, [[id, values]]]] = row
expect(stream).toBe('stream')
expect(id).toBe('2-0')
expect(id).toBe(`${fixedTimestamp}-1`)
expect(values).toEqual(['key', 'val'])
})
return redis
Expand All @@ -146,7 +176,6 @@ describe('xread', () => {
)

it('should block reads on a stream with a time out', () => {
const redis = new Redis()
const before = performance.now()
return redis
.xread('BLOCK', '500', 'STREAMS', 'empty-stream', '$')
Expand All @@ -158,7 +187,6 @@ describe('xread', () => {
})

it('should block reads on multiple streams with a time out', () => {
const redis = new Redis()
const before = performance.now()
return redis
.xread(
Expand Down Expand Up @@ -202,7 +230,7 @@ describe('xread', () => {
const after = performance.now()
expect(after - before >= 100).toBe(true)
expect(row).toEqual([
['empty-stream-2', [['1-0', ['key', 'val']]]],
['empty-stream-2', [[`${fixedTimestamp}-0`, ['key', 'val']]]],
['empty-stream', []],
])
})
Expand All @@ -220,9 +248,15 @@ describe('xread', () => {
['2-0', ['key', 'val']],
['3-0', ['key', 'val']],
],
'stream:stream:1-0': { polled: false },
'stream:stream:2-0': { polled: false },
'stream:stream:3-0': { polled: false },
'stream:stream:1-0': {
polled: false,
},
'stream:stream:2-0': {
polled: false,
},
'stream:stream:3-0': {
polled: false,
},
},
})
return redis.xread('STREAMS', 'stream', '2-0').then(events =>
Expand Down

0 comments on commit c2e2b81

Please sign in to comment.