Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement live query on @orbit/record-cache #718

Merged
merged 1 commit into from Mar 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 19 additions & 0 deletions packages/@orbit/record-cache/src/async-record-cache.ts
Expand Up @@ -38,6 +38,7 @@ import {
} from './record-accessor';
import { PatchResult } from './patch-result';
import { QueryResult, QueryResultData } from './query-result';
import { AsyncLiveQuery } from './live-query/async-live-query';

const { assert } = Orbit;

Expand Down Expand Up @@ -246,6 +247,24 @@ export abstract class AsyncRecordCache implements Evented, AsyncRecordAccessor {
return result;
}

liveQuery(
queryOrExpressions: QueryOrExpressions,
options?: object,
id?: string
): AsyncLiveQuery {
const query = buildQuery(
queryOrExpressions,
options,
id,
this.queryBuilder
);

return new AsyncLiveQuery({
cache: this,
query
});
}

/////////////////////////////////////////////////////////////////////////////
// Protected methods
/////////////////////////////////////////////////////////////////////////////
Expand Down
2 changes: 2 additions & 0 deletions packages/@orbit/record-cache/src/index.ts
Expand Up @@ -4,9 +4,11 @@ export * from './record-accessor';

export * from './async-record-cache';
export * from './async-operation-processor';
export * from './live-query/async-live-query';

export * from './sync-record-cache';
export * from './sync-operation-processor';
export * from './live-query/sync-live-query';

// Operators
export * from './operators/async-inverse-patch-operators';
Expand Down
57 changes: 57 additions & 0 deletions packages/@orbit/record-cache/src/live-query/async-live-query.ts
@@ -0,0 +1,57 @@
import { Schema, Query } from '@orbit/data';
import { QueryResult } from '../query-result';
import { AsyncRecordCache } from '../async-record-cache';
import { LiveQuery, LiveQuerySettings } from './live-query';

export interface AsyncLiveQueryUpdateSettings {
cache: AsyncRecordCache;
query: Query;
}

export class AsyncLiveQueryUpdate {
private _cache: AsyncRecordCache;
private _query: Query;

constructor(settings: AsyncLiveQueryUpdateSettings) {
this._cache = settings.cache;
this._query = settings.query;
}

query(): Promise<QueryResult> {
return this._cache.query(this._query);
}
}

export interface AsyncLiveQuerySettings extends LiveQuerySettings {
cache: AsyncRecordCache;
}

export class AsyncLiveQuery extends LiveQuery {
protected cache: AsyncRecordCache;

protected get schema(): Schema {
return this.cache.schema;
}

private get _update() {
return new AsyncLiveQueryUpdate({
cache: this.cache,
query: this._query
});
}

constructor(settings: AsyncLiveQuerySettings) {
super(settings);
this.cache = settings.cache;
}

async query(): Promise<QueryResult> {
return this._update.query();
}

subscribe(cb: (update: AsyncLiveQueryUpdate) => void): () => void {
return this._subscribe(() => {
cb(this._update);
});
}
}
182 changes: 182 additions & 0 deletions packages/@orbit/record-cache/src/live-query/live-query.ts
@@ -0,0 +1,182 @@
import Orbit, { Evented } from '@orbit/core';
import {
QueryExpression,
FindRecord,
FindRecords,
FindRelatedRecord,
FindRelatedRecords,
equalRecordIdentities,
Query,
Schema,
RecordOperation
} from '@orbit/data';

import { RecordChange, recordOperationChange } from './record-change';

const { assert } = Orbit;

export interface LiveQuerySettings {
query: Query;
}

