Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chainable methods on subscriptions #155

Merged
merged 2 commits into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .changeset/subscription-chain.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
"@effection/subscription": minor
---
Add a free `subscribe` function, allow chaining of `map`, `filter` etc on subscriptions and deprecate chaining on subscribables.
76 changes: 76 additions & 0 deletions packages/subscription/src/chainable-subscription.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import { Operation } from 'effection';
import { Subscription } from './subscription';
import { DeepPartial, matcher } from './match';

export class ChainableSubscription<T,TReturn> implements Subscription<T,TReturn> {
constructor(private subscription: Subscription<T, TReturn>) {}

filter(predicate: (value: T) => boolean): ChainableSubscription<T, TReturn> {
let { subscription } = this;
return new ChainableSubscription({
*next() {
while(true) {
let result = yield subscription.next();
if(result.done) {
return result;
} else if(predicate(result.value)) {
return result;
}
}
}
});
}

match(reference: DeepPartial<T>): ChainableSubscription<T,TReturn> {
return this.filter(matcher(reference));
}

map<R>(mapper: (value: T) => R): ChainableSubscription<R, TReturn> {
let { subscription } = this;
return new ChainableSubscription({
*next() {
while(true) {
let result = yield subscription.next();
if(result.done) {
return result;
} else {
return { done: false, value: mapper(result.value) };
}
}
}
});
}

*first(): Operation<T | undefined> {
let result: IteratorResult<T,TReturn> = yield this.subscription.next();
if(result.done) {
return undefined;
} else {
return result.value;
}
}

*expect(): Operation<T> {
let result: IteratorResult<T,TReturn> = yield this.subscription.next();
if(result.done) {
throw new Error('expected subscription to contain a value');
} else {
return result.value;
}
}

*forEach(visit: (value: T) => Operation<void>): Operation<TReturn> {
while (true) {
let result: IteratorResult<T,TReturn> = yield this.subscription.next();
if(result.done) {
return result.value;
} else {
yield visit(result.value);
}
}
}

next() {
return this.subscription.next();
}
}
11 changes: 11 additions & 0 deletions packages/subscription/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,13 @@
export { Subscription, createSubscription } from './subscription';
export { SymbolSubscribable, SubscriptionSource, forEach, Subscribable } from './subscribable';
export { ChainableSubscription } from './chainable-subscription';

import { Operation } from 'effection';
import { subscribe as rawSubscribe } from './subscribable';
import { ChainableSubscription } from './chainable-subscription';
import { SubscriptionSource } from './subscribable';

export function* subscribe<T, TReturn>(source: SubscriptionSource<T,TReturn>): Operation<ChainableSubscription<T,TReturn>> {
let inner = yield rawSubscribe(source);
return new ChainableSubscription<T, TReturn>(inner);
}
2 changes: 1 addition & 1 deletion packages/subscription/src/subscribable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export class Chain<T, TReturn> implements Subscribable<T,TReturn> {
}
}

