Skip to content

Commit

Permalink
implement live query on @orbit/record-cache
Browse files Browse the repository at this point in the history
  • Loading branch information
tchak committed Dec 29, 2019
1 parent d9d7bcb commit e29d57a
Show file tree
Hide file tree
Showing 8 changed files with 839 additions and 0 deletions.
19 changes: 19 additions & 0 deletions packages/@orbit/record-cache/src/async-record-cache.ts
Expand Up @@ -38,6 +38,7 @@ import {
} from './record-accessor';
import { PatchResult } from './patch-result';
import { QueryResult, QueryResultData } from './query-result';
import { AsyncLiveQuery } from './live-query/async-live-query';

const { assert } = Orbit;

Expand Down Expand Up @@ -246,6 +247,24 @@ export abstract class AsyncRecordCache implements Evented, AsyncRecordAccessor {
return result;
}

liveQuery(
queryOrExpressions: QueryOrExpressions,
options?: object,
id?: string
): AsyncLiveQuery {
const query = buildQuery(
queryOrExpressions,
options,
id,
this.queryBuilder
);

return new AsyncLiveQuery({
cache: this,
query
});
}

/////////////////////////////////////////////////////////////////////////////
// Protected methods
/////////////////////////////////////////////////////////////////////////////
Expand Down
32 changes: 32 additions & 0 deletions packages/@orbit/record-cache/src/live-query/async-live-query.ts
@@ -0,0 +1,32 @@
import { RecordException } from '@orbit/data';
import { QueryResult } from '../query-result';
import { AsyncRecordCache } from '../async-record-cache';
import { LiveQuery, LiveQuerySettings } from './live-query';

export interface AsyncLiveQuerySettings extends LiveQuerySettings {
cache: AsyncRecordCache;
}

export class AsyncLiveQuery extends LiveQuery {
cache: AsyncRecordCache;

constructor(settings: AsyncLiveQuerySettings) {
super(settings);
this.cache = settings.cache;
}

get schema() {
return this.cache.schema;
}

executeQuery(
onNext: (result: QueryResult) => void,
onError?: (error: RecordException) => void
): void {
this.cache.query(this.query).then(onNext, onError || defaultOnError);
}
}

function defaultOnError(error: RecordException): void {
throw error;
}
187 changes: 187 additions & 0 deletions packages/@orbit/record-cache/src/live-query/live-query.ts
@@ -0,0 +1,187 @@
import { Evented } from '@orbit/core';
import {
QueryExpression,
FindRecord,
FindRecords,
FindRelatedRecord,
FindRelatedRecords,
equalRecordIdentities,
Query,
Schema,
RecordOperation,
RecordException
} from '@orbit/data';

import { QueryResult } from '../query-result';
import { recordOperationChange, RecordChange } from './utils';

export interface LiveQuerySettings {
query: Query;
}

export class LiveQuery {
cache: Evented;
schema: Schema;
query: Query;

executeQuery(
onNext: (result: QueryResult) => void,
onError?: (error: RecordException) => void
): void {
throw new TypeError('executeQuery: Not Implemented.');
}

on(
onNext: (result: QueryResult) => void,
onError?: (error: RecordException) => void
): () => void {
const executeQuery = onceTick(() => this.executeQuery(onNext, onError));
const unsubscribePatch = this.cache.on(
'patch',
(operation: RecordOperation) => {
if (this.match(operation)) {
executeQuery();
}
}
);

const unsubscribeReset = this.cache.on('reset', () => {
executeQuery();
});

executeQuery();

return function unsubscribe() {
cancelTick(executeQuery);
unsubscribePatch();
unsubscribeReset();
};
}

constructor(settings: LiveQuerySettings) {
this.query = settings.query;
}

match(operation: RecordOperation): boolean {
const change = recordOperationChange(operation);
return !!this.query.expressions.find(expression =>
this._queryExpressionMatchChange(expression, change)
);
}

protected _queryExpressionMatchChange(
expression: QueryExpression,
change: RecordChange
): boolean {
switch (expression.op) {
case 'findRecord':
return this._findRecordQueryExpressionMatchChange(
expression as FindRecord,
change
);
case 'findRecords':
return this._findRecordsQueryExpressionMatchChange(
expression as FindRecords,
change
);
case 'findRelatedRecord':
return this._findRelatedRecordQueryExpressionMatchChange(
expression as FindRelatedRecord,
change
);
case 'findRelatedRecords':
return this._findRelatedRecordsQueryExpressionMatchChange(
expression as FindRelatedRecords,
change
);
default:
return true;
}
}

protected _findRecordQueryExpressionMatchChange(
expression: FindRecord,
change: RecordChange
): boolean {
return equalRecordIdentities(expression.record, change);
}

protected _findRecordsQueryExpressionMatchChange(
expression: FindRecords,
change: RecordChange
): boolean {
if (expression.type) {
return expression.type === change.type;
} else if (expression.records) {
for (let record of expression.records) {
if (record.type === change.type) {
return true;
}
}
return false;
}
return true;
}

protected _findRelatedRecordQueryExpressionMatchChange(
expression: FindRelatedRecord,
change: RecordChange
): boolean {
return (
equalRecordIdentities(expression.record, change) &&
(change.relationships.includes(expression.relationship) || change.remove)
);
}

protected _findRelatedRecordsQueryExpressionMatchChange(
expression: FindRelatedRecords,
change: RecordChange
): boolean {
const { type } = this.schema.getRelationship(
expression.record.type,
expression.relationship
);

if (Array.isArray(type) && type.find(type => type === change.type)) {
return true;
} else if (type === change.type) {
return true;
}

return (
equalRecordIdentities(expression.record, change) &&
(change.relationships.includes(expression.relationship) || change.remove)
);
}
}

