Skip to content

Commit 50f1bbb

Browse files
committed
feat: use local queue and cache for local dev server instead of Redis one
1 parent f45468b commit 50f1bbb

File tree

10 files changed

+624
-242
lines changed

10 files changed

+624
-242
lines changed

.eslintrc.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ module.exports = {
1616
"no-mixed-operators": 0,
1717
"no-else-return": 0,
1818
"prefer-promise-reject-errors": 0,
19+
"no-plusplus": 0,
1920
"operator-linebreak": ["error", "after"],
2021
'max-len': ['error', 120, 2, {
2122
ignoreUrls: true,
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
const crypto = require('crypto');
2+
3+
class BaseQueueDriver {
4+
redisHash(queryKey) {
5+
return typeof queryKey === 'string' && queryKey.length < 256 ?
6+
queryKey :
7+
crypto.createHash('md5').update(JSON.stringify(queryKey)).digest("hex");
8+
}
9+
}
10+
11+
module.exports = BaseQueueDriver;
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
class LocalCacheDriver {
2+
constructor() {
3+
this.store = {};
4+
}
5+
6+
async get(key) {
7+
if (this.store[key] && this.store[key].exp < new Date().getTime()) {
8+
delete this.store[key];
9+
}
10+
return this.store[key] && this.store[key].value;
11+
}
12+
13+
async set(key, value, expiration) {
14+
this.store[key] = {
15+
value,
16+
exp: new Date().getTime() + expiration * 1000
17+
};
18+
}
19+
20+
async remove(key) {
21+
delete this.store[key];
22+
}
23+
}
24+
25+
module.exports = LocalCacheDriver;
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
const R = require('ramda');
2+
const BaseQueueDriver = require('./BaseQueueDriver');
3+
4+
class LocalQueueDriverConnection {
5+
constructor(driver, options) {
6+
this.redisQueuePrefix = options.redisQueuePrefix;
7+
this.continueWaitTimeout = options.continueWaitTimeout;
8+
this.orphanedTimeout = options.orphanedTimeout;
9+
this.heartBeatTimeout = options.heartBeatTimeout;
10+
this.concurrency = options.concurrency;
11+
this.driver = driver;
12+
this.results = driver.results;
13+
this.resultPromises = driver.resultPromises;
14+
this.queryDef = driver.queryDef;
15+
this.toProcess = driver.toProcess;
16+
this.recent = driver.recent;
17+
this.active = driver.active;
18+
}
19+
20+
getResultPromise(resultListKey) {
21+
if (!this.resultPromises[resultListKey]) {
22+
let resolveMethod = null;
23+
this.resultPromises[resultListKey] = new Promise(resolve => {
24+
resolveMethod = resolve;
25+
});
26+
this.resultPromises[resultListKey].resolve = resolveMethod;
27+
}
28+
return this.resultPromises[resultListKey];
29+
}
30+
31+
async getResultBlocking(queryKey, continueWaitTimeout) {
32+
const resultListKey = this.resultListKey(queryKey);
33+
const timeoutPromise = (timeout) => new Promise((resolve) => setTimeout(() => resolve(null), timeout));
34+
35+
const res = await Promise.race([
36+
this.getResultPromise(resultListKey),
37+
timeoutPromise(continueWaitTimeout || this.continueWaitTimeout * 1000),
38+
]);
39+
if (res) {
40+
delete this.resultPromises[resultListKey];
41+
}
42+
return res;
43+
}
44+
45+
async getResult(queryKey) {
46+
const resultListKey = this.resultListKey(queryKey);
47+
if (this.resultPromises[resultListKey]) {
48+
return this.getResultBlocking(queryKey, 5);
49+
}
50+
return null;
51+
}
52+
53+
queueArray(queueObj, orderFilterLessThan) {
54+
return R.pipe(
55+
R.values,
56+
R.filter(orderFilterLessThan ? q => q.order < orderFilterLessThan : R.identity),
57+
R.sortBy(q => q.order),
58+
R.map(q => q.key)
59+
)(queueObj);
60+
}
61+
62+
addToQueue(keyScore, queryKey, time, queryHandler, query, priority, options) {
63+
const queryQueueObj = {
64+
queryHandler, query, queryKey, stageQueryKey: options.stageQueryKey, priority
65+
};
66+
const key = this.redisHash(queryKey);
67+
if (!this.queryDef[key]) {
68+
this.queryDef[key] = queryQueueObj;
69+
}
70+
let added = 0;
71+
if (!this.toProcess[key]) {
72+
this.toProcess[key] = {
73+
order: keyScore,
74+
key
75+
};
76+
added = 1;
77+
}
78+
this.recent[key] = { order: time, key };
79+
80+
return [added, null, null, Object.keys(this.toProcess).length]; // TODO nulls
81+
}
82+
83+
getToProcessQueries() {
84+
return this.queueArray(this.toProcess);
85+
}
86+
87+
getActiveQueries() {
88+
return this.queueArray(this.active);
89+
}
90+
91+
async getQueryAndRemove(queryKey) {
92+
const key = this.redisHash(queryKey);
93+
const query = this.queryDef[key];
94+
delete this.active[key];
95+
delete this.toProcess[key];
96+
delete this.recent[key];
97+
delete this.queryDef[key];
98+
return [query];
99+
}
100+
101+
setResultAndRemoveQuery(queryKey, executionResult) {
102+
const key = this.redisHash(queryKey);
103+
const promise = this.getResultPromise(this.resultListKey(queryKey));
104+
delete this.active[key];
105+
delete this.toProcess[key];
106+
delete this.recent[key];
107+
delete this.queryDef[key];
108+
promise.resolve(executionResult);
109+
}
110+
111+
removeQuery(queryKey) {
112+
const key = this.redisHash(queryKey);
113+
delete this.active[key];
114+
delete this.toProcess[key];
115+
delete this.recent[key];
116+
delete this.queryDef[key];
117+
}
118+
119+
getOrphanedQueries() {
120+
return this.queueArray(this.recent, new Date().getTime() - this.orphanedTimeout * 1000);
121+
}
122+
123+
getStalledQueries() {
124+
return this.queueArray(this.active, new Date().getTime() - this.heartBeatTimeout * 1000);
125+
}
126+
127+
async getQueryStageState() {
128+
return [this.queueArray(this.active), this.queueArray(this.toProcess), R.clone(this.queryDef)];
129+
}
130+
131+
async getQueryDef(queryKey) {
132+
return this.queryDef[this.redisHash(queryKey)];
133+
}
134+
135+
updateHeartBeat(queryKey) {
136+
const key = this.redisHash(queryKey);
137+
if (this.active[key]) {
138+
this.active[key] = { key, order: new Date().getTime() };
139+
}
140+
}
141+
142+
retrieveForProcessing(queryKey) {
143+
const key = this.redisHash(queryKey);
144+
let added = 0;
145+
if (Object.keys(this.active).length < this.concurrency && !this.active[key]) {
146+
this.active[key] = { key, order: new Date().getTime() };
147+
added = 1;
148+
}
149+
return [added, null, this.queueArray(this.active), Object.keys(this.toProcess).length]; // TODO nulls
150+
}
151+
152+
async optimisticQueryUpdate(queryKey, toUpdate) {
153+
const key = this.redisHash(queryKey);
154+
this.queryDef[key] = { ...this.queryDef[key], ...toUpdate };
155+
}
156+
157+
release() {
158+
}
159+
160+
queryRedisKey(queryKey, suffix) {
161+
return `${this.redisQueuePrefix}_${this.redisHash(queryKey)}_${suffix}`;
162+
}
163+
164+
resultListKey(queryKey) {
165+
return this.queryRedisKey(queryKey, 'RESULT');
166+
}
167+
168+
redisHash(queryKey) {
169+
return this.driver.redisHash(queryKey);
170+
}
171+
}
172+
173+
class LocalQueueDriver extends BaseQueueDriver {
174+
constructor(options) {
175+
super();
176+
this.options = options;
177+
this.results = {};
178+
this.resultPromises = {};
179+
this.queryDef = {};
180+
this.toProcess = {};
181+
this.recent = {};
182+
this.active = {};
183+
}
184+
185+
createConnection() {
186+
return new LocalQueueDriverConnection(this, this.options);
187+
}
188+
}
189+
190+
module.exports = LocalQueueDriver;

packages/cubejs-query-orchestrator/orchestrator/PreAggregations.js

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
const crypto = require('crypto');
22
const R = require('ramda');
3-
const redis = require('redis');
3+
const RedisCacheDriver = require('./RedisCacheDriver');
4+
const LocalCacheDriver = require('./LocalCacheDriver');
45

56
const QueryCache = require('./QueryCache');
67
const ContinueWaitError = require('./ContinueWaitError');
@@ -50,15 +51,15 @@ class PreAggregationLoadCache {
5051
this.queryCache = queryCache;
5152
this.preAggregations = preAggregations;
5253
this.queryResults = {};
53-
this.redisClient = preAggregations.redisClient;
54+
this.cacheDriver = preAggregations.cacheDriver;
5455
}
5556

5657
async tablesFromCache(schema) {
57-
let tables = JSON.parse(await this.redisClient.getAsync(this.tablesRedisKey()));
58+
let tables = await this.cacheDriver.get(this.tablesRedisKey());
5859
if (!tables) {
5960
const client = await this.driverFactory();
6061
tables = await client.getTablesQuery(schema);
61-
await this.redisClient.setAsync(this.tablesRedisKey(), JSON.stringify(tables), 'EX', 120);
62+
await this.cacheDriver.set(this.tablesRedisKey(), tables, 120);
6263
}
6364
return tables;
6465
}
@@ -106,7 +107,7 @@ class PreAggregationLoadCache {
106107
this.tables = undefined;
107108
this.queryStageState = undefined;
108109
this.versionEnries = undefined;
109-
await this.redisClient.delAsync(this.tablesRedisKey());
110+
await this.cacheDriver.remove(this.tablesRedisKey());
110111
}
111112
}
112113

@@ -312,7 +313,9 @@ class PreAggregations {
312313
this.queryCache = queryCache;
313314
this.refreshErrors = {}; // TODO should be in redis
314315
this.tablesUsedInQuery = {}; // TODO should be in redis
315-
this.redisClient = redis.createClient(process.env.REDIS_URL);
316+
this.cacheDriver = process.env.NODE_ENV === 'production' || process.env.REDIS_URL ?
317+
new RedisCacheDriver() :
318+
new LocalCacheDriver();
316319
}
317320

318321
loadAllPreAggregationsIfNeeded (queryBody) {

packages/cubejs-query-orchestrator/orchestrator/QueryCache.js

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
1-
const redis = require('redis');
21
const crypto = require('crypto');
32
const QueryQueue = require('./QueryQueue');
43
const ContinueWaitError = require('./ContinueWaitError');
4+
const RedisCacheDriver = require('./RedisCacheDriver');
5+
const LocalCacheDriver = require('./LocalCacheDriver');
56

67
class QueryCache {
78
constructor(redisPrefix, clientFactory, logger, options) {
89
this.options = options || {};
910
this.redisPrefix = redisPrefix;
1011
this.driverFactory = clientFactory;
1112
this.logger = logger;
12-
this.redisClient = redis.createClient(process.env.REDIS_URL);
13+
this.cacheDriver = process.env.NODE_ENV === 'production' || process.env.REDIS_URL ?
14+
new RedisCacheDriver() :
15+
new LocalCacheDriver();
1316
}
1417

1518
cachedQueryResult (queryBody, preAggregationsTablesToTempTables) {
@@ -193,15 +196,15 @@ class QueryCache {
193196
result: res,
194197
renewalKey
195198
};
196-
return this.redisClient.setAsync(redisKey, JSON.stringify(result), 'EX', expiration)
199+
return this.cacheDriver.set(redisKey, result, expiration)
197200
.then(() => {
198201
this.logger('Renewed', { cacheKey });
199202
return res
200203
});
201204
}).catch(e => {
202205
if (!(e instanceof ContinueWaitError)) {
203206
this.logger('Dropping Cache', { cacheKey, error: e.stack || e });
204-
this.redisClient.delAsync(redisKey)
207+
this.cacheDriver.remove(redisKey)
205208
.catch(e => this.logger('Error removing key', { cacheKey, error: e.stack || e }));
206209
}
207210
throw e;
@@ -213,9 +216,9 @@ class QueryCache {
213216
return fetchNew();
214217
}
215218

216-
return this.redisClient.getAsync(redisKey).then(res => {
219+
return this.cacheDriver.get(redisKey).then(res => {
217220
if (res) {
218-
const parsedResult = JSON.parse(res);
221+
const parsedResult = res;
219222
const renewedAgo = (new Date()).getTime() - parsedResult.time;
220223
this.logger('Found cache entry', {
221224
cacheKey,

0 commit comments

Comments
 (0)