diff --git a/Gruntfile.coffee b/Gruntfile.coffee index e12fb443..9e011006 100644 --- a/Gruntfile.coffee +++ b/Gruntfile.coffee @@ -151,10 +151,10 @@ module.exports = (grunt) -> grunt.registerTask 'build-kefir', ['concat:kefir', 'uglify:kefir'] grunt.registerTask 'test', ['jasmine_node:main', 'jshint:main'] grunt.registerTask 'build-docs', ['jade:docs'] - grunt.registerTask 'release-patch', ['bump', 'release'] - grunt.registerTask 'release-minor', ['bump:minor', 'release'] - grunt.registerTask 'release-major', ['bump:major', 'release'] - grunt.registerTask 'release-pre', ['bump:prerelease', 'release'] + grunt.registerTask 'release-patch', ['bump', 'release', 'post-release'] + grunt.registerTask 'release-minor', ['bump:minor', 'release', 'post-release'] + grunt.registerTask 'release-major', ['bump:major', 'release', 'post-release'] + grunt.registerTask 'release-pre', ['bump:prerelease', 'release', 'post-release'] grunt.registerTask 'default', [ 'clean', 'build-docs', 'build-kefir', 'build-browser-tests', 'test'] diff --git a/bower.json b/bower.json index 5059dc67..085e4a03 100644 --- a/bower.json +++ b/bower.json @@ -1,6 +1,6 @@ { "name": "kefir", - "version": "1.0.0", + "version": "1.1.0", "homepage": "https://github.com/pozadi/kefir", "authors": [ "Roman Pominov " @@ -34,7 +34,7 @@ "!LICENSE.txt" ], "devDependencies": { - "bacon": "0.7.43", + "bacon": "0.7.48", "jquery": "1.11.1", "jasmine-reporters": "git@github.com:larrymyers/jasmine-reporters.git#0.4.1", "coffeescript": "git@github.com:jashkenas/coffeescript.git#1.9.0", diff --git a/changelog.md b/changelog.md index 2d3f9d61..d48a3923 100644 --- a/changelog.md +++ b/changelog.md @@ -1,3 +1,10 @@ +## 1.1.0 + + - The `Bus` and `Pool` classes are exposed as `Kefir.Bus` and `Kefir.Pool` + - A bug in `.merge` and `.zip` fixed which may cause them to not unsubscribe from their sources in very rare cases + - New method `.emitEvent` in Emitter, Emitter Object, and Bus + - New method `Kefir.repeat` + ## 1.0.0 - jQuery plugin moved to a [separate repo](https://github.com/pozadi/kefir-jquery) @@ -26,11 +33,10 @@ ## 0.5.0 - Base errors support added (i.e. errors flow through all kind of transformations/combinations) + - Properties now may have a current error (as well as current value) - New method `.onError` - New method `.offError` - - Emitter now has `.error` method for emitting errors - - Emitter object also now has `.error` method - - Properties now may have a current error (as well as current value) + - New method `.error` in Emitter, Emitter Object, and Bus - New method `Kefir.constantError` - New method `.mapErrors` - New method `.filterErrors` diff --git a/docs-src/descriptions/create.jade b/docs-src/descriptions/create.jade index 8d262ac9..34ecd8ee 100644 --- a/docs-src/descriptions/create.jade +++ b/docs-src/descriptions/create.jade @@ -3,8 +3,10 @@ h2#create-stream Create a stream +descr-method('emitter', 'emitter', 'Kefir.emitter()'). Creates an emitter, that is ordinary stream, but also has additional methods: - #[tt .emit(value)], #[tt .error(error)], and #[tt .end()]. - Once an emitter was created, one can easily emit all three kind of events to it, + #[tt .emit(value)], #[tt .error(error)], #[tt .end()], and #[tt .emitEvent()]. + The first three are pretty self-descriptive, and the last one accepts event object with same format + as in #[a(href='#on-any') onAny] method and emits the event. + Once an emitter was created, one can easily emit all three kind of events from it, using these methods. pre(title='example'). @@ -26,12 +28,12 @@ pre(title='events in time'). div p. - #[b Emitter] is the easiest way + #[img(data-emoji="point_up")] #[b Emitter] is the easiest way to create general purpose streams, but it doesn't give you control over the #[a(href='#active-state') active state] of the stream — it doesn't allows you to monitor if the stream has subscribers or not, and sub/unsub to your original source or doing other resource management based on - whether stream has subscribers or not. If you want to have that control, you should use + that. If you want to have that control, you should use #[a(href='#from-binder') fromBinder] or #[a(href='#from-sub-unsub') fromSubUnsub]. @@ -418,6 +420,91 @@ div + ++descr-method('repeat', 'repeat', 'Kefir.repeat(generator)'). + Calls #[b generator] function which supposed to return an observable. + Emits values and errors from spawned observable, when it ends + calls #[b generator] again to get new one and so on. + +p. + The #[b generator] function is called with one argument — iteration number + starting from #[tt 0]. If a falsy value returned + from #[b generator] the stream ends. + +pre(title='example') + :escapehtml + var result = Kefir.repeat(function(i) { + if (i < 3) { + return Kefir.sequentially(100, [i, i]); + } else { + return false; + } + }); + result.log(); + +pre(title='console output') + :escapehtml + > [repeat] 0 + > [repeat] 0 + > [repeat] 1 + > [repeat] 1 + > [repeat] 2 + > [repeat] 2 + > [repeat] + +pre(title='events in time'). + spawned 1: ---0---0X + spawned 2: ---1---1X + spawned 3: ---2---2X + + result: ---0---0---1---1---2---2X +div + +p. + #[img(data-emoji="point_up")] Note that with this method + it's possible to create an infinite loop. Consider this example: + +pre(title='example') + :escapehtml + var result = Kefir.repeat(function() { + return Kefir.constant(1); + }); + + // When we subscribe to it (directly or via .log) + // we already in infinite loop. + result.log(); + + // But if we limit it with .take or something it'll work just fine. + // So the `result` stream defined like this + // still may make sense depend on how we use it. + result.take(10).log(); + +p. + It's even more dangerous if #[b generator] constantly returns ended observable + with no values (e.g. #[a(href='#never') never]). + In this case #[tt .take] won't help, because you'll never get any single + value from it, but #[b generator] will be called over and over. + The only escape path here is to define an escape condition it the + #[b generator]: + +pre(title='example') + :escapehtml + var result = Kefir.repeat(function(i) { + + // Defining that a new observable will be spawned at most 10 times + if (i >= 10) { + return false; + } + + return Kefir.never(); + }); + +p. + So just be careful when using #[b repeat], + it's a little dangerous but still a great method. + + + h2#create-property Create a property diff --git a/docs-src/descriptions/emitter-object.jade b/docs-src/descriptions/emitter-object.jade index 5879b41e..b778681b 100644 --- a/docs-src/descriptions/emitter-object.jade +++ b/docs-src/descriptions/emitter-object.jade @@ -1,13 +1,17 @@ h2#emitter-object Emitter object p. - Emitter object is an object, that has three methods #[b emit], #[b error], and #[b end]. + Emitter object is an object, that has four methods for emitting events. It is used in several places in Kefir as a proxy to emit events to some observable. +p. + The methods names describe themself pretty clearly: + ul li #[tt emiter.emit(value)] accepts one argument (any value) li #[tt emiter.error(error)] accepts one argument (any value) li #[tt emiter.end()] accepts no arguments + li #[tt emiter.emitEvent(event)] accepts one agrument (event object with same format as in #[a(href='#on-any') onAny] method). Note that the #[tt current] property of #[b event object] are ignored and automatically set to appropriate value for result event. pre :escapehtml @@ -21,7 +25,7 @@ p. They both have similar methods, but #[b emitter object] isn't actually a stream, it has no stream methods or functionality. - Emitter object has only three methods, that's it. + Emitter object has only four methods, that's it. p. All #[b emitter object] methods are bound to its context, diff --git a/docs-src/includes/side-menu.jade b/docs-src/includes/side-menu.jade index ef469c7b..3ee73c1a 100644 --- a/docs-src/includes/side-menu.jade +++ b/docs-src/includes/side-menu.jade @@ -21,6 +21,7 @@ ul.toc-section +method('#from-event', 'fromEvent') +method('#from-sub-unsub', 'fromSubUnsub') +method('#from-binder', 'fromBinder') + +method('#repeat', 'repeat') a.toc-title(href='#create-property') Create a property ul.toc-section diff --git a/grunt-tasks/bump.js b/grunt-tasks/bump.js index b968b8a8..232bed64 100644 --- a/grunt-tasks/bump.js +++ b/grunt-tasks/bump.js @@ -26,9 +26,13 @@ module.exports = function(grunt){ grunt.file.write('bower.json', JSON.stringify(bower, null, ' ') + '\n'); grunt.log.ok('bumped version in bower.json to ' + pkg.version); - run('NODE_PATH=./dist grunt'); // lol + run('grunt'); // lol + run('grunt bower'); // lol run('git add .'); run('git add -f dist'); + run('git add -f index.html'); + run('git add -f bower-packages'); + run('git add -f test/in-browser/spec/KefirSpecs.js'); run('git commit -m "'+ pkg.version +'"'); run('git push'); diff --git a/grunt-tasks/post-release.js b/grunt-tasks/post-release.js new file mode 100644 index 00000000..257178ed --- /dev/null +++ b/grunt-tasks/post-release.js @@ -0,0 +1,27 @@ +var semver = require('semver'); +var shell = require('shelljs'); + + + + +module.exports = function(grunt){ + grunt.registerTask('post-release', 'cleanup repository after release', function(type){ + + function run(cmd){ + if (shell.exec(cmd, {silent:false}).code === 0){ + grunt.log.ok(cmd + ' ...success'); + } else{ + grunt.fail.warn('Failed when executing: `' + cmd + '`\n') + } + } + + + run('git rm -r dist'); + run('git rm -r bower-packages'); + run('git rm index.html'); + run('git rm test/in-browser/spec/KefirSpecs.js'); + run('git commit -m "cleanup repository after release"'); + run('git push'); + }); +}; + diff --git a/package.json b/package.json index cef2f92b..492868f1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "kefir", - "version": "1.0.0", + "version": "1.1.0", "description": "Reactive Programming library for JavaScript inspired by Bacon.js and RxJS with focus on high performance and low memory usage", "main": "dist/kefir.js", "scripts": { @@ -29,7 +29,7 @@ }, "license": "MIT", "devDependencies": { - "baconjs": "0.7.43", + "baconjs": "0.7.48", "benchmark": "1.0.0", "coffee-script": "1.9.0", "coffeeify": "1.0.0", @@ -39,14 +39,15 @@ "grunt-cli": "0.1.13", "grunt-contrib-clean": "0.6.0", "grunt-contrib-concat": "0.5.0", - "grunt-contrib-jade": "0.14.0", + "grunt-contrib-jade": "0.14.1", "grunt-contrib-jshint": "0.11.0", "grunt-contrib-uglify": "0.7.0", "grunt-contrib-watch": "0.6.1", - "grunt-jasmine-node": "0.2.1", + "grunt-jasmine-node": "git+https://git@github.com/pozadi/grunt-jasmine-node.git", "grunt-release": "0.7.0", + "jasmine-node": "1.14.5", "load-grunt-tasks": "3.1.0", - "rx": "2.3.24", + "rx": "2.3.25", "semver": "2.3.0", "shelljs": "0.3.0", "sinon": "1.12.2", diff --git a/src/interval.js b/src/interval.js index 2fe5cdbf..4854dd0f 100644 --- a/src/interval.js +++ b/src/interval.js @@ -9,7 +9,8 @@ withInterval('withInterval', { this._emitter = { emit: function(x) { $._send(VALUE, x) }, error: function(x) { $._send(ERROR, x) }, - end: function() { $._send(END) } + end: function() { $._send(END) }, + emitEvent: function(e) { $._send(e.type, e.value) } } }, _free: function() { diff --git a/src/multiple-sources.js b/src/multiple-sources.js index a7db51be..16bd62ca 100644 --- a/src/multiple-sources.js +++ b/src/multiple-sources.js @@ -98,7 +98,11 @@ inherit(_AbstractPool, Stream, { var sources = this._curSources , i; this._activating = true; - for (i = 0; i < sources.length; i++) { this._subscribe(sources[i]) } + for (i = 0; i < sources.length; i++) { + if (this._active) { + this._subscribe(sources[i]); + } + } this._activating = false; }, _onDeactivation: function() { @@ -234,6 +238,9 @@ inherit(Bus, _AbstractPool, { end: function() { this._send(END); return this; + }, + emitEvent: function(event) { + this._send(event.type, event.value); } }); @@ -370,7 +377,9 @@ inherit(Zip, Stream, { this._drainArrays(); this._aliveCount = length; for (i = 0; i < length; i++) { - this._sources[i].onAny(this._bindHandleAny(i), [this, i]); + if (this._active) { + this._sources[i].onAny(this._bindHandleAny(i), [this, i]); + } } }, diff --git a/src/one-source.js b/src/one-source.js index e65091c7..ce9909eb 100644 --- a/src/one-source.js +++ b/src/one-source.js @@ -56,7 +56,8 @@ withOneSource('withHandler', { this._emitter = { emit: function(x) { $._send(VALUE, x, $._forcedCurrent) }, error: function(x) { $._send(ERROR, x, $._forcedCurrent) }, - end: function() { $._send(END, null, $._forcedCurrent) } + end: function() { $._send(END, null, $._forcedCurrent) }, + emitEvent: function(e) { $._send(e.type, e.value, $._forcedCurrent) } } }, _free: function() { diff --git a/src/primary.js b/src/primary.js index 58ad5e5d..e580becd 100644 --- a/src/primary.js +++ b/src/primary.js @@ -16,7 +16,8 @@ inherit(FromBinder, Stream, { , emitter = { emit: function(x) { $._send(VALUE, x, isCurrent) }, error: function(x) { $._send(ERROR, x, isCurrent) }, - end: function() { $._send(END, null, isCurrent) } + end: function() { $._send(END, null, isCurrent) }, + emitEvent: function(e) { $._send(e.type, e.value, isCurrent) } }; this._unsubscribe = this._fn(emitter) || null; @@ -70,6 +71,9 @@ inherit(Emitter, Stream, { end: function() { this._send(END); return this; + }, + emitEvent: function(event) { + this._send(event.type, event.value); } }); @@ -131,3 +135,80 @@ Kefir.constantError = function(x) { return new ConstantError(x); } + + + +// Kefir.repeat(generator) + +function Repeat(generator) { + Stream.call(this); + this._generator = generator; + this._source = null; + this._inLoop = false; + this._activating = false; + this._iteration = 0; + + var $ = this; + this._$handleAny = function(event) { + $._handleAny(event); + }; +} + +inherit(Repeat, Stream, { + + _name: 'repeat', + + _handleAny: function(event) { + if (event.type === END) { + this._source = null; + this._startLoop(); + } else { + this._send(event.type, event.value, this._activating); + } + }, + + _startLoop: function() { + if (!this._inLoop) { + this._inLoop = true; + while (this._source === null && this._alive && this._active) { + this._source = this._generator(this._iteration++); + if (this._source) { + this._source.onAny(this._$handleAny); + } else { + this._send(END); + } + } + this._inLoop = false; + } + }, + + _onActivation: function() { + this._activating = true; + if (this._source) { + this._source.onAny(this._$handleAny); + } else { + this._startLoop(); + } + this._activating = false; + }, + + _onDeactivation: function() { + if (this._source) { + this._source.offAny(this._$handleAny); + } + }, + + _clear: function() { + Stream.prototype._clear.call(this); + this._generator = null; + this._source = null; + this._$handleAny = null; + } + +}); + +Kefir.repeat = function(generator) { + return new Repeat(generator); +} + + diff --git a/test/specs/bus.coffee b/test/specs/bus.coffee index 8a43d5d4..d3e6f35e 100644 --- a/test/specs/bus.coffee +++ b/test/specs/bus.coffee @@ -16,7 +16,7 @@ describe 'bus', -> it 'should not be ended', -> expect(Kefir.bus()).toEmit [] - it 'should emit values and end', -> + it 'should emit events', -> a = Kefir.bus() expect(a).toEmit [1, 2, {error: -1}, 3, ''], -> a.emit(1) @@ -25,6 +25,15 @@ describe 'bus', -> a.emit(3) a.end() + it 'should emit events via .emitEvent', -> + a = Kefir.bus() + expect(a).toEmit [1, 2, {error: -1}, 3, ''], -> + a.emitEvent({type: 'value', value: 1, current: false}) + a.emitEvent({type: 'value', value: 2, current: true}) # `current` should be ignored + a.emitEvent({type: 'error', value: -1, current: false}) + a.emitEvent({type: 'value', value: 3, current: false}) + a.emitEvent({type: 'end', value: undefined, current: false}) + # pool specs diff --git a/test/specs/emmiter.coffee b/test/specs/emmiter.coffee index 6c7ec88b..6b5d3014 100644 --- a/test/specs/emmiter.coffee +++ b/test/specs/emmiter.coffee @@ -4,11 +4,16 @@ describe 'emitter', -> it 'should return stream', -> expect(Kefir.emitter()).toBeStream() + expect(new Kefir.Emitter()).toBeStream() + + it 'should return emitter', -> + expect(Kefir.emitter()).toBeEmitter() + expect(new Kefir.Emitter()).toBeEmitter() it 'should not be ended', -> expect(Kefir.emitter()).toEmit [] - it 'should emit values and end', -> + it 'should emit events', -> a = Kefir.emitter() expect(a).toEmit [1, 2, {error: -1}, 3, ''], -> a.emit(1) @@ -16,3 +21,12 @@ describe 'emitter', -> a.error(-1) a.emit(3) a.end() + + it 'should emit events via .emitEvent', -> + a = Kefir.emitter() + expect(a).toEmit [1, 2, {error: -1}, 3, ''], -> + a.emitEvent({type: 'value', value: 1, current: false}) + a.emitEvent({type: 'value', value: 2, current: true}) # `current` should be ignored + a.emitEvent({type: 'error', value: -1, current: false}) + a.emitEvent({type: 'value', value: 3, current: false}) + a.emitEvent({type: 'end', value: undefined, current: false}) diff --git a/test/specs/from-binder.coffee b/test/specs/from-binder.coffee index c4bb286e..8589c4a6 100644 --- a/test/specs/from-binder.coffee +++ b/test/specs/from-binder.coffee @@ -66,6 +66,21 @@ describe 'fromBinder', -> ).toEmitInTime [[0, {current: 1}], [0, {currentError: -1}], [0, {current: 2}], [1000, 2], [1000, '']] + it 'should support emitter.emitEvent', -> + expect( + Kefir.fromBinder (emitter) -> + emitter.emitEvent({type: 'value', value: 1, current: true}); + emitter.emitEvent({type: 'error', value: -1, current: false}); + emitter.emitEvent({type: 'value', value: 2, current: false}); + setTimeout -> + emitter.emitEvent({type: 'value', value: 3, current: true}); + emitter.emitEvent({type: 'value', value: 4, current: false}); + emitter.emitEvent({type: 'end', value: undefined, current: false}); + , 1000 + null + ).toEmitInTime [[0, {current: 1}], [0, {currentError: -1}], [0, {current: 2}], [1000, 3], [1000, 4], [1000, '']] + + # https://github.com/pozadi/kefir/issues/35 it 'should work with .take(1) and sync emit', -> diff --git a/test/specs/merge.coffee b/test/specs/merge.coffee index 2ad18109..aba2105c 100644 --- a/test/specs/merge.coffee +++ b/test/specs/merge.coffee @@ -75,3 +75,10 @@ describe 'merge', -> b = prop() c = stream() expect(Kefir.merge([a, b, c])).errorsToFlow(c) + + it 'should work correctly when unsuscribing after one sync event', -> + a = Kefir.constant(1) + b = Kefir.interval(1000, 1) + c = a.merge(b) + activate(c.take(1)) + expect(b).not.toBeActive() diff --git a/test/specs/repeat.coffee b/test/specs/repeat.coffee new file mode 100644 index 00000000..7327563d --- /dev/null +++ b/test/specs/repeat.coffee @@ -0,0 +1,117 @@ +{stream, prop, send, activate, deactivate, Kefir} = require('../test-helpers.coffee') + +describe 'repeat', -> + + it 'should return stream', -> + expect(Kefir.repeat()).toBeStream() + + it 'should work correctly (with .constant)', -> + a = Kefir.repeat (i) -> + Kefir[if i == 2 then 'constantError' else 'constant'](i) + expect(a.take(3)).toEmit [ + {current: 0}, + {current: 1}, + {currentError: 2}, + {current: 3}, + '' + ] + + + it 'should work correctly (with .later)', -> + a = Kefir.repeat (i) -> Kefir.later(100, i) + expect(a.take(3)).toEmitInTime [ + [100, 0], + [200, 1], + [300, 2], + [300, ''] + ] + + it 'should work correctly (with .sequentially)', -> + a = Kefir.repeat (i) -> Kefir.sequentially(100, [1, 2, 3]) + expect(a.take(5)).toEmitInTime [ + [100, 1], + [200, 2], + [300, 3], + [400, 1], + [500, 2], + [500, ''] + ] + + it 'should not cause stack overflow', -> + sum = (a, b) -> a + b + genConstant = -> Kefir.constant(1) + + a = Kefir.repeat(genConstant).take(3000).reduce(sum, 0) + expect(a).toEmit [{current: 3000}, ''] + + + it 'should get new source only if previous one ended', -> + a = stream() + + callsCount = 0 + b = Kefir.repeat -> + callsCount++ + if !a._alive + a = stream() + a + + expect(callsCount).toBe(0) + activate(b) + expect(callsCount).toBe(1) + deactivate(b) + activate(b) + expect(callsCount).toBe(1) + send(a, ['']) + expect(callsCount).toBe(2) + + + + it 'should unsubscribe from source', -> + a = stream() + b = Kefir.repeat -> a + expect(b).toActivate(a) + + + + it 'should end when falsy value returned from generator', -> + a = Kefir.repeat (i) -> + if i < 3 + Kefir.constant(i) + else + false + expect(a).toEmit [ + {current: 0}, + {current: 1}, + {current: 2}, + '' + ] + + # https://github.com/baconjs/bacon.js/issues/521 + it 'should work with @AgentME\'s setup', -> + + allSpawned = [] + + i = 0 + step = -> + if ++i == 1 + a = Kefir.later(1, 'later') + allSpawned.push(a) + a + else + a = Kefir.constant(5) + b = Kefir.repeatedly(200, [6, 7, 8]) + c = a.merge(b) + allSpawned.push(a) + allSpawned.push(b) + allSpawned.push(c) + c + + expect(Kefir.repeat(step).take(2)).toEmitInTime [ + [1, 'later'], + [1, 5], + [1, ''] + ], (->), 100 + + for obs in allSpawned + expect(obs).not.toBeActive() + diff --git a/test/specs/with-handler.coffee b/test/specs/with-handler.coffee index e4b56b02..6e13010f 100644 --- a/test/specs/with-handler.coffee +++ b/test/specs/with-handler.coffee @@ -11,6 +11,9 @@ describe 'withHandler', -> when 'error' then emitter.error(event.value) when 'end' then emitter.end() + emitEventMirror = (emitter, event) -> + emitter.emitEvent(event) + duplicate = (emitter, event) -> if event.type == 'value' @@ -52,6 +55,11 @@ describe 'withHandler', -> send(a, ['']) expect(a.withHandler mirror).toEmit [''] + it 'should support emitter.emitEvent', -> + a = stream() + expect(a.withHandler emitEventMirror).toEmit [1, {error: 3}, 2, ''], -> + send(a, [1, {error: 3}, 2, '']) + describe 'property', -> @@ -74,6 +82,11 @@ describe 'withHandler', -> expect(a.withHandler duplicate).toEmit [{current: 1}, {currentError: 0}, 2, 2, {error: 4}, {error: 4}, 3, 3, ''], -> send(a, [2, {error: 4}, 3, '']) + it 'should support emitter.emitEvent', -> + a = send(prop(), [1, {error: 0}]) + expect(a.withHandler emitEventMirror).toEmit [{current: 1}, {currentError: 0}, 2, {error: 4}, 3, ''], -> + send(a, [2, {error: 4}, 3, '']) + it 'should automatically preserve isCurent (end)', -> a = prop() expect(a.withHandler mirror).toEmit [''], -> diff --git a/test/specs/with-interval.coffee b/test/specs/with-interval.coffee index f6903963..a2324a21 100644 --- a/test/specs/with-interval.coffee +++ b/test/specs/with-interval.coffee @@ -19,3 +19,18 @@ describe 'withInterval', -> expect(Kefir.withInterval(100, fn)).toEmitInTime( [[ 100, 1 ], [ 100, 2 ], [ 200, {error: -1} ], [ 300, 3 ], [ 300, 6 ], [ 300, '' ]] ) + + it 'should support emitter.emitEvent', -> + i = 0 + fn = (emitter) -> + i++ + if i == 2 + emitter.emitEvent({type: 'error', value: -1, current: false}) + else + emitter.emitEvent({type: 'value', value: i, current: true}) # current should be ignored + emitter.emitEvent({type: 'value', value: i*2, current: false}) + if i == 3 + emitter.emitEvent({type: 'end', value: undefined, current: false}) + expect(Kefir.withInterval(100, fn)).toEmitInTime( + [[ 100, 1 ], [ 100, 2 ], [ 200, {error: -1} ], [ 300, 3 ], [ 300, 6 ], [ 300, '' ]] + ) diff --git a/test/specs/zip.coffee b/test/specs/zip.coffee index 0dcf4e5c..066d0eb4 100644 --- a/test/specs/zip.coffee +++ b/test/specs/zip.coffee @@ -138,3 +138,19 @@ describe 'zip', -> activate(cb) deactivate(cb) expect(cb).toEmit [{current: [0, 1]}] + + + it 'should work correctly when unsuscribing after one sync event', -> + a0 = stream() + a = a0.toProperty(1) + b0 = stream() + b = b0.toProperty(1) + c = Kefir.zip([a, b]) + + activate(c1 = c.take(2)) + send(b0, [1, 1]) + send(a0, [1]) + deactivate(c1) + + activate(c.take(1)) + expect(b).not.toBeActive() diff --git a/test/test-helpers.coffee b/test/test-helpers.coffee index 398d9daa..0552a22b 100644 --- a/test/test-helpers.coffee +++ b/test/test-helpers.coffee @@ -92,6 +92,9 @@ beforeEach -> toBeStream: -> @message = -> "Expected #{@actual.toString()} to be instance of Stream" @actual instanceof Kefir.Stream + toBeEmitter: -> + @message = -> "Expected #{@actual.toString()} to be instance of Emitter" + @actual instanceof Kefir.Emitter toBePool: -> @message = -> "Expected #{@actual.toString()} to be instance of Pool" @actual instanceof Kefir.Pool