export abstract class LiveQuery {
protected cache: Evented;
protected schema: Schema;

protected _query: Query;
protected _subscribe(onNext: () => void): () => void {
const execute = onceTick(onNext);

const unsubscribePatch = this.cache.on(
'patch',
(operation: RecordOperation) => {
if (this.operationRelevantForQuery(operation)) {
execute();
}
}
);

const unsubscribeReset = this.cache.on('reset', () => {
execute();
});

function unsubscribe() {
cancelTick(execute);
unsubscribePatch();
unsubscribeReset();
}

return unsubscribe;
}

constructor(settings: LiveQuerySettings) {
assert(
'Only single expression queries are supported on LiveQuery',
settings.query.expressions.length === 1
);
this._query = settings.query;
}

operationRelevantForQuery(operation: RecordOperation): boolean {
const change = recordOperationChange(operation);
const expression = this._query.expressions[0];
return this.queryExpressionRelevantForChange(expression, change);
}

protected queryExpressionRelevantForChange(
expression: QueryExpression,
change: RecordChange
): boolean {
switch (expression.op) {
case 'findRecord':
return this.findRecordQueryExpressionRelevantForChange(
expression as FindRecord,
change
);
case 'findRecords':
return this.findRecordsQueryExpressionRelevantForChange(
expression as FindRecords,
change
);
case 'findRelatedRecord':
return this.findRelatedRecordQueryExpressionRelevantForChange(
expression as FindRelatedRecord,
change
);
case 'findRelatedRecords':
return this.findRelatedRecordsQueryExpressionRelevantForChange(
expression as FindRelatedRecords,
change
);
default:
return true;
}
}

protected findRecordQueryExpressionRelevantForChange(
expression: FindRecord,
change: RecordChange
): boolean {
return equalRecordIdentities(expression.record, change);
}

protected findRecordsQueryExpressionRelevantForChange(
expression: FindRecords,
change: RecordChange
): boolean {
if (expression.type) {
return expression.type === change.type;
} else if (expression.records) {
for (let record of expression.records) {
if (record.type === change.type) {
return true;
}
}
return false;
}
return true;
}

protected findRelatedRecordQueryExpressionRelevantForChange(
expression: FindRelatedRecord,
change: RecordChange
): boolean {
return (
equalRecordIdentities(expression.record, change) &&
(change.relationships.includes(expression.relationship) || change.remove)
);
}

protected findRelatedRecordsQueryExpressionRelevantForChange(
expression: FindRelatedRecords,
change: RecordChange
): boolean {
const { type } = this.schema.getRelationship(
expression.record.type,
expression.relationship
);

if (Array.isArray(type) && type.find(type => type === change.type)) {
return true;
} else if (type === change.type) {
return true;
}

return (
equalRecordIdentities(expression.record, change) &&
(change.relationships.includes(expression.relationship) || change.remove)
);
}
}

const isNode =
typeof process === 'object' && typeof process.nextTick === 'function';
let resolvedPromise: Promise<void>;
const nextTick = isNode
? function(fn: () => void) {
if (!resolvedPromise) {
resolvedPromise = Promise.resolve();
}
resolvedPromise.then(() => {
process.nextTick(fn);
});
}
: window.setImmediate || setTimeout;

function onceTick(fn: () => void) {
return function tick() {
if (!ticks.has(tick)) {
ticks.add(tick);
nextTick(() => {
fn();
cancelTick(tick);
});
}
};
}

function cancelTick(tick: () => void) {
ticks.delete(tick);
}

const ticks = new WeakSet();
68 changes: 68 additions & 0 deletions packages/@orbit/record-cache/src/live-query/record-change.ts
@@ -0,0 +1,68 @@
import {
Record,
cloneRecordIdentity,
RecordIdentity,
RecordOperation
} from '@orbit/data';

export interface RecordChange extends RecordIdentity {
keys: string[];
attributes: string[];
relationships: string[];
meta: string[];
links: string[];
remove: boolean;
}

export function recordOperationChange(
operation: RecordOperation
): RecordChange {
const record = operation.record as Record;
const change: RecordChange = {
...cloneRecordIdentity(record),
remove: false,
keys: [],
attributes: [],
relationships: [],
meta: [],
links: []
};

switch (operation.op) {
case 'addRecord':
case 'updateRecord':
if (record.keys) {
change.keys = Object.keys(record.keys);
}
if (record.attributes) {
change.attributes = Object.keys(record.attributes);
}
if (record.relationships) {
change.relationships = Object.keys(record.relationships);
}
if (record.meta) {
change.meta = Object.keys(record.meta);
}
if (record.links) {
change.links = Object.keys(record.links);
}
break;
case 'replaceAttribute':
change.attributes = [operation.attribute];
break;
case 'replaceKey':
change.keys = [operation.key];
break;
case 'replaceRelatedRecord':
case 'replaceRelatedRecords':
case 'addToRelatedRecords':
case 'removeFromRelatedRecords':
change.relationships = [operation.relationship];
break;
case 'removeRecord':
change.remove = true;
break;
}

return change;
}