Skip to content
This repository has been archived by the owner on Dec 7, 2023. It is now read-only.

Commit

Permalink
feat: search by any field
Browse files Browse the repository at this point in the history
  • Loading branch information
s-r-x committed Apr 8, 2022
1 parent 0a22934 commit f5d9960
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 45 deletions.
12 changes: 12 additions & 0 deletions packages/root/src/bull-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@ import type {
GlobalJobCompletionCb,
} from './queue';
import type { Maybe } from './typings/utils';
// this is required due to bad bull typings
import * as Bull from 'bull';

export class BullJobAdapter extends Job {
constructor(private _job: BullJob, private _queue: Queue) {
super();
}

// getters
public get rawJob(): BullJob {
return this._job;
}

public get queue(): Queue {
return this._queue;
}
Expand Down Expand Up @@ -68,6 +75,7 @@ export class BullJobAdapter extends Job {
return this._job.timestamp || undefined;
}

// public methods
public async getState(): Promise<JobStatus> {
return this._job.getState() as any;
}
Expand Down Expand Up @@ -189,6 +197,10 @@ export class BullAdapter extends Queue {
return this.normalizeJob(job);
}
}
public jobFromJSON(json: any, jobId: JobId): Job {
// @ts-ignore
return this.normalizeJob(Bull.Job.fromJSON(this._queue, json, jobId));
}

public async getJobs(
status: JobStatus,
Expand Down
8 changes: 7 additions & 1 deletion packages/root/src/bullmq-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ export class BullMQJobAdapter extends Job {
}

// getters
public get rawJob(): BullMQJob {
return this._job;
}

public get queue(): Queue {
return this._queue;
}
Expand Down Expand Up @@ -205,7 +209,9 @@ export class BullMQAdapter extends Queue {
return this.normalizeJob(job);
}
}

