Skip to content

Commit

Permalink
feat: refactor zrange (#1267)
Browse files Browse the repository at this point in the history
  • Loading branch information
marcoreni committed Apr 8, 2023
1 parent 8859c8f commit 0d0de54
Show file tree
Hide file tree
Showing 8 changed files with 231 additions and 171 deletions.
158 changes: 137 additions & 21 deletions src/commands/zrange-command.common.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import orderBy from 'lodash.orderby';

export function slice(arr, start, end) {
return arr.slice(start, end === -1 ? undefined : end + 1)
}
Expand Down Expand Up @@ -49,29 +51,143 @@ export function filterPredicate(min, max) {
}
}

// this emulates the way Redis handles parsing these arguments
// see https://github.com/antirez/redis/blob/06d490342f51cff316588a7a45124cc410b7d050/src/t_zset.c#L2556
export function getWithScoresAndLimit(args) {
let remaining = args.length
let pos = 0
let withScores = false
let limit = null
let offset = null

while (remaining > 0) {
if (remaining >= 1 && args[pos].toUpperCase() === 'WITHSCORES') {
withScores = true
pos += 1
remaining -= 1
} else if (remaining >= 3 && args[pos].toUpperCase() === 'LIMIT') {
offset = parseInt(args[pos + 1], 10)
limit = parseInt(args[pos + 2], 10)
pos += 3
remaining -= 3
function streq(a, b) {
return a.toString().toLowerCase() === b.toString().toLowerCase()
}

export const DIRECTION_REVERSE = 'reverse';
const DIRECTION_FORWARD = 'forward';

export const RANGE_RANK = 'rank';
export const RANGE_LEX = 'lex';
export const RANGE_SCORE = 'score';

/** https://github.com/redis/redis/blob/f651708a19b4fc8137eec13180fcea39e68fb284/src/t_zset.c#L3589 */
export function zrangeBaseCommand(args, argsStart = 0, store = false, inputRange = null, inputDirection = null) {
const key = args[argsStart];
const map = this.data.get(key)
if (!map) {
return []
}

// @TODO investigate a more stable way to detect sorted lists
if (this.data.has(key) && !(this.data.get(key) instanceof Map)) {
return []
}

let withScores = false;
let offset = 0;
let limit = null;
let direction = inputDirection || DIRECTION_FORWARD;
let range = inputRange || RANGE_RANK;
let start;
let end;

let minIdx = argsStart + 1;
let maxIdx = argsStart + 2;

/* Step 1: Skip the <src> <min> <max> args and parse remaining optional arguments. */
for (let j = argsStart + 3; j < args.length; j++) {
const leftargs = args.length - j - 1;

if (!store && streq(args[j], 'withscores')) {
withScores = 1;
} else if (streq(args[j], 'limit') && leftargs >= 2) {
offset = parseInt(args[j + 1], 10);
limit = parseInt(args[j + 2], 10);
if (Number.isNaN(offset) || Number.isNaN(limit)) {
throw new Error('ERR syntax error')
}

j += 2;
} else if (!inputDirection && streq(args[j], 'rev')) {
direction = DIRECTION_REVERSE;
} else if (!inputRange && streq(args[j] , 'bylex')) {
range = RANGE_LEX;
} else if (!inputRange && streq(args[j], 'byscore')) {
range = RANGE_SCORE;
} else {
throw new Error('ERR syntax error')
throw new Error('ERR syntax error');
}
}

return { withScores, limit, offset }
if (limit !== null && range === RANGE_RANK) {
throw new Error('ERR syntax error, LIMIT is only supported in combination with either BYSCORE or BYLEX');
}

if (withScores && range === RANGE_LEX) {
throw new Error('ERR syntax error, WITHSCORES not supported in combination with BYLEX');
}

if (direction === DIRECTION_REVERSE && (range === RANGE_SCORE || range === RANGE_LEX)) {
/* Range is given as [max,min] */
const tmp = maxIdx;
maxIdx = minIdx;
minIdx = tmp;
}


/* Step 2: Parse the range. */
switch (range) {
case RANGE_RANK:
/* Z[REV]RANGE, ZRANGESTORE [REV]RANGE */
start = parseInt(args[minIdx], 10);
end = parseInt(args[maxIdx], 10);

if (Number.isNaN(start) || Number.isNaN(end)) {
throw new Error('ERR syntax error ');
}
break;

case RANGE_SCORE:
/* Z[REV]RANGEBYSCORE, ZRANGESTORE [REV]RANGEBYSCORE */
start = parseLimit(args[minIdx]);
end = parseLimit(args[maxIdx]);

break;
// FIXME: handle RANGE_LEX
// case ZRANGE_LEX:
// /* Z[REV]RANGEBYLEX, ZRANGESTORE [REV]RANGEBYLEX */
// if (zslParseLexRange(c->argv[minidx], c->argv[maxidx], &lexrange) != C_OK) {
// addReplyError(c, "min or max not valid string range item");
// return;
// }
// break;
// }
default:
throw new Error('ERR syntax error');
}

/* Step 3: Lookup the key and get the range. */
let sort;
if (direction === DIRECTION_REVERSE) {
sort = ['desc', 'desc']
}

let ordered;
if (range === RANGE_SCORE) {
const filteredArray = Array.from(map.values()).filter(
filterPredicate(start, end)
)

ordered = orderBy(filteredArray, ['score', 'value'], sort);
}
// FIXME: handle RANGE_LEX
else {
ordered = slice(
orderBy(Array.from(map.values()), ['score', 'value'], sort),
start,
end
)
}

if (limit !== null) {
ordered = offsetAndLimit(ordered, offset, limit)
}

if (withScores) {
return ordered.flatMap(it => [it.value, `${it.score}`])
}

return ordered.map(it => it.value);
}
34 changes: 3 additions & 31 deletions src/commands/zrange.js
Original file line number Diff line number Diff line change
@@ -1,36 +1,8 @@
import orderBy from 'lodash.orderby'

import { convertStringToBuffer } from '../commands-utils/convertStringToBuffer'
import { slice } from './zrange-command.common'

export function zrange(key, s, e, withScores) {
const map = this.data.get(key)
if (!map) {
return []
}

// @TODO investigate a more stable way to detect sorted lists
if (this.data.has(key) && !(this.data.get(key) instanceof Map)) {
return []
}

const start = parseInt(s, 10)
const end = parseInt(e, 10)

const ordered = slice(
orderBy(Array.from(map.values()), ['score', 'value']),
start,
end
)

if (
typeof withScores === 'string' &&
withScores.toUpperCase() === 'WITHSCORES'
) {
return ordered.flatMap(it => [it.value, `${it.score}`])
}
import { zrangeBaseCommand } from './zrange-command.common';

return ordered.map(it => it.value)
export function zrange(...args) {
return zrangeBaseCommand.call(this, args);
}

export function zrangeBuffer(...args) {
Expand Down
41 changes: 3 additions & 38 deletions src/commands/zrangebyscore.js
Original file line number Diff line number Diff line change
@@ -1,45 +1,10 @@
import orderBy from 'lodash.orderby'

import { convertStringToBuffer } from '../commands-utils/convertStringToBuffer'
import {
filterPredicate,
getWithScoresAndLimit,
offsetAndLimit,
parseLimit,
RANGE_SCORE, zrangeBaseCommand,
} from './zrange-command.common'

export function zrangebyscore(key, inputMin, inputMax, ...args) {
const map = this.data.get(key)
if (!map) {
return []
}

if (this.data.has(key) && !(this.data.get(key) instanceof Map)) {
return []
}

const { withScores, limit, offset } = getWithScoresAndLimit(args)

const min = parseLimit(inputMin)
const max = parseLimit(inputMax)
const filteredArray = Array.from(map.values()).filter(
filterPredicate(min, max)
)

let ordered = orderBy(filteredArray, ['score', 'value'])
if (withScores) {
if (limit !== null) {
ordered = offsetAndLimit(ordered, offset, limit)
}

return ordered.flatMap(it => [it.value, it.score])
}

const results = ordered.map(it => it.value)
if (limit !== null) {
return offsetAndLimit(results, offset, limit)
}
return results
export function zrangebyscore(...args) {
return zrangeBaseCommand.call(this, args, 0, false, RANGE_SCORE);
}

export function zrangebyscoreBuffer(...args) {
Expand Down
35 changes: 3 additions & 32 deletions src/commands/zrevrange.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,8 @@
import orderBy from 'lodash.orderby'

import { convertStringToBuffer } from '../commands-utils/convertStringToBuffer'
import { slice } from './zrange-command.common'

export function zrevrange(key, s, e, w) {
const map = this.data.get(key)
if (!map) {
return []
}

// @TODO investigate a more stable way to detect sorted lists
if (this.data.has(key) && !(this.data.get(key) instanceof Map)) {
return []
}

const start = parseInt(s, 10)
const end = parseInt(e, 10)

let val = orderBy(
Array.from(map.values()),
['score', 'value'],
['desc', 'desc']
).map(it => {
if (w) {
return [it.value, it.score]
}

return [it.value]
})

val = slice(val, start, end)
import { DIRECTION_REVERSE, zrangeBaseCommand } from './zrange-command.common'

return [].concat(...val)
export function zrevrange(...args) {
return zrangeBaseCommand.call(this, args, 0, false, null, DIRECTION_REVERSE);
}

export function zrevrangeBuffer(...args) {
Expand Down
42 changes: 3 additions & 39 deletions src/commands/zrevrangebyscore.js
Original file line number Diff line number Diff line change
@@ -1,45 +1,9 @@
import orderBy from 'lodash.orderby'

import { convertStringToBuffer } from '../commands-utils/convertStringToBuffer'
import {
filterPredicate,
getWithScoresAndLimit,
offsetAndLimit,
parseLimit,
} from './zrange-command.common'

export function zrevrangebyscore(key, inputMax, inputMin, ...args) {
const map = this.data.get(key)
if (!map) {
return []
}

if (this.data.has(key) && !(this.data.get(key) instanceof Map)) {
return []
}

const { withScores, limit, offset } = getWithScoresAndLimit(args)

const min = parseLimit(inputMin)
const max = parseLimit(inputMax)
const filteredArray = Array.from(map.values()).filter(
filterPredicate(min, max)
)

let ordered = orderBy(filteredArray, ['score', 'value'], ['desc', 'desc'])
if (withScores) {
if (limit !== null) {
ordered = offsetAndLimit(ordered, offset, limit)
}
import { DIRECTION_REVERSE, RANGE_SCORE, zrangeBaseCommand } from './zrange-command.common';

return ordered.flatMap(it => [it.value, it.score])
}

const results = ordered.map(it => it.value)
if (limit !== null) {
return offsetAndLimit(results, offset, limit)
}
return results
export function zrevrangebyscore(...args) {
return zrangeBaseCommand.call(this, args, 0, false, RANGE_SCORE, DIRECTION_REVERSE);
}

export function zrevrangebyscoreBuffer(...args) {
Expand Down

0 comments on commit 0d0de54

Please sign in to comment.