function subscribe<T, TReturn>(source: SubscriptionSource<T,TReturn>): Operation<Subscription<T,TReturn>> {
export function subscribe<T, TReturn>(source: SubscriptionSource<T,TReturn>): Operation<Subscription<T,TReturn>> {
if (isSubscribable<T,TReturn>(source)) {
let subscriber = getSubscriber<T,TReturn>(source);
if (subscriber) {
Expand Down
3 changes: 2 additions & 1 deletion packages/subscription/src/subscription.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Operation, resource } from 'effection';

import { ChainableSubscription } from './chainable-subscription';
import { Semaphore } from './semaphore';

export type Subscriber<T,TReturn> = (publish: (value: T) => void) => Operation<TReturn>;
Expand Down Expand Up @@ -27,7 +28,7 @@ export function createSubscription<T, TReturn>(subscribe: Subscriber<T,TReturn>)
return wait.then(() => results.shift() as IteratorResult<T,TReturn>);
};

let subscription = yield resource({ next }, function*() {
let subscription = yield resource(new ChainableSubscription({ next }), function*() {
try {
let value = yield subscribe((value: T) => publish(value));
results.push({ done: true, value });
Expand Down
119 changes: 119 additions & 0 deletions packages/subscription/test/chainable-subscription.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import * as expect from 'expect';
import { describe, it, beforeEach } from 'mocha';
import { spawn } from './helpers';

import { Operation } from 'effection';
import { createSubscription, ChainableSubscription, Subscribable, SymbolSubscribable, forEach } from '../src/index';

interface Thing {
name: string;
type: string;
}

function* stuff(): Operation<ChainableSubscription<Thing, number>> {
return yield createSubscription(function*(publish) {
publish({name: 'bob', type: 'person' });
publish({name: 'alice', type: 'person' });
publish({name: 'world', type: 'planet' });
return 3;
})
}

function* emptySubscription(): Operation<ChainableSubscription<Thing, number>> {
return yield createSubscription(function*(publish) {
return 12;
})
}

describe('chainable subscriptions', () => {
let subscription: ChainableSubscription<Thing, number>;

beforeEach(async () => {
subscription = await spawn(stuff);
});

describe('forEach', () => {
let values: Thing[];
let result: number;
beforeEach(async () => {
values = [];
result = await spawn(subscription.forEach(function*(item) { values.push(item); }));
});

it('iterates through all members of the subscribable', () => {
expect(values).toEqual([
{name: 'bob', type: 'person' },
{name: 'alice', type: 'person' },
{name: 'world', type: 'planet' },
])
});

it('returns the original result', () => {
expect(result).toEqual(3);
});
});

describe('map', () => {
it('maps over the values', async () => {
let mapped = subscription.map(item => `hello ${item.name}`);
await expect(spawn(mapped.next())).resolves.toEqual({ done: false, value: 'hello bob' });
await expect(spawn(mapped.next())).resolves.toEqual({ done: false, value: 'hello alice' });
await expect(spawn(mapped.next())).resolves.toEqual({ done: false, value: 'hello world' });
await expect(spawn(mapped.next())).resolves.toEqual({ done: true, value: 3 });
});
});

describe('filter', () => {
it('filters the values', async () => {
let filtered = subscription.filter(item => item.type === 'person');
await expect(spawn(filtered.next())).resolves.toEqual({ done: false, value: { name: 'bob', type: 'person' } });
await expect(spawn(filtered.next())).resolves.toEqual({ done: false, value: { name: 'alice', type: 'person' } });
await expect(spawn(filtered.next())).resolves.toEqual({ done: true, value: 3 });
});
});

describe('match', () => {
it('filters the values based on the given pattern', async () => {
let matched = subscription.match({ type: 'person' });
await expect(spawn(matched.next())).resolves.toEqual({ done: false, value: { name: 'bob', type: 'person' } });
await expect(spawn(matched.next())).resolves.toEqual({ done: false, value: { name: 'alice', type: 'person' } });
await expect(spawn(matched.next())).resolves.toEqual({ done: true, value: 3 });
});

it('can work on nested items', async () => {
let matched = subscription.map(item => ({ thing: item })).match({ thing: { type: 'person' } });
await expect(spawn(matched.next())).resolves.toEqual({ done: false, value: { thing: { name: 'bob', type: 'person' } } });
await expect(spawn(matched.next())).resolves.toEqual({ done: false, value: { thing: { name: 'alice', type: 'person' } } });
await expect(spawn(matched.next())).resolves.toEqual({ done: true, value: 3 });
});
});

describe('first', () => {
it('returns the first item in the subscription', async () => {
await expect(spawn(subscription.first())).resolves.toEqual({ name: 'bob', type: 'person' });
});

it('returns undefined if the subscription is empty', async () => {
let subscription = await spawn(emptySubscription);
await expect(spawn(subscription.first())).resolves.toEqual(undefined);
});
});

describe('expect', () => {
it('returns the first item in the subscription', async () => {
await expect(spawn(subscription.expect())).resolves.toEqual({ name: 'bob', type: 'person' });
});

it('throws an error if the subscription is empty', async () => {
let subscription = await spawn(emptySubscription);
await spawn(function*() {
try {
yield subscription.expect();
throw new Error('unreachable');
} catch(e) {
expect(e.message).toEqual('expected subscription to contain a value');
}
});
});
});
});
74 changes: 74 additions & 0 deletions packages/subscription/test/subscribe.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import * as expect from 'expect';
import { describe, it, beforeEach } from 'mocha';
import { spawn } from './helpers';

import { Operation } from 'effection';

import { subscribe, ChainableSubscription, Subscription, createSubscription, Subscribable, SymbolSubscribable, forEach } from '../src/index';

interface Thing {
name: string;
type: string;
}
const subscribableWithSymbol = {
[SymbolSubscribable](): Operation<Subscription<Thing, number>> {
return createSubscription(function*(publish) {
publish({name: 'bob', type: 'person' });
publish({name: 'alice', type: 'person' });
publish({name: 'world', type: 'planet' });
return 3;
});
}
};

function* subscribableAsOperation(): Operation<Subscription<Thing, number>> {
return yield createSubscription(function*(publish) {
publish({name: 'sally', type: 'person' });
publish({name: 'jupiter', type: 'planet' });
return 12;
});
}

describe('subscribe', () => {
describe('with symbol subscribable', () => {
let subscription: ChainableSubscription<Thing, number>;

beforeEach(async () => {
subscription = await spawn(subscribe(subscribableWithSymbol));
});

it('iterates through all members of the subscribable', async () => {
await expect(spawn(subscription.next())).resolves.toEqual({ done: false, value: { name: 'bob', type: 'person' } });
await expect(spawn(subscription.next())).resolves.toEqual({ done: false, value: { name: 'alice', type: 'person' } });
await expect(spawn(subscription.next())).resolves.toEqual({ done: false, value: { name: 'world', type: 'planet' } });
await expect(spawn(subscription.next())).resolves.toEqual({ done: true, value: 3 });
});

it('is chainable', async () => {
let filteredSubscription = subscription.filter((t) => t.type === 'person');
await expect(spawn(filteredSubscription.next())).resolves.toEqual({ done: false, value: { name: 'bob', type: 'person' } });
await expect(spawn(filteredSubscription.next())).resolves.toEqual({ done: false, value: { name: 'alice', type: 'person' } });
await expect(spawn(filteredSubscription.next())).resolves.toEqual({ done: true, value: 3 });
});
});

describe('with operation subscribable', () => {
let subscription: ChainableSubscription<Thing, number>;

beforeEach(async () => {
subscription = await spawn(subscribe(subscribableAsOperation));
});

it('iterates through all members of the subscribable', async () => {
await expect(spawn(subscription.next())).resolves.toEqual({ done: false, value: { name: 'sally', type: 'person' } });
await expect(spawn(subscription.next())).resolves.toEqual({ done: false, value: { name: 'jupiter', type: 'planet' } });
await expect(spawn(subscription.next())).resolves.toEqual({ done: true, value: 12 });
});

it('is chainable', async () => {
let filteredSubscription = subscription.filter((t) => t.type === 'person');
await expect(spawn(filteredSubscription.next())).resolves.toEqual({ done: false, value: { name: 'sally', type: 'person' } });
await expect(spawn(filteredSubscription.next())).resolves.toEqual({ done: true, value: 12 });
});
});
});