public jobFromJSON(json: any, jobId: JobId): Job {
return this.normalizeJob(BullMQJob.fromJSON(this._queue, json, jobId));
}
public async getJobs(
types: JobStatus | JobStatus[],
start?: number,
Expand Down
66 changes: 28 additions & 38 deletions packages/root/src/data-search.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import isempty from 'lodash/isEmpty';
import { Readable } from 'stream';
import jsonata from 'jsonata';
import { JsonService } from './services/json';
import { DEFAULT_DATA_SEARCH_SCAN_COUNT } from './constants';
import type { Queue as BullQueue, JobStatus } from './queue';
import type { Queue, JobStatus, Job } from './queue';
import type { Maybe } from './typings/utils';

type TSearchArgs = {
Expand All @@ -14,14 +13,11 @@ type TSearchArgs = {
scanCount?: number;
};

type TJobExcerpt = {
data: string;
id: string;
};
type TJobsList = Job[];

export class DataSearcher {
constructor(private _queue: BullQueue) {}
async search(args: TSearchArgs) {
export class PowerSearch {
constructor(private _queue: Queue) {}
async search(args: TSearchArgs): Promise<TJobsList> {
let expr: jsonata.Expression;
try {
expr = jsonata(args.search);
Expand All @@ -32,24 +28,20 @@ export class DataSearcher {
if (!it) return [];
const start = args.offset;
const end = args.limit + start;
const acc: string[] = [];
const acc: TJobsList = [];
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/label
mainLoop: for await (const jobs of it.generator()) {
for (const job of jobs) {
const matched = this._matchData(job.data, expr);
if (matched) {
acc.push(job.id);
if (this._evalSearch(job, expr)) {
acc.push(job);
}
if (acc.length >= end) {
break mainLoop;
}
}
}
it.destroy();
const jobs = await Promise.all(
acc.slice(start, end).map((id) => this._queue.getJob(id))
);
return jobs;
return acc.slice(start, end);
}
private _getIterator(args: TSearchArgs): Maybe<AbstractIterator> {
const redisKey = this._queue.toKey(args.status);
Expand All @@ -67,9 +59,9 @@ export class DataSearcher {
return new ListIterator(this._queue, redisKey, config);
}
}
private _matchData(data: string, expr: jsonata.Expression): boolean {
private _evalSearch(job: Job, expr: jsonata.Expression): boolean {
try {
const result = expr.evaluate(JsonService.maybeParse(data));
const result = expr.evaluate(job.rawJob);
if (!result) return false;
return typeof result === 'object' ? !isempty(result) : !!result;
} catch (_e) {
Expand All @@ -84,30 +76,28 @@ type TIteratorConfig = {

abstract class AbstractIterator {
protected _scanCount: number;
constructor(protected _queue: BullQueue, config: TIteratorConfig) {
constructor(protected _queue: Queue, config: TIteratorConfig) {
this._scanCount = config.scanCount || DEFAULT_DATA_SEARCH_SCAN_COUNT;
}
protected async _extractJobsData(ids: string[]): Promise<TJobExcerpt[]> {
protected async _extractJobs(ids: string[]): Promise<TJobsList> {
const client = await this._queue.client;
const pipeline = client.pipeline();
ids.forEach((id) => pipeline.hmget(this._queue.toKey(id), 'data'));
const data = await pipeline.exec();
return data.reduce((acc, [error, [jobData]], idx) => {
if (!error && jobData && jobData !== '{}' && jobData !== '[]') {
acc.push({
data: jobData,
id: ids[idx],
});
ids.forEach((id) => pipeline.hgetall(this._queue.toKey(id)));
const jobs = await pipeline.exec();

return jobs.reduce((acc, [error, job], idx) => {
if (!error && job) {
acc.push(this._queue.jobFromJSON(job, ids[idx]));
}
return acc;
}, [] as TJobExcerpt[]);
}, [] as TJobsList);
}
abstract generator(): AsyncGenerator<TJobExcerpt[]>;
abstract generator(): AsyncGenerator<TJobsList>;
abstract destroy(): void;
}
class SetIterator extends AbstractIterator {
private _stream: Readable;
constructor(queue: BullQueue, private _key: string, config: TIteratorConfig) {
constructor(queue: Queue, private _key: string, config: TIteratorConfig) {
super(queue, config);
}
async *generator() {
Expand All @@ -120,8 +110,8 @@ class SetIterator extends AbstractIterator {
const filteredIds = (ids as string[]).filter(
(_k: string, idx) => !(idx % 2)
);
const data = await this._extractJobsData(filteredIds);
yield data;
const jobs = await this._extractJobs(filteredIds);
yield jobs;
this._stream.resume();
}
}
Expand All @@ -134,7 +124,7 @@ class SetIterator extends AbstractIterator {
class ListIterator extends AbstractIterator {
private _ids: string[];
private _cursor = 0;
constructor(queue: BullQueue, private _key: string, config: TIteratorConfig) {
constructor(queue: Queue, private _key: string, config: TIteratorConfig) {
super(queue, config);
}
async *generator() {
Expand All @@ -146,9 +136,9 @@ class ListIterator extends AbstractIterator {
if (isempty(ids)) {
return;
}
const data = await this._extractJobsData(ids);
this._incCursor(data.length);
yield data;
const jobs = await this._extractJobs(ids);
this._incCursor(jobs.length);
yield jobs;
} catch (e) {
console.error(e);
return;
Expand Down
4 changes: 2 additions & 2 deletions packages/root/src/gql/data-sources/bull/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { DataSource } from 'apollo-datasource';
import { JsonService } from '../../../services/json';
import { OrderEnum } from '../../../typings/gql';
import redisInfo from 'redis-info';
import { DataSearcher } from '../../../data-search';
import { PowerSearch } from '../../../data-search';
import isNil from 'lodash/isNil';
import { BullMonitorError } from '../../../errors';
import { BullErrorEnum as ErrorEnum } from './errors-enum';
Expand Down Expand Up @@ -85,7 +85,7 @@ export class BullDataSource extends DataSource {
return job ? [job] : [];
} else if (dataSearch) {
if (status) {
const searcher = new DataSearcher(bullQueue);
const searcher = new PowerSearch(bullQueue);
return await searcher
.search({
status,
Expand Down
2 changes: 2 additions & 0 deletions packages/root/src/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ export abstract class Job {
public abstract get processedOn(): Maybe<number>;
public abstract get finishedOn(): Maybe<number>;
public abstract get timestamp(): Maybe<number>;
public abstract get rawJob(): any;

public abstract getState(): Promise<JobStatus>;
public abstract moveToCompleted(returnValue?: unknown): Promise<unknown>;
Expand Down Expand Up @@ -80,6 +81,7 @@ export abstract class Queue {
public abstract isPaused(): Promise<boolean>;

public abstract getJob(id: JobId): Promise<Maybe<Job>>;
public abstract jobFromJSON(json: any, jobId: JobId): Job;
public abstract getJobs(
types: JobStatus | JobStatus[],
start?: number,
Expand Down
2 changes: 1 addition & 1 deletion packages/ui/src/components/CloseableTip/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { StorageConfig } from '@/config/storage';

type TProps = {
className?: string;
tip: string;
tip: any;
persistKey: string;
};
export default function CloseableTip(props: TProps) {
Expand Down
20 changes: 18 additions & 2 deletions packages/ui/src/screens/jobs/Filters/DataSearch/Tip.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,27 @@ type TProps = {
className?: string;
};
const DataSearchTip = ({ className }: TProps) => {
const text = (
<>
Search is powered by{' '}
<a target="__blank" href="https://docs.jsonata.org/overview.html">
jsonata
</a>
. Check out the{' '}
<a
target="__blank"
href="https://github.com/s-r-x/bull-monitor/blob/main/search-examples.md"
>
examples
</a>
.
</>
);
return (
<CloseableTip
className={className}
persistKey="data-text-search-v2"
tip='Data search is powered by jsonata(https://docs.jsonata.org/overview.html). Example query for data {"user": {"profile": {"name": "ilya", "age": 30}}} -> user.profile[name="ilya" and age>=30]'
persistKey="data-text-search-v3"
tip={text}
/>
);
};
Expand Down
2 changes: 1 addition & 1 deletion packages/ui/src/screens/jobs/Filters/DataSearch/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ const DataSearch = ({ className }: TProps) => {
className={className}
value={search}
onChange={onChange}
label="Data search"
label="Search"
variant="outlined"
id="jobs-filters_data-search-key"
autoComplete="off"
Expand Down
35 changes: 35 additions & 0 deletions search-examples.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
## bull-monitor search

Let's take this job as an example:

```json
{
"name": "my-job",
"data": {
"hello": {
"to": "world"
}
},
"opts": {
"attempts": 2,
"delay": 1000
}
}
```
To find a job by data we can utilize this query:
```
data.*.hello.to="world"
```
By name:
```
name="my-job"
```
One or more attempts:
```
opts.attempts >= 1
```
Put it all together:
```
data.*.hello.to="world" and name="my-job" or opts.attempts >= 1
```

0 comments on commit f5d9960

Please sign in to comment.