let resolvedPromise: Promise<void>;
const nextTick =
typeof process === 'object' && typeof process.nextTick === 'function'
? function(fn: () => void) {
if (!resolvedPromise) {
resolvedPromise = Promise.resolve();
}
resolvedPromise.then(() => {
process.nextTick(fn);
});
}
: window.setImmediate || setTimeout;

function onceTick(fn: () => void) {
return function tick() {
if (!ticks.has(tick)) {
ticks.add(tick);
nextTick(() => {
fn();
cancelTick(tick);
});
}
};
}

function cancelTick(tick: () => void) {
ticks.delete(tick);
}

const ticks = new WeakSet();
36 changes: 36 additions & 0 deletions packages/@orbit/record-cache/src/live-query/sync-live-query.ts
@@ -0,0 +1,36 @@
import { RecordException } from '@orbit/data';
import { QueryResult } from '../query-result';
import { SyncRecordCache } from '../sync-record-cache';
import { LiveQuery, LiveQuerySettings } from './live-query';

export interface SyncLiveQuerySettings extends LiveQuerySettings {
cache: SyncRecordCache;
}

export class SyncLiveQuery extends LiveQuery {
cache: SyncRecordCache;

constructor(settings: SyncLiveQuerySettings) {
super(settings);
this.cache = settings.cache;
}

get schema() {
return this.cache.schema;
}

executeQuery(
onNext: (result: QueryResult) => void,
onError?: (error: RecordException) => void
): void {
try {
onNext(this.cache.query(this.query));
} catch (error) {
(onError || defaultOnError)(error);
}
}
}

function defaultOnError(error: RecordException) {
throw error;
}
58 changes: 58 additions & 0 deletions packages/@orbit/record-cache/src/live-query/utils.ts
@@ -0,0 +1,58 @@
import {
Record,
cloneRecordIdentity,
RecordIdentity,
RecordOperation
} from '@orbit/data';

export interface RecordChange extends RecordIdentity {
keys: string[];
attributes: string[];
relationships: string[];
remove: boolean;
}

export function recordOperationChange(
operation: RecordOperation
): RecordChange {
const record = operation.record as Record;
const change: RecordChange = {
...cloneRecordIdentity(record),
remove: false,
keys: [],
attributes: [],
relationships: []
};

switch (operation.op) {
case 'addRecord':
case 'updateRecord':
if (record.keys) {
change.keys = Object.keys(record.keys);
}
if (record.attributes) {
change.attributes = Object.keys(record.attributes);
}
if (record.relationships) {
change.relationships = Object.keys(record.relationships);
}
break;
case 'replaceAttribute':
change.attributes = [operation.attribute];
break;
case 'replaceKey':
change.keys = [operation.key];
break;
case 'replaceRelatedRecord':
case 'replaceRelatedRecords':
case 'addToRelatedRecords':
case 'removeFromRelatedRecords':
change.relationships = [operation.relationship];
break;
case 'removeRecord':
change.remove = true;
break;
}

return change;
}
19 changes: 19 additions & 0 deletions packages/@orbit/record-cache/src/sync-record-cache.ts
Expand Up @@ -38,6 +38,7 @@ import {
} from './record-accessor';
import { PatchResult } from './patch-result';
import { QueryResult, QueryResultData } from './query-result';
import { SyncLiveQuery } from './live-query/sync-live-query';

const { assert } = Orbit;

Expand Down Expand Up @@ -242,6 +243,24 @@ export abstract class SyncRecordCache implements Evented, SyncRecordAccessor {
return result;
}

liveQuery(
queryOrExpressions: QueryOrExpressions,
options?: object,
id?: string
): SyncLiveQuery {
const query = buildQuery(
queryOrExpressions,
options,
id,
this.queryBuilder
);

return new SyncLiveQuery({
cache: this,
query
});
}

/////////////////////////////////////////////////////////////////////////////
// Protected methods
/////////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit e29d57a

Please sign in to comment.