Skip to content

Commit

Permalink
fix: rejects when observables complete before they emit a value; adds…
Browse files Browse the repository at this point in the history
… toSafePromise util
  • Loading branch information
rafamel committed Oct 25, 2019
1 parent b609187 commit 0ba9472
Show file tree
Hide file tree
Showing 13 changed files with 45 additions and 21 deletions.
14 changes: 7 additions & 7 deletions packages/core/src/application/merge-intercepts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
CollectionTree,
CollectionTreeImplementation
} from '~/types';
import { mergeServiceErrors } from '~/utils';
import { mergeServiceErrors } from '~/helpers';
import { Observable, from } from 'rxjs';
import {
isTypeRequest,
Expand All @@ -14,7 +14,7 @@ import {
} from '~/inspect';
import { replace } from '~/transform';
import { allof } from '~/create';
import { take } from 'rxjs/operators';
import { toSafePromise } from '~/utils';

export function mergeIntercepts(
collection: CollectionTreeImplementation
Expand Down Expand Up @@ -64,11 +64,11 @@ export function serviceIntercepts(
errors: mergeServiceErrors(service.types.errors, intercept.errors)
},
resolve(data: any, context, info): Promise<any> {
return interceptFn(data, context, info, (data: any) => {
return from(resolve.call(this, data, context, info));
})
.pipe(take(1))
.toPromise();
return toSafePromise(
interceptFn(data, context, info, (data: any) => {
return from(resolve.call(this, data, context, info));
})
);
}
};
}
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/create/collections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
MutationService,
SubscriptionService
} from '~/types';
import { emptyCollection, mergeCollection } from '~/utils';
import { emptyCollection, mergeCollection } from '~/helpers';

export { collections };

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { InterceptImplementation, CollectionTreeImplementation } from '~/types';
import { from, Observable } from 'rxjs';
import { switchMap, mergeMap } from 'rxjs/operators';
import { emptyIntercept, mergeServiceErrors } from '~/utils';
import { emptyIntercept, mergeServiceErrors } from '~/helpers';
import { isElementService, isServiceImplementation } from '~/inspect';
import { replace } from '~/transform';
import { InterceptCreateInput, HookCreateInput } from './types';
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/create/scopes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
AbstractCollectionTree,
AbstractScopeTree
} from '~/types';
import { emptyCollection, emptyScope } from '~/utils';
import { emptyCollection, emptyScope } from '~/helpers';

export type ScopeCreate<
Q extends QueryService,
Expand Down
File renamed without changes.
2 changes: 2 additions & 0 deletions packages/core/src/helpers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './empty';
export * from './merge';
File renamed without changes.
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ export * from './generate';
export * from './inspect';
export * from './transform';
export * from './types';
export * from './utils';
export * from './errors';
2 changes: 1 addition & 1 deletion packages/core/src/inspect/is/implementation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
isTypeError,
isTypeRequest
} from './element';
import { emptyCollection } from '~/utils';
import { emptyCollection } from '~/helpers';
import { traverse } from '../traverse';

export function isTreeImplementation(
Expand Down
7 changes: 2 additions & 5 deletions packages/core/src/transform/to.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
isServiceImplementation,
isServiceSubscription
} from '~/inspect';
import { take } from 'rxjs/operators';
import { toSafePromise } from '~/utils';

export function toImplementation<T extends CollectionTree>(
collection: T,
Expand Down Expand Up @@ -66,10 +66,7 @@ export function toUnary<
...element,
kind: 'query',
resolve(...args: any): Promise<any> {
return resolve
.apply(this, args)
.pipe(take(1))
.toPromise();
return toSafePromise(resolve.apply(this, args));
}
};
}) as AbstractCollectionTree<Q, M, never>;
Expand Down
3 changes: 1 addition & 2 deletions packages/core/src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
export * from './empty';
export * from './merge';
export * from './safe-promise';
25 changes: 25 additions & 0 deletions packages/core/src/utils/safe-promise.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { Observable } from 'rxjs';
import { take } from 'rxjs/operators';

/**
* Converts an *Observable* into a *Promise* that will throw if the observable
* completes before it emits a value.
*/
export function toSafePromise<T>(observable: Observable<T>): Promise<T> {
return new Promise((resolve, reject) => {
const subscription = observable.pipe(take(1)).subscribe({
next(value) {
resolve(value);
setTimeout(() => subscription.unsubscribe, 0);
},
error(err) {
reject(err);
setTimeout(() => subscription.unsubscribe, 0);
},
complete() {
reject(Error(`Source completed before emitting a result`));
setTimeout(() => subscription.unsubscribe, 0);
}
});
});
}
6 changes: 3 additions & 3 deletions packages/rpc-adapter/src/resolve.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import {
isServiceQuery,
isServiceMutation,
isServiceSubscription,
ApplicationService
ApplicationService,
toSafePromise
} from '@karmic/core';
import { Observable } from 'rxjs';
import { take } from 'rxjs/operators';

export default function resolve(
request: Partial<RPCRequest> & Pick<RPCRequest, 'id'>,
Expand Down Expand Up @@ -52,7 +52,7 @@ export default function resolve(
if (isServiceSubscription(service.declaration)) {
if (action === 'query') {
const obs = service.resolve(data || {}, context) as Observable<any>;
return channels.open(id, obs.pipe(take(1)).toPromise());
return channels.open(id, toSafePromise(obs));
}
if (action === 'subscribe') {
return channels.open(id, service.resolve(data || {}, context));
Expand Down

0 comments on commit 0ba9472

Please sign in to comment.